백엔드 데이터 인프라 126편 — Kafka Streams Processor API (Low-Level)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 126편. Kafka Streams Processor API — Topology 직접 구성, Custom Processor 작성, Punctuator (시간 기반 작업), State Store 직접 다루기, DSL ↔ Processor 변환 같은 low-level 제어 영역까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 126편 — Kafka Streams Processor API (Low-Level)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 126편이에요. 125편 에서 DSL (선언형 stream 처리 API) 의 풍부한 operation 을 잡았다면, 이번 126편은 DSL 로 안 풀리는 영역Processor API (저수준 stream 처리 API).

언제 Processor API 가 필요한가

DSL = 90% 환경 충분. Processor API 가 필요한 경우:

  • Time-based 작업 (Punctuator — 주기적 호출 콜백) — 1초마다 무언가
  • State Store (Kafka Streams 의 상태 저장소) 직접 조작 — 복잡한 read-modify-write
  • Custom Aggregation (사용자 정의 집계) — DSL aggregate 로 안 풀림
  • Per-record routing — 동적 sink 결정
  • 외부 시스템 직접 호출 — DB·HTTP

대부분 = DSL + 일부 Processor API 결합.

Topology 직접 구성

DSL 안 쓰고 직접:

Topology topology = new Topology();

topology
    .addSource("source-node", "input-topic")
    .addProcessor("my-processor", MyProcessor::new, "source-node")
    .addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"),
            Serdes.String(), Serdes.Long()
        ),
        "my-processor"
    )
    .addSink("sink-node", "output-topic", "my-processor");

KafkaStreams streams = new KafkaStreams(topology, props);

핵심:

  • addSource = Kafka topic 에서 read
  • addProcessor = 사용자 로직 (Processor 클래스)
  • addStateStore = 상태 저장소 (특정 processor 에 연결)
  • addSink = Kafka topic 에 write

각 node = 이름 + 부모 노드들. DAG (방향성 비순환 그래프) 구성.

Processor 인터페이스 (Kafka 3.0+)

public class MyProcessor implements Processor<String, String, String, String> {

    private ProcessorContext<String, String> context;
    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        this.store = context.getStateStore("my-store");

        // Punctuator 등록 (10초마다)
        context.schedule(
            Duration.ofSeconds(10),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> {
                // 주기적 작업
                store.all().forEachRemaining(kv -> {
                    log.info("State: {} = {}", kv.key, kv.value);
                });
            }
        );
    }

    @Override
    public void process(Record<String, String> record) {
        // Record 처리
        Long count = store.get(record.key());
        if (count == null) count = 0L;
        count++;
        store.put(record.key(), count);

        // 다음 processor 또는 sink 로 forward
        context.forward(record.withValue(record.value() + "-counted-" + count));
    }

    @Override
    public void close() {
        // 정리
    }
}

핵심 method:

  • init(context) = 초기화·state store 가져오기·Punctuator 등록
  • process(record) = 각 record 처리
  • context.forward(record) = 다음 processor 로 전달
  • close() = 정리

Punctuator — 시간 기반 작업

context.schedule(
    Duration.ofSeconds(10),
    PunctuationType.WALL_CLOCK_TIME,
    timestamp -> {
        // 10초마다 호출
    }
);

PunctuationType 2가지

STREAM_TIME — Event Time 기반

PunctuationType.STREAM_TIME

Stream 의 record timestamp 가 진행할 때만 호출. Event time 정확.

record A (t=100) 처리
record B (t=105) 처리
record C (t=115) 처리  → punctuator 호출 (10s 지났음)

장점은 out-of-order 안전 하고 deterministic 하다는 것, 단점은 record 가 안 오면 호출되지 않는다는 것.

WALL_CLOCK_TIME — Real Time

PunctuationType.WALL_CLOCK_TIME

실제 시계 시간 기준. Record 와 무관.

장점은 언제든 호출된다는 것, 단점은 out-of-order event 와 동기화되지 않는다는 것.

활용 사례

Punctuator 가 잘 들어맞는 자리는 오래된 entry 를 비우는 state store cleanup, 누적 데이터를 모아 한 번에 내보내는 external system flush, 주기적 상태 보고를 위한 alert·heartbeat, 그리고 늦게 도착한 event 를 정리하는 watermark 처리 정도.

State Store — 직접 다루기

Persistent KeyValueStore

StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    );

topology.addStateStore(storeBuilder, "my-processor");

사용

KeyValueStore<String, Long> store = context.getStateStore("my-store");

store.put("key1", 100L);
Long value = store.get("key1");
store.delete("key1");

// Range query
KeyValueIterator<String, Long> iter = store.range("a", "z");
while (iter.hasNext()) {
    KeyValue<String, Long> kv = iter.next();
    // ...
}
iter.close();

Window Store

StoreBuilder<WindowStore<String, Long>> windowStore =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore("window-store",
            Duration.ofHours(24),     // retention
            Duration.ofMinutes(5),     // window size
            false),
        Serdes.String(),
        Serdes.Long()
    );

Window 별 state. 자주 쓰는 자리 = windowed aggregate (125편 DSL 의 내부).

Session Store

SessionStore<String, Long>

Session window 용.

DSL 안에서 Processor API 사용

DSL chain 안에서도 Processor API 호출 가능:

KStream<String, String> input = builder.stream("input");

input
    .filter(...)
    .process(MyProcessor::new, "my-state-store")
    .to("output");

process (Kafka 3.0+) = Processor API 통합. State store 미리 등록 필요.

Transform vs Process

// Transform — 새 stream 반환 (downstream 계속)
KStream<String, String> result = input.transform(MyTransformer::new, "store");
result.to("output");

// Process — terminal (downstream 없음)
input.process(MyProcessor::new, "store");

transform = downstream 가능, process = terminal.

모니터링 + Metrics

Processor 안에서 custom metric 등록:

@Override
public void init(ProcessorContext<String, String> context) {
    Sensor sensor = context.metrics().addSensor("my-processor-rate",
        Sensor.RecordingLevel.INFO);
    sensor.add(new MetricName("processed-rate", "my-group", "...", Map.of()),
               new Rate());
}

@Override
public void process(Record<String, String> record) {
    sensor.record(1.0);
    // ...
}

JMX (자바 표준 모니터링 인터페이스) 로 노출.

운영 사례 — 외부 시스템 직접 호출

public class EnrichmentProcessor implements Processor<String, Order, String, EnrichedOrder> {

    private ExternalApiClient apiClient;

    @Override
    public void init(ProcessorContext<String, EnrichedOrder> context) {
        apiClient = new ExternalApiClient();
    }

    @Override
    public void process(Record<String, Order> record) {
        Order order = record.value();
        UserProfile profile = apiClient.getUserProfile(order.getUserId());

        EnrichedOrder enriched = new EnrichedOrder(order, profile);
        context.forward(record.withValue(enriched));
    }
}

외부 HTTP API 호출 같은 영역. DSL 로는 표현 어려움.

여기서 시험 함정이 하나 있어요 — 외부 호출은 latency·throughput 영향. async batch·cache·local DB lookup 같은 패턴으로 최적화.

운영 사례 — Punctuator + State Cleanup

public class StatefulCleanupProcessor implements Processor<String, Event, String, Event> {

    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext<String, Event> context) {
        this.store = context.getStateStore("event-store");

        // 매 1시간 마다 오래된 entry 제거
        context.schedule(
            Duration.ofHours(1),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> {
                KeyValueIterator<String, Long> iter = store.all();
                long cutoff = timestamp - Duration.ofDays(7).toMillis();
                while (iter.hasNext()) {
                    KeyValue<String, Long> kv = iter.next();
                    if (kv.value < cutoff) {
                        store.delete(kv.key);
                    }
                }
                iter.close();
            }
        );
    }
}

DSL 로는 표현 어려운 주기적 cleanup 패턴.

DSL vs Processor API — 결정 가이드

시나리오 선택
표준 변환·집계·join·window DSL
주기적 작업 (Punctuator) Processor API
Complex state manipulation Processor API
외부 시스템 직접 호출 Processor API
동적 routing Processor API
Per-record sink 결정 Processor API
혼합 DSL .process() 또는 .transform()

대부분 환경 = DSL 위주 + 특수 영역만 Processor API.

한계·실무 함정

1. Topology 복잡도

직접 Topology 구성 = 코드 복잡·실수 위험. DSL 권장.

2. State Store 등록 누락

addStateStoreaddProcessor 연결 누락 = processor cannot access state store 에러.

3. Punctuator 의 STREAM_TIME 함정

Record 가 안 오면 호출 X. 알람·heartbeat 는 WALL_CLOCK_TIME.

4. 외부 호출 blocking

외부 API 호출 = stream thread 블로킹. timeout 짧게 + async 검토.

5. State Store iter close

iterator.close() 누락 = resource leak. try-with-resources 또는 finally.

시험 직전 한 번 더 — Processor API 함정 압축 노트

  • 언제 = Time-based 작업 · State 복잡 · 외부 시스템 호출 · 동적 routing
  • Topology 직접 구성addSource·addProcessor·addStateStore·addSink
  • Processor 인터페이스 (Kafka 3.0+) — init·process·close
  • init = state store + Punctuator 등록
  • process = 각 record 처리
  • context.forward = downstream 으로 전달
  • Punctuator = context.schedule(Duration, PunctuationType, ...)
  • STREAM_TIME = event time 기반, record 진행 시
  • WALL_CLOCK_TIME = 실제 시계, 항상 호출
  • 활용 = state cleanup · external flush · alert · watermark
  • State StoreKeyValueStore·WindowStore·SessionStore
  • Stores.keyValueStoreBuilder(...persistentKeyValueStore("name"), keySerde, valueSerde)
  • 사용 — put·get·delete·range
  • DSL 안 통합 = process(...) · transform(...)
  • transform = downstream, process = terminal
  • Custom Metric = context.metrics().addSensor()
  • 운영 사례 — 외부 enrichment, Punctuator state cleanup
  • DSL 위주 + 특수 영역만 Processor API
  • 함정 — Topology 복잡도
  • 함정 — State Store 등록 누락
  • 함정 — STREAM_TIME 의 record 안 오면 호출 X
  • 함정 — 외부 호출 blocking
  • 함정 — Iterator close 누락

공식 문서: Kafka Streams Processor API 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!