백엔드 데이터 인프라 124편. Kafka Streams 작성·운영 — KafkaStreams 라이프사이클, Spring Boot @EnableKafkaStreams, Uncaught Exception Handler, Deserialization Exception Handler, Production Exception Handler, rolling restart, graceful shutdown, healthcheck 같은 운영 패턴까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 124편이에요. 123편 까지 개념 이었다면, 이번 124편은 실제 코드 작성·운영 — Spring Boot 통합·error handler·운영 패턴.
KafkaStreams Lifecycle
상태 (State) 6가지
CREATED → REBALANCING → RUNNING → REBALANCING → RUNNING → ...
↓
ERROR (uncaught exception)
↓
NOT_RUNNING (close)
| State | 의미 |
|---|---|
| CREATED | 생성됨, 아직 start() X |
| REBALANCING | partition 할당 중 |
| RUNNING | 정상 처리 중 |
| PENDING_SHUTDOWN | close() 호출 후 정리 중 |
| NOT_RUNNING | 종료 완료 |
| ERROR | uncaught exception |
| PENDING_ERROR | error 처리 중 |
State Listener
streams.setStateListener((newState, oldState) -> {
log.info("State transition: {} -> {}", oldState, newState);
if (newState == KafkaStreams.State.ERROR) {
// Alert·재시작
}
});
Vanilla Java — 기본 패턴
public class MyStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
// ... other config
StreamsBuilder builder = new StreamsBuilder();
// ... topology 구성
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
// Error Handler
streams.setUncaughtExceptionHandler(exception -> {
log.error("Uncaught", exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
streams.setStateListener((newState, oldState) -> {
log.info("{} -> {}", oldState, newState);
});
// Graceful shutdown
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close(Duration.ofSeconds(30));
latch.countDown();
}));
streams.start();
try { latch.await(); } catch (InterruptedException e) { }
}
}
Spring Boot — @EnableKafkaStreams
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092
streams:
application-id: my-streams-app
properties:
num.stream.threads: 4
processing.guarantee: exactly_once_v2
commit.interval.ms: 1000
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
@Configuration
@EnableKafkaStreams
public class StreamsConfig {
@Bean
public KStream<String, MyEvent> processStream(StreamsBuilder builder) {
KStream<String, MyEvent> input = builder.stream("input-topic");
input.filter(...).mapValues(...).to("output-topic");
return input;
}
}
Spring 이 KafkaStreams 인스턴스 생성과 관리, shutdown 까지 알아서 해주니까 코드가 훨씬 깔끔해져요.
StreamsBuilderFactoryBean
더 세밀하게 손대고 싶을 때 쓰는 방식이에요.
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return factoryBean -> {
factoryBean.setStateListener((newState, oldState) -> log.info(...));
factoryBean.setUncaughtExceptionHandler(handler);
};
}
Error Handler 3가지
1. Uncaught Exception Handler
Stream thread 에서 미처 잡지 못한 exception 을 처리하는 핸들러예요.
streams.setUncaughtExceptionHandler(exception -> {
log.error("Uncaught", exception);
// 응답:
return StreamThreadExceptionResponse.REPLACE_THREAD; // thread 만 재시작
// 또는
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT; // 이 instance 만 종료
// 또는
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; // 모든 instance 종료
});
REPLACE_THREAD = 기본 권장 (다른 thread 영향 X).
2. Deserialization Exception Handler
Input message format 이 깨졌을 때 걸리는 핸들러예요.
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
LogAndFailExceptionHandler(기본) = stream 종료LogAndContinueExceptionHandler= log 만 + 메시지 skip (운영 권장)- Custom = DLQ 로 보내기 등
3. Production Exception Handler
Output 을 write 하는 단계에서 실패할 때 — record 가 너무 크거나 serialization 이 깨지는 경우예요.
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
MyProductionExceptionHandler.class);
public class MyProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(...) {
log.error("Production failed", exception);
return ProductionExceptionHandlerResponse.CONTINUE; // 또는 FAIL
}
}
DLQ Pattern
DLQ (Dead Letter Queue, 처리 실패 메시지를 따로 모으는 토픽) 로 빠진 메시지를 보내는 방식이에요.
streams.setUncaughtExceptionHandler(exception -> {
log.error("Sending to DLQ", exception);
// DLQ 로 메시지 발송 (별도 producer)
dlqProducer.send(new ProducerRecord<>("dlq-topic", failedRecord));
return REPLACE_THREAD;
});
또는 stream 안에서 branch 로 valid / invalid 를 갈라내는 방법도 있어요.
KStream<String, String> input = builder.stream("input");
KStream<String, String>[] branches = input.branch(
(key, value) -> isValid(value),
(key, value) -> true // 나머지 = invalid
);
branches[0].to("output");
branches[1].to("dlq");
Health Check — Spring Boot Actuator
management:
endpoints:
web:
exposure:
include: health,info,kafka-streams
health:
kafka-streams:
enabled: true
/actuator/health/kafka-streams 가 KafkaStreams state 반환:
{
"status": "UP",
"details": {
"applicationId": "my-streams-app",
"state": "RUNNING",
"threads": [...]
}
}
Kubernetes liveness·readiness probe (컨테이너 생존·트래픽 수용 가능 여부 점검) 와 그대로 엮어 쓸 수 있어요.
Graceful Shutdown
streams.close(Duration.ofSeconds(30));
30초 안에 모든 task 를 안전하게 종료하고 state 를 flush 한 뒤 offset 까지 commit 합니다. Spring Boot 에서는 @PreDestroy 가 알아서 처리해줘요.
Rolling Restart
instance 여러 개를 한 번에 한 대씩 재시작하는 방식이에요. 다른 instance 가 그 instance 의 task 를 이어받아서 무중단이 됩니다.
전제 조건:
num.standby.replicas ≥ 1= state 즉시 복구acceptable.recovery.lag적절
spring:
kafka:
streams:
properties:
num.standby.replicas: 1
acceptable.recovery.lag: 10000
max.warmup.replicas: 4
운영 권장 설정
spring:
kafka:
bootstrap-servers: kafka1:9093,kafka2:9093,kafka3:9093
streams:
application-id: my-streams-app-prod
properties:
# Performance
num.stream.threads: 4
cache.max.bytes.buffering: 10485760 # 10MB
commit.interval.ms: 1000
# EOS
processing.guarantee: exactly_once_v2
# Reliability
replication.factor: 3
min.insync.replicas: 2
num.standby.replicas: 1
acceptable.recovery.lag: 10000
# Error Handling
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.production.exception.handler: ...MyProductionExceptionHandler
# Security
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: ...
ssl.truststore.location: ...
# State Store
state.dir: /var/kafka-streams-state
# Metrics
metrics.recording.level: INFO
Logging 권장
log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=INFO
log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractCoordinator=INFO
운영 환경에서 DEBUG 까지 켜면 로그 양이 부담스러워져요. INFO 면 충분합니다.
모니터링 (다시)
JMX (Java Management Extensions, 자바 런타임 지표 노출 표준) 는 123편에서 다뤘던 내용이에요.
stream-thread-metrics:process-rate·process-latency-avgstream-state-metrics:put-rate·put-latency-avg- Consumer lag (consumer group
<app-id>으로 확인)
Grafana 대시보드 + Prometheus.
한계·실무 함정
1. application.id collision
같은 cluster 안의 다른 application 과 id 가 부딪히면 internal topic 까지 충돌나요. prefix + 환경 분리.
2. State store 위치
state.dir 기본 = /tmp (재시작 시 삭제). 반드시 영구 경로.
3. num.standby.replicas 0
운영 환경에서는 state 재로드 부담이 커요. 1+ 권장.
4. Uncaught Exception Handler 미설정
기본값으로 두면 stream 이 그대로 죽고 자동 복구도 안 됩니다. REPLACE_THREAD 권장.
5. EOS V1 사용
EOS (Exactly Once Semantics, 메시지 정확히 한 번 처리 보장) 의 V1 은 deprecated 됐어요. V2 로 마이그레이션.
시험 직전 한 번 더 — Kafka Streams 작성·운영 함정 압축 노트
- State 6가지 = CREATED·REBALANCING·RUNNING·PENDING_SHUTDOWN·NOT_RUNNING·ERROR·PENDING_ERROR
- State Listener = state 전환 모니터링
- Spring Boot =
@EnableKafkaStreams+@Bean KStream - 자동 KafkaStreams 인스턴스·shutdown
StreamsBuilderFactoryBean= 세밀 제어- Error Handler 3가지 — Uncaught Exception · Deserialization Exception · Production Exception
- Uncaught =
REPLACE_THREAD(기본 권장) · SHUTDOWN_CLIENT · SHUTDOWN_APPLICATION - Deserialization =
LogAndContinue(운영) · LogAndFail (기본) - Production = Custom 으로 DLQ
- DLQ Pattern =
branch로 valid/invalid 분기 - Health Check = Spring Boot Actuator
/actuator/health/kafka-streams - Kubernetes liveness·readiness probe 연동
- Graceful Shutdown =
streams.close(Duration.ofSeconds(30)) - Rolling Restart =
num.standby.replicas ≥ 1+acceptable.recovery.lag - 운영 권장 — EOS V2 + RF 3 + min.insync 2 + standby 1 + Error Handler 명시 + Security + state.dir 영구 + log INFO
- 함정 —
application.idcollision - 함정 —
state.dir=/tmp데이터 손실 - 함정 —
num.standby.replicas=0(state 재로드 부담) - 함정 — Uncaught Exception Handler 미설정 (stream 죽음)
- 함정 — EOS V1 (deprecated)
공식 문서: Kafka Streams Developer Guide 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 119편 — Kafka Connect Custom Connector 개발
- 120편 — Kafka Connect Config 종합
- 121편 — Kafka Streams 입문 (라이브러리 모델·왜)
- 122편 — Kafka Streams Quickstart (WordCount 5분)
- 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)
다음 글: