백엔드 데이터 인프라 128편 — Kafka Streams Testing (TopologyTestDriver)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 128편. Kafka Streams Testing — TopologyTestDriver 로 Kafka 없이 빠른 단위 테스트, TestInputTopic·TestOutputTopic·time 제어·state store 검증, EmbeddedKafkaCluster 통합 테스트, Spring kafka-streams-test-utils 까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 128편 — Kafka Streams Testing (TopologyTestDriver)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 128편이에요. 127편 까지 Stateful 처리와 IQ(Interactive Queries, 상태 조회 API) 를 잡았다면, 이번 128편은 코드 품질을 어떻게 지키느냐 — Testing 차례예요.

Stream Testing 의 두 가지 영역

  1. Unit Test — TopologyTestDriver(Kafka 없이 토폴로지만 돌리는 테스트 드라이버) 로 Kafka 없이 빠른 검증
  2. Integration Test — EmbeddedKafkaCluster(JVM 안에 띄우는 임베디드 카프카) 로 실제 Kafka 환경

실무에선 보통 unit 80%, integration 20% 비율로 가져가요.

TopologyTestDriver — Unit Test 의 핵심

의존성

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>4.0.0</version>
    <scope>test</scope>
</dependency>

기본 사용

@Test
public void testWordCount() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> input = builder.stream("input-topic");
    input.flatMapValues(v -> Arrays.asList(v.split(" ")))
         .groupBy((k, w) -> w)
         .count(Materialized.as("word-counts"))
         .toStream()
         .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

    Topology topology = builder.build();

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
        // 입력
        TestInputTopic<String, String> input = driver.createInputTopic(
            "input-topic",
            new StringSerializer(),
            new StringSerializer()
        );

        TestOutputTopic<String, Long> output = driver.createOutputTopic(
            "output-topic",
            new StringDeserializer(),
            new LongDeserializer()
        );

        input.pipeInput("key1", "hello world hello");

        // 출력 검증
        assertEquals(Long.valueOf(2), output.readKeyValuesToMap().get("hello"));
        assertEquals(Long.valueOf(1), output.readKeyValuesToMap().get("world"));
    }
}

Kafka 없이 모든 처리가 in-memory 로 끝나서 굉장히 빨라요.

TestInputTopic — 입력

TestInputTopic<String, String> input = driver.createInputTopic(...);

// 단일 record
input.pipeInput("key", "value");

// 여러 record
input.pipeKeyValueList(Arrays.asList(
    new KeyValue<>("k1", "v1"),
    new KeyValue<>("k2", "v2")
));

// Timestamp 명시
input.pipeInput("key", "value", Instant.parse("2026-05-17T12:00:00Z"));

TestOutputTopic — 출력

TestOutputTopic<String, Long> output = driver.createOutputTopic(...);

// 단일 record 읽기
TestRecord<String, Long> record = output.readRecord();
assertEquals("hello", record.key());
assertEquals(Long.valueOf(2), record.value());

// 모든 record 를 Map 으로
Map<String, Long> results = output.readKeyValuesToMap();

// List 로
List<KeyValue<String, Long>> all = output.readKeyValuesToList();

// 남은 record 수
int remaining = output.getQueueSize();
assertEquals(0, remaining);    // 모두 처리됨

Time 제어 — Window 테스트

input.pipeInput("key", "value-1", Instant.parse("2026-05-17T12:00:00Z"));
input.pipeInput("key", "value-2", Instant.parse("2026-05-17T12:30:00Z"));
input.pipeInput("key", "value-3", Instant.parse("2026-05-17T13:30:00Z"));   // 새 hourly window

Event time 을 자유롭게 밀어 넣을 수 있어서 window·session·grace period 까지 검증돼요.

Wall-clock time 진행

driver.advanceWallClockTime(Duration.ofMinutes(15));

WALL_CLOCK_TIME Punctuator(주기적 콜백 훅) 테스트에 써요.

State Store 검증

try (TopologyTestDriver driver = ...) {
    // 입력 ...

    // State store 직접 조회
    KeyValueStore<String, Long> store = driver.getKeyValueStore("word-counts");
    assertEquals(Long.valueOf(2), store.get("hello"));

    // Window store
    WindowStore<String, Long> windowStore = driver.getWindowStore("hourly-counts");
    // ...
}

Stream 처리 결과는 output topic 과 state store 양쪽 모두 확인할 수 있어요.

운영 패턴 — Test 구성

Test Helper 메서드

public class StreamsTestHelper {
    public static TopologyTestDriver createDriver(Topology topology) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-" + UUID.randomUUID());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        // ... default config
        return new TopologyTestDriver(topology, props);
    }
}

Setup·TearDown

@BeforeEach
public void setup() {
    driver = createDriver(buildTopology());
    inputTopic = driver.createInputTopic("input", new StringSerializer(), new StringSerializer());
    outputTopic = driver.createOutputTopic("output", new StringDeserializer(), new LongDeserializer());
}

@AfterEach
public void tearDown() {
    driver.close();
}

@Test
public void testFilter() {
    inputTopic.pipeInput("k", "important-event");
    inputTopic.pipeInput("k", "not-important");
    assertEquals(1, outputTopic.getQueueSize());
}

Integration Test — EmbeddedKafkaCluster

실제 Kafka 환경에서 검증해요. 정확도는 올라가지만 속도는 떨어져요.

의존성

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

<!-- Spring Kafka 사용 시 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Spring Boot — @EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"input-topic", "output-topic"})
public class StreamsIntegrationTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void testStream() throws Exception {
        // Producer
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        Producer<String, String> producer = new KafkaProducer<>(producerProps, ...);
        producer.send(new ProducerRecord<>("input-topic", "key", "hello world"));
        producer.flush();

        // Consumer
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "true", embeddedKafka);
        Consumer<String, Long> consumer = new KafkaConsumer<>(consumerProps, ...);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic");

        // 검증 — 약간 대기 후
        ConsumerRecord<String, Long> record = KafkaTestUtils.getSingleRecord(consumer, "output-topic", Duration.ofSeconds(5));
        assertEquals(Long.valueOf(1), record.value());
    }
}

진짜 Kafka 가 떠 있으니 stream lifecycle·rebalance·EOS(Exactly-Once Semantics, 정확히 한 번 보장) 까지 통째로 검증돼요.

Testcontainers — 더 현실적 환경

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.20.0</version>
    <scope>test</scope>
</dependency>
@Container
private static final KafkaContainer kafka = new KafkaContainer("apache/kafka:4.0.0");

@Test
public void testWithRealKafka() {
    // kafka.getBootstrapServers() 로 연결
}

Docker 위에 진짜 Kafka container 가 뜨는 방식이라 CI/CD 파이프라인에 자연스럽게 녹아들어요.

선택 가이드

시나리오 권장
Pure topology 로직·빠른 iteration TopologyTestDriver
Window·time 테스트 TopologyTestDriver (time 제어 자유)
Stream + Kafka rebalance EmbeddedKafka
Stream + Schema Registry + Connect Testcontainers
End-to-end Testcontainers + 실제 dependencies

80% 정도는 TopologyTestDriver 로 충분해요. 빠르고, 대부분의 로직을 그 안에서 다 잡을 수 있거든요.

한계·실무 함정

1. TopologyTestDriver 와 실제 Kafka 차이

TopologyTestDriver 는 single-threaded·in-memory 로 돌아가요. 반면 실제 Kafka 는 multi-threaded·multi-partition 환경이라, race 나 rebalance 처럼 스레드·파티션 경계에서 터지는 시나리오는 여기서 못 잡아요.

2. EmbeddedKafka 느림

테스트가 뜰 때마다 수 초씩 까먹어요. 모든 테스트에 EmbeddedKafka 를 박으면 빌드가 금방 무거워지니까 TopologyTestDriver 를 먼저 두는 게 좋아요.

3. Time-based 테스트의 정확성

advanceWallClockTime 은 Punctuator 에만 영향을 줘요. Event time window 는 record timestamp 로 직접 밀어줘야 의도대로 굴러가요.

4. State store 의 RocksDB

TopologyTestDriver 는 in-memory store 로 동작하지만 실제 운영은 RocksDB(임베디드 KV 스토어) 라서, 디스크·iterator 쪽 시나리오에선 결과가 살짝 어긋날 수 있어요.

5. SerDe 검증

테스트에서 가짜 직렬화기를 끼우지 말고 real serializer 를 그대로 쓰세요. byte 레벨 호환성까지 같이 검증돼요.

시험 직전 한 번 더 — Streams Testing 함정 압축 노트

  • 두 영역 = Unit (TopologyTestDriver) · Integration (EmbeddedKafka·Testcontainers)
  • 의존성 = kafka-streams-test-utils
  • TopologyTestDriver = Kafka 없이 in-memory 빠른 테스트
  • TestInputTopic = pipeInput(key, value, timestamp)·pipeKeyValueList
  • TestOutputTopic = readRecord·readKeyValuesToMap·readKeyValuesToList·getQueueSize
  • Time 제어 = Event time 자유 (timestamp 명시) + advanceWallClockTime (Punctuator)
  • State Store 검증 = driver.getKeyValueStore("name").get(...)
  • 운영 패턴 = @BeforeEach setup + @AfterEach tearDown
  • Spring @EmbeddedKafka = KafkaTestUtils.producerProps·consumerProps·getSingleRecord
  • Testcontainers = Docker 기반 진짜 Kafka, CI/CD 친화
  • 80% = TopologyTestDriver
  • Window·time 테스트 = TopologyTestDriver 가 가장 자유
  • 함정 — TopologyTestDriver 와 실제 차이 (single vs multi-threaded)
  • 함정 — EmbeddedKafka 느림 (필요 시만)
  • 함정 — advanceWallClockTime = Punctuator 만
  • 함정 — State store RocksDB vs in-memory 차이
  • 함정 — Real SerDe 사용 권장

공식 문서: Kafka Streams Testing 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!