백엔드 데이터 인프라 125편 — Kafka Streams DSL (변환·집계·Join·Window)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 125편. Kafka Streams DSL — filter·map·flatMap·branch·peek (stateless), groupBy·count·reduce·aggregate (stateful), KStream-KStream·KStream-KTable·KTable-KTable Join, tumbling·hopping·session Window 까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 125편 — Kafka Streams DSL (변환·집계·Join·Window)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 125편이에요. 124편 까지 운영 패턴 을 잡았다면, 이번 125편은 코드의 핵심DSL operation 종합.

DSL Operation 4가지 영역

DSL(선언적 스트림 API)이 다루는 영역은 네 갈래로 갈라져요. 상태가 없는 Stateless (filter·map·flatMap·branch·peek), 상태를 들고 가는 Stateful (groupBy·count·reduce·aggregate), 두 흐름을 잇는 Join (KStream-KStream·KStream-KTable·KTable-KTable), 그리고 시간으로 잘라내는 Window (tumbling·hopping·session·sliding) 까지. 이 네 영역의 조합으로 거의 모든 실시간 처리가 표현돼요.

1. Stateless Operations

filter / filterNot

input.filter((key, value) -> value.startsWith("ORDER"));
input.filterNot((key, value) -> value.isEmpty());

조건이 맞는 record 만 그대로 흘려보내요. 따로 들고 있는 상태가 없습니다.

map / mapValues

// key + value 둘 다 변환
input.map((key, value) -> KeyValue.pair(newKey(key), newValue(value)));

// value 만 변환 (key 보존, repartition X)
input.mapValues(value -> value.toUpperCase());

가능하면 mapValues 를 쓰세요. key 를 안 건드리니 repartition(키 기준으로 메시지를 다시 나누는 것) 이 안 일어나서 성능에서 이득이에요.

flatMap / flatMapValues

// 한 record → 여러 record
input.flatMapValues(line -> Arrays.asList(line.split(" ")));

// key + value 둘 다 변형
input.flatMap((key, value) -> ...);

한 record 를 여러 개로 쪼개야 할 때 (예: 한 줄을 단어 단위로) 쓰는 자리예요.

branch (Kafka 2.7+) — 여러 stream 으로 분리

Map<String, KStream<String, String>> branches = input.split()
    .branch((key, value) -> value.startsWith("A"), Branched.as("a-stream"))
    .branch((key, value) -> value.startsWith("B"), Branched.as("b-stream"))
    .defaultBranch(Branched.as("other"));

branches.get("CustomNameA-stream").to("a-topic");
branches.get("CustomNameB-stream").to("b-topic");
branches.get("CustomNameother").to("other-topic");

peek — Side effect (로그·메트릭)

input.peek((key, value) -> log.info("Saw: {}", value));

stream 자체는 바꾸지 않고 옆으로 새는 작업만 해요. 로깅이나 디버깅 자리에 자주 써요.

selectKey — Key 재설정

input.selectKey((key, value) -> extractNewKey(value));

key 를 바꾸는 순간 repartition 이 트리거돼요. 그래서 보통 groupBy·join 바로 앞에 둡니다.

2. Stateful Operations

groupByKey vs groupBy

// key 그대로 group
KGroupedStream<String, String> grouped = input.groupByKey();

// key 재설정 후 group
KGroupedStream<String, String> grouped = input.groupBy((key, value) -> value);

groupBy 는 key 를 바꾸니까 repartition 이 일어나요. 반대로 groupByKey 는 이미 같은 key 를 쓰고 있다면 repartition 없이 그대로 묶입니다.

count

KTable<String, Long> wordCounts = input
    .groupBy((key, word) -> word)
    .count();

122편에서 본 WordCount 가 바로 이 형태예요.

reduce

KTable<String, Long> total = input
    .groupByKey()
    .reduce((agg, newVal) -> agg + newVal);

각 key 마다 들어오는 value 를 누적해서 하나로 줄여요.

aggregate — 가장 유연

KTable<String, Long> result = input
    .groupByKey()
    .aggregate(
        () -> 0L,                           // initializer
        (key, value, agg) -> agg + 1L,      // adder
        Materialized.with(Serdes.String(), Serdes.Long())
    );

초기값(initializer) 과 각 record 를 받아 합치는 함수(adder) 를 따로 줘요. reduce 보다 자유로운 집계를 짤 수 있는 자리.

Materialized — State Store 명명

.aggregate(
    () -> 0L,
    (key, value, agg) -> agg + 1L,
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())
        .withRetention(Duration.ofDays(7))
)

as("my-store") 가 state store 에 이름을 붙여요. 이 이름이 있어야 Interactive Queries(저장된 상태를 외부에서 조회하는 API, 127편) 로 외부에서 query 할 수 있어요.

3. Window Operations

Tumbling Window — 겹치지 않는 고정 간격

KTable<Windowed<String>, Long> hourlyCounts = input
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();
12:00~13:00 = window A
13:00~14:00 = window B
14:00~15:00 = window C

시간별·일별 집계를 낼 때 자주 쓰는 자리예요.

Hopping Window — 겹치는 고정 간격

KTable<Windowed<String>, Long> rolling = input
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
        .advanceBy(Duration.ofMinutes(1)))
    .count();
12:00~12:05 = window A
12:01~12:06 = window B
12:02~12:07 = window C
...

window 가 겹치니까 같은 메시지가 여러 window 에 들어가요. sliding 평균 같은 곳에 잘 맞아요.

Session Window — 활동 기반 가변 간격

KTable<Windowed<String>, Long> sessions = input
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

30분 동안 활동이 없으면 새 session 이 시작돼요. 사용자 session 분석에서는 사실상 표준입니다.

Grace Period

TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(2))

window 가 닫힌 뒤에도 2분 동안은 늦게 도착한 메시지를 받아줘요 (out-of-order tolerance).

여기서 시험 함정이 하나 있어요 — grace period 가 끝난 뒤에 도착한 메시지는 그냥 drop 돼요. 너무 짧게 잡으면 데이터가 새고, 너무 길게 잡으면 메모리와 지연이 같이 늘어나요.

4. Join

KStream-KStream Join — 시간 기반

KStream<String, String> result = leftStream.join(
    rightStream,
    (left, right) -> left + " - " + right,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

두 stream 에서 key 가 같으면서 5분 안에 들어온 메시지끼리 join 해요. 주문과 결제 같은 두 이벤트를 묶는 자리에 어울려요.

KStream-KTable Join — 상태 기반

KTable<String, UserProfile> profiles = builder.table("user-profiles");

KStream<String, EnrichedEvent> enriched = events.join(
    profiles,
    (event, profile) -> new EnrichedEvent(event, profile)
);

흐르는 stream 의 각 record 에 KTable 이 들고 있는 현재 상태를 붙여 줘요. 이게 enrichment(원본에 부가 정보를 덧붙이기) 패턴이에요.

KTable-KTable Join — 두 상태 결합

KTable<String, Result> combined = tableA.join(
    tableB,
    (a, b) -> combine(a, b)
);

두 KTable 의 최신 상태를 key 로 묶어요.

Left·Outer Join

// Left join — 왼쪽이 없으면 null
leftStream.leftJoin(rightTable, (left, right) -> {
    if (right == null) return defaultValue(left);
    return combine(left, right);
});

// Outer join (KStream-KTable 은 X, KTable-KTable 만)
tableA.outerJoin(tableB, ...);

5. Repartition 의 의미

groupBy·selectKey·join (다른 key 기준) 을 만나면 내부적으로 repartition topic 이 자동으로 생겨요.

[input] → selectKey → [repartition topic] → groupBy → count → [output]

비용은 두 갈래예요. 하나는 내부 topic 에 다시 쓰고 다시 읽는 네트워크·디스크 비용, 다른 하나는 RF(복제 인수, replication factor) 와 partition 이 자동으로 잡힌다는 점 (제어는 가능). 피하는 방법은 단순해요 — mapValues 로 key 를 안 건드리거나, 이미 같은 key 라면 groupByKey 를 그대로 쓰면 됩니다.

6. Aggregating Topics — 운영 패턴

일별 사용자 활동 카운트

KTable<Windowed<String>, Long> dailyActivity = events
    .groupBy((key, event) -> event.getUserId())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
    .count(Materialized.as("daily-activity-store"));

매일 사용자별로 활동 수를 세는 자리.

Top-K 실시간

KStream<String, String> events = builder.stream("trending");

KTable<Windowed<String>, Long> counts = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
    .count();

// 10분 window 마다 Top 10 인기 검색어

Top-K(상위 K개 추출) 는 트렌딩 검색이나 실시간 leaderboard 에서 그대로 쓰는 패턴이에요.

7. SerDes — 모든 operation 의 필수

input
    .groupBy((k, v) -> v.getCategory(), Grouped.with(Serdes.String(), Serdes.String()))
    .count(Materialized.with(Serdes.String(), Serdes.Long()))
    .toStream()
    .to("output", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

operation 마다 입력·출력 SerDe(직렬화·역직렬화 짝) 를 명시해 줘야 해요. 안 적으면 default SerDe 로 떨어지는데, 타입이 안 맞으면 그대로 에러로 가요.

8. 종합 예제 — 주문 enrichment + 집계

// User profile 로 enrichment
KTable<String, UserProfile> profiles = builder.table("user-profiles");

KStream<String, Order> orders = builder.stream("orders");

// Join
KStream<String, EnrichedOrder> enriched = orders.join(
    profiles,
    (order, profile) -> new EnrichedOrder(order, profile)
);

// 카테고리별 매출 집계 (1시간 window)
KTable<Windowed<String>, Double> hourlyRevenue = enriched
    .groupBy((key, order) -> order.getCategory())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .aggregate(
        () -> 0.0,
        (key, order, total) -> total + order.getAmount(),
        Materialized.with(Serdes.String(), Serdes.Double())
    );

hourlyRevenue.toStream().to("hourly-revenue");

다섯 줄로 enrichment 와 windowed aggregate, 그리고 sink 까지 다 담겨요. DSL 의 표현력이 이래서 좋아요.

한계·실무 함정

1. Repartition 폭증

selectKey·groupBy 를 자주 쓰면 repartition topic 부담이 같이 따라와요. 최소화가 답.

2. Window 메모리

window 가 크고 key 가 많으면 state store 가 폭증해요. retention 과 grace period 를 같이 조정해야 해요.

3. Join time window

KStream-KStream join 의 window 를 너무 길게 잡으면 state 가 부풀어요. 비즈니스 의미에서 적정선을 정확히 잡아야 해요.

4. Late data 처리

grace period 가 너무 짧으면 데이터 손실, 너무 길면 지연이 따라와요. 균형이 핵심.

5. SerDe 누락

default SerDe 와 실제 타입이 다르면 그대로 에러. 명시가 답이에요.

시험 직전 한 번 더 — Kafka Streams DSL 함정 압축 노트

  • 4가지 영역 = Stateless · Stateful · Join · Window
  • Stateless — filter · filterNot · map · mapValues · flatMap · flatMapValues · branch · peek · selectKey
  • mapValues 권장 = repartition X (key 보존)
  • selectKey = repartition 트리거
  • Stateful — groupByKey · groupBy · count · reduce · aggregate
  • groupByKey vs groupBy — groupByKey = repartition X (이미 같은 key)
  • aggregate = 가장 유연 (initializer + adder + Materialized)
  • Materialized.as("name") = state store 명명 (Interactive Queries 용)
  • Window 3가지Tumbling (겹치지 X) · Hopping (겹침) · Session (활동 gap)
  • Tumbling = 시간별·일별 집계
  • Hopping = sliding 평균
  • Session = 사용자 session 분석
  • grace = window 끝 후 늦은 메시지 허용 시간
  • Join 3가지KStream-KStream (시간 기반, JoinWindows) · KStream-KTable (enrichment) · KTable-KTable (상태 결합)
  • leftJoin·outerJoin 변형 가능 (outer 는 KStream-KStream·KTable-KTable 만)
  • Repartition = groupBy·selectKey·join 자동 생성, 비용 있음
  • SerDes 명시 = Grouped.with·Produced.with·Materialized.with
  • 운영 패턴 = enrichment (KStream-KTable join) + windowed aggregate + sink
  • 함정 — Repartition 폭증
  • 함정 — Window 메모리 폭증
  • 함정 — Join window 너무 김 = state 폭증
  • 함정 — Late data grace 균형
  • 함정 — SerDe 누락

공식 문서: Kafka Streams DSL API 에서 자세한 사양·예제를 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!