백엔드 데이터 인프라 127편. Kafka Streams Stateful + Interactive Queries — Materialized state store, IQ v1·v2 API, HTTP endpoint 로 외부 query, Standby Replica·RocksDB tuning·Memory Management 까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 127편이에요. 126편 까지 Processor API 를 잡았다면, 이번 127편은 state 의 외부 접근 — Interactive Queries (IQ).
Interactive Queries 란
Kafka Streams 가 안에 들고 있는 state store (집계 결과를 담아 두는 내장 저장소) 를 HTTP API 같은 외부 경로로 조회하게 해 주는 기능이에요.
[Streams App with state store]
↑
│ HTTP REST query
│
[External Client]
활용:
- 실시간 dashboard — count·aggregate 결과 즉시 query
- Microservice 조회 API — 다른 서비스가 state 조회
- Lookup service — 사용자별 통계·세션 조회
State Store 명명 (Materialized)
KTable<String, Long> counts = input
.groupByKey()
.count(Materialized.as("word-counts-store"));
Materialized.as("word-counts-store") 로 붙인 이름이 바로 외부에서 이 store 에 접근할 때 쓰는 식별자예요.
IQ v1 — 옛 API (Kafka < 3.0)
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"word-counts-store",
QueryableStoreTypes.keyValueStore()
)
);
Long count = store.get("hello");
한계 — 이 instance 의 local state 만 보여요. 다른 instance 가 들고 있는 state 는 여기서 바로 못 꺼냅니다.
IQ v2 — 새 API (Kafka 3.0+)
KeyQuery<String, ValueAndTimestamp<Long>> query = KeyQuery.withKey("hello");
StateQueryRequest<ValueAndTimestamp<Long>> request = StateQueryRequest
.inStore("word-counts-store")
.withQuery(query);
StateQueryResult<ValueAndTimestamp<Long>> result = streams.query(request);
QueryResult<ValueAndTimestamp<Long>> queryResult = result.getOnlyPartitionResult();
Long count = queryResult.getResult().value();
더 풍부한 Query type:
KeyQuery— 단일 keyRangeQuery— rangeWindowKeyQuery— windowed storeWindowRangeQuery- Custom
여기서 시험 함정이 하나 있어요. State 가 여러 partition·instance 에 흩어져 있으니, key 가 어느 partition 에 들어갔는지부터 확인하고 그 instance 로 query 를 보내야 합니다.
Multi-Instance — 분산 State 처리
Instance A (partition 0, 1)
Instance B (partition 2, 3)
Key "hello" → partition 1 → Instance A
Key "world" → partition 3 → Instance B
이러면 외부 client 가 어느 instance 한테 query 를 던질지를 직접 정해 줘야 해요.
streams.metadataForKey()
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"word-counts-store",
"hello",
Serdes.String().serializer()
);
HostInfo activeHost = metadata.activeHost();
// activeHost = Instance A
// HTTP redirect 또는 직접 query
이 metadata 만 있으면 올바른 instance 로 routing 할 수 있어요.
HTTP Endpoint — Spring Boot
@RestController
public class StateController {
@Autowired
private StreamsBuilderFactoryBean streamsFactory;
@GetMapping("/count/{word}")
public CountResponse getCount(@PathVariable String word) {
KafkaStreams streams = streamsFactory.getKafkaStreams();
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"word-counts-store", word, Serdes.String().serializer()
);
if (metadata == null || metadata == KeyQueryMetadata.NOT_AVAILABLE) {
return new CountResponse(word, null, "not available");
}
if (isThisInstance(metadata.activeHost())) {
// Local query
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"word-counts-store",
QueryableStoreTypes.keyValueStore()
)
);
return new CountResponse(word, store.get(word), "local");
} else {
// Remote — HTTP redirect 또는 RestTemplate
String url = String.format("http://%s:%d/count/%s",
metadata.activeHost().host(), metadata.activeHost().port(), word);
return restTemplate.getForObject(url, CountResponse.class);
}
}
}
각 instance 는 자기가 들고 있는 partition 의 state 만 직접 조회하고, 다른 partition 으로 가야 하는 요청은 HTTP redirect 로 넘겨요.
Application Server Config
spring:
kafka:
streams:
properties:
application.server: streams-app-1.example.com:8080
이렇게 각 instance 가 자기 HTTP endpoint 를 등록해 두면, 다른 instance 의 metadataForKey 가 그 값을 보고 routing 합니다.
Standby Replicas — Fast Failover
spring:
kafka:
streams:
properties:
num.standby.replicas: 1
active task 마다 다른 instance 에 standby 를 하나씩 두고 state 를 미리 복구해 두는 구조예요. active instance 가 죽으면 standby 가 곧장 promote 됩니다.
IQ 가용성 향상 — active 가 내려가도 standby 가 그 자리에서 query 응답을 받아 줘요.
Standby 에서 query
KeyQueryMetadata metadata = streams.queryMetadataForKey(...);
HostInfo active = metadata.activeHost();
Set<HostInfo> standby = metadata.standbyHosts();
// active 가 down 이면 standby 시도
운영에서 이걸 그대로 쓰려면 stale read (커밋 직전 값을 약간 늦게 보는 것) 를 허용하는 환경이어야 해요.
RocksDB 튜닝
State store 의 기본 엔진은 RocksDB (embedded key-value DB) 입니다.
spring:
kafka:
streams:
properties:
rocksdb.config.setter: com.example.MyRocksDBConfig
public class MyRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
options.setWriteBufferSize(64 * 1024 * 1024L); // 64MB
options.setMaxWriteBufferNumber(4);
options.setMaxBackgroundJobs(8);
// ... tuning
}
@Override
public void close(String storeName, Options options) { }
}
state store 가 커지면 RocksDB 튜닝은 거의 필수예요.
State Store Cache
spring:
kafka:
streams:
properties:
cache.max.bytes.buffering: 10485760 # 10MB
RocksDB 위에 in-memory cache 가 한 겹 얹혀 있어요. cache hit 이 잘 나면 downstream throughput 이 올라가는 대신, 버퍼링 때문에 latency 가 살짝 늘어납니다.
Memory Management
JVM Heap
↑
Kafka Streams (코드 자체)
+ RocksDB native memory (off-heap)
+ State store cache (in-heap)
RocksDB 는 off-heap (JVM heap 밖에서 OS 가 직접 잡는 메모리) 에 자리를 잡기 때문에, state 가 커지면 OS 메모리 점유가 갑자기 튀어 오를 수 있어요.
대략 총 메모리는 JVM heap + RocksDB write buffer × num_partitions × instances 로 잡아 두면 됩니다.
Common Patterns
Real-time Dashboard
Browser → HTTP → Streams App → state store → JSON 응답
dashboard 가 수 초 간격으로 polling 을 돌리고, 응답은 수십~수백 ms 안에 떨어지는 식이에요.
Microservice Lookup
Service A → HTTP → Streams App → state store → 사용자 통계 응답
다른 microservice 가 Kafka topic 을 거치지 않고 Streams app 한테 바로 query 를 던지는 패턴입니다.
Aggregate API
Streams: 1시간 window 매출 집계
↓ Materialized.as("hourly-revenue-store")
External: HTTP GET /revenue/2026-05-17/14:00
→ IQ WindowKeyQuery
→ "2026-05-17 14:00 매출 $5,200"
한계·실무 함정
1. State 일관성
Stream 처리 중 아직 commit 안 된 state 도 query 에 그대로 잡혀요. stale read 가 허용되는 환경에서만 안전하게 씁니다.
2. Network overhead
다른 instance 로 HTTP redirect 가 한 번 더 들어가면 그만큼 latency 가 붙고, query 가 대량으로 쏟아지면 부담도 커져요.
3. Standby 의 lag
standby 는 active 보다 항상 조금 뒤처져 있어요. 정확한 일관성이 필요하면 active 쪽으로만 query 를 보내야 합니다.
4. RocksDB 메모리
off-heap 메모리는 JVM heap 바깥, OS 가 잡는 영역이라 모니터링과 튜닝을 따로 챙겨야 해요.
5. State store 이름 변경
기존 state 와 이름이 달라지면 state 를 처음부터 다시 빌드해야 합니다.
시험 직전 한 번 더 — Stateful + IQ 함정 압축 노트
- Interactive Queries (IQ) = state store 외부 query
- Materialized.as("name") = state store 명명 (IQ 용)
- IQ v1 (옛) =
streams.store(StoreQueryParameters)— 단순 - IQ v2 (Kafka 3.0+) =
streams.query(StateQueryRequest)— 풍부한 Query type - Query Types —
KeyQuery·RangeQuery·WindowKeyQuery·WindowRangeQuery - State 분산 — Key 가 어느 partition·instance 인지 결정
streams.queryMetadataForKey()= active·standby host 정보- HTTP Endpoint = Spring Boot REST + local/remote routing
application.server= 각 instance 가 자기 endpoint 등록- Standby Replicas =
num.standby.replicas=1+= fast failover + standby query metadata.standbyHosts()= standby 도 query 가능 (stale read)- RocksDB 튜닝 =
RocksDBConfigSetter(write buffer·background jobs) - State Store Cache =
cache.max.bytes.buffering(in-heap) - cache hit = throughput ↑, latency ↑ (buffered)
- 메모리 = JVM heap + RocksDB off-heap (큰 state 면 OS 메모리 폭증)
- 패턴 — Real-time dashboard · Microservice lookup · Aggregate API
- 함정 — Stale read 허용 환경에만
- 함정 — 다른 instance HTTP redirect latency
- 함정 — Standby lag
- 함정 — RocksDB off-heap 메모리
- 함정 — State store 이름 변경 = 재빌드
공식 문서: Kafka Streams Interactive Queries 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 122편 — Kafka Streams Quickstart (WordCount 5분)
- 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)
- 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)
- 125편 — Kafka Streams DSL (변환·집계·Join·Window)
- 126편 — Kafka Streams Processor API (Low-Level)
다음 글: