백엔드 데이터 인프라 122편. Kafka Streams Quickstart — WordCount 예제로 5분 hands-on. Maven setup·KStream filter·flatMapValues·groupBy·count·to·실행·결과 확인까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 122편이에요. 121편 에서 큰 그림 을 잡았다면, 이번 122편은 직접 만지는 WordCount Quickstart (공식 입문 예제).
왜 WordCount?
WordCount = Stream processing 의 Hello World. 단어 stream → 단어 별 count.
input: "hello world hello kafka"
output: hello=2, world=1, kafka=1
핵심 모든 stream 개념 (filter·map·groupBy·aggregate·output) 한 예제에.
사전 준비
- Java 17+
- Maven (또는 Gradle)
- Kafka 4.0+ 실행 중 (82편 quickstart)
Step 1: 입력·출력 Topic 생성
$ kafka-topics.sh --create --topic streams-input \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
$ kafka-topics.sh --create --topic streams-output \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1 \
--config cleanup.policy=compact
출력 topic = compacted (key 별 최신 값만 남기는 정책, key 별 최신 count). 107편 log compaction.
Step 2: Maven 의존성
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
kafka-streams 가 모든 필요 의존성 가져옴 (kafka-clients 포함).
Step 3: WordCount 코드
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.filter((key, word) -> word != null && !word.isEmpty())
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("streams-output",
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down...");
streams.close();
}));
streams.start();
}
}
각 step 의 의미를 짚어보면, flatMapValues 는 "hello world" → ["hello", "world"] 처럼 한 record 를 여러 record 로 확장. 이어서 filter 로 빈 문자열·null 을 제거. groupBy 는 key 재설정 — 같은 단어가 같은 key 로 묶이도록 ("anyKey", "hello") → ("hello", "hello") 식으로 다시 그룹화한다. 그 다음 count 가 KGroupedStream (groupBy 결과 stream) 을 받아 집계해서 KTable<String, Long> (key 별 최신 값 테이블) 을 반환 — 각 key 의 최신 count 가 들어있다 (예: hello=2, world=1, kafka=1). 마지막으로 toStream().to(...) 가 KTable 을 다시 KStream 으로 변환해 streams-output topic 으로 write 하는데, 이때 value 가 Long 이므로 Serdes (serializer/deserializer 묶음) 를 명시한다.
Step 4: 실행
$ mvn package
$ java -cp target/wordcount-app.jar com.example.WordCountApp
로그:
INFO - Kafka Streams started
INFO - State transition: REBALANCING -> RUNNING
Step 5: 입력 + 출력 확인
새 터미널에서 producer:
$ kafka-console-producer.sh --topic streams-input --bootstrap-server localhost:9092
>hello world hello kafka
>kafka streams hello
>real time processing
또 다른 터미널에서 consumer:
$ kafka-console-consumer.sh --topic streams-output --from-beginning \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
출력:
hello 2
world 1
kafka 1
kafka 2
streams 1
hello 3
real 1
time 1
processing 1
같은 key 가 여러 번 등장 — 각 update 마다 새 record. KTable 의 자연스러운 동작 (changelog, 변경 이력 stream).
Application.id 의 중요성
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
application.id = Kafka Streams 의 가장 중요한 설정.
- Consumer Group ID 로 사용
- Internal topic prefix 로 사용 (
wordcount-app-KSTREAM-AGGREGATE-...) - State store 디렉토리 이름
Cluster 안 unique 해야. 같은 application.id 의 instance 들 = 같은 task 분담 (consumer group 처럼).
Internal Topics
WordCount 가 자동으로 생성하는 internal topic:
$ kafka-topics.sh --list --bootstrap-server localhost:9092
streams-input
streams-output
wordcount-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
wordcount-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
-changelog= state store 의 backup (재시작 시 복구)-repartition= groupBy 후 재partition
이름이 자동 부여. 수동 관리 어려움 — Streams 가 자동.
State Store — RocksDB
count() = stateful 처리 → local state store 필요.
~/streams-state/
└── wordcount-app/
└── 0_0/
└── KSTREAM-AGGREGATE-STATE-STORE-0000000003/
└── rocksdb files ...
기본 = RocksDB (앱 안에 박힌 embedded DB). 빠른 local key-value store.
State 가 changelog topic 에 backup → 재시작 시 changelog 부터 복구.
Spring Boot 통합
spring:
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: wordcount-app
default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
@EnableKafkaStreams
@Configuration
public class StreamsConfig {
@Bean
public KStream<String, String> wordCountStream(StreamsBuilder builder) {
KStream<String, String> input = builder.stream("streams-input");
input.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+")))
.filter((k, w) -> !w.isEmpty())
.groupBy((k, w) -> w)
.count()
.toStream()
.to("streams-output", Produced.with(Serdes.String(), Serdes.Long()));
return input;
}
}
@EnableKafkaStreams = Spring 이 자동으로 KafkaStreams 인스턴스 시작·관리. 훨씬 깔끔.
단계별 시각화
"hello world hello"
↓ flatMapValues
["hello", "world", "hello"]
↓ filter (non-empty)
["hello", "world", "hello"]
↓ groupBy (key = word)
(hello, hello), (world, world), (hello, hello)
↓ count
KTable: hello=2, world=1
↓ toStream().to()
streams-output:
hello → 2
world → 1
한계·실무 함정
1. application.id 변경
기존 state 와 새 application.id 가 다름 → state 다시 빌드 (느림). 신중히 결정.
2. State store 디스크 부담
큰 dataset = 큰 state store. 디스크 모니터링.
3. Changelog topic 의 RF
운영 환경 = replication.factor=3 권장. 안 그러면 broker 한 대 죽으면 state 복구 불가.
4. Serdes 명시
to(...) 호출 시 value 가 Long 이면 Serdes.Long() 명시. 안 하면 default (String) → ClassCastException.
5. Production 코드는 더 복잡
WordCount = 학습. 실제 = error handling·monitoring·EOS (exactly-once semantics, 정확히 한 번 처리)·graceful shutdown·...
시험 직전 한 번 더 — Kafka Streams Quickstart 함정 압축 노트
- WordCount = Stream processing 의 Hello World
- Maven 의존성 =
kafka-streams한 줄 application.id= 가장 중요한 설정, cluster unique- Consumer Group + Internal topic prefix + State 디렉토리 이름으로 사용
bootstrap.servers+ 기본 Serdes 설정- WordCount 흐름 =
stream → flatMapValues → filter → groupBy → count → toStream → to flatMapValues= 한 record 를 여러 record (split)groupBy= key 재설정count= stateful 집계, KTable 반환toStream().to()= KTable → KStream → topic- Serdes 명시 = value 가 Long 이면
Produced.with(Serdes.String(), Serdes.Long()) - Internal Topics =
<app-id>-...-changelog·-repartition자동 생성 - State Store = 기본 RocksDB (local key-value)
- changelog topic 에 backup → 재시작 시 복구
- Spring Boot =
@EnableKafkaStreams+@Bean KStream - 출력 topic 권장 = compacted (key 별 최신)
- 함정 —
application.id변경 = state 재빌드 - 함정 — State store 디스크 부담
- 함정 — Changelog RF 3 미설정
- 함정 — Serdes 명시 누락 → ClassCastException
공식 문서: Kafka Streams Quickstart 에서 더 자세한 예제·tutorial 을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 117편 — Kafka Connect (Source · Sink · Worker 아키텍처)
- 118편 — Kafka Connect 운영 (REST · Status · Error Handler · DLQ)
- 119편 — Kafka Connect Custom Connector 개발
- 120편 — Kafka Connect Config 종합
- 121편 — Kafka Streams 입문 (라이브러리 모델·왜)
다음 글: