백엔드 데이터 인프라 118편. Kafka Connect 운영 깊이 — REST API 자세한 사용, Connector·Task State 해석, 재시작 패턴, Error Handler (FAIL·LOG·RETRY)·Dead Letter Queue, SMT 운영 사례, Debezium·JDBC·S3 connector 실전 패턴까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 118편이에요. 117편 에서 Connect 아키텍처를 잡았다면, 이번 118편은 운영을 다룹니다 — Connector 등록·관리·에러 처리·실전 패턴.
Connect 운영의 가장 자주 보는 4가지 영역
- REST API 로 connector lifecycle 관리
- State 해석 + 재시작 시점
- Error Handling + Dead Letter Queue
- 인기 Connector 실전 패턴 (Debezium·JDBC·S3)
1. REST API — 자세한 사용
Connector 등록
$ curl -X POST -H "Content-Type: application/json" \
--data @connector-config.json \
http://connect-1:8083/connectors
connector-config.json 예제 (JDBC Source):
{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "4",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "...",
"connection.password": "...",
"topic.prefix": "pg-",
"mode": "incrementing",
"incrementing.column.name": "id",
"poll.interval.ms": "1000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Config 업데이트 (PUT)
$ curl -X PUT -H "Content-Type: application/json" \
--data @updated-config.json \
http://connect-1:8083/connectors/jdbc-source-orders/config
PUT 은 전체 config 를 교체하니까, 일부만 바꾸고 싶다면 기존 config 를 GET 으로 받아 수정한 뒤 PUT 으로 다시 올려야 해요.
상태 조회
$ curl http://connect-1:8083/connectors/jdbc-source-orders/status
응답:
{
"name": "jdbc-source-orders",
"connector": {
"state": "RUNNING",
"worker_id": "connect-worker-1:8083",
"trace": null
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-worker-1:8083"
},
{
"id": 1,
"state": "FAILED",
"worker_id": "connect-worker-2:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: ..."
}
]
}
trace 필드에 에러 stack trace 가 그대로 담겨 나오는 형태입니다.
2. State 해석
| State | 의미 |
|---|---|
| UNASSIGNED | 아직 worker 할당 안 됨 (시작 직후) |
| RUNNING | 정상 동작 중 |
| PAUSED | 일시 정지 (수동 또는 자동) |
| STOPPED | 정지 (Kafka 3.5+ 신규 state) |
| FAILED | 에러로 종료, 수동 재시작 필요 |
| RESTARTING | 재시작 중 |
| DESTROYED | 삭제됨 |
재시작 — restart
# Connector 자체만
$ curl -X POST http://connect-1:8083/connectors/jdbc-source-orders/restart
# 모든 failed task 포함
$ curl -X POST "http://connect-1:8083/connectors/jdbc-source-orders/restart?includeTasks=true&onlyFailed=true"
includeTasks=true&onlyFailed=true 옵션은 failed task 만 골라 재시작하는 방식이라 운영에서 자주 씁니다.
특정 task 만
$ curl -X POST http://connect-1:8083/connectors/jdbc-source-orders/tasks/1/restart
일시정지·재개
$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/pause
$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/resume
유지보수나 디버깅할 때 쓰는 명령이에요.
정지 (Kafka 3.5+)
$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/stop
pause 와 달리 task 들이 아예 종료되고 리소스도 해제됩니다. 다시 돌릴 때는 Resume 으로 깨우면 돼요.
3. Error Handling
errors.tolerance
errors.tolerance=none # 기본, 에러 시 task FAILED
errors.tolerance=all # 에러 무시·계속 진행
none 으로 두면 메시지 하나만 잘못돼도 task 가 죽어버립니다. 안전한 대신 운영 부담이 큽니다. 반대로 all 은 에러를 무시하고 계속 진행하니까 DLQ (Dead Letter Queue, 실패 메시지 저장소) 와 함께 묶어 써야 합니다.
Error Logging
errors.log.enable=true
errors.log.include.messages=true # message body 도 로그 (민감 데이터 주의)
에러가 났을 때 Connect 로그에 자세한 흔적이 남게 해주는 설정입니다.
Retry
errors.retry.timeout=600000 # 10분
errors.retry.delay.max.ms=60000 # 최대 1분 대기
일시적 에러는 자동으로 재시도해주니까, 외부 시스템이 잠깐 흔들렸을 때 알아서 회복됩니다.
4. Dead Letter Queue (DLQ)
처리하지 못한 메시지를 따로 만든 topic 에 모아두는 장치입니다. 나중에 분석하거나 재처리할 때 꺼내 씁니다.
errors.deadletterqueue.topic.name=dlq-jdbc-source-orders
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
context.headers.enable=true 를 켜두면 어디서 왜 실패했는지 에러 컨텍스트가 headers 에 함께 들어갑니다.
DLQ 메시지의 Headers
__connect.errors.topic
__connect.errors.partition
__connect.errors.offset
__connect.errors.connector.name
__connect.errors.task.id
__connect.errors.stage
__connect.errors.class.name
__connect.errors.exception.class.name
__connect.errors.exception.message
__connect.errors.exception.stacktrace
DLQ consumer 는 이 headers 를 읽어 원인을 분석하는 식으로 동작합니다.
운영 패턴
Source/Sink Connector
├─ 정상 메시지 → 원본 topic
└─ 에러 메시지 → DLQ
↓
DLQ Consumer (분석·재처리)
- Alert 발송
- 데이터 보정 후 재전송
- 영구 실패는 별도 저장
SMT (Single Message Transforms, 메시지 단위 변환) — 운영 사례
1. Schema 진화 — 필드 추가
"transforms": "AddTimestamp",
"transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddTimestamp.timestamp.field": "_ingested_at"
2. PII 마스킹
"transforms": "MaskEmail,MaskPhone",
"transforms.MaskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskEmail.fields": "email",
"transforms.MaskEmail.replacement": "***"
GDPR 같은 개인정보 규제를 의식한 처리예요. 데이터가 Kafka 에 들어가기 전 단계에서 미리 마스킹합니다.
3. Routing — 메시지 별 다른 topic
"transforms": "RouteByType",
"transforms.RouteByType.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.RouteByType.topic.format": "${topic}-${timestamp}",
"transforms.RouteByType.timestamp.format": "yyyyMMdd"
매일 별도 topic 으로 흘려보내는, 날짜 기준 partitioning 구조입니다.
4. Filter — 일부 메시지 drop
"transforms": "FilterTombstones",
"transforms.FilterTombstones.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.FilterTombstones.negate": "true"
Tombstone, 그러니까 value 가 null 인 메시지는 빼고 처리하도록 거르는 설정이에요.
5. 인기 Connector 실전 — Debezium
CDC (Change Data Capture, DB 변경 캡처)
{
"name": "debezium-postgres-orders",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "...",
"database.dbname": "mydb",
"database.server.name": "prod-db-1",
"table.include.list": "public.orders,public.payments",
"plugin.name": "pgoutput",
"slot.name": "debezium",
"publication.name": "debezium",
"snapshot.mode": "initial"
}
}
PostgreSQL 의 pgoutput plugin 을 타고 CDC 가 흐릅니다. 테이블에 일어난 INSERT·UPDATE·DELETE 가 전부 Kafka topic 으로 흘러들어가는 구조예요.
Snapshot Mode
initial= 처음 한 번 전체 snapshot + 이후 CDCnever= CDC 만 (snapshot X)when_needed= 필요시만
큰 테이블에서 initial snapshot 을 한 번에 뜨려고 하면 시간 부담이 큽니다. ETL (Extract Transform Load, 데이터 적재 파이프라인) 로 한 번 채워둔 뒤 never 모드로 가는 방식도 충분히 검토할 만해요.
6. 인기 Connector 실전 — S3 Sink
{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "8",
"topics": "events",
"s3.bucket.name": "my-data-lake",
"s3.region": "ap-northeast-2",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "Record"
}
}
특징:
- Parquet (컬럼 기반 분석용 포맷) 형식 = column-oriented, big data 분석 효율
- Time-based partition =
year=2026/month=05/day=17/hour=12디렉토리 구조 - 10000 메시지 또는 1시간 마다 S3 업로드
쌓아둔 S3 데이터는 Athena (AWS 의 S3 SQL 쿼리 엔진)·Spark·Databricks 로 분석에 바로 씁니다.
7. 모니터링·운영 명령
모든 connector
$ curl http://connect-1:8083/connectors
["jdbc-source-orders", "s3-sink-events", "debezium-postgres-orders"]
상세 (expand)
$ curl "http://connect-1:8083/connectors?expand=status&expand=info"
각 connector 의 info 와 status 를 한 번에 받아볼 수 있는 호출이에요.
Worker 정보
$ curl http://connect-1:8083/
{
"version": "4.0.0",
"commit": "...",
"kafka_cluster_id": "..."
}
$ curl http://connect-1:8083/connector-plugins
[
{ "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "..." },
...
]
지금 설치돼 있는 connector plugin 목록이 그대로 떨어집니다.
한계·실무 함정
1. Config password 평문
REST API 로 config 를 올릴 때 password 가 평문으로 흘러갑니다. 반드시 SSL 을 깔고, Connect 자체에도 ACL (Access Control List, 접근 권한 목록) 을 걸어둬야 해요.
2. Restart 만으로 안 풀림
config 가 틀렸거나 외부 시스템 자체가 망가진 영구 에러는 restart 를 무한 반복해도 안 풀립니다. trace 를 읽고 근본 원인을 찾아야 합니다.
3. DLQ 가득
DLQ 도 결국 topic 이라 retention 이 걸려 있습니다. 무한정 쌓아두면 메모리·디스크가 받쳐주지 못합니다.
4. SMT chain 너무 김
SMT 가 늘어날수록 CPU 도 같이 늘어납니다. 10개 넘게 엮으면 처리량이 떨어지니까 꼭 필요한 것만 골라 씁니다.
5. Connect cluster split-brain
group.id 가 서로 다른 worker 가 같이 돌면 두 개의 cluster 로 갈라집니다. 같은 cluster 라면 group.id 도 늘 같아야 해요.
시험 직전 한 번 더 — Connect 운영 함정 압축 노트
- REST API =
/connectorsPOST 등록·GET 조회·PUT config 변경·DELETE - 상태 조회 =
/status(connector + tasks state + trace) - State 종류 = UNASSIGNED·RUNNING·PAUSED·STOPPED·FAILED·RESTARTING·DESTROYED
- 재시작 =
/restart?includeTasks=true&onlyFailed=true운영 표준 - 특정 task 재시작 =
/tasks/N/restart pause/resume= 일시 정지·재개stop(Kafka 3.5+) = task 종료·리소스 해제- Error Handling =
errors.tolerance=none/all errors.log.enable=true로 자세한 로그errors.retry.timeout으로 자동 retry- DLQ =
errors.deadletterqueue.topic.name=...+context.headers.enable=true - DLQ 메시지 headers = 에러 컨텍스트 (
__connect.errors.*) - SMT 운영 사례 — 필드 추가 (InsertField)·PII 마스킹 (MaskField)·라우팅 (TimestampRouter)·필터 (RecordIsTombstone)
- Debezium = DB CDC,
snapshot.mode=initial/never/when_needed - S3 Sink = Parquet + Time-based partition (year/month/day/hour)
- 모니터링 =
/connectors?expand=status&expand=info+ Worker/·/connector-plugins - 함정 — Config password 평문 → SSL + ACL
- 함정 — Restart 만으로 영구 에러 안 풀림 → trace 분석
- 함정 — DLQ retention 관리
- 함정 — SMT chain 너무 김 → 처리량 ↓
- 함정 —
group.id다른 worker → split-brain
공식 문서: Kafka Connect User Guide 에서 자세한 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 113편 — Kafka Security Overview (3축 종합)
- 114편 — Kafka SSL/TLS (Keystore · mTLS · 인증서 운영)
- 115편 — Kafka SASL (SCRAM · PLAIN · OAuth · Kerberos)
- 116편 — Kafka ACL (Authorization 깊이)
- 117편 — Kafka Connect (Source · Sink · Worker 아키텍처)
다음 글: