백엔드 데이터 인프라 117편. Kafka Connect 종합 — 외부 시스템과 Kafka 의 통합 도구. Source vs Sink Connector, Worker (Standalone vs Distributed), Pre-built vs Custom, REST API, 자동 offset 관리·확장성·생태계까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 117편이에요. Part 5-8 Security 까지 끝났다면, 이번 117편부터는 Part 5-9 — Kafka Connect (4편). 첫 글은 Connect 의 정체성·아키텍처.
Kafka Connect가 어렵게 느껴지는 이유
90편에서 Connect 가 5가지 API 중 하나라는 큰 그림을 잡았다면, 이번 영역은 왜·어떻게·언제 에 답하는 자리예요. 첫째로 Producer/Consumer 와 어떻게 다른지 — 수동 코딩이 아니라 설정 기반으로 흐름이 갈리는 결정 지점. 둘째로 Worker·Connector·Task 라는 3계층 추상화가 처음 보면 복잡하게 느껴진다는 점. 셋째로 Pre-built 와 Custom 중 무엇을 쓸지, 직접 만들지 말고 가져다 쓰는 게 답인 자리가 의외로 많다는 점.
그래서 이 글에서 Connect 의 큰 그림을 잡고, 다음 글 (118 User · 119 Developer · 120 Config) 로 이어가요.
Kafka Connect 의 정체성
"Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems."
한 줄로 줄이면 외부 시스템과 Kafka 사이를 표준화된 방식으로 잇는 통합 도구예요.
자주 만나는 통합은 이런 것들 —
- PostgreSQL → Kafka (CDC — 변경 데이터 캡처, Debezium)
- Kafka → S3 (Archive, data lake)
- Kafka → Elasticsearch (Search index)
- Kafka → HDFS (Big data ETL — 추출·변환·적재)
- Salesforce → Kafka → Snowflake (Data warehouse)
- MongoDB → Kafka (Change streaming)
Source vs Sink Connector
┌─────────────────┐ ┌─────────────────┐
│ External System │ Source │ │ Sink │ External System │
│ (DB, API, etc) │ ────────→ │ Kafka │ ────────→ │ (S3, ES, etc) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Source Connector
외부에서 Kafka 로 데이터를 들여오는 쪽.
예시:
- JDBC Source — DB 의 row 변경을 Kafka topic 으로
- Debezium — DB transaction log 기반 CDC
- MQTT Source — IoT (사물인터넷) 디바이스 → Kafka
- File Source — 파일 변경 → Kafka
Sink Connector
반대로 Kafka 메시지를 외부 시스템으로 내보내는 쪽.
예시:
- JDBC Sink — Kafka → DB
- S3 Sink — Kafka → S3 (archive·data lake)
- Elasticsearch Sink — Kafka → ES
- HDFS Sink — Kafka → HDFS
- Snowflake Sink — Kafka → Snowflake
양방향
같은 connector 가 source 와 sink 둘 다 별도 모드로 제공하기도 해요.
Worker · Connector · Task — 3계층
Worker (Process)
Kafka Connect 의 실행 단위. Java 프로세스 1개가 곧 Worker 1개예요.
- Standalone — 1 worker, 단순, 개발용
- Distributed — N worker = cluster, 운영 표준
Connector (Logical)
어떤 외부 시스템과 통합할지를 정의하는 논리 단위예요.
"jdbc-source-orders" connector = "PostgreSQL DB → Kafka orders topic 으로 sync"
REST API 로 등록하고 관리해요.
Task (Execution)
Connector 가 실제 작업을 N 개 task 로 쪼개서 worker 들에 분산해요.
"jdbc-source-orders" connector
↓ tasks.max=4
Task 1 → Worker A (table-1, table-2 처리)
Task 2 → Worker B (table-3 처리)
Task 3 → Worker A (table-4 처리)
Task 4 → Worker C (table-5 처리)
이렇게 병렬로 돌려서 확장성을 얻는 구조.
Worker Distributed Mode
[Connect Worker 1] [Connect Worker 2] [Connect Worker 3]
│ │ │
└──────────────────┼──────────────────┘
↓ Connect 자체가 Consumer Group
[Internal Kafka Topics]
- connect-configs (connector config)
- connect-offsets (source connector offset)
- connect-status (worker·connector 상태)
Worker 들이 Kafka Consumer Group 패턴으로 작업을 나눠 가져요. 한 Worker 가 죽으면 task 가 다른 worker 로 자동으로 옮겨가요.
REST API — 표준 인터페이스
# Connector 생성
$ curl -X POST -H "Content-Type: application/json" \
--data '{...}' \
http://connect-1:8083/connectors
# 목록
$ curl http://connect-1:8083/connectors
["jdbc-source-orders", "s3-sink-events"]
# 상태
$ curl http://connect-1:8083/connectors/jdbc-source-orders/status
{
"name": "jdbc-source-orders",
"connector": { "state": "RUNNING", "worker_id": "..." },
"tasks": [
{ "id": 0, "state": "RUNNING", "worker_id": "..." },
{ "id": 1, "state": "RUNNING", "worker_id": "..." }
]
}
# 재시작
$ curl -X POST http://connect-1:8083/connectors/jdbc-source-orders/restart
# 일시 정지
$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/pause
# 삭제
$ curl -X DELETE http://connect-1:8083/connectors/jdbc-source-orders
언어 독립적인 REST API 라서, 어디서든 connector 를 관리할 수 있어요.
자동 Offset 관리
Source connector 는 외부 시스템의 어디까지 읽었는지를 스스로 추적해요.
Debezium MySQL source:
→ MySQL binlog position 자동 저장
→ connect-offsets topic 에 commit
→ 재시작 시 같은 position 부터 재개
Connector 개발자가 offset 관리 코드를 따로 짤 필요가 없어요. Connect framework 가 알아서 처리해요.
Sink connector 쪽은 Kafka consumer offset 과 같은 방식 (자동 commit).
확장성 — Worker 추가
초기: 3 worker, 10 task
↓ Worker 추가
신규: 5 worker, 10 task 재분배 (각 worker ~2 task)
Worker 를 Kafka Connect cluster 에 join 시키면 자동으로 rebalance 가 일어나요. Cooperative Sticky (점진적 리밸런스 전략) 덕에 다운타임도 없어요.
인기 Pre-built Connector
Source
- Debezium — MySQL·PostgreSQL·MongoDB·SQL Server·Oracle CDC
- JDBC Source — 일반 DB polling
- MQTT Source — IoT
- FilePulse — 파일 변경
- HTTP Source — REST API polling
- Salesforce·SAP 등 — 엔터프라이즈
Sink
- JDBC Sink — DB upsert
- S3 Sink — Avro·JSON·Parquet 변환 가능
- Elasticsearch Sink — 검색 인덱스
- HDFS Sink — Big data
- Snowflake / BigQuery / Redshift Sink — DW (data warehouse)
- Splunk·Datadog Sink — 모니터링
대부분 Confluent·Aiven·Strimzi 와 OSS (오픈소스) 커뮤니티가 제공해요.
Confluent Hub
Confluent Hub 는 공식 connector 카탈로그예요.
- 100+ connector 검색·다운로드
- Enterprise connector (지원 포함) 또는 Open Source
- 커뮤니티 ratings
$ confluent-hub install confluentinc/kafka-connect-jdbc:10.7.0
CLI 로 설치해요.
Standalone vs Distributed
Standalone
$ bin/connect-standalone.sh worker.properties source-connector.properties [sink-connector.properties ...]
- 1 process 만
- Config 가 properties file
- 학습·테스트만
- 재시작 시 manual
Distributed (운영)
# Worker (각각 별도 실행)
$ bin/connect-distributed.sh worker.properties
# Connector 등록 = REST API
$ curl -X POST .../connectors
- N process cluster
- Config 가 Kafka internal topic
- 자동 failover (한 worker 죽으면 task 이동)
- 운영 표준
Connect Worker 설정
# 클러스터 ID (모든 worker 동일)
group.id=connect-cluster
# Internal topic 들
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Replication factor
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# Internal topic 의 partition (offset 만 N개)
offset.storage.partitions=25
status.storage.partitions=5
# Plugin path (connector 들 jar)
plugin.path=/usr/share/kafka-connect-plugins
# REST API
rest.port=8083
# Converter (메시지 직렬화)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Converter — Key/Value 직렬화
Source connector → Connect Record → Converter → Kafka 메시지 byte
↓
JSON / Avro / Protobuf / String / ByteArray
운영 표준은 대규모일 땐 Avro + Schema Registry, 단순할 땐 JSON 이에요.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
Transformations — SMT (Single Message Transforms)
각 메시지에 경량 변환을 거는 기능. 별도 Stream 처리를 안 걸어도 돼서 편해요.
{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "RenameField,InsertTimestamp",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "id:order_id",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "ingested_at"
}
}
자주 쓰는 SMT 는 ReplaceField·InsertField·MaskField·TimestampConverter·ExtractField·Filter·... 가 있어요. 119편에서 깊이 다뤄요.
모니터링
JMX (자바 모니터링 인터페이스) 지표 중 핵심은 —
connector-status— 각 connector 상태task-status— 각 task 상태source-record-write-rate— source connector throughputsink-record-read-rate— sink throughputrecord-error-rate— 에러offset-commit-success-rate— offset commit
운영은 Grafana + Prometheus JMX Exporter 조합으로 가요.
한계·실무 함정
1. Standalone 운영
운영엔 절대 쓰지 않아요. Distributed 만.
2. Internal Topic 의 replication
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
3 으로 가는 게 좋아요. Worker 가 하나 죽어도 안전.
3. Worker 수와 task 수
tasks.max=10 인데 worker 가 2개면 각 worker 가 5 task 를 떠안아서 부담이 커요. 대략 worker 수 ≥ tasks.max / 5 정도로 잡아요.
4. Custom connector 작성
진짜 필요한 경우만. 대부분 Pre-built 가 이미 있어요. Custom 은 119편에서.
5. Connect 자체 모니터링 부담
Connect cluster 는 별도 인프라라서, 모니터링·alert 도 따로 챙겨야 해요.
시험 직전 한 번 더 — Kafka Connect 함정 압축 노트
- Kafka Connect = 외부 시스템 ↔ Kafka standardized 통합 도구
- Source Connector = 외부 → Kafka (Debezium·JDBC·MQTT)
- Sink Connector = Kafka → 외부 (JDBC·S3·Elasticsearch·HDFS)
- 3계층 = Worker (Process) · Connector (Logical) · Task (Execution)
- Standalone = 1 worker, 학습용
- Distributed = N worker cluster, 운영 표준
- Distributed = Kafka Consumer Group 패턴 (자동 rebalance·failover)
- REST API = connector 등록·관리·monitoring (
/connectors) - 자동 Offset 관리 =
connect-offsetstopic 에 commit, 재시작 시 재개 - 확장성 = worker 추가 → 자동 rebalance
- 인기 Pre-built — Debezium·JDBC·S3·Elasticsearch·Snowflake·...
- Confluent Hub = 공식 connector 카탈로그
- Internal Topics —
connect-configs·connect-offsets·connect-status - 각 RF 3 권장
- Worker 설정 =
group.id·storage topics·plugin.path·rest.port·converters - Converter — JSON·Avro (Schema Registry 권장)·Protobuf·String
- SMT (Single Message Transforms) = 경량 변환 (ReplaceField·InsertField·MaskField·TimestampConverter)
- 119편 깊이
- 모니터링 = JMX
connector-status·task-status·source/sink-record-rate·error-rate - 함정 — Standalone 운영
- 함정 — Internal topic RF 부족
- 함정 — Worker 수 < tasks 부담
- 함정 — Custom connector 신중히 (pre-built 우선)
- 함정 — Connect 별도 인프라 모니터링 필요
공식 문서: Kafka Connect Overview 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 112편 — Kafka Transaction Protocol (EOS 내부 메커니즘)
- 113편 — Kafka Security Overview (3축 종합)
- 114편 — Kafka SSL/TLS (Keystore · mTLS · 인증서 운영)
- 115편 — Kafka SASL (SCRAM · PLAIN · OAuth · Kerberos)
- 116편 — Kafka ACL (Authorization 깊이)
다음 글: