백엔드 데이터 인프라 91편 — Kafka Producer API 깊이 (Serializer · Callback · Interceptor)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 91편. Kafka Producer API 깊이 — KafkaProducer 생성·send() 콜백·Future, Serializer 4가지(String·ByteArray·Avro·JSON), Custom Partitioner, Interceptor 패턴, Spring Boot KafkaTemplate 까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 91편 — Kafka Producer API 깊이 (Serializer · Callback · Interceptor)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 91편이에요. 90편 에서 5가지 API 의 큰 그림을 잡았고, 이번 91편은 Producer API 깊이 로 들어갑니다. 86편 Design: Producer설계 결정 을 다뤘다면, 이번 글은 실제 코드 작성 쪽이에요.

Kafka Producer API가 어렵게 느껴지는 이유

겉으로는 producer.send(record) 한 줄이지만, 그 뒤에 5가지 결정 이 숨어 있어요. 첫째, send() 의 반환값 — Future(미래에 결과를 받는 핸들), Callback(완료 시 호출되는 함수), 또는 둘 다 중 무엇을 쓸지 정해야 하고 응답 처리 패턴이 헷갈립니다. 둘째, Serializer(직렬화기) 선택 — String 만으로 충분한지, 아니면 Avro(스키마 기반 이진 포맷)·JSON·Protobuf(Google 의 이진 직렬화) 의 trade-off 를 따져야 하는지. 셋째, 에러 처리 — Retryable 인지 Non-retryable 인지, Custom error handler 를 둘지 결정해야 해요.

이 글에서 Producer API 깊이와 실무 패턴을 정리합니다.

KafkaProducer 생성

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("compression.type", "zstd");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

필수는 bootstrap.serverskey.serializer, value.serializer 세 개예요. 여러 broker(클러스터 노드) 를 명시하는 건 클라이언트가 클러스터 metadata 를 요청할 entry point 를 확보하기 위해서고, 한 broker 가 다운돼도 다른 broker 로 fallback 됩니다.

send() — 3가지 패턴

1. Fire-and-Forget (보내고 잊기, 가장 단순)

producer.send(new ProducerRecord<>("topic", "key", "value"));

응답을 안 기다리고 실패도 무시해요. 로그·메트릭 처럼 한두 건 빠져도 괜찮은 자리에만 씁니다.

2. Future (동기 대기)

RecordMetadata metadata = producer.send(record).get();   // 블로킹 대기
System.out.println(metadata.topic() + " p=" + metadata.partition()
                   + " offset=" + metadata.offset());

get()blocking 이라 응답 받을 때까지 대기합니다. 가장 안전하지만 느려요(batch 효율 X).

3. Callback (비동기, 권장)

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("send failed", exception);
    } else {
        log.info("sent to {} p={} offset={}",
                 metadata.topic(), metadata.partition(), metadata.offset());
    }
});

비동기로 보내면서 응답 처리까지 같이 합니다. 처리량과 안전성 둘 다 잡혀서 대부분 환경의 표준 패턴.

ProducerRecord — 5가지 옵션

new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
  • topic (필수)
  • partition (선택) — 명시 시 그 partition 으로 강제, 안 박으면 partitioner 가 선택
  • timestamp (선택) — 안 박으면 현재 시각
  • key (선택) — 같은 key = 같은 partition
  • value (필수)
  • headers (선택) — 메타데이터 (trace_id·content-type 등)

흔히 쓰는 단순형:

new ProducerRecord<>("topic", "key", "value");

Serializer — 4가지 선택

StringSerializer (기본 학습용)

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

문자열만 다루고 가장 단순합니다.

ByteArraySerializer (raw 바이트)

props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

직렬화를 직접 처리할 때 써요. JSON·Protobuf 를 직접 인코딩하는 경우입니다.

Avro (스키마 진화)

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.x.x</version>
</dependency>
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

스키마 진화(필드 추가·제거·rename) 가 자연스럽고, Confluent Schema Registry(스키마 중앙 저장소) 와 함께 씁니다. 대규모 환경의 사실상 표준이에요.

JSON

props.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");

(Spring Kafka 의존성이 필요합니다.) 또는 Confluent JsonSchemaSerializer 에 Schema Registry 를 붙여 씁니다.

선택 가이드

  • 학습·간단 → String
  • CDC(변경 데이터 캡처)·이미 직렬화 → ByteArray
  • 대규모·스키마 진화 필요Avro + Schema Registry
  • 개발자 친화·유연 → JSON

Custom Partitioner

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        if (key == null) return 0;
        // 비즈니스 로직 — VIP 사용자는 partition 0 으로
        if (((String) key).startsWith("vip:")) return 0;
        return Math.abs(key.hashCode() % cluster.partitionCountForTopic(topic));
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}
props.put("partitioner.class", "com.example.MyPartitioner");

특수 환경 — VIP 분리나 일부 partition 으로 hot data 격리 같은 자리에만 씁니다.

ProducerInterceptor — 가로채기 패턴

public class MyInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 헤더 추가·로깅 등
        record.headers().add("trace-id", traceId().getBytes());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 응답 후 처리 — 메트릭 수집 등
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}
props.put("interceptor.classes", "com.example.MyInterceptor");

자주 쓰는 자리는 분산 trace 헤더 자동 추가 (OpenTelemetry(분산 추적 표준) 등), 메트릭(전송 횟수·실패율) 자동 수집, 그리고 감사 로그입니다.

에러 처리

Retryable 에러 (자동 retry)

  • LEADER_NOT_AVAILABLE
  • NOT_LEADER_FOR_PARTITION
  • NETWORK_EXCEPTION
  • REQUEST_TIMED_OUT

자동으로 retry 가 돌기 때문에 일반적으로 사용자가 따로 처리할 일은 없어요.

Non-Retryable 에러 (즉시 callback 으로)

  • RecordTooLargeException — 메시지 크기 초과
  • InvalidTopicException — topic 이름 잘못
  • SerializationException — 직렬화 실패
  • AuthenticationException — 권한

이런 에러는 재시도해도 의미가 없어서 callback 에서 명시적으로 처리 해야 합니다.

Callback 패턴

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof RetriableException) {
            // 이미 자동 retry 했는데 실패 → 정말 문제
            log.error("retriable but exhausted", exception);
        } else {
            // 즉시 실패 — 메시지 형식·권한 등 문제
            deadLetterQueue.send(record);   // DLQ 패턴
        }
    }
});

Producer Lifecycle — close()

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    // ... send ...
}   // 자동 close (flush + 연결 종료)

또는 명시적으로:

producer.flush();      // 모든 batch 즉시 전송
producer.close();      // 종료

close() 를 안 부르면 마지막 batch 가 손실 됩니다. try-with-resources 로 묶거나 명시적으로 close 를 부르세요.

Spring Boot — KafkaTemplate

spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      properties:
        enable.idempotence: true
        compression.type: zstd
@Autowired
private KafkaTemplate<String, MyEvent> kafkaTemplate;

public void publish(String key, MyEvent event) {
    kafkaTemplate.send("events", key, event)
        .addCallback(
            result -> log.info("sent"),
            ex -> log.error("failed", ex)
        );
}

훨씬 깔끔합니다. callback 과 Future 둘 다 wrap 해줘요.

Thread Safety

여기서 시험 함정이 하나 있어요 — KafkaProducer 는 thread-safe 라서 여러 스레드가 같은 인스턴스를 공유 해도 되고, 오히려 권장 됩니다.

Connection pool 같은 패턴은 필요 없어요. 애플리케이션 전체에 KafkaProducer 인스턴스 하나 가 일반적입니다.

한계·실무 함정

1. send() Future 의 .get() 동기 사용

처리량이 폭망합니다. callback 패턴 을 쓰세요.

2. close() 누락

마지막 batch 가 손실됩니다. try-with-resources 로 묶어요.

3. 큰 메시지

max.request.size=1048576    # 기본 1MB

초과하면 RecordTooLargeException. 큰 메시지 환경 (이미지·동영상 메타데이터 등) 은 값을 조정하거나 외부 저장에 올리고 키만 메시지로 보냅니다.

4. Serializer 변경 시 호환성

기존 consumer 가 다른 deserializer 를 쓰고 있으면 읽기가 실패 합니다. Schema Registry 가 이런 마이그레이션을 도와줘요.

5. 한 broker 만 bootstrap

bootstrap.servers=kafka1:9092      # 위험
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092   # OK

한 broker 가 다운되면 client 가 cluster 를 못 찾습니다. 최소 3개 를 명시하세요.

시험 직전 한 번 더 — Kafka Producer API 함정 압축 노트

  • 필수 설정 = bootstrap.servers + key.serializer + value.serializer
  • bootstrap.servers = 여러 broker 명시 (fallback)
  • send() 3가지 = Fire-and-Forget · Future (블로킹) · Callback (권장)
  • ProducerRecord 옵션 = topic·partition·timestamp·key·value·headers
  • 같은 key = 같은 partition (순서 보장)
  • Serializer 4가지 = String·ByteArray·Avro (Schema Registry)·JSON
  • 대규모 = Avro + Confluent Schema Registry 표준
  • Custom Partitioner = partitioner.class, 비즈니스 로직 기반
  • ProducerInterceptor = interceptor.classes, trace·메트릭·감사
  • Retryable 에러 = 자동 retry (LEADER_NOT_AVAILABLE·NETWORK 등)
  • Non-Retryable = 즉시 callback (RecordTooLarge·InvalidTopic·SerializationException·Authentication)
  • DLQ 패턴 = non-retryable 메시지 별도 topic 저장
  • KafkaProducer thread-safe — 여러 스레드 공유 OK + 권장
  • close() 안 부르면 = 마지막 batch 손실, try-with-resources
  • 명시적 flush() + close()
  • 운영 표준 = acks=all + enable.idempotence=true + compression.type=zstd
  • Spring Boot = KafkaTemplate 추상화, callback wrap
  • 함정 — Future .get() 동기 사용 → 처리량 폭망
  • 함정 — close() 누락 → batch 손실
  • 함정 — max.request.size 초과 → RecordTooLargeException
  • 함정 — Serializer 변경 시 consumer 호환성
  • 함정 — bootstrap.servers 한 broker 만 명시 → 다운 시 client 못 찾음

공식 문서: KafkaProducer Javadoc 에서 자세한 API 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!