백엔드 데이터 인프라 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 123편. Kafka Streams Core Concepts — Topology·SubTopology·Task·Thread·Instance 의 정확한 매핑. Partition 과의 관계, num.stream.threads, scaling 모델, Time semantics, EOS V2 까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 123편이에요. 122편 에서 직접 만져 봤다면, 이번 123편은 그 내부가 어떻게 돌아가는지 살펴볼 차례 — Core Concepts.

Core Concepts가 어렵게 느껴지는 이유

Kafka Streams 의 4계층 추상화 — Topology(처리 그래프)·Task(실행 단위)·Thread·Instance — 가 partition(토픽을 나눈 단위) 과 어떻게 맞물려 돌아가는지가 헷갈리는 지점이에요.

이 글에서 4계층 + scaling 모델 + Time semantics + EOS 까지 한 번에 정리해요.

1. Topology — Processing Graph

Topology = Source nodes → Processor nodes → Sink nodes
[Source: "input-topic"]
        ↓
[Processor: filter, map]
        ↓
[Processor: groupByKey, count]
        ↓
[Sink: "output-topic"]

DSL(고수준 선언형 API) 코드가 곧 Topology 정의이고, builder.build() 가 topology 를 만들어요.

Topology topology = builder.build();
System.out.println(topology.describe());   // Topology 시각화

2. SubTopology — Repartition 단위

Topology 는 repartition(키 재분배) 지점에서 SubTopology 로 쪼개져요.

SubTopology 0:
  Source → filter → mapValues → groupBy → KafkaSink (repartition topic)

SubTopology 1:
  KafkaSource (repartition topic) → count → KafkaSink (output)

groupBy 같은 key 재설정이 들어가면 내부 repartition topic 으로 write 하고, 새 SubTopology 에서 다시 read 해요.

왜 분할?

각 SubTopology 는 독립적인 task 들로 이뤄지고, 별도 thread 에서 실행 가능해요. Parallelism(병렬 처리) 의 기본 단위예요.

3. Task — 실행 단위

Task = SubTopology + Partition assignments.

SubTopology 0 + Partition 0 = Task 0_0
SubTopology 0 + Partition 1 = Task 0_1
SubTopology 0 + Partition 2 = Task 0_2

SubTopology 1 + Partition 0 = Task 1_0
SubTopology 1 + Partition 1 = Task 1_1
SubTopology 1 + Partition 2 = Task 1_2

총 task 수는 subtopologies × max(partitions) 로 계산해요.

Task 의 격리

각 task 는 독립이에요. 자기 partition 과 자기 state store 를 가지고, 한 task 가 실패해도 다른 task 에는 영향이 가지 않아요.

4. Thread — num.stream.threads

props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

한 instance 안의 N 개 thread 가 task 들을 나눠 처리하고, 기본값은 1이에요.

Instance A (4 threads):
  Thread 1 → Task 0_0, Task 1_0
  Thread 2 → Task 0_1, Task 1_1
  Thread 3 → Task 0_2
  Thread 4 → Task 1_2

각 thread 는 여러 task 를 round-robin(돌아가며 할당) 으로 처리할 수 있어요.

권장

num.stream.threadsCPU core 수 정도가 적당해요.

5. Instance — Application Process

같은 application.id 를 가진 여러 process 가 같은 application 의 instance 가 돼요.

Instance A (host-1)
   Thread 1: Task 0_0, Task 0_1
   Thread 2: Task 1_0, Task 1_1

Instance B (host-2)
   Thread 1: Task 0_2
   Thread 2: Task 1_2

Instance 들이 task 를 나눠 처리하는 방식이 Kafka consumer group 과 동일한 패턴이에요.

매핑 종합

Topology (1)
   ↓ split by repartition
SubTopology (N)
   ↓ × input partitions
Task (N × P)
   ↓ assigned to
Thread (per instance, ×T)
   ↓ run on
Instance (cluster, ×I)

총 task = N × P (subtopology × partition). 총 thread = T × I (thread per instance × instance). Task ≤ Thread 권장 (남는 thread = idle).

Scaling — Instance 추가

초기: 1 instance, 4 thread → 모든 task 처리
   ↓ instance 추가
이후: 2 instance, 4 thread each → task 절반씩
   ↓ instance 또 추가
이후: 4 instance, 4 thread each → task 1/4 씩

Kafka consumer group rebalance(재분배) 가 자동으로 일어나서 task 가 재분배돼요.

한계 — Partition 수

Partition 8 = 최대 8 instance 까지 의미 있음
9 번째 instance = idle

스케일링 한계는 max partition 수예요. 그러니 처음에 충분한 partition 으로 만들어 두는 게 좋아요.

6. Time Semantics

Event Time     = 메시지가 *실제 발생* 한 시간 (외부 시스템 timestamp)
Ingestion Time = Kafka broker 가 *받은* 시간 (LogAppendTime)
Processing Time = Stream 처리 시간 (now)

Stream 처리는 정확한 windowing 을 위해 Event Time 기반이 일반적이에요.

Timestamp Extractor

public class MyTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        MyEvent event = (MyEvent) record.value();
        return event.getOccurredAt();   // 비즈니스 시각
    }
}

props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
          MyTimestampExtractor.class);

기본값은 FailOnInvalidTimestamp (record 의 timestamp 를 그대로 사용) 이고, Custom Extractor 로 value 안의 시각을 직접 뽑아낼 수 있어요.

Stream Time

각 task 의 현재 처리 중인 메시지 timestamp 가 stream time 이에요. Windowing 과 grace period(지연 허용 구간) 가 이 값으로 결정돼요.

7. EOS — Exactly-Once V2

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.EXACTLY_ONCE_V2);

한 줄 설정으로 다음이 자동으로 들어가요:

  • Transactional Producer
  • Transactional Consumer (read_committed)
  • State store + changelog 의 atomic 보장

자세한 transaction protocol 은 88·112편에서 다뤘어요.

EOS V1 vs V2

항목 V1 V2
Producer per task 별도 (메모리 큼) 공유 (효율적)
성능 보통 빠름
권장 deprecated 표준 (Kafka 3.0+)

V2 = 기본 권장.

8. State Store

stateful 처리 (count·aggregate·join) 에는 local state store(로컬 상태 저장소) 가 필요해요.

  • RocksDB(임베디드 키-값 DB) (기본) = embedded key-value DB, disk 기반
  • In-Memory = Stores.inMemoryKeyValueStore(), fast 하지만 메모리만

state store 는 changelog topic(상태 변경 로그) 에 backup 돼요:

state store: my-count-store
changelog topic: <app-id>-my-count-store-changelog

재시작 시에는 changelog 부터 replay(재생) 해서 복구해요.

9. Standby Replicas

props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

각 task 의 standby replica(예비 복제본) 가 다른 instance 에서 state 를 미리 복구해 둬요. 한 instance 가 죽으면 standby 가 즉시 활성화돼서 fast failover 가 가능해져요.

기본값은 0이고, 운영 환경에서는 1~2를 권장해요.

10. Cooperative Rebalance

props.put(StreamsConfig.UPGRADE_FROM_CONFIG, "...");
// 또는 (Kafka 2.4+ 기본)

111편에서 본 Cooperative Sticky(점진적 고정 재분배) 가 Streams 의 기본이에요. Stop-the-world(전체 정지) 없이 점진적으로 rebalance 가 일어나요.

11. Topology Description

System.out.println(topology.describe());

출력:

Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000002
      <-- KSTREAM-SOURCE-0000000000
    ...

운영과 디버깅에 매우 유용해요.

12. Streams DSL → Topology 변환

KStream<String, String> input = builder.stream("input");
input.filter(...).mapValues(...).to("output");

DSL 이 자동으로 processor node graph 를 생성해 줘서, 사용자가 topology 자체를 손으로 짤 일은 없어요.

세밀한 제어가 필요하면 Processor API (126편) 로 내려가요.

13. 모니터링 핵심

JMX(Java 모니터링 표준):

  • stream-thread-metrics:poll-rate — poll 빈도
  • stream-thread-metrics:process-rate — 처리 throughput
  • stream-task-metrics:process-latency-avg — 처리 latency
  • stream-state-metrics:put-rate — state store write rate
  • stream-task-metrics:commit-rate — commit rate

가장 중요한 지표는 lag(consumer group lag 와 같은 개념) 이에요.

한계·실무 함정

1. Partition 수 과소

스케일링 한계가 곧 partition 수예요. 그래서 처음에 충분한 partition 으로 잡아 두는 게 좋아요 (예: instance 의 4~8배).

2. num.stream.threads 너무 많음

CPU 수보다 많으면 context switch(문맥 전환) 부담이 커져요. CPU core 수 정도로 맞춰요.

3. State store 디스크 폭증

큰 dataset 은 큰 state store 로 이어져요. 모니터링과 retention(보관 기간) 검토가 필요해요.

4. Standby replicas 미설정

instance 가 죽으면 state 를 재로드하는 데 수 분에서 수 시간이 걸려요. num.standby.replicas=1+ 를 권장해요.

5. application.id 변경

state 가 다시 빌드되니까 신중히 다뤄야 해요.

6. Time semantics 오해

Processing Time 기반 windowing 은 out-of-order event(순서 어긋난 이벤트) 를 잘못 처리해요. Event Time 기반이 권장이에요.

시험 직전 한 번 더 — Kafka Streams Core Concepts 함정 압축 노트

  • 4계층 = Topology → SubTopology → Task → Thread → Instance
  • Topology = Processing graph (Source → Processor → Sink)
  • SubTopology = repartition 지점에서 분할
  • Task = SubTopology + Partition assignment = 실행 단위
  • Thread = num.stream.threads, instance 안 여러 thread
  • Instance = application process, 같은 application.id = 같은 application
  • 총 task = N × max(partitions)
  • 총 thread = T × I
  • Scaling = instance 추가 → 자동 rebalance (Cooperative)
  • 한계 = max partition 수 (그 이상 instance = idle)
  • Time Semantics 3가지 — Event Time (정확) · Ingestion Time · Processing Time
  • Timestamp Extractor = custom 으로 비즈니스 시각 추출
  • Stream Time = task 의 현재 처리 timestamp
  • EOS V2 (Kafka 3.0+) = PROCESSING_GUARANTEE_CONFIG=EXACTLY_ONCE_V2 한 줄
  • V2 = producer 공유 (V1 보다 효율)
  • State Store = RocksDB (기본) or In-Memory
  • Changelog topic 으로 backup, 재시작 시 복구
  • Standby Replicas = num.standby.replicas=1+ = fast failover
  • Cooperative Rebalance = Kafka 2.4+ 기본, stop-the-world 없음
  • Topology Description = topology.describe() 운영 도구
  • 모니터링 = JMX poll-rate·process-rate·process-latency·put-rate·commit-rate
  • 가장 중요 = lag
  • 함정 — Partition 과소 (스케일링 한계)
  • 함정 — num.stream.threads 너무 많음 (context switch)
  • 함정 — State store 디스크 폭증
  • 함정 — Standby 미설정 (state 재로드 부담)
  • 함정 — application.id 변경 = state 재빌드
  • 함정 — Time semantics 오해 (Event Time 권장)

공식 문서: Kafka Streams Core Concepts 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!