백엔드 데이터 인프라 128편. Kafka Streams Testing — TopologyTestDriver 로 Kafka 없이 빠른 단위 테스트, TestInputTopic·TestOutputTopic·time 제어·state store 검증, EmbeddedKafkaCluster 통합 테스트, Spring kafka-streams-test-utils 까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 128편이에요. 127편 까지 Stateful 처리와 IQ(Interactive Queries, 상태 조회 API) 를 잡았다면, 이번 128편은 코드 품질을 어떻게 지키느냐 — Testing 차례예요.
Stream Testing 의 두 가지 영역
- Unit Test — TopologyTestDriver(Kafka 없이 토폴로지만 돌리는 테스트 드라이버) 로 Kafka 없이 빠른 검증
- Integration Test — EmbeddedKafkaCluster(JVM 안에 띄우는 임베디드 카프카) 로 실제 Kafka 환경
실무에선 보통 unit 80%, integration 20% 비율로 가져가요.
TopologyTestDriver — Unit Test 의 핵심
의존성
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
기본 사용
@Test
public void testWordCount() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input-topic");
input.flatMapValues(v -> Arrays.asList(v.split(" ")))
.groupBy((k, w) -> w)
.count(Materialized.as("word-counts"))
.toStream()
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
// 입력
TestInputTopic<String, String> input = driver.createInputTopic(
"input-topic",
new StringSerializer(),
new StringSerializer()
);
TestOutputTopic<String, Long> output = driver.createOutputTopic(
"output-topic",
new StringDeserializer(),
new LongDeserializer()
);
input.pipeInput("key1", "hello world hello");
// 출력 검증
assertEquals(Long.valueOf(2), output.readKeyValuesToMap().get("hello"));
assertEquals(Long.valueOf(1), output.readKeyValuesToMap().get("world"));
}
}
Kafka 없이 모든 처리가 in-memory 로 끝나서 굉장히 빨라요.
TestInputTopic — 입력
TestInputTopic<String, String> input = driver.createInputTopic(...);
// 단일 record
input.pipeInput("key", "value");
// 여러 record
input.pipeKeyValueList(Arrays.asList(
new KeyValue<>("k1", "v1"),
new KeyValue<>("k2", "v2")
));
// Timestamp 명시
input.pipeInput("key", "value", Instant.parse("2026-05-17T12:00:00Z"));
TestOutputTopic — 출력
TestOutputTopic<String, Long> output = driver.createOutputTopic(...);
// 단일 record 읽기
TestRecord<String, Long> record = output.readRecord();
assertEquals("hello", record.key());
assertEquals(Long.valueOf(2), record.value());
// 모든 record 를 Map 으로
Map<String, Long> results = output.readKeyValuesToMap();
// List 로
List<KeyValue<String, Long>> all = output.readKeyValuesToList();
// 남은 record 수
int remaining = output.getQueueSize();
assertEquals(0, remaining); // 모두 처리됨
Time 제어 — Window 테스트
input.pipeInput("key", "value-1", Instant.parse("2026-05-17T12:00:00Z"));
input.pipeInput("key", "value-2", Instant.parse("2026-05-17T12:30:00Z"));
input.pipeInput("key", "value-3", Instant.parse("2026-05-17T13:30:00Z")); // 새 hourly window
Event time 을 자유롭게 밀어 넣을 수 있어서 window·session·grace period 까지 검증돼요.
Wall-clock time 진행
driver.advanceWallClockTime(Duration.ofMinutes(15));
WALL_CLOCK_TIME Punctuator(주기적 콜백 훅) 테스트에 써요.
State Store 검증
try (TopologyTestDriver driver = ...) {
// 입력 ...
// State store 직접 조회
KeyValueStore<String, Long> store = driver.getKeyValueStore("word-counts");
assertEquals(Long.valueOf(2), store.get("hello"));
// Window store
WindowStore<String, Long> windowStore = driver.getWindowStore("hourly-counts");
// ...
}
Stream 처리 결과는 output topic 과 state store 양쪽 모두 확인할 수 있어요.
운영 패턴 — Test 구성
Test Helper 메서드
public class StreamsTestHelper {
public static TopologyTestDriver createDriver(Topology topology) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-" + UUID.randomUUID());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// ... default config
return new TopologyTestDriver(topology, props);
}
}
Setup·TearDown
@BeforeEach
public void setup() {
driver = createDriver(buildTopology());
inputTopic = driver.createInputTopic("input", new StringSerializer(), new StringSerializer());
outputTopic = driver.createOutputTopic("output", new StringDeserializer(), new LongDeserializer());
}
@AfterEach
public void tearDown() {
driver.close();
}
@Test
public void testFilter() {
inputTopic.pipeInput("k", "important-event");
inputTopic.pipeInput("k", "not-important");
assertEquals(1, outputTopic.getQueueSize());
}
Integration Test — EmbeddedKafkaCluster
실제 Kafka 환경에서 검증해요. 정확도는 올라가지만 속도는 떨어져요.
의존성
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<!-- Spring Kafka 사용 시 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Spring Boot — @EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"input-topic", "output-topic"})
public class StreamsIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testStream() throws Exception {
// Producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<String, String> producer = new KafkaProducer<>(producerProps, ...);
producer.send(new ProducerRecord<>("input-topic", "key", "hello world"));
producer.flush();
// Consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "true", embeddedKafka);
Consumer<String, Long> consumer = new KafkaConsumer<>(consumerProps, ...);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic");
// 검증 — 약간 대기 후
ConsumerRecord<String, Long> record = KafkaTestUtils.getSingleRecord(consumer, "output-topic", Duration.ofSeconds(5));
assertEquals(Long.valueOf(1), record.value());
}
}
진짜 Kafka 가 떠 있으니 stream lifecycle·rebalance·EOS(Exactly-Once Semantics, 정확히 한 번 보장) 까지 통째로 검증돼요.
Testcontainers — 더 현실적 환경
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.20.0</version>
<scope>test</scope>
</dependency>
@Container
private static final KafkaContainer kafka = new KafkaContainer("apache/kafka:4.0.0");
@Test
public void testWithRealKafka() {
// kafka.getBootstrapServers() 로 연결
}
Docker 위에 진짜 Kafka container 가 뜨는 방식이라 CI/CD 파이프라인에 자연스럽게 녹아들어요.
선택 가이드
| 시나리오 | 권장 |
|---|---|
| Pure topology 로직·빠른 iteration | TopologyTestDriver |
| Window·time 테스트 | TopologyTestDriver (time 제어 자유) |
| Stream + Kafka rebalance | EmbeddedKafka |
| Stream + Schema Registry + Connect | Testcontainers |
| End-to-end | Testcontainers + 실제 dependencies |
80% 정도는 TopologyTestDriver 로 충분해요. 빠르고, 대부분의 로직을 그 안에서 다 잡을 수 있거든요.
한계·실무 함정
1. TopologyTestDriver 와 실제 Kafka 차이
TopologyTestDriver 는 single-threaded·in-memory 로 돌아가요. 반면 실제 Kafka 는 multi-threaded·multi-partition 환경이라, race 나 rebalance 처럼 스레드·파티션 경계에서 터지는 시나리오는 여기서 못 잡아요.
2. EmbeddedKafka 느림
테스트가 뜰 때마다 수 초씩 까먹어요. 모든 테스트에 EmbeddedKafka 를 박으면 빌드가 금방 무거워지니까 TopologyTestDriver 를 먼저 두는 게 좋아요.
3. Time-based 테스트의 정확성
advanceWallClockTime 은 Punctuator 에만 영향을 줘요. Event time window 는 record timestamp 로 직접 밀어줘야 의도대로 굴러가요.
4. State store 의 RocksDB
TopologyTestDriver 는 in-memory store 로 동작하지만 실제 운영은 RocksDB(임베디드 KV 스토어) 라서, 디스크·iterator 쪽 시나리오에선 결과가 살짝 어긋날 수 있어요.
5. SerDe 검증
테스트에서 가짜 직렬화기를 끼우지 말고 real serializer 를 그대로 쓰세요. byte 레벨 호환성까지 같이 검증돼요.
시험 직전 한 번 더 — Streams Testing 함정 압축 노트
- 두 영역 = Unit (TopologyTestDriver) · Integration (EmbeddedKafka·Testcontainers)
- 의존성 =
kafka-streams-test-utils - TopologyTestDriver = Kafka 없이 in-memory 빠른 테스트
- TestInputTopic =
pipeInput(key, value, timestamp)·pipeKeyValueList - TestOutputTopic =
readRecord·readKeyValuesToMap·readKeyValuesToList·getQueueSize - Time 제어 = Event time 자유 (timestamp 명시) +
advanceWallClockTime(Punctuator) - State Store 검증 =
driver.getKeyValueStore("name").get(...) - 운영 패턴 =
@BeforeEach setup+@AfterEach tearDown - Spring
@EmbeddedKafka=KafkaTestUtils.producerProps·consumerProps·getSingleRecord - Testcontainers = Docker 기반 진짜 Kafka, CI/CD 친화
- 80% = TopologyTestDriver
- Window·time 테스트 = TopologyTestDriver 가 가장 자유
- 함정 — TopologyTestDriver 와 실제 차이 (single vs multi-threaded)
- 함정 — EmbeddedKafka 느림 (필요 시만)
- 함정 —
advanceWallClockTime= Punctuator 만 - 함정 — State store RocksDB vs in-memory 차이
- 함정 — Real SerDe 사용 권장
공식 문서: Kafka Streams Testing 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 123편 — Kafka Streams Core Concepts (Topology · Task · Thread)
- 124편 — Kafka Streams Write & Run (Spring Boot · 운영 패턴)
- 125편 — Kafka Streams DSL (변환·집계·Join·Window)
- 126편 — Kafka Streams Processor API (Low-Level)
- 127편 — Kafka Streams Stateful + Interactive Queries
다음 글: