백엔드 데이터 인프라 127편 — Kafka Streams Stateful + Interactive Queries

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 127편. Kafka Streams Stateful + Interactive Queries — Materialized state store, IQ v1·v2 API, HTTP endpoint 로 외부 query, Standby Replica·RocksDB tuning·Memory Management 까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 127편 — Kafka Streams Stateful + Interactive Queries

이 글은 백엔드 데이터 인프라 시리즈 130편 중 127편이에요. 126편 까지 Processor API 를 잡았다면, 이번 127편은 state 의 외부 접근Interactive Queries (IQ).

Interactive Queries 란

Kafka Streams 가 안에 들고 있는 state store (집계 결과를 담아 두는 내장 저장소) 를 HTTP API 같은 외부 경로로 조회하게 해 주는 기능이에요.

[Streams App with state store]
        ↑
        │ HTTP REST query
        │
[External Client]

활용:

  • 실시간 dashboard — count·aggregate 결과 즉시 query
  • Microservice 조회 API — 다른 서비스가 state 조회
  • Lookup service — 사용자별 통계·세션 조회

State Store 명명 (Materialized)

KTable<String, Long> counts = input
    .groupByKey()
    .count(Materialized.as("word-counts-store"));

Materialized.as("word-counts-store") 로 붙인 이름이 바로 외부에서 이 store 에 접근할 때 쓰는 식별자예요.

IQ v1 — 옛 API (Kafka < 3.0)

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "word-counts-store",
        QueryableStoreTypes.keyValueStore()
    )
);

Long count = store.get("hello");

한계 — 이 instance 의 local state 만 보여요. 다른 instance 가 들고 있는 state 는 여기서 바로 못 꺼냅니다.

IQ v2 — 새 API (Kafka 3.0+)

KeyQuery<String, ValueAndTimestamp<Long>> query = KeyQuery.withKey("hello");

StateQueryRequest<ValueAndTimestamp<Long>> request = StateQueryRequest
    .inStore("word-counts-store")
    .withQuery(query);

StateQueryResult<ValueAndTimestamp<Long>> result = streams.query(request);
QueryResult<ValueAndTimestamp<Long>> queryResult = result.getOnlyPartitionResult();
Long count = queryResult.getResult().value();

더 풍부한 Query type:

  • KeyQuery — 단일 key
  • RangeQuery — range
  • WindowKeyQuery — windowed store
  • WindowRangeQuery
  • Custom

여기서 시험 함정이 하나 있어요. State 가 여러 partition·instance 에 흩어져 있으니, key 가 어느 partition 에 들어갔는지부터 확인하고 그 instance 로 query 를 보내야 합니다.

Multi-Instance — 분산 State 처리

Instance A (partition 0, 1)
Instance B (partition 2, 3)

Key "hello" → partition 1 → Instance A
Key "world" → partition 3 → Instance B

이러면 외부 client 가 어느 instance 한테 query 를 던질지를 직접 정해 줘야 해요.

streams.metadataForKey()

KeyQueryMetadata metadata = streams.queryMetadataForKey(
    "word-counts-store",
    "hello",
    Serdes.String().serializer()
);

HostInfo activeHost = metadata.activeHost();
// activeHost = Instance A
// HTTP redirect 또는 직접 query

이 metadata 만 있으면 올바른 instance 로 routing 할 수 있어요.

HTTP Endpoint — Spring Boot

@RestController
public class StateController {

    @Autowired
    private StreamsBuilderFactoryBean streamsFactory;

    @GetMapping("/count/{word}")
    public CountResponse getCount(@PathVariable String word) {
        KafkaStreams streams = streamsFactory.getKafkaStreams();

        KeyQueryMetadata metadata = streams.queryMetadataForKey(
            "word-counts-store", word, Serdes.String().serializer()
        );

        if (metadata == null || metadata == KeyQueryMetadata.NOT_AVAILABLE) {
            return new CountResponse(word, null, "not available");
        }

        if (isThisInstance(metadata.activeHost())) {
            // Local query
            ReadOnlyKeyValueStore<String, Long> store = streams.store(
                StoreQueryParameters.fromNameAndType(
                    "word-counts-store",
                    QueryableStoreTypes.keyValueStore()
                )
            );
            return new CountResponse(word, store.get(word), "local");
        } else {
            // Remote — HTTP redirect 또는 RestTemplate
            String url = String.format("http://%s:%d/count/%s",
                metadata.activeHost().host(), metadata.activeHost().port(), word);
            return restTemplate.getForObject(url, CountResponse.class);
        }
    }
}

각 instance 는 자기가 들고 있는 partition 의 state 만 직접 조회하고, 다른 partition 으로 가야 하는 요청은 HTTP redirect 로 넘겨요.

Application Server Config

spring:
  kafka:
    streams:
      properties:
        application.server: streams-app-1.example.com:8080

이렇게 각 instance 가 자기 HTTP endpoint 를 등록해 두면, 다른 instance 의 metadataForKey 가 그 값을 보고 routing 합니다.

Standby Replicas — Fast Failover

spring:
  kafka:
    streams:
      properties:
        num.standby.replicas: 1

active task 마다 다른 instance 에 standby 를 하나씩 두고 state 를 미리 복구해 두는 구조예요. active instance 가 죽으면 standby 가 곧장 promote 됩니다.

IQ 가용성 향상 — active 가 내려가도 standby 가 그 자리에서 query 응답을 받아 줘요.

Standby 에서 query

KeyQueryMetadata metadata = streams.queryMetadataForKey(...);

HostInfo active = metadata.activeHost();
Set<HostInfo> standby = metadata.standbyHosts();

// active 가 down 이면 standby 시도

운영에서 이걸 그대로 쓰려면 stale read (커밋 직전 값을 약간 늦게 보는 것) 를 허용하는 환경이어야 해요.

RocksDB 튜닝

State store 의 기본 엔진은 RocksDB (embedded key-value DB) 입니다.

spring:
  kafka:
    streams:
      properties:
        rocksdb.config.setter: com.example.MyRocksDBConfig
public class MyRocksDBConfig implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {
        options.setWriteBufferSize(64 * 1024 * 1024L);       // 64MB
        options.setMaxWriteBufferNumber(4);
        options.setMaxBackgroundJobs(8);
        // ... tuning
    }

    @Override
    public void close(String storeName, Options options) { }
}

state store 가 커지면 RocksDB 튜닝은 거의 필수예요.

State Store Cache

spring:
  kafka:
    streams:
      properties:
        cache.max.bytes.buffering: 10485760     # 10MB

RocksDB 위에 in-memory cache 가 한 겹 얹혀 있어요. cache hit 이 잘 나면 downstream throughput 이 올라가는 대신, 버퍼링 때문에 latency 가 살짝 늘어납니다.

Memory Management

JVM Heap
  ↑
Kafka Streams (코드 자체)
  + RocksDB native memory (off-heap)
  + State store cache (in-heap)

RocksDB 는 off-heap (JVM heap 밖에서 OS 가 직접 잡는 메모리) 에 자리를 잡기 때문에, state 가 커지면 OS 메모리 점유가 갑자기 튀어 오를 수 있어요.

대략 총 메모리는 JVM heap + RocksDB write buffer × num_partitions × instances 로 잡아 두면 됩니다.

Common Patterns

Real-time Dashboard

Browser → HTTP → Streams App → state store → JSON 응답

dashboard 가 수 초 간격으로 polling 을 돌리고, 응답은 수십~수백 ms 안에 떨어지는 식이에요.

Microservice Lookup

Service A → HTTP → Streams App → state store → 사용자 통계 응답

다른 microservice 가 Kafka topic 을 거치지 않고 Streams app 한테 바로 query 를 던지는 패턴입니다.

Aggregate API

Streams: 1시간 window 매출 집계
   ↓ Materialized.as("hourly-revenue-store")

External: HTTP GET /revenue/2026-05-17/14:00
   → IQ WindowKeyQuery
   → "2026-05-17 14:00 매출 $5,200"

한계·실무 함정

1. State 일관성

Stream 처리 중 아직 commit 안 된 state 도 query 에 그대로 잡혀요. stale read 가 허용되는 환경에서만 안전하게 씁니다.

2. Network overhead

다른 instance 로 HTTP redirect 가 한 번 더 들어가면 그만큼 latency 가 붙고, query 가 대량으로 쏟아지면 부담도 커져요.

3. Standby 의 lag

standby 는 active 보다 항상 조금 뒤처져 있어요. 정확한 일관성이 필요하면 active 쪽으로만 query 를 보내야 합니다.

4. RocksDB 메모리

off-heap 메모리는 JVM heap 바깥, OS 가 잡는 영역이라 모니터링과 튜닝을 따로 챙겨야 해요.

5. State store 이름 변경

기존 state 와 이름이 달라지면 state 를 처음부터 다시 빌드해야 합니다.

시험 직전 한 번 더 — Stateful + IQ 함정 압축 노트

  • Interactive Queries (IQ) = state store 외부 query
  • Materialized.as("name") = state store 명명 (IQ 용)
  • IQ v1 (옛) = streams.store(StoreQueryParameters) — 단순
  • IQ v2 (Kafka 3.0+) = streams.query(StateQueryRequest) — 풍부한 Query type
  • Query TypesKeyQuery·RangeQuery·WindowKeyQuery·WindowRangeQuery
  • State 분산 — Key 가 어느 partition·instance 인지 결정
  • streams.queryMetadataForKey() = active·standby host 정보
  • HTTP Endpoint = Spring Boot REST + local/remote routing
  • application.server = 각 instance 가 자기 endpoint 등록
  • Standby Replicas = num.standby.replicas=1+ = fast failover + standby query
  • metadata.standbyHosts() = standby 도 query 가능 (stale read)
  • RocksDB 튜닝 = RocksDBConfigSetter (write buffer·background jobs)
  • State Store Cache = cache.max.bytes.buffering (in-heap)
  • cache hit = throughput ↑, latency ↑ (buffered)
  • 메모리 = JVM heap + RocksDB off-heap (큰 state 면 OS 메모리 폭증)
  • 패턴 — Real-time dashboard · Microservice lookup · Aggregate API
  • 함정 — Stale read 허용 환경에만
  • 함정 — 다른 instance HTTP redirect latency
  • 함정 — Standby lag
  • 함정 — RocksDB off-heap 메모리
  • 함정 — State store 이름 변경 = 재빌드

공식 문서: Kafka Streams Interactive Queries 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!