백엔드 데이터 인프라 126편. Kafka Streams Processor API — Topology 직접 구성, Custom Processor 작성, Punctuator (시간 기반 작업), State Store 직접 다루기, DSL ↔ Processor 변환 같은 low-level 제어 영역까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 126편이에요. 125편 에서 DSL (선언형 stream 처리 API) 의 풍부한 operation 을 잡았다면, 이번 126편은 DSL 로 안 풀리는 영역 — Processor API (저수준 stream 처리 API).
언제 Processor API 가 필요한가
DSL = 90% 환경 충분. Processor API 가 필요한 경우:
- Time-based 작업 (Punctuator — 주기적 호출 콜백) — 1초마다 무언가
- State Store (Kafka Streams 의 상태 저장소) 직접 조작 — 복잡한 read-modify-write
- Custom Aggregation (사용자 정의 집계) — DSL aggregate 로 안 풀림
- Per-record routing — 동적 sink 결정
- 외부 시스템 직접 호출 — DB·HTTP
대부분 = DSL + 일부 Processor API 결합.
Topology 직접 구성
DSL 안 쓰고 직접:
Topology topology = new Topology();
topology
.addSource("source-node", "input-topic")
.addProcessor("my-processor", MyProcessor::new, "source-node")
.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(), Serdes.Long()
),
"my-processor"
)
.addSink("sink-node", "output-topic", "my-processor");
KafkaStreams streams = new KafkaStreams(topology, props);
핵심:
addSource= Kafka topic 에서 readaddProcessor= 사용자 로직 (Processor 클래스)addStateStore= 상태 저장소 (특정 processor 에 연결)addSink= Kafka topic 에 write
각 node = 이름 + 부모 노드들. DAG (방향성 비순환 그래프) 구성.
Processor 인터페이스 (Kafka 3.0+)
public class MyProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.store = context.getStateStore("my-store");
// Punctuator 등록 (10초마다)
context.schedule(
Duration.ofSeconds(10),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
// 주기적 작업
store.all().forEachRemaining(kv -> {
log.info("State: {} = {}", kv.key, kv.value);
});
}
);
}
@Override
public void process(Record<String, String> record) {
// Record 처리
Long count = store.get(record.key());
if (count == null) count = 0L;
count++;
store.put(record.key(), count);
// 다음 processor 또는 sink 로 forward
context.forward(record.withValue(record.value() + "-counted-" + count));
}
@Override
public void close() {
// 정리
}
}
핵심 method:
init(context)= 초기화·state store 가져오기·Punctuator 등록process(record)= 각 record 처리context.forward(record)= 다음 processor 로 전달close()= 정리
Punctuator — 시간 기반 작업
context.schedule(
Duration.ofSeconds(10),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
// 10초마다 호출
}
);
PunctuationType 2가지
STREAM_TIME — Event Time 기반
PunctuationType.STREAM_TIME
Stream 의 record timestamp 가 진행할 때만 호출. Event time 정확.
record A (t=100) 처리
record B (t=105) 처리
record C (t=115) 처리 → punctuator 호출 (10s 지났음)
장점은 out-of-order 안전 하고 deterministic 하다는 것, 단점은 record 가 안 오면 호출되지 않는다는 것.
WALL_CLOCK_TIME — Real Time
PunctuationType.WALL_CLOCK_TIME
실제 시계 시간 기준. Record 와 무관.
장점은 언제든 호출된다는 것, 단점은 out-of-order event 와 동기화되지 않는다는 것.
활용 사례
Punctuator 가 잘 들어맞는 자리는 오래된 entry 를 비우는 state store cleanup, 누적 데이터를 모아 한 번에 내보내는 external system flush, 주기적 상태 보고를 위한 alert·heartbeat, 그리고 늦게 도착한 event 를 정리하는 watermark 처리 정도.
State Store — 직접 다루기
Persistent KeyValueStore
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
);
topology.addStateStore(storeBuilder, "my-processor");
사용
KeyValueStore<String, Long> store = context.getStateStore("my-store");
store.put("key1", 100L);
Long value = store.get("key1");
store.delete("key1");
// Range query
KeyValueIterator<String, Long> iter = store.range("a", "z");
while (iter.hasNext()) {
KeyValue<String, Long> kv = iter.next();
// ...
}
iter.close();
Window Store
StoreBuilder<WindowStore<String, Long>> windowStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore("window-store",
Duration.ofHours(24), // retention
Duration.ofMinutes(5), // window size
false),
Serdes.String(),
Serdes.Long()
);
Window 별 state. 자주 쓰는 자리 = windowed aggregate (125편 DSL 의 내부).
Session Store
SessionStore<String, Long>
Session window 용.
DSL 안에서 Processor API 사용
DSL chain 안에서도 Processor API 호출 가능:
KStream<String, String> input = builder.stream("input");
input
.filter(...)
.process(MyProcessor::new, "my-state-store")
.to("output");
process (Kafka 3.0+) = Processor API 통합. State store 미리 등록 필요.
Transform vs Process
// Transform — 새 stream 반환 (downstream 계속)
KStream<String, String> result = input.transform(MyTransformer::new, "store");
result.to("output");
// Process — terminal (downstream 없음)
input.process(MyProcessor::new, "store");
transform = downstream 가능, process = terminal.
모니터링 + Metrics
Processor 안에서 custom metric 등록:
@Override
public void init(ProcessorContext<String, String> context) {
Sensor sensor = context.metrics().addSensor("my-processor-rate",
Sensor.RecordingLevel.INFO);
sensor.add(new MetricName("processed-rate", "my-group", "...", Map.of()),
new Rate());
}
@Override
public void process(Record<String, String> record) {
sensor.record(1.0);
// ...
}
JMX (자바 표준 모니터링 인터페이스) 로 노출.
운영 사례 — 외부 시스템 직접 호출
public class EnrichmentProcessor implements Processor<String, Order, String, EnrichedOrder> {
private ExternalApiClient apiClient;
@Override
public void init(ProcessorContext<String, EnrichedOrder> context) {
apiClient = new ExternalApiClient();
}
@Override
public void process(Record<String, Order> record) {
Order order = record.value();
UserProfile profile = apiClient.getUserProfile(order.getUserId());
EnrichedOrder enriched = new EnrichedOrder(order, profile);
context.forward(record.withValue(enriched));
}
}
외부 HTTP API 호출 같은 영역. DSL 로는 표현 어려움.
여기서 시험 함정이 하나 있어요 — 외부 호출은 latency·throughput 영향. async batch·cache·local DB lookup 같은 패턴으로 최적화.
운영 사례 — Punctuator + State Cleanup
public class StatefulCleanupProcessor implements Processor<String, Event, String, Event> {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, Event> context) {
this.store = context.getStateStore("event-store");
// 매 1시간 마다 오래된 entry 제거
context.schedule(
Duration.ofHours(1),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
KeyValueIterator<String, Long> iter = store.all();
long cutoff = timestamp - Duration.ofDays(7).toMillis();
while (iter.hasNext()) {
KeyValue<String, Long> kv = iter.next();
if (kv.value < cutoff) {
store.delete(kv.key);
}
}
iter.close();
}
);
}
}
DSL 로는 표현 어려운 주기적 cleanup 패턴.
DSL vs Processor API — 결정 가이드
| 시나리오 | 선택 |
|---|---|
| 표준 변환·집계·join·window | DSL |
| 주기적 작업 (Punctuator) | Processor API |
| Complex state manipulation | Processor API |
| 외부 시스템 직접 호출 | Processor API |
| 동적 routing | Processor API |
| Per-record sink 결정 | Processor API |
| 혼합 | DSL .process() 또는 .transform() |
대부분 환경 = DSL 위주 + 특수 영역만 Processor API.
한계·실무 함정
1. Topology 복잡도
직접 Topology 구성 = 코드 복잡·실수 위험. DSL 권장.
2. State Store 등록 누락
addStateStore 와 addProcessor 연결 누락 = processor cannot access state store 에러.
3. Punctuator 의 STREAM_TIME 함정
Record 가 안 오면 호출 X. 알람·heartbeat 는 WALL_CLOCK_TIME.
4. 외부 호출 blocking
외부 API 호출 = stream thread 블로킹. timeout 짧게 + async 검토.
5. State Store iter close
iterator.close() 누락 = resource leak. try-with-resources 또는 finally.
시험 직전 한 번 더 — Processor API 함정 압축 노트
- 언제 = Time-based 작업 · State 복잡 · 외부 시스템 호출 · 동적 routing
- Topology 직접 구성 —
addSource·addProcessor·addStateStore·addSink - Processor 인터페이스 (Kafka 3.0+) —
init·process·close init= state store + Punctuator 등록process= 각 record 처리context.forward= downstream 으로 전달- Punctuator =
context.schedule(Duration, PunctuationType, ...) STREAM_TIME= event time 기반, record 진행 시WALL_CLOCK_TIME= 실제 시계, 항상 호출- 활용 = state cleanup · external flush · alert · watermark
- State Store —
KeyValueStore·WindowStore·SessionStore Stores.keyValueStoreBuilder(...persistentKeyValueStore("name"), keySerde, valueSerde)- 사용 —
put·get·delete·range - DSL 안 통합 =
process(...)·transform(...) - transform = downstream, process = terminal
- Custom Metric =
context.metrics().addSensor() - 운영 사례 — 외부 enrichment, Punctuator state cleanup
- DSL 위주 + 특수 영역만 Processor API
- 함정 — Topology 복잡도
- 함정 — State Store 등록 누락
- 함정 — STREAM_TIME 의 record 안 오면 호출 X
- 함정 — 외부 호출 blocking
- 함정 — Iterator close 누락
공식 문서: Kafka Streams Processor API 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 121편 — Kafka Streams 입문 (라이브러리 모델·왜)
- 122편 — Kafka Streams Quickstart (WordCount 5분)
- 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)
- 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)
- 125편 — Kafka Streams DSL (변환·집계·Join·Window)
다음 글: