백엔드 데이터 인프라 125편. Kafka Streams DSL — filter·map·flatMap·branch·peek (stateless), groupBy·count·reduce·aggregate (stateful), KStream-KStream·KStream-KTable·KTable-KTable Join, tumbling·hopping·session Window 까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 125편이에요. 124편 까지 운영 패턴 을 잡았다면, 이번 125편은 코드의 핵심 — DSL operation 종합.
DSL Operation 4가지 영역
DSL(선언적 스트림 API)이 다루는 영역은 네 갈래로 갈라져요. 상태가 없는 Stateless (filter·map·flatMap·branch·peek), 상태를 들고 가는 Stateful (groupBy·count·reduce·aggregate), 두 흐름을 잇는 Join (KStream-KStream·KStream-KTable·KTable-KTable), 그리고 시간으로 잘라내는 Window (tumbling·hopping·session·sliding) 까지. 이 네 영역의 조합으로 거의 모든 실시간 처리가 표현돼요.
1. Stateless Operations
filter / filterNot
input.filter((key, value) -> value.startsWith("ORDER"));
input.filterNot((key, value) -> value.isEmpty());
조건이 맞는 record 만 그대로 흘려보내요. 따로 들고 있는 상태가 없습니다.
map / mapValues
// key + value 둘 다 변환
input.map((key, value) -> KeyValue.pair(newKey(key), newValue(value)));
// value 만 변환 (key 보존, repartition X)
input.mapValues(value -> value.toUpperCase());
가능하면 mapValues 를 쓰세요. key 를 안 건드리니 repartition(키 기준으로 메시지를 다시 나누는 것) 이 안 일어나서 성능에서 이득이에요.
flatMap / flatMapValues
// 한 record → 여러 record
input.flatMapValues(line -> Arrays.asList(line.split(" ")));
// key + value 둘 다 변형
input.flatMap((key, value) -> ...);
한 record 를 여러 개로 쪼개야 할 때 (예: 한 줄을 단어 단위로) 쓰는 자리예요.
branch (Kafka 2.7+) — 여러 stream 으로 분리
Map<String, KStream<String, String>> branches = input.split()
.branch((key, value) -> value.startsWith("A"), Branched.as("a-stream"))
.branch((key, value) -> value.startsWith("B"), Branched.as("b-stream"))
.defaultBranch(Branched.as("other"));
branches.get("CustomNameA-stream").to("a-topic");
branches.get("CustomNameB-stream").to("b-topic");
branches.get("CustomNameother").to("other-topic");
peek — Side effect (로그·메트릭)
input.peek((key, value) -> log.info("Saw: {}", value));
stream 자체는 바꾸지 않고 옆으로 새는 작업만 해요. 로깅이나 디버깅 자리에 자주 써요.
selectKey — Key 재설정
input.selectKey((key, value) -> extractNewKey(value));
key 를 바꾸는 순간 repartition 이 트리거돼요. 그래서 보통 groupBy·join 바로 앞에 둡니다.
2. Stateful Operations
groupByKey vs groupBy
// key 그대로 group
KGroupedStream<String, String> grouped = input.groupByKey();
// key 재설정 후 group
KGroupedStream<String, String> grouped = input.groupBy((key, value) -> value);
groupBy 는 key 를 바꾸니까 repartition 이 일어나요. 반대로 groupByKey 는 이미 같은 key 를 쓰고 있다면 repartition 없이 그대로 묶입니다.
count
KTable<String, Long> wordCounts = input
.groupBy((key, word) -> word)
.count();
122편에서 본 WordCount 가 바로 이 형태예요.
reduce
KTable<String, Long> total = input
.groupByKey()
.reduce((agg, newVal) -> agg + newVal);
각 key 마다 들어오는 value 를 누적해서 하나로 줄여요.
aggregate — 가장 유연
KTable<String, Long> result = input
.groupByKey()
.aggregate(
() -> 0L, // initializer
(key, value, agg) -> agg + 1L, // adder
Materialized.with(Serdes.String(), Serdes.Long())
);
초기값(initializer) 과 각 record 를 받아 합치는 함수(adder) 를 따로 줘요. reduce 보다 자유로운 집계를 짤 수 있는 자리.
Materialized — State Store 명명
.aggregate(
() -> 0L,
(key, value, agg) -> agg + 1L,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withRetention(Duration.ofDays(7))
)
as("my-store") 가 state store 에 이름을 붙여요. 이 이름이 있어야 Interactive Queries(저장된 상태를 외부에서 조회하는 API, 127편) 로 외부에서 query 할 수 있어요.
3. Window Operations
Tumbling Window — 겹치지 않는 고정 간격
KTable<Windowed<String>, Long> hourlyCounts = input
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
12:00~13:00 = window A
13:00~14:00 = window B
14:00~15:00 = window C
시간별·일별 집계를 낼 때 자주 쓰는 자리예요.
Hopping Window — 겹치는 고정 간격
KTable<Windowed<String>, Long> rolling = input
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
.count();
12:00~12:05 = window A
12:01~12:06 = window B
12:02~12:07 = window C
...
window 가 겹치니까 같은 메시지가 여러 window 에 들어가요. sliding 평균 같은 곳에 잘 맞아요.
Session Window — 활동 기반 가변 간격
KTable<Windowed<String>, Long> sessions = input
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
30분 동안 활동이 없으면 새 session 이 시작돼요. 사용자 session 분석에서는 사실상 표준입니다.
Grace Period
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(2))
window 가 닫힌 뒤에도 2분 동안은 늦게 도착한 메시지를 받아줘요 (out-of-order tolerance).
여기서 시험 함정이 하나 있어요 — grace period 가 끝난 뒤에 도착한 메시지는 그냥 drop 돼요. 너무 짧게 잡으면 데이터가 새고, 너무 길게 잡으면 메모리와 지연이 같이 늘어나요.
4. Join
KStream-KStream Join — 시간 기반
KStream<String, String> result = leftStream.join(
rightStream,
(left, right) -> left + " - " + right,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
두 stream 에서 key 가 같으면서 5분 안에 들어온 메시지끼리 join 해요. 주문과 결제 같은 두 이벤트를 묶는 자리에 어울려요.
KStream-KTable Join — 상태 기반
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KStream<String, EnrichedEvent> enriched = events.join(
profiles,
(event, profile) -> new EnrichedEvent(event, profile)
);
흐르는 stream 의 각 record 에 KTable 이 들고 있는 현재 상태를 붙여 줘요. 이게 enrichment(원본에 부가 정보를 덧붙이기) 패턴이에요.
KTable-KTable Join — 두 상태 결합
KTable<String, Result> combined = tableA.join(
tableB,
(a, b) -> combine(a, b)
);
두 KTable 의 최신 상태를 key 로 묶어요.
Left·Outer Join
// Left join — 왼쪽이 없으면 null
leftStream.leftJoin(rightTable, (left, right) -> {
if (right == null) return defaultValue(left);
return combine(left, right);
});
// Outer join (KStream-KTable 은 X, KTable-KTable 만)
tableA.outerJoin(tableB, ...);
5. Repartition 의 의미
groupBy·selectKey·join (다른 key 기준) 을 만나면 내부적으로 repartition topic 이 자동으로 생겨요.
[input] → selectKey → [repartition topic] → groupBy → count → [output]
비용은 두 갈래예요. 하나는 내부 topic 에 다시 쓰고 다시 읽는 네트워크·디스크 비용, 다른 하나는 RF(복제 인수, replication factor) 와 partition 이 자동으로 잡힌다는 점 (제어는 가능). 피하는 방법은 단순해요 — mapValues 로 key 를 안 건드리거나, 이미 같은 key 라면 groupByKey 를 그대로 쓰면 됩니다.
6. Aggregating Topics — 운영 패턴
일별 사용자 활동 카운트
KTable<Windowed<String>, Long> dailyActivity = events
.groupBy((key, event) -> event.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
.count(Materialized.as("daily-activity-store"));
매일 사용자별로 활동 수를 세는 자리.
Top-K 실시간
KStream<String, String> events = builder.stream("trending");
KTable<Windowed<String>, Long> counts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.count();
// 10분 window 마다 Top 10 인기 검색어
Top-K(상위 K개 추출) 는 트렌딩 검색이나 실시간 leaderboard 에서 그대로 쓰는 패턴이에요.
7. SerDes — 모든 operation 의 필수
input
.groupBy((k, v) -> v.getCategory(), Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.to("output", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
operation 마다 입력·출력 SerDe(직렬화·역직렬화 짝) 를 명시해 줘야 해요. 안 적으면 default SerDe 로 떨어지는데, 타입이 안 맞으면 그대로 에러로 가요.
8. 종합 예제 — 주문 enrichment + 집계
// User profile 로 enrichment
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KStream<String, Order> orders = builder.stream("orders");
// Join
KStream<String, EnrichedOrder> enriched = orders.join(
profiles,
(order, profile) -> new EnrichedOrder(order, profile)
);
// 카테고리별 매출 집계 (1시간 window)
KTable<Windowed<String>, Double> hourlyRevenue = enriched
.groupBy((key, order) -> order.getCategory())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
hourlyRevenue.toStream().to("hourly-revenue");
다섯 줄로 enrichment 와 windowed aggregate, 그리고 sink 까지 다 담겨요. DSL 의 표현력이 이래서 좋아요.
한계·실무 함정
1. Repartition 폭증
selectKey·groupBy 를 자주 쓰면 repartition topic 부담이 같이 따라와요. 최소화가 답.
2. Window 메모리
window 가 크고 key 가 많으면 state store 가 폭증해요. retention 과 grace period 를 같이 조정해야 해요.
3. Join time window
KStream-KStream join 의 window 를 너무 길게 잡으면 state 가 부풀어요. 비즈니스 의미에서 적정선을 정확히 잡아야 해요.
4. Late data 처리
grace period 가 너무 짧으면 데이터 손실, 너무 길면 지연이 따라와요. 균형이 핵심.
5. SerDe 누락
default SerDe 와 실제 타입이 다르면 그대로 에러. 명시가 답이에요.
시험 직전 한 번 더 — Kafka Streams DSL 함정 압축 노트
- 4가지 영역 = Stateless · Stateful · Join · Window
- Stateless — filter · filterNot · map · mapValues · flatMap · flatMapValues · branch · peek · selectKey
mapValues권장 = repartition X (key 보존)selectKey= repartition 트리거- Stateful — groupByKey · groupBy · count · reduce · aggregate
groupByKeyvsgroupBy— groupByKey = repartition X (이미 같은 key)aggregate= 가장 유연 (initializer + adder + Materialized)Materialized.as("name")= state store 명명 (Interactive Queries 용)- Window 3가지 — Tumbling (겹치지 X) · Hopping (겹침) · Session (활동 gap)
- Tumbling = 시간별·일별 집계
- Hopping = sliding 평균
- Session = 사용자 session 분석
grace= window 끝 후 늦은 메시지 허용 시간- Join 3가지 — KStream-KStream (시간 기반, JoinWindows) · KStream-KTable (enrichment) · KTable-KTable (상태 결합)
- leftJoin·outerJoin 변형 가능 (outer 는 KStream-KStream·KTable-KTable 만)
- Repartition =
groupBy·selectKey·join자동 생성, 비용 있음 - SerDes 명시 =
Grouped.with·Produced.with·Materialized.with - 운영 패턴 = enrichment (KStream-KTable join) + windowed aggregate + sink
- 함정 — Repartition 폭증
- 함정 — Window 메모리 폭증
- 함정 — Join window 너무 김 = state 폭증
- 함정 — Late data grace 균형
- 함정 — SerDe 누락
공식 문서: Kafka Streams DSL API 에서 자세한 사양·예제를 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 120편 — Kafka Connect Config 종합
- 121편 — Kafka Streams 입문 (라이브러리 모델·왜)
- 122편 — Kafka Streams Quickstart (WordCount 5분)
- 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)
- 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)
다음 글: