백엔드 데이터 인프라 120편 — Kafka Connect Config 종합

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 120편. Kafka Connect Config 종합 — Worker config (bootstrap·group·storage·converter·rest)·Connector config (class·tasks·error)·Converter (JSON·Avro·String)·SMT chain·운영 권장 조합까지 풀어쓴 학습 노트. Part 5-9 Connect 마무리.

📚 백엔드 데이터 인프라 · 120편 — Kafka Connect Config 종합

이 글은 백엔드 데이터 인프라 시리즈 130편 중 120편이에요. Part 5-9 Connect 의 마지막 글. 117~119편 으로 아키텍처·운영·개발 을 잡았다면, 이번 120편은 모든 Config 종합 + 운영 권장 조합.

Connect 의 Config 영역

Connect 의 config 는 3 계층:

  1. Worker Config (worker.properties) — process 레벨
  2. Connector Config (REST API JSON) — connector 인스턴스 별
  3. SMT Config (Connector config 안) — SMT(Single Message Transform, 메시지 단위 변환) chain

각 영역을 하나씩 짚어볼게요.

1. Worker Config

Cluster Identity

group.id=connect-cluster-prod
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

group.id = Connect cluster 식별. 같은 group.id 의 worker = 같은 cluster.

Internal Topics

config.storage.topic=connect-configs
config.storage.replication.factor=3

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5

각 internal topic 의 RF(Replication Factor, 복제본 수)·partition. 운영 환경 RF 3 필수.

Plugin Path

plugin.path=/usr/share/kafka-connect-plugins,/usr/local/share/kafka-connect-plugins

Connector·SMT jar 들을 두는 위치예요. 여러 경로는 콤마로 이어 붙이면 됩니다.

REST API

rest.port=8083
rest.host.name=connect-1.example.com
rest.advertised.host.name=connect-1.example.com
rest.advertised.port=8083

rest.advertised.* = 다른 worker·client 에 알려주는 주소. Docker·K8s(Kubernetes 줄임) 환경에서 중요.

Converters (default)

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# Avro 권장
# value.converter=io.confluent.connect.avro.AvroConverter
# value.converter.schema.registry.url=http://schema-registry:8081

Worker 의 default 값이고, Connector 별로 override 할 수 있어요.

Internal Converters (옛 옵션)

Kafka 3.0+ 부터 deprecated. 무시.

Security

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=...
ssl.truststore.location=...

# Internal topic 용
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=...
producer.ssl.truststore.location=...

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=...
consumer.ssl.truststore.location=...

Connect 가 Kafka 에 producer·consumer 로 접근 하니 그 부분도 보안.

Distributed Worker 추가 설정

# Heartbeat·session (consumer group 패턴)
heartbeat.interval.ms=3000
session.timeout.ms=10000

# Task 회복
rebalance.timeout.ms=60000

# Internal topic 자동 생성
config.storage.replication.factor=-1     # cluster default 사용

2. Connector Config

REST API JSON 형식.

필수

{
  "name": "my-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "4",
    "topics": "...",
    "topic.prefix": "..."
  }
}

Common Options (모든 connector 공통)

{
  "config": {
    "name": "my-connector",
    "connector.class": "...",
    "tasks.max": "4",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "InsertTime,MaskEmail",
    "transforms.InsertTime.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTime.timestamp.field": "_ingested_at",
    "transforms.MaskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.MaskEmail.fields": "email",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "false",
    "errors.deadletterqueue.topic.name": "dlq-my-connector",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

Sink 전용

{
  "config": {
    "topics": "orders,payments",
    "topics.regex": "events\\..*",

    "consumer.override.auto.offset.reset": "earliest",
    "consumer.override.max.poll.records": "1000",
    "consumer.override.fetch.min.bytes": "10240"
  }
}
  • topics vs topics.regex — 명시 list vs pattern
  • consumer.override.* = consumer config override

Source 전용

{
  "config": {
    "topic.creation.default.replication.factor": "3",
    "topic.creation.default.partitions": "3",
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.groups": "high-throughput",
    "topic.creation.high-throughput.include": "events.*",
    "topic.creation.high-throughput.partitions": "10"
  }
}

Auto Topic Creation (Kafka 2.6+) = Connector 가 자동으로 topic 생성.

3. Converter 상세

JsonConverter

{
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}
  • schemas.enable=true = JSON 안에 schema 정보 포함 (큼)
  • schemas.enable=false = 단순 JSON (대부분)

AvroConverter — 운영 표준

{
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.basic.auth.user.info": "user:password"
}
  • 스키마 진화 지원
  • Schema Registry(스키마 중앙 저장소) 통합
  • 대규모 환경 표준

StringConverter

{
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter"
}

값을 단순 문자열로 다뤄요.

ByteArrayConverter

{
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

raw bytes 그대로 전달해요. 이미 직렬화된 데이터에 어울려요 (PB(Protobuf, 구글 직렬화 포맷)·MsgPack(MessagePack, 바이너리 JSON) 등).

Protobuf·JSON Schema Converter

{
  "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
  "value.converter.schema.registry.url": "..."
}

Confluent 가 제공하는 converter 예요.

4. SMT Chain — 운영 사례

다중 변환

{
  "transforms": "RemoveField,RenameField,InsertTimestamp,MaskPII",

  "transforms.RemoveField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.RemoveField.exclude": "internal_id,debug_info",

  "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.RenameField.renames": "user_id:userId,created_at:createdAt",

  "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.InsertTimestamp.timestamp.field": "_ingestedAt",

  "transforms.MaskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.MaskPII.fields": "email,phone",
  "transforms.MaskPII.replacement": "***"
}

Chain 순서대로 실행. RemoveField → RenameField → InsertTimestamp → MaskPII.

Conditional SMT — Predicates (Kafka 2.6+)

{
  "transforms": "FilterOnlyOrders",
  "transforms.FilterOnlyOrders.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.FilterOnlyOrders.predicate": "IsOrder",

  "predicates": "IsOrder",
  "predicates.IsOrder.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsOrder.pattern": "orders.*"
}

특정 topic 에만 변환을 거는 식이고, conditional logic 도 짤 수 있어요.

5. 운영 환경 권장 조합

Worker Config

# Cluster
group.id=connect-prod
bootstrap.servers=kafka-1:9093,kafka-2:9093,kafka-3:9093

# Internal Topics (RF 3, 운영 표준)
config.storage.topic=connect-prod-configs
config.storage.replication.factor=3
offset.storage.topic=connect-prod-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-prod-status
status.storage.replication.factor=3
status.storage.partitions=5

# Plugin
plugin.path=/usr/share/kafka-connect-plugins

# REST
rest.port=8083
rest.advertised.host.name=connect-prod-1.example.com

# Default Converter (Avro 표준)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# Security
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=...
ssl.truststore.location=/etc/kafka/ssl/truststore.jks

# Producer / Consumer 자식도 동일
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=...
producer.ssl.truststore.location=...

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=...
consumer.ssl.truststore.location=...

# Distributed
heartbeat.interval.ms=3000
session.timeout.ms=30000
rebalance.timeout.ms=60000

Connector Config

{
  "name": "jdbc-source-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "4",
    "connection.url": "jdbc:postgresql://db:5432/mydb",
    "connection.user": "${file:/etc/kafka-connect/secrets:db_user}",
    "connection.password": "${file:/etc/kafka-connect/secrets:db_password}",
    "topic.prefix": "pg-",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "poll.interval.ms": "1000",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "AddIngestedAt",
    "transforms.AddIngestedAt.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.AddIngestedAt.timestamp.field": "_ingestedAt",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.deadletterqueue.topic.name": "dlq-jdbc-source-orders",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

${file:...} = ConfigProvider(설정값 외부 주입 인터페이스) 로 비밀번호 외부 파일 분리 (REST API 노출 X).

6. ConfigProvider — Secret 분리

# Worker config
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

Connector config 에서:

{
  "connection.password": "${file:/etc/kafka-connect/secrets.properties:db_password}"
}

Secret 이 REST API·log 에 노출 X. 운영 환경 필수.

다른 ConfigProvider:

  • HashiCorp Vault
  • AWS Secrets Manager
  • Kubernetes Secrets

엔터프라이즈 환경에서 권장하는 선택지예요.

7. Spring Boot 통합

Spring Boot 가 Connect 자체 관리는 X — Connect 는 별도 인프라. 단 Spring 애플리케이션에서 Connect REST API 호출:

@Service
public class ConnectAdminService {
    private final RestTemplate restTemplate;

    public void deployConnector(String name, Map<String, String> config) {
        String url = "http://connect:8083/connectors";
        Map<String, Object> body = Map.of("name", name, "config", config);
        restTemplate.postForObject(url, body, String.class);
    }
}

CI/CD 에서 Spring 애플리케이션이 connector 자동 등록.

Part 5-9 Connect 마무리

4편 (117~120):

  • 117 Overview — Source/Sink·Worker·Pre-built vs Custom
  • 118 User Guide — REST·Status·Error·DLQ(Dead Letter Queue, 실패 메시지 격리 topic)·SMT 실전
  • 119 Developer Guide — Custom Connector 작성
  • 120 Config — Worker·Connector·Converter·SMT 종합

Connect = Kafka 생태계의 통합 도구. 대부분 환경 = pre-built 만으로 충분. 개발 비용 낮은 도구.

시험 직전 한 번 더 — Kafka Connect Config 함정 압축 노트

  • 3 계층 = Worker Config (process)·Connector Config (REST JSON)·SMT Config (변환 chain)
  • Worker Config 필수 = group.id·bootstrap.servers·storage topics + RF·plugin.path·rest.port·converters·security
  • Internal Topicsconfig.storage·offset.storage·status.storage
  • 운영 표준 RF = 3
  • REST 설정rest.advertised.* Docker·K8s 환경 중요
  • Converters — JsonConverter·AvroConverter (운영 표준)·StringConverter·ByteArrayConverter·Protobuf·JSON Schema
  • AvroConverter = Schema Registry 통합
  • Security = producer·consumer 자식도 같은 config
  • Connector Config 공통name·connector.class·tasks.max·converters·transforms·errors.*
  • Sink 전용topics vs topics.regex·consumer.override.*
  • Source 전용topic.creation.* (Auto Topic Creation, Kafka 2.6+)
  • Error Handling = errors.tolerance·errors.log.*·errors.deadletterqueue.*
  • SMT Chain = transforms=A,B,C + 각 type·config
  • Predicates (Kafka 2.6+) = conditional SMT (TopicNameMatches·RecordIsTombstone)
  • ConfigProvider = secret 외부 파일·Vault·AWS Secrets·K8s Secrets
  • ${file:path:key} = config 안 secret reference
  • Spring = REST API 직접 호출 (CI/CD 자동화)
  • 운영 권장 조합 = RF 3·SASL_SSL·SCRAM-SHA-512·AvroConverter·DLQ·ConfigProvider
  • Part 5-9 Connect 4편 = Overview·User·Developer·Config

공식 문서: Kafka Connect Configs 에서 모든 설정 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!