카프카 마스터 노트 시리즈 5편. Reactor Kafka가 전통 Java Kafka 클라이언트를 리액티브로 풀어내는 방식, KafkaReceiver와 KafkaSender의 역할, ReceiverRecord의 acknowledge·메타데이터 구조, 백프레셔가 자동 페이스 조절하는 원리, KafkaSender의 send 결과를 SenderResult로 받기, 리액티브 파이프라인의 비동기·논블로킹 효율까지.
이 글은 카프카 마스터 노트 시리즈의 다섯 번째 편입니다. 4편(Consumer Group)까지 전통 Kafka API였다면, 이번엔 그것을 리액티브로 풀어낸 라이브러리 — Reactor Kafka.
WebFlux 기반 마이크로서비스에선 traditional consumer.poll() 루프와 어울리지 않습니다. Reactor Kafka는 Kafka를 Flux/Mono로 풀어내 백프레셔·논블로킹·리액티브 합성을 자연스럽게.
처음 Reactor Kafka가 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, "리액티브"가 모호합니다. 그냥 비동기 아닌가? 왜 Kafka에 Reactor를 입혀야 하는가? 둘째, API 이름이 익숙한 KafkaConsumer·KafkaProducer와 다릅니다. KafkaReceiver·KafkaSender·ReceiverRecord… 한 번에 안 들어옵니다.
해결법은 한 가지예요. "수도꼭지 vs 양동이" 비유로 묶는 것. 전통 Consumer = 양동이로 떠 옴 (poll 루프), Reactor Receiver = 수도꼭지 (Flux 스트림이 자동 흐름). 백프레셔 = 수도꼭지 잠그기. 이 그림이 잡히면 모든 동작이 따라옵니다.
왜 Reactor Kafka인가
전통 Java Kafka 클라이언트:
while (running) {
var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record); // 블로킹 처리
}
consumer.commitSync();
}
문제 — WebFlux 환경엔:
- 블로킹 호출 (스레드 점유)
- 백프레셔 안 됨
- Flux/Mono 합성 어려움
- 외부 API 호출 시 비효율
Reactor Kafka:
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processAsync(record))
.doOnNext(record -> record.receiverOffset().acknowledge())
.subscribe();
논블로킹 + 백프레셔 + 자연스러운 합성.
여기서 정말 중요한 시험 함정 — WebFlux 마이크로서비스 + Kafka = Reactor Kafka가 표준. Spring Kafka(전통)도 가능하지만 리액티브 흐름 깨짐.
KafkaReceiver — 리액티브 컨슈머
기본 사용
ReceiverOptions<String, String> receiverOptions = ReceiverOptions
.<String, String>create(Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "order-processor",
"key.deserializer", StringDeserializer.class,
"value.deserializer", StringDeserializer.class,
"auto.offset.reset", "earliest"
))
.subscription(List.of("order-events"));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
receiver.receive()
.doOnNext(record -> {
log.info("Received: {}", record.value());
record.receiverOffset().acknowledge();
})
.subscribe();
ReceiverRecord — 응답 구조
ReceiverRecord
├── key()
├── value()
├── partition()
├── offset()
├── headers()
├── timestamp()
└── receiverOffset()
├── acknowledge() // 단일 ack
├── commit() // 즉시 commit
└── topicPartition()
receiverOffset()이 핵심 — 처리 완료 시 명시적으로 acknowledge·commit.
여기서 시험 함정이 하나 있어요. acknowledge() vs commit(). acknowledge는 "처리 끝, 내부 큐에 추가". commit은 "즉시 브로커에 commit 명령". 일반적으로 acknowledge → 자동 배치 commit.
처리 패턴 3종
1. 순차 처리 — concatMap
receiver.receive()
.concatMap(record ->
processAsync(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
)
.subscribe();
순서 보장. 한 메시지 처리 끝난 후 다음. 같은 파티션 안에서 안전.
2. 병렬 처리 — flatMap
receiver.receive()
.flatMap(record ->
processAsync(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge()),
16 // 동시 처리 수
)
.subscribe();
병렬·빠름. 다만 순서 깨짐. 멱등 처리 필수.
여기서 정말 중요한 시험 함정 — 순서 중요 = concatMap / 처리량 중요 = flatMap. flatMap에 동시 처리 수 명시 (기본 무제한 = 위험).
3. 그룹별 병렬 — groupBy + concatMap
receiver.receive()
.groupBy(record -> record.partition())
.flatMap(group ->
group.concatMap(record ->
processAsync(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
)
)
.subscribe();
파티션 단위 순서 보장 + 파티션 사이 병렬. 7편에서 더 깊이.
백프레셔 — 자동 페이스 조절
전통 컨슈머의 한계:
while (true) {
var records = consumer.poll(...); // 1000개 한꺼번에
for (var r : records) {
slowProcess(r); // 못 따라감 → 메모리 폭주
}
}
Reactor:
receiver.receive()
.flatMap(record -> slowProcess(record), 4) // 동시 4개만
// 자동으로 poll 페이스 조절됨
.subscribe();
Subscriber가 천천히 가져가면 Publisher(receiver)도 천천히 보냄. 메모리 안전.
여기서 시험 함정이 하나 있어요. flatMap의 concurrency 파라미터가 백프레셔의 핵심. 기본 무제한 (Integer.MAX_VALUE)이라 명시 필수. 4·8·16 정도가 일반적.
KafkaSender — 리액티브 프로듀서
기본 사용
SenderOptions<String, String> senderOptions = SenderOptions
.<String, String>create(Map.of(
"bootstrap.servers", "localhost:9092",
"key.serializer", StringSerializer.class,
"value.serializer", StringSerializer.class,
"acks", "all",
"enable.idempotence", true
));
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
sender.send(Mono.just(SenderRecord.create(
new ProducerRecord<>("order-events", "user-123", orderJson),
"correlation-id-1" // 임의 메타데이터
)))
.doOnNext(result -> {
if (result.exception() != null) {
log.error("Send failed", result.exception());
} else {
log.info("Sent to partition {}, offset {}",
result.recordMetadata().partition(),
result.recordMetadata().offset());
}
})
.subscribe();
SenderResult — 응답
SenderResult
├── recordMetadata() // 토픽·파티션·오프셋·타임스탬프
├── correlationMetadata() // 발행자가 넣은 메타
└── exception() // null이면 성공
성공·실패를 명시적으로 처리.
처리 → 발행 파이프라인
마이크로서비스의 핵심 패턴 — 들어온 이벤트를 처리해 새 이벤트 발행.
receiver.receive()
.concatMap(record ->
processAsync(record)
.map(result -> SenderRecord.create(
new ProducerRecord<>("processed-events", record.key(), result),
record.receiverOffset() // ack는 발행 성공 후
))
)
.as(sender::send)
.doOnNext(senderResult -> {
ReceiverOffset offset = (ReceiverOffset) senderResult.correlationMetadata();
offset.acknowledge();
})
.subscribe();
핵심 — 발행 성공 후 ack. 발행 실패 시 ack 안 됨 → 재처리.
여기서 정말 중요한 시험 함정 — read → process → write → ack 순서. ack를 처리 직후로 옮기면 발행 실패 시 메시지 손실. 발행 성공 후 ack가 at-least-once 보장.
리액티브 합성 예시
외부 API 호출 + 발행
receiver.receive()
.flatMap(record ->
webClient.post().uri("/api/process")
.bodyValue(record.value())
.retrieve()
.bodyToMono(Result.class)
.map(result -> Tuples.of(record, result))
, 8)
.flatMap(tuple ->
sender.send(Mono.just(SenderRecord.create(
new ProducerRecord<>("results", tuple.getT2()),
tuple.getT1().receiverOffset()
)))
)
.doOnNext(sr -> ((ReceiverOffset) sr.correlationMetadata()).acknowledge())
.subscribe();
WebFlux + Kafka + 외부 API 자연스럽게.
Subscriber 종류
// 기본 — 영원히 구독
.subscribe();
// 명시적 — Disposable 반환 (취소 가능)
Disposable d = flux.subscribe(...);
d.dispose();
// 종료 시그널 받기
.subscribe(
next -> ...,
error -> ...,
() -> log.info("Completed")
);
운영 — Disposable을 보관하고 graceful shutdown 시 dispose().
ReceiverOptions 추가 설정
receiverOptions
.commitInterval(Duration.ofSeconds(5)) // 자동 commit 간격
.commitBatchSize(100) // 100 ack마다 commit
.pollTimeout(Duration.ofMillis(500)) // poll 타임아웃
.maxCommitAttempts(3) // commit 재시도
.closeTimeout(Duration.ofSeconds(10)); // 종료 대기
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 5편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Reactor Kafka = Kafka를 Flux/Mono로 풀어낸 라이브러리
- WebFlux 마이크로서비스 + Kafka = 표준
- 논블로킹 + 백프레셔 + 자연스러운 합성
- KafkaReceiver = 리액티브 컨슈머
- KafkaSender = 리액티브 프로듀서
- ReceiverRecord — value·partition·offset·headers +
receiverOffset() receiverOffset().acknowledge()= 처리 끝 표시commit()= 즉시 commit 명령- 일반 — acknowledge → 자동 배치 commit
- 처리 3 패턴 — concatMap (순서) / flatMap (병렬) / groupBy+concatMap (파티션별 병렬)
- flatMap concurrency 명시 필수 (기본 무제한 = 위험)
- 4·8·16 일반적
- 백프레셔 = subscriber 페이스로 자동 조절 (메모리 안전)
- SenderResult — recordMetadata·correlationMetadata·exception
- exception null이면 성공
- 발행 성공 후 ack — at-least-once 보장
- read → process → write → ack 순서 중요
- 발행 실패 시 ack 안 됨 → 재처리
- 리액티브 합성 — Kafka + WebClient + Kafka 자연스럽게
Disposable로 graceful shutdown- ReceiverOptions — commitInterval / commitBatchSize / pollTimeout / closeTimeout
시리즈 다른 편
- 1편 — EDA·Kafka 기초·KRaft
- 2편 — Topic·Partition·Offset
- 3편 — Producer·Consumer 동작
- 4편 — Consumer Group·리밸런싱
- 5편 — Reactor Kafka (현재 글)
- 6편 — Cluster·HA·Best Practices
- 7편 — 배치·에러·트랜잭션
- 8편 — Spring Kafka·테스트·보안
- 9편 — Spring Cloud Stream 기초
- 10편 — StreamBridge 동적 라우팅
- 11편 — Fan-Out / Fan-In
- 12편 — SCS Tips & Tricks
- 13편 — Saga 코레오그래피
- 14편 — Saga 오케스트레이터
- 15편 — Transactional Outbox
공식 문서: Reactor Kafka Reference 에서 더 깊이.
다음 글(6편)에서는 Cluster·HA·Best Practices — 리더/팔로워·ISR·복제·장애 복구·운영 모범 사례까지 풀어 갑니다.