백엔드 데이터 인프라 122편 — Kafka Streams Quickstart (WordCount 5분)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 122편. Kafka Streams Quickstart — WordCount 예제로 5분 hands-on. Maven setup·KStream filter·flatMapValues·groupBy·count·to·실행·결과 확인까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 122편 — Kafka Streams Quickstart (WordCount 5분)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 122편이에요. 121편 에서 큰 그림 을 잡았다면, 이번 122편은 직접 만지는 WordCount Quickstart (공식 입문 예제).

왜 WordCount?

WordCount = Stream processing 의 Hello World. 단어 stream → 단어 별 count.

input:  "hello world hello kafka"
output: hello=2, world=1, kafka=1

핵심 모든 stream 개념 (filter·map·groupBy·aggregate·output) 한 예제에.

사전 준비

  • Java 17+
  • Maven (또는 Gradle)
  • Kafka 4.0+ 실행 중 (82편 quickstart)

Step 1: 입력·출력 Topic 생성

$ kafka-topics.sh --create --topic streams-input \
    --bootstrap-server localhost:9092 \
    --partitions 1 --replication-factor 1

$ kafka-topics.sh --create --topic streams-output \
    --bootstrap-server localhost:9092 \
    --partitions 1 --replication-factor 1 \
    --config cleanup.policy=compact

출력 topic = compacted (key 별 최신 값만 남기는 정책, key 별 최신 count). 107편 log compaction.

Step 2: Maven 의존성

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.9</version>
    </dependency>
</dependencies>

kafka-streams모든 필요 의존성 가져옴 (kafka-clients 포함).

Step 3: WordCount 코드

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream("streams-input");

        KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .filter((key, word) -> word != null && !word.isEmpty())
            .groupBy((key, word) -> word)
            .count();

        wordCounts.toStream().to("streams-output",
            Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down...");
            streams.close();
        }));

        streams.start();
    }
}

각 step 의 의미를 짚어보면, flatMapValues"hello world" → ["hello", "world"] 처럼 한 record 를 여러 record 로 확장. 이어서 filter 로 빈 문자열·null 을 제거. groupBykey 재설정 — 같은 단어가 같은 key 로 묶이도록 ("anyKey", "hello") → ("hello", "hello") 식으로 다시 그룹화한다. 그 다음 countKGroupedStream (groupBy 결과 stream) 을 받아 집계해서 KTable<String, Long> (key 별 최신 값 테이블) 을 반환 — 각 key 의 최신 count 가 들어있다 (예: hello=2, world=1, kafka=1). 마지막으로 toStream().to(...) 가 KTable 을 다시 KStream 으로 변환해 streams-output topic 으로 write 하는데, 이때 value 가 Long 이므로 Serdes (serializer/deserializer 묶음) 를 명시한다.

Step 4: 실행

$ mvn package
$ java -cp target/wordcount-app.jar com.example.WordCountApp

로그:

INFO  - Kafka Streams started
INFO  - State transition: REBALANCING -> RUNNING

Step 5: 입력 + 출력 확인

새 터미널에서 producer:

$ kafka-console-producer.sh --topic streams-input --bootstrap-server localhost:9092

>hello world hello kafka
>kafka streams hello
>real time processing

또 다른 터미널에서 consumer:

$ kafka-console-consumer.sh --topic streams-output --from-beginning \
    --bootstrap-server localhost:9092 \
    --property print.key=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

출력:

hello    2
world    1
kafka    1
kafka    2
streams  1
hello    3
real     1
time     1
processing  1

같은 key 가 여러 번 등장 — 각 update 마다 새 record. KTable 의 자연스러운 동작 (changelog, 변경 이력 stream).

Application.id 의 중요성

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");

application.id = Kafka Streams 의 가장 중요한 설정.

  • Consumer Group ID 로 사용
  • Internal topic prefix 로 사용 (wordcount-app-KSTREAM-AGGREGATE-...)
  • State store 디렉토리 이름

Cluster 안 unique 해야. 같은 application.id 의 instance 들 = 같은 task 분담 (consumer group 처럼).

Internal Topics

WordCount 가 자동으로 생성하는 internal topic:

$ kafka-topics.sh --list --bootstrap-server localhost:9092

streams-input
streams-output
wordcount-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
wordcount-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
  • -changelog = state store 의 backup (재시작 시 복구)
  • -repartition = groupBy 후 재partition

이름이 자동 부여. 수동 관리 어려움 — Streams 가 자동.

State Store — RocksDB

count() = stateful 처리 → local state store 필요.

~/streams-state/
└── wordcount-app/
    └── 0_0/
        └── KSTREAM-AGGREGATE-STATE-STORE-0000000003/
            └── rocksdb files ...

기본 = RocksDB (앱 안에 박힌 embedded DB). 빠른 local key-value store.

State 가 changelog topic 에 backup → 재시작 시 changelog 부터 복구.

Spring Boot 통합

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: wordcount-app
      default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
@EnableKafkaStreams
@Configuration
public class StreamsConfig {

    @Bean
    public KStream<String, String> wordCountStream(StreamsBuilder builder) {
        KStream<String, String> input = builder.stream("streams-input");

        input.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+")))
             .filter((k, w) -> !w.isEmpty())
             .groupBy((k, w) -> w)
             .count()
             .toStream()
             .to("streams-output", Produced.with(Serdes.String(), Serdes.Long()));

        return input;
    }
}

@EnableKafkaStreams = Spring 이 자동으로 KafkaStreams 인스턴스 시작·관리. 훨씬 깔끔.

단계별 시각화

"hello world hello"
     ↓ flatMapValues
["hello", "world", "hello"]
     ↓ filter (non-empty)
["hello", "world", "hello"]
     ↓ groupBy (key = word)
(hello, hello), (world, world), (hello, hello)
     ↓ count
KTable: hello=2, world=1
     ↓ toStream().to()
streams-output:
  hello → 2
  world → 1

한계·실무 함정

1. application.id 변경

기존 state 와 새 application.id 가 다름 → state 다시 빌드 (느림). 신중히 결정.

2. State store 디스크 부담

큰 dataset = 큰 state store. 디스크 모니터링.

3. Changelog topic 의 RF

운영 환경 = replication.factor=3 권장. 안 그러면 broker 한 대 죽으면 state 복구 불가.

4. Serdes 명시

to(...) 호출 시 value 가 Long 이면 Serdes.Long() 명시. 안 하면 default (String) → ClassCastException.

5. Production 코드는 더 복잡

WordCount = 학습. 실제 = error handling·monitoring·EOS (exactly-once semantics, 정확히 한 번 처리)·graceful shutdown·...

시험 직전 한 번 더 — Kafka Streams Quickstart 함정 압축 노트

  • WordCount = Stream processing 의 Hello World
  • Maven 의존성 = kafka-streams 한 줄
  • application.id = 가장 중요한 설정, cluster unique
  • Consumer Group + Internal topic prefix + State 디렉토리 이름으로 사용
  • bootstrap.servers + 기본 Serdes 설정
  • WordCount 흐름 = stream → flatMapValues → filter → groupBy → count → toStream → to
  • flatMapValues = 한 record 를 여러 record (split)
  • groupBy = key 재설정
  • count = stateful 집계, KTable 반환
  • toStream().to() = KTable → KStream → topic
  • Serdes 명시 = value 가 Long 이면 Produced.with(Serdes.String(), Serdes.Long())
  • Internal Topics = <app-id>-...-changelog·-repartition 자동 생성
  • State Store = 기본 RocksDB (local key-value)
  • changelog topic 에 backup → 재시작 시 복구
  • Spring Boot = @EnableKafkaStreams + @Bean KStream
  • 출력 topic 권장 = compacted (key 별 최신)
  • 함정 — application.id 변경 = state 재빌드
  • 함정 — State store 디스크 부담
  • 함정 — Changelog RF 3 미설정
  • 함정 — Serdes 명시 누락 → ClassCastException

공식 문서: Kafka Streams Quickstart 에서 더 자세한 예제·tutorial 을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!