카프카 마스터 노트 시리즈 8편 (마지막). Spring Kafka의 @KafkaListener 어노테이션 패턴, ReactiveKafkaConsumerTemplate/ProducerTemplate 리액티브 통합, 통합 테스트 도구 비교(Embedded Kafka vs Testcontainers), Kafka 보안 3축(인증·인가·암호화), SASL/PLAIN과 JAAS 설정, ACL로 토픽별 권한 제어까지 — 실무 통합 마무리.
이 글은 카프카 마스터 노트 시리즈의 여덟 번째 편입니다. 1~7편이 Kafka 자체였다면, 이번엔 그것을 Spring 애플리케이션에 통합 — Spring Kafka·테스트·보안.
Spring Kafka의 @KafkaListener 한 줄로 컨슈머 끝. 통합 테스트는 Embedded vs Testcontainers의 결정. 운영 보안은 SASL+SSL+ACL 3축. 시리즈 마무리.
처음 Spring·테스트·보안이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Spring Kafka와 Reactor Kafka의 관계가 모호합니다. 둘은 어떻게 다른가? 둘째, 보안 옵션이 너무 많습니다. SASL·SSL·ACL·PLAIN·SCRAM·OAUTHBEARER… 한 번에 안 들어옵니다.
해결법은 한 가지예요. Spring Kafka = Spring 친화 / Reactor Kafka = WebFlux 친화로 구분. 보안 = "누구냐(인증) + 뭘 할 수 있냐(인가) + 안전하게 통신(암호화)" 3축. 이 그림이 잡히면 디테일은 채우기만 하면 됩니다.
Spring Kafka — @KafkaListener 패턴
의존성
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
설정 (application.yml)
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-processor
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Listener 어노테이션
@Component
public class OrderListener {
@KafkaListener(topics = "order-events", groupId = "order-processor")
public void onOrder(OrderEvent event) {
log.info("Received: {}", event);
// 처리
}
// 메타데이터 함께
@KafkaListener(topics = "order-events")
public void onOrderWithMetadata(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
// ...
}
// 배치
@KafkaListener(topics = "order-events", containerFactory = "batchFactory")
public void onBatch(List<OrderEvent> events) {
// ...
}
}
KafkaTemplate — 발행
@Service
public class OrderPublisher {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publish(OrderEvent event) {
kafkaTemplate.send("order-events", event.getOrderId(), event);
}
}
여기서 정말 중요한 시험 함정 — Spring Kafka는 전통 Java Client 위 추상화. Spring 어노테이션 친화. Reactor Kafka는 별개 (Project Reactor 위).
Reactive Spring Kafka — WebFlux 통합
@Bean
ReactiveKafkaConsumerTemplate<String, OrderEvent> reactiveConsumer(
ReceiverOptions<String, OrderEvent> options) {
return new ReactiveKafkaConsumerTemplate<>(options);
}
@Service
public class OrderProcessor {
@Autowired
private ReactiveKafkaConsumerTemplate<String, OrderEvent> consumer;
@PostConstruct
public void start() {
consumer.receive()
.flatMap(record -> processAsync(record), 8)
.subscribe();
}
}
ReactiveKafkaProducerTemplate도 비슷.
Spring Cloud Stream
더 추상화된 메시징 프레임워크. 카프카·RabbitMQ 코드 변경 없이 전환. **심화편 (9~15편)**에서 본격 다룸.
@Bean
public Function<Flux<OrderEvent>, Flux<PaymentEvent>> processOrder() {
return orders -> orders.map(this::toPayment);
}
통합 테스트 — Embedded vs Testcontainers
Embedded Kafka
@EmbeddedKafka(partitions = 3, topics = "order-events")
@SpringBootTest
class OrderTest {
@Test
void test() {
// ...
}
}
장점 — 빠름·간단·CI 가벼움. 단점 — 실제 Kafka 동작과 미세 차이.
Testcontainers
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@DynamicPropertySource
static void props(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers",
kafka::getBootstrapServers);
}
장점 — 실제 Kafka 도커 컨테이너 = 운영 동작과 일치. 단점 — 무거움 (도커 띄움)·CI 시간 증가.
여기서 정말 중요한 시험 함정 — 단위 테스트 = Embedded / E2E·통합 테스트 = Testcontainers. 실제 동작 검증은 Testcontainers가 신뢰성 ↑.
테스트 패턴
StepVerifier (Reactive)
StepVerifier.create(consumer.receive().take(3))
.expectNextMatches(r -> r.value().getOrderId().equals("1"))
.expectNextMatches(r -> r.value().getOrderId().equals("2"))
.expectNextMatches(r -> r.value().getOrderId().equals("3"))
.verifyComplete();
Sinks
테스트 메시지 발행:
Sinks.Many<OrderEvent> sink = Sinks.many().multicast().onBackpressureBuffer();
producer.send(sink.asFlux().map(this::toRecord))
.subscribe();
sink.tryEmitNext(testEvent);
@DirtiesContext
각 테스트마다 새 Spring 컨텍스트:
@SpringBootTest
@DirtiesContext(classMode = ClassMode.BEFORE_EACH_TEST_METHOD)
Kafka 컨슈머 그룹 격리.
여기서 시험 함정이 하나 있어요. auto.offset.reset=earliest 테스트에서 명시. 기본 latest로 두면 테스트 메시지 못 받음.
Kafka 보안 3축
1. Authentication (인증) — 너 누구냐
2. Authorization (인가) — 뭘 할 수 있냐
3. Encryption (암호화) — 안전하게 통신
1. SSL/TLS — 암호화
브로커-클라이언트 통신 암호화.
ssl.keystore.location=/path/keystore.jks
ssl.keystore.password=secret
ssl.truststore.location=/path/truststore.jks
ssl.truststore.password=secret
security.protocol=SSL
2. SASL — 인증
다양한 메커니즘:
| 메커니즘 | 설명 |
|---|---|
| PLAIN | username·password (간단, 평문 위험) |
| SCRAM | challenge-response (안전, 권장) |
| GSSAPI (Kerberos) | 엔터프라이즈 환경 |
| OAUTHBEARER | OAuth 토큰 |
SASL/PLAIN 예시
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="alice" password="secret";
여기서 시험 함정이 하나 있어요. PLAIN은 SSL과 함께만. 평문 비밀번호라 SSL 없으면 노출. SASL_SSL = SASL + SSL 결합.
JAAS 설정
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="secret";
};
-Djava.security.auth.login.config=/path/jaas.conf로 적용.
3. ACL — 인가
토픽별·작업별 권한:
# alice에게 order-events 토픽 read 권한
kafka-acls --bootstrap-server localhost:9092 \
--add \
--allow-principal User:alice \
--operation Read \
--topic order-events
# alice에게 그룹 권한
kafka-acls --add \
--allow-principal User:alice \
--operation Read \
--group order-processor
# 확인
kafka-acls --list --topic order-events
여기서 정말 중요한 시험 함정 — ACL은 기본 disabled. authorizer.class.name=kafka.security.authorizer.AclAuthorizer 설정 필요. 활성화 시 명시 권한 없는 모든 작업 거부.
Spring Kafka에서 보안 설정
spring:
kafka:
security:
protocol: SASL_SSL
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: |
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="secret";
ssl.truststore.location: /path/truststore.jks
ssl.truststore.password: secret
시리즈 마무리 — 15편 종합
1편부터 8편까지의 흐름:
| 편 | 주제 | 한 줄 |
|---|---|---|
| 1 | 기초·EDA·KRaft | EDA 패러다임, Kafka가 분산 로그 |
| 2 | Topic·Partition·Offset | 4계층 데이터 구조 |
| 3 | Producer·Consumer | acks·auto.offset.reset 핵심 옵션 |
| 4 | Consumer Group | 그룹·리밸런싱·할당 전략 4종 |
| 5 | Reactor Kafka | 리액티브 + 백프레셔 |
| 6 | Cluster·HA | ISR·복제·운영 Best Practices |
| 7 | 배치·에러·TX | 멱등·DLQ·Exactly-once |
| 8 | Spring·Test·보안 | @KafkaListener·Testcontainers·SASL+SSL+ACL |
기초편은 여기서 마무리. **심화편 (9~15편)**에서는 Spring Cloud Stream·Saga 패턴·Transactional Outbox 같은 마이크로서비스 패턴을 깊게 다룹니다.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 8편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Spring Kafka =
@KafkaListener어노테이션 + KafkaTemplate - 전통 Java Client 위 Spring 추상화
JsonDeserializer사용 시spring.json.trusted.packages설정 필수- Reactive —
ReactiveKafkaConsumerTemplate / ProducerTemplate - 더 추상화 = Spring Cloud Stream (9~15편)
- 통합 테스트 — Embedded Kafka (빠름) vs Testcontainers (실제 동작)
- 단위 = Embedded / E2E = Testcontainers
- StepVerifier로 Reactive 검증
- Sinks로 테스트 메시지 발행
@DirtiesContext로 컨텍스트 격리- 테스트는
auto.offset.reset=earliest명시 - 보안 3축 — 인증 / 인가 / 암호화
- SSL/TLS = 암호화 (truststore·keystore)
- SASL = 인증 (PLAIN / SCRAM / GSSAPI / OAUTHBEARER)
- PLAIN은 SSL과 함께만 (SASL_SSL)
- SCRAM 권장 (운영 표준)
- JAAS 설정 = sasl.jaas.config 또는 jaas.conf 파일
- ACL = 토픽·그룹·작업별 권한
- 기본 disabled —
authorizer.class.name설정 필요 - 활성화 후 명시 권한 없으면 거부
- 운영 — SASL_SSL + SCRAM + ACL 표준
- 심화편 (9~15편) = Spring Cloud Stream·Saga·Outbox
시리즈 다른 편 (시리즈 마지막)
- 1편 — EDA·Kafka 기초·KRaft
- 2편 — Topic·Partition·Offset
- 3편 — Producer·Consumer 동작
- 4편 — Consumer Group·리밸런싱
- 5편 — Reactor Kafka
- 6편 — Cluster·HA·Best Practices
- 7편 — 배치·에러·트랜잭션
- 8편 — Spring Kafka·테스트·보안 (현재 글)
- 9편 — Spring Cloud Stream 기초
- 10편 — StreamBridge 동적 라우팅
- 11편 — Fan-Out / Fan-In
- 12편 — SCS Tips & Tricks
- 13편 — Saga 코레오그래피
- 14편 — Saga 오케스트레이터
- 15편 — Transactional Outbox
공식 문서: Spring Kafka Reference / Kafka Security 에서 더 깊이.
기초편은 여기서 마무리. 다음은 심화편 — Spring Cloud Stream·Saga편 (별도 시리즈) 으로 마이크로서비스 통합 패턴을 풀어 갑니다.