백엔드 데이터 인프라 92편. Kafka Consumer API 깊이 — subscribe·poll 패턴, Auto vs Manual Commit, ConsumerRebalanceListener 로 rebalance 대응, seek 으로 offset 제어, pause/resume backpressure, Spring @KafkaListener 까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 92편이에요. 91편 에서 Producer API 를 풀었다면, 이번 92편은 반대편 — Consumer API 깊이예요. 87편 Design: Consumer 가 설계였다면, 이번은 실제 코드 패턴이에요.
Consumer API가 어렵게 느껴지는 이유
Consumer 는 Producer 보다 훨씬 까다로워요. 상태 관리, rebalance(파티션 재분배), commit 타이밍 같은 영역이 한꺼번에 걸려요.
첫째, poll loop(메시지 가져오기 루프) 안 처리 시간이에요. 너무 길면 session timeout(세션 만료) 으로 rebalance 가 트리거돼요.
둘째, Commit 타이밍 결정이에요. 자동과 수동, 동기와 비동기, 언제 commit 하느냐 — 88편 delivery semantics(전달 보장) 와 직결돼요.
셋째, Rebalance 대응이에요. partition 을 빼앗기기 전에 cleanup 코드를 미리 짜둬야 해요.
이 글에서 Consumer 의 핵심 패턴과 실무 함정을 정리할게요.
KafkaConsumer 생성
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "order-workers");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "500");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
필수:
bootstrap.serversgroup.id— Consumer Group(같은 group.id 공유 컨슈머 묶음) 식별key.deserializer+value.deserializer
자주 박는 옵션:
auto.offset.reset— group 의 이전 offset 없을 때 시작 위치 (earliest·latest·none)enable.auto.commit— 자동 commit 여부 (true/false)max.poll.records— 한 poll 당 최대 메시지 수
Subscribe — Topic 등록
consumer.subscribe(Arrays.asList("orders", "payments"));
// 또는 pattern
consumer.subscribe(Pattern.compile("order-.*"));
// 또는 manual partition 할당 (consumer group 없이)
consumer.assign(Arrays.asList(
new TopicPartition("orders", 0),
new TopicPartition("orders", 1)
));
subscribe 는 consumer group 멤버로 들어가서 partition 을 자동 할당받아요. assign 은 manual 할당이고 group 이 없어요(특수 케이스만 써요). 대부분 환경에서는 subscribe 예요. TopicPartition(토픽+파티션 식별자) 은 이렇게 manual 할당이나 commit·seek 에서 자주 만나요.
Poll Loop — 기본 패턴
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitSync();
}
} finally {
consumer.close();
}
핵심은 세 가지예요. poll(timeout) 은 max 대기 시간이고 broker 측 long poll 과 consumer 측 wait 가 합쳐져 있어요. for-each 로 batch 안 메시지를 하나씩 처리해요. 마지막에 commitSync() 또는 commitAsync() 로 offset 을 commit 해요.
Commit 패턴 — 4가지
(1) Auto Commit (가장 단순, 위험)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // 5초마다
5초마다 자동 commit 이에요. 위험한 점은 메시지 처리 중간에 commit 이 일어나서 처리 안 한 메시지가 손실될 수 있다는 거예요(at-most-once(최대 한 번 전달) 변형).
(2) Sync Commit (안전)
consumer.commitSync();
blocking 이고 실패 시 retry 가 돌아요. 가장 안전하고 처리량이 약간 감소해요.
(3) Async Commit (빠름)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("commit failed", exception);
}
});
non-blocking 이라 처리량이 올라가요. 실패 시 자동 retry 가 안 돌기 때문에 마지막 commit 만 신경 쓰면 돼요.
(4) Hybrid (권장)
try {
while (true) {
records = consumer.poll(...);
process(records);
consumer.commitAsync(); // 일반 = async (빠름)
}
} finally {
try {
consumer.commitSync(); // 마지막 = sync (안전)
} finally {
consumer.close();
}
}
일반 commit 은 async 로 성능을 챙기고, shutdown 시점에는 sync 로 안전하게 마무리해요. 운영 표준이에요.
Per-partition Commit (정밀 제어)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
process(record);
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
consumer.commitSync(offsets);
정확히 어디까지 처리했는지 명시해요. 처리 도중 실패해도 부분 commit 이 돼요.
Rebalance Listener — Partition 빼앗기 전 정리
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// partition 빼앗기 전 — commit + cleanup
consumer.commitSync(currentOffsets);
log.info("Revoked partitions: {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 새 partition 받은 후 — 초기화
log.info("Assigned partitions: {}", partitions);
// 필요 시 seek
}
});
가장 중요한 패턴이에요 — partition 을 빼앗기기 전에 현재까지 처리한 offset 을 commit 해줘야 해요. 안 하면 중복 처리가 폭증해요.
Seek — Offset 임의 제어
처음부터 다시
consumer.seekToBeginning(consumer.assignment());
끝부터
consumer.seekToEnd(consumer.assignment());
특정 offset
consumer.seek(new TopicPartition("orders", 0), 12345);
시간 기반
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("orders", 0), System.currentTimeMillis() - 3600000L); // 1시간 전
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
"1시간 전부터 재처리" 같은 패턴이에요. 강력해요.
Pause / Resume — Backpressure
처리 시스템이 느려져서 backpressure(처리 속도 조절) 가 필요한 경우에 써요.
// pause
consumer.pause(consumer.assignment());
// 처리 시스템 회복 대기
Thread.sleep(5000);
// resume
consumer.resume(consumer.assignment());
특정 partition 만 멈추고 싶으면:
consumer.pause(Collections.singletonList(new TopicPartition("orders", 0)));
여기서 시험 함정이 하나 있어요 — poll() 은 paused partition 도 그대로 호출해야 해요. 단지 그 partition 메시지를 반환하지 않을 뿐이에요. poll 을 호출하지 않으면 rebalance 가 트리거돼요.
Heartbeat·Session Timeout
session.timeout.ms=45000 # 45초 (기본)
heartbeat.interval.ms=3000 # 3초
max.poll.interval.ms=300000 # 5분
session.timeout.ms는 broker 가 consumer 가 죽었다고 판단하는 시간이에요heartbeat.interval.ms는 consumer 가 heartbeat 를 보내는 주기예요 (session timeout 의 1/3 권장)max.poll.interval.ms는 두 poll 사이 최대 간격이고, 초과하면 consumer 죽음 처리예요
처리 시간이 긴 환경이라면 max.poll.interval.ms 를 늘려요 (예: 30분).
Spring Boot — @KafkaListener
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092
consumer:
group-id: order-workers
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
listener:
ack-mode: MANUAL
@KafkaListener(topics = "orders", groupId = "order-workers")
public void handle(MyEvent event, Acknowledgment ack) {
try {
process(event);
ack.acknowledge(); // manual commit
} catch (Exception e) {
// dead letter queue 또는 retry
}
}
훨씬 깔끔해요. poll loop, rebalance listener, commit 까지 모두 wrap 해줘요. Acknowledgment(manual ack 객체) 만 받아서 처리 끝에 호출하면 끝이고, dead letter queue(처리 실패 메시지 보관 큐) 로 흘려보내는 것도 catch 블록에서 처리할 수 있어요.
Thread Safety
여기서 정말 중요한 자리 — KafkaConsumer 는 thread-safe 가 아니에요. 한 인스턴스에 한 스레드예요.
wakeup() 만 다른 스레드에서 호출할 수 있어요(consumer 를 안전하게 종료할 때 써요).
// 다른 스레드에서
consumer.wakeup(); // WakeupException 으로 poll 중단
WakeupException(poll 중단용 예외) 이 던져지면서 poll 이 빠져나와요. 병렬 처리는 consumer 인스턴스를 여러 개 만들어서 각자 별도 스레드에 태우는 방식이에요.
한계·실무 함정
1. 처리 시간 길어서 rebalance 트리거
max.poll.interval.ms 초과 = consumer 죽음 처리 + rebalance. 처리가 길면 늘리거나 작업을 분할해요.
2. Auto commit + 처리 도중 실패
처리 안 된 메시지가 영구 손실돼요. 반드시 manual commit 으로 가요.
3. Rebalance Listener 누락
partition 빼앗기 직전 commit 이 안 되어 중복 처리가 폭증해요.
4. poll 안 부르면 죽은 것처럼
paused 상태에서도 poll 을 호출해야 해요. 처리가 길어도 주기적으로 poll 을 돌려요.
5. Thread safe X
여러 스레드가 공유하면 동작이 예측 불가예요. 별도 인스턴스 또는 Spring Kafka container 가 처리하도록 맡겨요.
시험 직전 한 번 더 — Kafka Consumer API 함정 압축 노트
- 필수 =
bootstrap.servers·group.id·key/value.deserializer - 자주 박는 =
auto.offset.reset (earliest/latest)·enable.auto.commit (false)·max.poll.records - subscribe (group, 자동 할당) vs assign (manual, group 없음)
- 대부분 = subscribe
- Poll Loop =
poll(timeout)→ for-each → commit - Commit 4가지 = Auto · Sync · Async · Hybrid (권장)
- Hybrid = 일반 async (성능) + shutdown sync (안전)
- Per-partition Commit = 정밀 제어
- ConsumerRebalanceListener =
onPartitionsRevoked(commit + cleanup) ·onPartitionsAssigned(초기화·seek) - Seek =
seekToBeginning·seekToEnd·seek(tp, offset)·시간 기반 - "1시간 전부터 재처리" =
offsetsForTimes+ seek - Pause/Resume = backpressure, paused partition 도 poll 호출 필수
- Heartbeat·Timeout —
session.timeout.ms (45초)·heartbeat.interval.ms (3초)·max.poll.interval.ms (5분) - 처리 시간 긴 환경 =
max.poll.interval.ms늘림 - Spring Boot =
@KafkaListener+Acknowledgment(manual ack) KafkaConsumerthread-safe X — 한 인스턴스 = 한 스레드wakeup()만 다른 스레드 안전- 병렬 = 여러 consumer 인스턴스
- 함정 — 처리 시간 길어 rebalance 트리거
- 함정 — Auto commit + 처리 도중 실패 = 메시지 손실
- 함정 — Rebalance Listener 누락 → 중복 처리
- 함정 — pause 상태에서도 poll 호출 필수
- 함정 — thread safe X, 별도 인스턴스 필요
공식 문서: KafkaConsumer Javadoc 에서 자세한 API 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 87편 — Kafka Design: Consumer (Pull · Consumer Group · Offset)
- 88편 — Kafka Message Delivery Semantics (at-most·at-least·exactly-once)
- 89편 — Kafka Replication (ISR · Leader Election · Unclean)
- 90편 — Kafka API 5종 종합 (Producer · Consumer · Streams · Connect · Admin)
- 91편 — Kafka Producer API 깊이 (Serializer · Callback · Interceptor)
다음 글: