백엔드 데이터 인프라 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 124편. Kafka Streams 작성·운영 — KafkaStreams 라이프사이클, Spring Boot @EnableKafkaStreams, Uncaught Exception Handler, Deserialization Exception Handler, Production Exception Handler, rolling restart, graceful shutdown, healthcheck 같은 운영 패턴까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 124편이에요. 123편 까지 개념 이었다면, 이번 124편은 실제 코드 작성·운영 — Spring Boot 통합·error handler·운영 패턴.

KafkaStreams Lifecycle

상태 (State) 6가지

CREATED → REBALANCING → RUNNING → REBALANCING → RUNNING → ...
                           ↓
                        ERROR (uncaught exception)
                           ↓
                        NOT_RUNNING (close)
State 의미
CREATED 생성됨, 아직 start() X
REBALANCING partition 할당 중
RUNNING 정상 처리 중
PENDING_SHUTDOWN close() 호출 후 정리 중
NOT_RUNNING 종료 완료
ERROR uncaught exception
PENDING_ERROR error 처리 중

State Listener

streams.setStateListener((newState, oldState) -> {
    log.info("State transition: {} -> {}", oldState, newState);
    if (newState == KafkaStreams.State.ERROR) {
        // Alert·재시작
    }
});

Vanilla Java — 기본 패턴

public class MyStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
        // ... other config

        StreamsBuilder builder = new StreamsBuilder();
        // ... topology 구성
        Topology topology = builder.build();

        KafkaStreams streams = new KafkaStreams(topology, props);

        // Error Handler
        streams.setUncaughtExceptionHandler(exception -> {
            log.error("Uncaught", exception);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });

        streams.setStateListener((newState, oldState) -> {
            log.info("{} -> {}", oldState, newState);
        });

        // Graceful shutdown
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30));
            latch.countDown();
        }));

        streams.start();
        try { latch.await(); } catch (InterruptedException e) { }
    }
}

Spring Boot — @EnableKafkaStreams

spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092
    streams:
      application-id: my-streams-app
      properties:
        num.stream.threads: 4
        processing.guarantee: exactly_once_v2
        commit.interval.ms: 1000
        default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
@Configuration
@EnableKafkaStreams
public class StreamsConfig {

    @Bean
    public KStream<String, MyEvent> processStream(StreamsBuilder builder) {
        KStream<String, MyEvent> input = builder.stream("input-topic");
        input.filter(...).mapValues(...).to("output-topic");
        return input;
    }
}

Spring 이 KafkaStreams 인스턴스 생성과 관리, shutdown 까지 알아서 해주니까 코드가 훨씬 깔끔해져요.

StreamsBuilderFactoryBean

더 세밀하게 손대고 싶을 때 쓰는 방식이에요.

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return factoryBean -> {
        factoryBean.setStateListener((newState, oldState) -> log.info(...));
        factoryBean.setUncaughtExceptionHandler(handler);
    };
}

Error Handler 3가지

1. Uncaught Exception Handler

Stream thread 에서 미처 잡지 못한 exception 을 처리하는 핸들러예요.

streams.setUncaughtExceptionHandler(exception -> {
    log.error("Uncaught", exception);
    // 응답:
    return StreamThreadExceptionResponse.REPLACE_THREAD;     // thread 만 재시작
    // 또는
    return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;     // 이 instance 만 종료
    // 또는
    return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; // 모든 instance 종료
});

REPLACE_THREAD = 기본 권장 (다른 thread 영향 X).

2. Deserialization Exception Handler

Input message format 이 깨졌을 때 걸리는 핸들러예요.

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);
  • LogAndFailExceptionHandler (기본) = stream 종료
  • LogAndContinueExceptionHandler = log 만 + 메시지 skip (운영 권장)
  • Custom = DLQ 로 보내기 등

3. Production Exception Handler

Output 을 write 하는 단계에서 실패할 때 — record 가 너무 크거나 serialization 이 깨지는 경우예요.

props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          MyProductionExceptionHandler.class);
public class MyProductionExceptionHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(...) {
        log.error("Production failed", exception);
        return ProductionExceptionHandlerResponse.CONTINUE;   // 또는 FAIL
    }
}

DLQ Pattern

DLQ (Dead Letter Queue, 처리 실패 메시지를 따로 모으는 토픽) 로 빠진 메시지를 보내는 방식이에요.

streams.setUncaughtExceptionHandler(exception -> {
    log.error("Sending to DLQ", exception);
    // DLQ 로 메시지 발송 (별도 producer)
    dlqProducer.send(new ProducerRecord<>("dlq-topic", failedRecord));
    return REPLACE_THREAD;
});

또는 stream 안에서 branch 로 valid / invalid 를 갈라내는 방법도 있어요.

KStream<String, String> input = builder.stream("input");

KStream<String, String>[] branches = input.branch(
    (key, value) -> isValid(value),
    (key, value) -> true              // 나머지 = invalid
);

branches[0].to("output");
branches[1].to("dlq");

Health Check — Spring Boot Actuator

management:
  endpoints:
    web:
      exposure:
        include: health,info,kafka-streams
  health:
    kafka-streams:
      enabled: true

/actuator/health/kafka-streams 가 KafkaStreams state 반환:

{
  "status": "UP",
  "details": {
    "applicationId": "my-streams-app",
    "state": "RUNNING",
    "threads": [...]
  }
}

Kubernetes liveness·readiness probe (컨테이너 생존·트래픽 수용 가능 여부 점검) 와 그대로 엮어 쓸 수 있어요.

Graceful Shutdown

streams.close(Duration.ofSeconds(30));

30초 안에 모든 task 를 안전하게 종료하고 state 를 flush 한 뒤 offset 까지 commit 합니다. Spring Boot 에서는 @PreDestroy 가 알아서 처리해줘요.

Rolling Restart

instance 여러 개를 한 번에 한 대씩 재시작하는 방식이에요. 다른 instance 가 그 instance 의 task 를 이어받아서 무중단이 됩니다.

전제 조건:

  • num.standby.replicas ≥ 1 = state 즉시 복구
  • acceptable.recovery.lag 적절
spring:
  kafka:
    streams:
      properties:
        num.standby.replicas: 1
        acceptable.recovery.lag: 10000
        max.warmup.replicas: 4

운영 권장 설정

spring:
  kafka:
    bootstrap-servers: kafka1:9093,kafka2:9093,kafka3:9093
    streams:
      application-id: my-streams-app-prod
      properties:
        # Performance
        num.stream.threads: 4
        cache.max.bytes.buffering: 10485760        # 10MB
        commit.interval.ms: 1000

        # EOS
        processing.guarantee: exactly_once_v2

        # Reliability
        replication.factor: 3
        min.insync.replicas: 2
        num.standby.replicas: 1
        acceptable.recovery.lag: 10000

        # Error Handling
        default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
        default.production.exception.handler: ...MyProductionExceptionHandler

        # Security
        security.protocol: SASL_SSL
        sasl.mechanism: SCRAM-SHA-512
        sasl.jaas.config: ...
        ssl.truststore.location: ...

        # State Store
        state.dir: /var/kafka-streams-state

        # Metrics
        metrics.recording.level: INFO

Logging 권장

log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=INFO
log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractCoordinator=INFO

운영 환경에서 DEBUG 까지 켜면 로그 양이 부담스러워져요. INFO 면 충분합니다.

모니터링 (다시)

JMX (Java Management Extensions, 자바 런타임 지표 노출 표준) 는 123편에서 다뤘던 내용이에요.

  • stream-thread-metrics:process-rate·process-latency-avg
  • stream-state-metrics:put-rate·put-latency-avg
  • Consumer lag (consumer group <app-id> 으로 확인)

Grafana 대시보드 + Prometheus.

한계·실무 함정

1. application.id collision

같은 cluster 안의 다른 application 과 id 가 부딪히면 internal topic 까지 충돌나요. prefix + 환경 분리.

2. State store 위치

state.dir 기본 = /tmp (재시작 시 삭제). 반드시 영구 경로.

3. num.standby.replicas 0

운영 환경에서는 state 재로드 부담이 커요. 1+ 권장.

4. Uncaught Exception Handler 미설정

기본값으로 두면 stream 이 그대로 죽고 자동 복구도 안 됩니다. REPLACE_THREAD 권장.

5. EOS V1 사용

EOS (Exactly Once Semantics, 메시지 정확히 한 번 처리 보장) 의 V1 은 deprecated 됐어요. V2 로 마이그레이션.

시험 직전 한 번 더 — Kafka Streams 작성·운영 함정 압축 노트

  • State 6가지 = CREATED·REBALANCING·RUNNING·PENDING_SHUTDOWN·NOT_RUNNING·ERROR·PENDING_ERROR
  • State Listener = state 전환 모니터링
  • Spring Boot = @EnableKafkaStreams + @Bean KStream
  • 자동 KafkaStreams 인스턴스·shutdown
  • StreamsBuilderFactoryBean = 세밀 제어
  • Error Handler 3가지 — Uncaught Exception · Deserialization Exception · Production Exception
  • Uncaught = REPLACE_THREAD (기본 권장) · SHUTDOWN_CLIENT · SHUTDOWN_APPLICATION
  • Deserialization = LogAndContinue (운영) · LogAndFail (기본)
  • Production = Custom 으로 DLQ
  • DLQ Pattern = branch 로 valid/invalid 분기
  • Health Check = Spring Boot Actuator /actuator/health/kafka-streams
  • Kubernetes liveness·readiness probe 연동
  • Graceful Shutdown = streams.close(Duration.ofSeconds(30))
  • Rolling Restart = num.standby.replicas ≥ 1 + acceptable.recovery.lag
  • 운영 권장 — EOS V2 + RF 3 + min.insync 2 + standby 1 + Error Handler 명시 + Security + state.dir 영구 + log INFO
  • 함정 — application.id collision
  • 함정 — state.dir=/tmp 데이터 손실
  • 함정 — num.standby.replicas=0 (state 재로드 부담)
  • 함정 — Uncaught Exception Handler 미설정 (stream 죽음)
  • 함정 — EOS V1 (deprecated)

공식 문서: Kafka Streams Developer Guide 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!