백엔드 데이터 인프라 120편. Kafka Connect Config 종합 — Worker config (bootstrap·group·storage·converter·rest)·Connector config (class·tasks·error)·Converter (JSON·Avro·String)·SMT chain·운영 권장 조합까지 풀어쓴 학습 노트. Part 5-9 Connect 마무리.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 120편이에요. Part 5-9 Connect 의 마지막 글. 117~119편 으로 아키텍처·운영·개발 을 잡았다면, 이번 120편은 모든 Config 종합 + 운영 권장 조합.
Connect 의 Config 영역
Connect 의 config 는 3 계층:
- Worker Config (
worker.properties) — process 레벨 - Connector Config (REST API JSON) — connector 인스턴스 별
- SMT Config (Connector config 안) — SMT(Single Message Transform, 메시지 단위 변환) chain
각 영역을 하나씩 짚어볼게요.
1. Worker Config
Cluster Identity
group.id=connect-cluster-prod
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id = Connect cluster 식별. 같은 group.id 의 worker = 같은 cluster.
Internal Topics
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
각 internal topic 의 RF(Replication Factor, 복제본 수)·partition. 운영 환경 RF 3 필수.
Plugin Path
plugin.path=/usr/share/kafka-connect-plugins,/usr/local/share/kafka-connect-plugins
Connector·SMT jar 들을 두는 위치예요. 여러 경로는 콤마로 이어 붙이면 됩니다.
REST API
rest.port=8083
rest.host.name=connect-1.example.com
rest.advertised.host.name=connect-1.example.com
rest.advertised.port=8083
rest.advertised.* = 다른 worker·client 에 알려주는 주소. Docker·K8s(Kubernetes 줄임) 환경에서 중요.
Converters (default)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# Avro 권장
# value.converter=io.confluent.connect.avro.AvroConverter
# value.converter.schema.registry.url=http://schema-registry:8081
Worker 의 default 값이고, Connector 별로 override 할 수 있어요.
Internal Converters (옛 옵션)
Kafka 3.0+ 부터 deprecated. 무시.
Security
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=...
ssl.truststore.location=...
# Internal topic 용
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=...
producer.ssl.truststore.location=...
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=...
consumer.ssl.truststore.location=...
Connect 가 Kafka 에 producer·consumer 로 접근 하니 그 부분도 보안.
Distributed Worker 추가 설정
# Heartbeat·session (consumer group 패턴)
heartbeat.interval.ms=3000
session.timeout.ms=10000
# Task 회복
rebalance.timeout.ms=60000
# Internal topic 자동 생성
config.storage.replication.factor=-1 # cluster default 사용
2. Connector Config
REST API JSON 형식.
필수
{
"name": "my-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "4",
"topics": "...",
"topic.prefix": "..."
}
}
Common Options (모든 connector 공통)
{
"config": {
"name": "my-connector",
"connector.class": "...",
"tasks.max": "4",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "InsertTime,MaskEmail",
"transforms.InsertTime.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTime.timestamp.field": "_ingested_at",
"transforms.MaskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskEmail.fields": "email",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "false",
"errors.deadletterqueue.topic.name": "dlq-my-connector",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
Sink 전용
{
"config": {
"topics": "orders,payments",
"topics.regex": "events\\..*",
"consumer.override.auto.offset.reset": "earliest",
"consumer.override.max.poll.records": "1000",
"consumer.override.fetch.min.bytes": "10240"
}
}
topicsvstopics.regex— 명시 list vs patternconsumer.override.*= consumer config override
Source 전용
{
"config": {
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.groups": "high-throughput",
"topic.creation.high-throughput.include": "events.*",
"topic.creation.high-throughput.partitions": "10"
}
}
Auto Topic Creation (Kafka 2.6+) = Connector 가 자동으로 topic 생성.
3. Converter 상세
JsonConverter
{
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
schemas.enable=true= JSON 안에 schema 정보 포함 (큼)schemas.enable=false= 단순 JSON (대부분)
AvroConverter — 운영 표준
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "user:password"
}
- 스키마 진화 지원
- Schema Registry(스키마 중앙 저장소) 통합
- 대규모 환경 표준
StringConverter
{
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
값을 단순 문자열로 다뤄요.
ByteArrayConverter
{
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
raw bytes 그대로 전달해요. 이미 직렬화된 데이터에 어울려요 (PB(Protobuf, 구글 직렬화 포맷)·MsgPack(MessagePack, 바이너리 JSON) 등).
Protobuf·JSON Schema Converter
{
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "..."
}
Confluent 가 제공하는 converter 예요.
4. SMT Chain — 운영 사례
다중 변환
{
"transforms": "RemoveField,RenameField,InsertTimestamp,MaskPII",
"transforms.RemoveField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RemoveField.exclude": "internal_id,debug_info",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "user_id:userId,created_at:createdAt",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "_ingestedAt",
"transforms.MaskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskPII.fields": "email,phone",
"transforms.MaskPII.replacement": "***"
}
Chain 순서대로 실행. RemoveField → RenameField → InsertTimestamp → MaskPII.
Conditional SMT — Predicates (Kafka 2.6+)
{
"transforms": "FilterOnlyOrders",
"transforms.FilterOnlyOrders.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.FilterOnlyOrders.predicate": "IsOrder",
"predicates": "IsOrder",
"predicates.IsOrder.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsOrder.pattern": "orders.*"
}
특정 topic 에만 변환을 거는 식이고, conditional logic 도 짤 수 있어요.
5. 운영 환경 권장 조합
Worker Config
# Cluster
group.id=connect-prod
bootstrap.servers=kafka-1:9093,kafka-2:9093,kafka-3:9093
# Internal Topics (RF 3, 운영 표준)
config.storage.topic=connect-prod-configs
config.storage.replication.factor=3
offset.storage.topic=connect-prod-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-prod-status
status.storage.replication.factor=3
status.storage.partitions=5
# Plugin
plugin.path=/usr/share/kafka-connect-plugins
# REST
rest.port=8083
rest.advertised.host.name=connect-prod-1.example.com
# Default Converter (Avro 표준)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# Security
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=...
ssl.truststore.location=/etc/kafka/ssl/truststore.jks
# Producer / Consumer 자식도 동일
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=...
producer.ssl.truststore.location=...
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=...
consumer.ssl.truststore.location=...
# Distributed
heartbeat.interval.ms=3000
session.timeout.ms=30000
rebalance.timeout.ms=60000
Connector Config
{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "4",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "${file:/etc/kafka-connect/secrets:db_user}",
"connection.password": "${file:/etc/kafka-connect/secrets:db_password}",
"topic.prefix": "pg-",
"mode": "incrementing",
"incrementing.column.name": "id",
"poll.interval.ms": "1000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "AddIngestedAt",
"transforms.AddIngestedAt.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddIngestedAt.timestamp.field": "_ingestedAt",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-source-orders",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
${file:...} = ConfigProvider(설정값 외부 주입 인터페이스) 로 비밀번호 외부 파일 분리 (REST API 노출 X).
6. ConfigProvider — Secret 분리
# Worker config
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
Connector config 에서:
{
"connection.password": "${file:/etc/kafka-connect/secrets.properties:db_password}"
}
Secret 이 REST API·log 에 노출 X. 운영 환경 필수.
다른 ConfigProvider:
- HashiCorp Vault
- AWS Secrets Manager
- Kubernetes Secrets
엔터프라이즈 환경에서 권장하는 선택지예요.
7. Spring Boot 통합
Spring Boot 가 Connect 자체 관리는 X — Connect 는 별도 인프라. 단 Spring 애플리케이션에서 Connect REST API 호출:
@Service
public class ConnectAdminService {
private final RestTemplate restTemplate;
public void deployConnector(String name, Map<String, String> config) {
String url = "http://connect:8083/connectors";
Map<String, Object> body = Map.of("name", name, "config", config);
restTemplate.postForObject(url, body, String.class);
}
}
CI/CD 에서 Spring 애플리케이션이 connector 자동 등록.
Part 5-9 Connect 마무리
4편 (117~120):
- 117 Overview — Source/Sink·Worker·Pre-built vs Custom
- 118 User Guide — REST·Status·Error·DLQ(Dead Letter Queue, 실패 메시지 격리 topic)·SMT 실전
- 119 Developer Guide — Custom Connector 작성
- 120 Config — Worker·Connector·Converter·SMT 종합
Connect = Kafka 생태계의 통합 도구. 대부분 환경 = pre-built 만으로 충분. 개발 비용 낮은 도구.
시험 직전 한 번 더 — Kafka Connect Config 함정 압축 노트
- 3 계층 = Worker Config (process)·Connector Config (REST JSON)·SMT Config (변환 chain)
- Worker Config 필수 =
group.id·bootstrap.servers·storage topics + RF·plugin.path·rest.port·converters·security - Internal Topics —
config.storage·offset.storage·status.storage - 운영 표준 RF = 3
- REST 설정 —
rest.advertised.*Docker·K8s 환경 중요 - Converters — JsonConverter·AvroConverter (운영 표준)·StringConverter·ByteArrayConverter·Protobuf·JSON Schema
- AvroConverter = Schema Registry 통합
- Security = producer·consumer 자식도 같은 config
- Connector Config 공통 —
name·connector.class·tasks.max·converters·transforms·errors.* - Sink 전용 —
topicsvstopics.regex·consumer.override.* - Source 전용 —
topic.creation.*(Auto Topic Creation, Kafka 2.6+) - Error Handling =
errors.tolerance·errors.log.*·errors.deadletterqueue.* - SMT Chain =
transforms=A,B,C+ 각 type·config - Predicates (Kafka 2.6+) = conditional SMT (TopicNameMatches·RecordIsTombstone)
- ConfigProvider = secret 외부 파일·Vault·AWS Secrets·K8s Secrets
${file:path:key}= config 안 secret reference- Spring = REST API 직접 호출 (CI/CD 자동화)
- 운영 권장 조합 = RF 3·SASL_SSL·SCRAM-SHA-512·AvroConverter·DLQ·ConfigProvider
- Part 5-9 Connect 4편 = Overview·User·Developer·Config
공식 문서: Kafka Connect Configs 에서 모든 설정 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 115편 — Kafka SASL (SCRAM · PLAIN · OAuth · Kerberos)
- 116편 — Kafka ACL (Authorization 깊이)
- 117편 — Kafka Connect (Source · Sink · Worker 아키텍처)
- 118편 — Kafka Connect 운영 (REST · Status · Error Handler · DLQ)
- 119편 — Kafka Connect Custom Connector 개발
다음 글: