카프카 마스터 노트 시리즈 3편. Producer가 메시지를 보낼 때 거치는 4단계(직렬화·파티셔너·배치·전송), acks 옵션 0/1/all의 내구성-성능 트레이드오프, 멱등성 프로듀서가 중복을 막는 방법, Consumer의 Pull 모델, auto.offset.reset 옵션(latest/earliest), 직렬화·역직렬화 표준, batch.size·linger.ms 튜닝까지.
이 글은 카프카 마스터 노트 시리즈의 세 번째 편입니다. 2편(Topic/Partition)에서 Kafka 데이터 구조를 다졌다면, 이번엔 그 데이터를 쓰고 읽는 주체 — Producer·Consumer.
acks=all 한 줄이 시스템 신뢰성을 결정. auto.offset.reset 한 줄이 새 컨슈머의 데이터 손실 여부를 결정. 옵션 한두 개의 미세한 차이가 운영 사고를 가르는 자리예요.
처음 Producer·Consumer 옵션이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, 옵션이 너무 많습니다. acks·linger.ms·batch.size·max.in.flight·enable.idempotence·auto.offset.reset… 어느 게 결정적이고 어느 게 미세 조정인지 안 보입니다. 둘째, "중복 vs 손실" 트레이드오프가 직관적이지 않습니다.
해결법은 한 가지예요. "acks와 auto.offset.reset 두 가지만 먼저 잡는 것". 나머지는 나중에 채워도 됩니다. acks = "쓰기 신뢰성", auto.offset.reset = "어디서부터 읽을까". 이 둘이 결정적입니다.
Producer — 메시지 발행자
4단계 동작
1. ProducerRecord 생성
2. Serializer (Key/Value를 바이트 배열로)
3. Partitioner (어느 파티션 갈지 결정)
4. Record Accumulator (배치 모음)
5. Sender Thread (브로커로 전송)
비동기 처리. 호출 즉시 반환, 백그라운드 스레드가 실제 전송.
코드 예시
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("order-events", "user-123", orderJson);
// 비동기 전송 + 콜백
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Send failed", exception);
} else {
log.info("Sent to partition {}, offset {}",
metadata.partition(), metadata.offset());
}
});
producer.flush();
producer.close();
acks — 가장 중요한 옵션
브로커가 몇 개 응답하면 OK라고 볼 것인가.
| acks | 의미 | 내구성 | 속도 |
|---|---|---|---|
| 0 | 응답 안 기다림 | 낮음 (손실 가능) | 매우 빠름 |
| 1 | Leader만 응답 | 중간 | 빠름 |
| all (-1) | 모든 ISR 응답 | 높음 | 느림 |
여기서 정말 중요한 시험 함정 — 운영 환경 표준 = acks=all. 성능 위해 acks=1은 위험. Leader가 받은 직후 다운되면 데이터 손실.
acks=all + min.insync.replicas
acks=all
min.insync.replicas=2
replication.factor=3
ISR이 2 이하로 줄면 쓰기 거부 (NotEnoughReplicas 예외). 데이터 무결성 보장.
멱등성 프로듀서 (Idempotent Producer)
문제 — 네트워크 오류로 producer가 재시도하면 중복 전송.
Producer → 메시지 보냄 → 브로커 받음 (디스크 쓰기 OK)
← 응답 (실패: 네트워크 끊김)
Producer → 재시도
← 같은 메시지 또 저장됨 (중복!)
해결 — enable.idempotence=true:
각 메시지에 PID(Producer ID) + Sequence Number 부여
브로커가 중복 감지 → 무시
여기서 정말 중요한 시험 함정 — enable.idempotence=true만으로 정확히 한 번 보장 X. 단일 파티션·단일 세션 안에서만. 진짜 정확히 한 번은 트랜잭션 + 멱등성 결합 (7편).
기본값 — Kafka 3.0+에선 enable.idempotence=true가 기본.
배치·압축 — 처리량 향상
batch.size · linger.ms
batch.size=16384 # 16KB까지 모으기
linger.ms=10 # 또는 10ms 대기
둘 중 먼저 충족되면 전송. 배치로 보내면 처리량 ↑.
compression.type
compression.type=lz4 # gzip, snappy, lz4, zstd 중
| 종류 | 속도 | 압축률 |
|---|---|---|
| gzip | 느림 | 높음 |
| snappy | 빠름 | 중간 |
| lz4 | 매우 빠름 | 중간 |
| zstd | 빠름 | 매우 높음 (Kafka 2.1+) |
여기서 시험 함정이 하나 있어요. 압축은 Producer 단에서. 브로커는 그대로 저장. Consumer가 받을 때 자동 압축 해제. 네트워크·디스크 비용 모두 절감.
max.in.flight.requests.per.connection
max.in.flight.requests.per.connection=5 (기본)
응답 안 받은 요청을 동시에 몇 개 보낼 수 있는지. 멱등성 프로듀서에선 5 이하 권장 (순서 보장).
ProducerRecord 구조
new ProducerRecord<>(
topic, // 필수
partition, // 선택 (직접 지정 가능)
timestamp, // 선택
key, // 선택 (파티션 결정)
value, // 필수
headers // 선택
);
Consumer — 메시지 소비자
Pull 모델
Consumer ─poll(timeout)→ Broker
← N개 메시지
처리...
Consumer ─poll(timeout)→ Broker
컨슈머가 능동적으로 가져감. 페이스 조절 가능.
코드 예시
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("order-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitSync();
}
auto.offset.reset — 새 그룹 시작점
새 컨슈머 그룹이 처음 토픽 구독할 때 어디서부터 읽을 것인가.
| 값 | 동작 |
|---|---|
| latest (기본) | 새로 들어오는 것만 (기존 메시지 무시) |
| earliest | 토픽 처음부터 |
| none | offset 없으면 예외 |
여기서 정말 중요한 시험 함정 — 개발·테스트 환경 = earliest, 운영 환경 = 신중히. 운영에서 earliest는 새 컨슈머가 토픽 전체를 다시 읽음 (수일치 데이터 폭주). 보통 latest 또는 명시적 offset 시작.
이 옵션은 새 그룹에만 적용. 기존 그룹은 마지막 commit한 offset부터.
Offset Commit — 어디까지 읽었나 기록
자동 커밋 (enable.auto.commit=true)
auto.commit.interval.ms=5000
5초마다 자동으로 마지막 처리 offset commit.
문제 — 처리 실패해도 commit. 메시지 손실 가능.
수동 커밋 (enable.auto.commit=false) — 권장
// 동기
consumer.commitSync();
// 비동기 (콜백)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
처리 완료 후 commit. 안전.
여기서 시험 함정이 하나 있어요. commitSync vs commitAsync. Sync는 정확하지만 느림. Async는 빠르지만 실패 시 재시도 X. 운영 — Async + 종료 시점에만 Sync.
직렬화·역직렬화 (Serialization)
| 종류 | 형식 | 장단점 |
|---|---|---|
| String | UTF-8 텍스트 | 단순 |
| JSON | JSON 문자열 | 가장 일반적, 가독성 ↑ |
| Avro | 바이너리 + 스키마 | 컴팩트, 스키마 진화 |
| Protobuf | 바이너리 + 스키마 | 매우 컴팩트, 빠름 |
Avro·Protobuf 권장 이유
- 작은 크기 (네트워크·디스크 절감)
- 스키마 진화 (필드 추가·삭제 안전)
- 타입 안전
Schema Registry와 결합 — 스키마 중앙 관리. Confluent 표준.
여기서 정말 중요한 시험 함정 — JSON은 시작용으로 OK, 운영 권장은 Avro/Protobuf. 메시지 크기·스키마 진화 모두 우월.
Consumer Poll Loop — 핵심 루프
while (running) {
// 1. Pull
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
// 2. 처리
for (var record : records) {
try {
process(record);
} catch (Exception e) {
// 에러 처리 (10편)
}
}
// 3. Commit
consumer.commitSync();
}
여기서 시험 함정이 하나 있어요. poll() 시간이 너무 길면 컨슈머 죽었다고 판단. max.poll.interval.ms (기본 5분) 안에 다음 poll 호출 필요. 안 그러면 그룹에서 추방·리밸런싱.
fetch.min.bytes · fetch.max.wait.ms
fetch.min.bytes=1 # 최소 N 바이트 모이면 전송
fetch.max.wait.ms=500 # 또는 N ms 대기
작은 메시지 자주 오는 환경에선 fetch.min.bytes 늘리면 처리량 ↑.
max.poll.records
max.poll.records=500 # poll 한 번에 최대 N 레코드
너무 많으면 처리 시간이 max.poll.interval.ms 초과 위험. 균형 필요.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 3편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Producer 4단계 — Record → Serializer → Partitioner → Accumulator → Sender
- 비동기 전송 + 콜백
- acks — 0 (안 기다림) / 1 (Leader) / all (모든 ISR)
- 운영 표준 = acks=all + min.insync.replicas=2 + replication.factor=3
- ISR 부족 시 쓰기 거부 (NotEnoughReplicas)
enable.idempotence=true= 중복 방지 (Kafka 3.0+ 기본)- 정확히 한 번 = 멱등성 + 트랜잭션 (7편)
- batch.size + linger.ms = 둘 중 먼저 충족되면 전송
- compression — gzip / snappy / lz4 / zstd
- 압축은 Producer 단에서, 브로커 그대로 저장
- max.in.flight.requests.per.connection ≤ 5 (멱등성 + 순서 보장)
- ProducerRecord — topic·partition·timestamp·key·value·headers
- Consumer는 Pull 모델
- auto.offset.reset — latest (기본) / earliest (개발·테스트) / none
- 운영에서 earliest는 위험 (전체 재처리)
- 새 그룹에만 적용 / 기존 그룹은 마지막 commit부터
- Offset Commit — 자동(위험) / 수동(권장)
commitSync정확·느림 /commitAsync빠름·재시도 X- 운영 = Async + 종료 시 Sync
- 직렬화 — String / JSON / Avro / Protobuf
- 운영 권장 = Avro/Protobuf + Schema Registry
max.poll.interval.ms(5분) 안에 다음 poll- 초과 시 그룹 추방·리밸런싱
max.poll.records너무 크면 시간 초과 위험
시리즈 다른 편
- 1편 — EDA·Kafka 기초·KRaft
- 2편 — Topic·Partition·Offset
- 3편 — Producer·Consumer 동작 (현재 글)
- 4편 — Consumer Group·리밸런싱
- 5편 — Reactor Kafka
- 6편 — Cluster·HA·Best Practices
- 7편 — 배치·에러·트랜잭션
- 8편 — Spring Kafka·테스트·보안
- 9편 — Spring Cloud Stream 기초
- 10편 — StreamBridge 동적 라우팅
- 11편 — Fan-Out / Fan-In
- 12편 — SCS Tips & Tricks
- 13편 — Saga 코레오그래피
- 14편 — Saga 오케스트레이터
- 15편 — Transactional Outbox
공식 문서: Kafka Producer Configs / Consumer Configs 에서 더 깊이.
다음 글(4편)에서는 Consumer Group — 그룹 코디네이터·리밸런싱·파티션 할당 전략·정확한 오프셋 커밋까지 풀어 갑니다.