백엔드 데이터 인프라 91편. Kafka Producer API 깊이 — KafkaProducer 생성·send() 콜백·Future, Serializer 4가지(String·ByteArray·Avro·JSON), Custom Partitioner, Interceptor 패턴, Spring Boot KafkaTemplate 까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 91편이에요. 90편 에서 5가지 API 의 큰 그림을 잡았고, 이번 91편은 Producer API 깊이 로 들어갑니다. 86편 Design: Producer 가 설계 결정 을 다뤘다면, 이번 글은 실제 코드 작성 쪽이에요.
Kafka Producer API가 어렵게 느껴지는 이유
겉으로는 producer.send(record) 한 줄이지만, 그 뒤에 5가지 결정 이 숨어 있어요. 첫째, send() 의 반환값 — Future(미래에 결과를 받는 핸들), Callback(완료 시 호출되는 함수), 또는 둘 다 중 무엇을 쓸지 정해야 하고 응답 처리 패턴이 헷갈립니다. 둘째, Serializer(직렬화기) 선택 — String 만으로 충분한지, 아니면 Avro(스키마 기반 이진 포맷)·JSON·Protobuf(Google 의 이진 직렬화) 의 trade-off 를 따져야 하는지. 셋째, 에러 처리 — Retryable 인지 Non-retryable 인지, Custom error handler 를 둘지 결정해야 해요.
이 글에서 Producer API 깊이와 실무 패턴을 정리합니다.
KafkaProducer 생성
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("compression.type", "zstd");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
필수는 bootstrap.servers 와 key.serializer, value.serializer 세 개예요. 여러 broker(클러스터 노드) 를 명시하는 건 클라이언트가 클러스터 metadata 를 요청할 entry point 를 확보하기 위해서고, 한 broker 가 다운돼도 다른 broker 로 fallback 됩니다.
send() — 3가지 패턴
1. Fire-and-Forget (보내고 잊기, 가장 단순)
producer.send(new ProducerRecord<>("topic", "key", "value"));
응답을 안 기다리고 실패도 무시해요. 로그·메트릭 처럼 한두 건 빠져도 괜찮은 자리에만 씁니다.
2. Future (동기 대기)
RecordMetadata metadata = producer.send(record).get(); // 블로킹 대기
System.out.println(metadata.topic() + " p=" + metadata.partition()
+ " offset=" + metadata.offset());
get() 이 blocking 이라 응답 받을 때까지 대기합니다. 가장 안전하지만 느려요(batch 효율 X).
3. Callback (비동기, 권장)
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("send failed", exception);
} else {
log.info("sent to {} p={} offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
비동기로 보내면서 응답 처리까지 같이 합니다. 처리량과 안전성 둘 다 잡혀서 대부분 환경의 표준 패턴.
ProducerRecord — 5가지 옵션
new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
- topic (필수)
- partition (선택) — 명시 시 그 partition 으로 강제, 안 박으면 partitioner 가 선택
- timestamp (선택) — 안 박으면 현재 시각
- key (선택) — 같은 key = 같은 partition
- value (필수)
- headers (선택) — 메타데이터 (trace_id·content-type 등)
흔히 쓰는 단순형:
new ProducerRecord<>("topic", "key", "value");
Serializer — 4가지 선택
StringSerializer (기본 학습용)
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
문자열만 다루고 가장 단순합니다.
ByteArraySerializer (raw 바이트)
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
직렬화를 직접 처리할 때 써요. JSON·Protobuf 를 직접 인코딩하는 경우입니다.
Avro (스키마 진화)
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.x.x</version>
</dependency>
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
스키마 진화(필드 추가·제거·rename) 가 자연스럽고, Confluent Schema Registry(스키마 중앙 저장소) 와 함께 씁니다. 대규모 환경의 사실상 표준이에요.
JSON
props.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");
(Spring Kafka 의존성이 필요합니다.) 또는 Confluent JsonSchemaSerializer 에 Schema Registry 를 붙여 씁니다.
선택 가이드
- 학습·간단 → String
- CDC(변경 데이터 캡처)·이미 직렬화 → ByteArray
- 대규모·스키마 진화 필요 → Avro + Schema Registry
- 개발자 친화·유연 → JSON
Custom Partitioner
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if (key == null) return 0;
// 비즈니스 로직 — VIP 사용자는 partition 0 으로
if (((String) key).startsWith("vip:")) return 0;
return Math.abs(key.hashCode() % cluster.partitionCountForTopic(topic));
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}
props.put("partitioner.class", "com.example.MyPartitioner");
특수 환경 — VIP 분리나 일부 partition 으로 hot data 격리 같은 자리에만 씁니다.
ProducerInterceptor — 가로채기 패턴
public class MyInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 헤더 추가·로깅 등
record.headers().add("trace-id", traceId().getBytes());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 응답 후 처리 — 메트릭 수집 등
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
props.put("interceptor.classes", "com.example.MyInterceptor");
자주 쓰는 자리는 분산 trace 헤더 자동 추가 (OpenTelemetry(분산 추적 표준) 등), 메트릭(전송 횟수·실패율) 자동 수집, 그리고 감사 로그입니다.
에러 처리
Retryable 에러 (자동 retry)
LEADER_NOT_AVAILABLENOT_LEADER_FOR_PARTITIONNETWORK_EXCEPTIONREQUEST_TIMED_OUT
자동으로 retry 가 돌기 때문에 일반적으로 사용자가 따로 처리할 일은 없어요.
Non-Retryable 에러 (즉시 callback 으로)
RecordTooLargeException— 메시지 크기 초과InvalidTopicException— topic 이름 잘못SerializationException— 직렬화 실패AuthenticationException— 권한
이런 에러는 재시도해도 의미가 없어서 callback 에서 명시적으로 처리 해야 합니다.
Callback 패턴
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// 이미 자동 retry 했는데 실패 → 정말 문제
log.error("retriable but exhausted", exception);
} else {
// 즉시 실패 — 메시지 형식·권한 등 문제
deadLetterQueue.send(record); // DLQ 패턴
}
}
});
Producer Lifecycle — close()
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// ... send ...
} // 자동 close (flush + 연결 종료)
또는 명시적으로:
producer.flush(); // 모든 batch 즉시 전송
producer.close(); // 종료
close() 를 안 부르면 마지막 batch 가 손실 됩니다. try-with-resources 로 묶거나 명시적으로 close 를 부르세요.
Spring Boot — KafkaTemplate
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
properties:
enable.idempotence: true
compression.type: zstd
@Autowired
private KafkaTemplate<String, MyEvent> kafkaTemplate;
public void publish(String key, MyEvent event) {
kafkaTemplate.send("events", key, event)
.addCallback(
result -> log.info("sent"),
ex -> log.error("failed", ex)
);
}
훨씬 깔끔합니다. callback 과 Future 둘 다 wrap 해줘요.
Thread Safety
여기서 시험 함정이 하나 있어요 — KafkaProducer 는 thread-safe 라서 여러 스레드가 같은 인스턴스를 공유 해도 되고, 오히려 권장 됩니다.
Connection pool 같은 패턴은 필요 없어요. 애플리케이션 전체에 KafkaProducer 인스턴스 하나 가 일반적입니다.
한계·실무 함정
1. send() Future 의 .get() 동기 사용
처리량이 폭망합니다. callback 패턴 을 쓰세요.
2. close() 누락
마지막 batch 가 손실됩니다. try-with-resources 로 묶어요.
3. 큰 메시지
max.request.size=1048576 # 기본 1MB
초과하면 RecordTooLargeException. 큰 메시지 환경 (이미지·동영상 메타데이터 등) 은 값을 조정하거나 외부 저장에 올리고 키만 메시지로 보냅니다.
4. Serializer 변경 시 호환성
기존 consumer 가 다른 deserializer 를 쓰고 있으면 읽기가 실패 합니다. Schema Registry 가 이런 마이그레이션을 도와줘요.
5. 한 broker 만 bootstrap
bootstrap.servers=kafka1:9092 # 위험
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 # OK
한 broker 가 다운되면 client 가 cluster 를 못 찾습니다. 최소 3개 를 명시하세요.
시험 직전 한 번 더 — Kafka Producer API 함정 압축 노트
- 필수 설정 =
bootstrap.servers+key.serializer+value.serializer bootstrap.servers= 여러 broker 명시 (fallback)- send() 3가지 = Fire-and-Forget · Future (블로킹) · Callback (권장)
- ProducerRecord 옵션 = topic·partition·timestamp·key·value·headers
- 같은 key = 같은 partition (순서 보장)
- Serializer 4가지 = String·ByteArray·Avro (Schema Registry)·JSON
- 대규모 = Avro + Confluent Schema Registry 표준
- Custom Partitioner =
partitioner.class, 비즈니스 로직 기반 - ProducerInterceptor =
interceptor.classes, trace·메트릭·감사 - Retryable 에러 = 자동 retry (LEADER_NOT_AVAILABLE·NETWORK 등)
- Non-Retryable = 즉시 callback (RecordTooLarge·InvalidTopic·SerializationException·Authentication)
- DLQ 패턴 = non-retryable 메시지 별도 topic 저장
KafkaProducerthread-safe — 여러 스레드 공유 OK + 권장- close() 안 부르면 = 마지막 batch 손실, try-with-resources
- 명시적
flush()+close() - 운영 표준 =
acks=all+enable.idempotence=true+compression.type=zstd - Spring Boot =
KafkaTemplate추상화, callback wrap - 함정 — Future
.get()동기 사용 → 처리량 폭망 - 함정 — close() 누락 → batch 손실
- 함정 —
max.request.size초과 → RecordTooLargeException - 함정 — Serializer 변경 시 consumer 호환성
- 함정 — bootstrap.servers 한 broker 만 명시 → 다운 시 client 못 찾음
공식 문서: KafkaProducer Javadoc 에서 자세한 API 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 86편 — Kafka Design: Producer (Partition 선택·ACK·Idempotent)
- 87편 — Kafka Design: Consumer (Pull · Consumer Group · Offset)
- 88편 — Kafka Message Delivery Semantics (at-most·at-least·exactly-once)
- 89편 — Kafka Replication (ISR · Leader Election · Unclean)
- 90편 — Kafka API 5종 종합 (Producer · Consumer · Streams · Connect · Admin)
다음 글: