백엔드 데이터 인프라 118편 — Kafka Connect 운영 (REST · Status · Error Handler · DLQ)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 118편. Kafka Connect 운영 깊이 — REST API 자세한 사용, Connector·Task State 해석, 재시작 패턴, Error Handler (FAIL·LOG·RETRY)·Dead Letter Queue, SMT 운영 사례, Debezium·JDBC·S3 connector 실전 패턴까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 118편 — Kafka Connect 운영 (REST · Status · Error Handler · DLQ)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 118편이에요. 117편 에서 Connect 아키텍처를 잡았다면, 이번 118편은 운영을 다룹니다 — Connector 등록·관리·에러 처리·실전 패턴.

Connect 운영의 가장 자주 보는 4가지 영역

  1. REST API 로 connector lifecycle 관리
  2. State 해석 + 재시작 시점
  3. Error Handling + Dead Letter Queue
  4. 인기 Connector 실전 패턴 (Debezium·JDBC·S3)

1. REST API — 자세한 사용

Connector 등록

$ curl -X POST -H "Content-Type: application/json" \
    --data @connector-config.json \
    http://connect-1:8083/connectors

connector-config.json 예제 (JDBC Source):

{
  "name": "jdbc-source-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "4",
    "connection.url": "jdbc:postgresql://db:5432/mydb",
    "connection.user": "...",
    "connection.password": "...",
    "topic.prefix": "pg-",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "poll.interval.ms": "1000",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Config 업데이트 (PUT)

$ curl -X PUT -H "Content-Type: application/json" \
    --data @updated-config.json \
    http://connect-1:8083/connectors/jdbc-source-orders/config

PUT 은 전체 config 를 교체하니까, 일부만 바꾸고 싶다면 기존 config 를 GET 으로 받아 수정한 뒤 PUT 으로 다시 올려야 해요.

상태 조회

$ curl http://connect-1:8083/connectors/jdbc-source-orders/status

응답:

{
  "name": "jdbc-source-orders",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect-worker-1:8083",
    "trace": null
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect-worker-1:8083"
    },
    {
      "id": 1,
      "state": "FAILED",
      "worker_id": "connect-worker-2:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: ..."
    }
  ]
}

trace 필드에 에러 stack trace 가 그대로 담겨 나오는 형태입니다.

2. State 해석

State 의미
UNASSIGNED 아직 worker 할당 안 됨 (시작 직후)
RUNNING 정상 동작 중
PAUSED 일시 정지 (수동 또는 자동)
STOPPED 정지 (Kafka 3.5+ 신규 state)
FAILED 에러로 종료, 수동 재시작 필요
RESTARTING 재시작 중
DESTROYED 삭제됨

재시작 — restart

# Connector 자체만
$ curl -X POST http://connect-1:8083/connectors/jdbc-source-orders/restart

# 모든 failed task 포함
$ curl -X POST "http://connect-1:8083/connectors/jdbc-source-orders/restart?includeTasks=true&onlyFailed=true"

includeTasks=true&onlyFailed=true 옵션은 failed task 만 골라 재시작하는 방식이라 운영에서 자주 씁니다.

특정 task 만

$ curl -X POST http://connect-1:8083/connectors/jdbc-source-orders/tasks/1/restart

일시정지·재개

$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/pause
$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/resume

유지보수나 디버깅할 때 쓰는 명령이에요.

정지 (Kafka 3.5+)

$ curl -X PUT http://connect-1:8083/connectors/jdbc-source-orders/stop

pause 와 달리 task 들이 아예 종료되고 리소스도 해제됩니다. 다시 돌릴 때는 Resume 으로 깨우면 돼요.

3. Error Handling

errors.tolerance

errors.tolerance=none      # 기본, 에러 시 task FAILED
errors.tolerance=all       # 에러 무시·계속 진행

none 으로 두면 메시지 하나만 잘못돼도 task 가 죽어버립니다. 안전한 대신 운영 부담이 큽니다. 반대로 all 은 에러를 무시하고 계속 진행하니까 DLQ (Dead Letter Queue, 실패 메시지 저장소) 와 함께 묶어 써야 합니다.

Error Logging

errors.log.enable=true
errors.log.include.messages=true     # message body 도 로그 (민감 데이터 주의)

에러가 났을 때 Connect 로그에 자세한 흔적이 남게 해주는 설정입니다.

Retry

errors.retry.timeout=600000          # 10분
errors.retry.delay.max.ms=60000      # 최대 1분 대기

일시적 에러는 자동으로 재시도해주니까, 외부 시스템이 잠깐 흔들렸을 때 알아서 회복됩니다.

4. Dead Letter Queue (DLQ)

처리하지 못한 메시지를 따로 만든 topic 에 모아두는 장치입니다. 나중에 분석하거나 재처리할 때 꺼내 씁니다.

errors.deadletterqueue.topic.name=dlq-jdbc-source-orders
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true

context.headers.enable=true 를 켜두면 어디서 왜 실패했는지 에러 컨텍스트가 headers 에 함께 들어갑니다.

DLQ 메시지의 Headers

__connect.errors.topic
__connect.errors.partition
__connect.errors.offset
__connect.errors.connector.name
__connect.errors.task.id
__connect.errors.stage
__connect.errors.class.name
__connect.errors.exception.class.name
__connect.errors.exception.message
__connect.errors.exception.stacktrace

DLQ consumer 는 이 headers 를 읽어 원인을 분석하는 식으로 동작합니다.

운영 패턴

Source/Sink Connector
   ├─ 정상 메시지 → 원본 topic
   └─ 에러 메시지 → DLQ
         ↓
   DLQ Consumer (분석·재처리)
   - Alert 발송
   - 데이터 보정 후 재전송
   - 영구 실패는 별도 저장

SMT (Single Message Transforms, 메시지 단위 변환) — 운영 사례

1. Schema 진화 — 필드 추가

"transforms": "AddTimestamp",
"transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddTimestamp.timestamp.field": "_ingested_at"

2. PII 마스킹

"transforms": "MaskEmail,MaskPhone",
"transforms.MaskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskEmail.fields": "email",
"transforms.MaskEmail.replacement": "***"

GDPR 같은 개인정보 규제를 의식한 처리예요. 데이터가 Kafka 에 들어가기 전 단계에서 미리 마스킹합니다.

3. Routing — 메시지 별 다른 topic

"transforms": "RouteByType",
"transforms.RouteByType.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.RouteByType.topic.format": "${topic}-${timestamp}",
"transforms.RouteByType.timestamp.format": "yyyyMMdd"

매일 별도 topic 으로 흘려보내는, 날짜 기준 partitioning 구조입니다.

4. Filter — 일부 메시지 drop

"transforms": "FilterTombstones",
"transforms.FilterTombstones.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.FilterTombstones.negate": "true"

Tombstone, 그러니까 value 가 null 인 메시지는 빼고 처리하도록 거르는 설정이에요.

5. 인기 Connector 실전 — Debezium

CDC (Change Data Capture, DB 변경 캡처)

{
  "name": "debezium-postgres-orders",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "...",
    "database.dbname": "mydb",
    "database.server.name": "prod-db-1",
    "table.include.list": "public.orders,public.payments",
    "plugin.name": "pgoutput",
    "slot.name": "debezium",
    "publication.name": "debezium",
    "snapshot.mode": "initial"
  }
}

PostgreSQL 의 pgoutput plugin 을 타고 CDC 가 흐릅니다. 테이블에 일어난 INSERT·UPDATE·DELETE 가 전부 Kafka topic 으로 흘러들어가는 구조예요.

Snapshot Mode

  • initial = 처음 한 번 전체 snapshot + 이후 CDC
  • never = CDC 만 (snapshot X)
  • when_needed = 필요시만

큰 테이블에서 initial snapshot 을 한 번에 뜨려고 하면 시간 부담이 큽니다. ETL (Extract Transform Load, 데이터 적재 파이프라인) 로 한 번 채워둔 뒤 never 모드로 가는 방식도 충분히 검토할 만해요.

6. 인기 Connector 실전 — S3 Sink

{
  "name": "s3-sink-events",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "8",
    "topics": "events",
    "s3.bucket.name": "my-data-lake",
    "s3.region": "ap-northeast-2",
    "s3.part.size": "5242880",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "Record"
  }
}

특징:

  • Parquet (컬럼 기반 분석용 포맷) 형식 = column-oriented, big data 분석 효율
  • Time-based partition = year=2026/month=05/day=17/hour=12 디렉토리 구조
  • 10000 메시지 또는 1시간 마다 S3 업로드

쌓아둔 S3 데이터는 Athena (AWS 의 S3 SQL 쿼리 엔진)·Spark·Databricks 로 분석에 바로 씁니다.

7. 모니터링·운영 명령

모든 connector

$ curl http://connect-1:8083/connectors
["jdbc-source-orders", "s3-sink-events", "debezium-postgres-orders"]

상세 (expand)

$ curl "http://connect-1:8083/connectors?expand=status&expand=info"

각 connector 의 info 와 status 를 한 번에 받아볼 수 있는 호출이에요.

Worker 정보

$ curl http://connect-1:8083/
{
  "version": "4.0.0",
  "commit": "...",
  "kafka_cluster_id": "..."
}

$ curl http://connect-1:8083/connector-plugins
[
  { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "..." },
  ...
]

지금 설치돼 있는 connector plugin 목록이 그대로 떨어집니다.

한계·실무 함정

1. Config password 평문

REST API 로 config 를 올릴 때 password 가 평문으로 흘러갑니다. 반드시 SSL 을 깔고, Connect 자체에도 ACL (Access Control List, 접근 권한 목록) 을 걸어둬야 해요.

2. Restart 만으로 안 풀림

config 가 틀렸거나 외부 시스템 자체가 망가진 영구 에러는 restart 를 무한 반복해도 안 풀립니다. trace 를 읽고 근본 원인을 찾아야 합니다.

3. DLQ 가득

DLQ 도 결국 topic 이라 retention 이 걸려 있습니다. 무한정 쌓아두면 메모리·디스크가 받쳐주지 못합니다.

4. SMT chain 너무 김

SMT 가 늘어날수록 CPU 도 같이 늘어납니다. 10개 넘게 엮으면 처리량이 떨어지니까 꼭 필요한 것만 골라 씁니다.

5. Connect cluster split-brain

group.id 가 서로 다른 worker 가 같이 돌면 두 개의 cluster 로 갈라집니다. 같은 cluster 라면 group.id 도 늘 같아야 해요.

시험 직전 한 번 더 — Connect 운영 함정 압축 노트

  • REST API = /connectors POST 등록·GET 조회·PUT config 변경·DELETE
  • 상태 조회 = /status (connector + tasks state + trace)
  • State 종류 = UNASSIGNED·RUNNING·PAUSED·STOPPED·FAILED·RESTARTING·DESTROYED
  • 재시작 = /restart?includeTasks=true&onlyFailed=true 운영 표준
  • 특정 task 재시작 = /tasks/N/restart
  • pause/resume = 일시 정지·재개
  • stop (Kafka 3.5+) = task 종료·리소스 해제
  • Error Handling = errors.tolerance=none/all
  • errors.log.enable=true 로 자세한 로그
  • errors.retry.timeout 으로 자동 retry
  • DLQ = errors.deadletterqueue.topic.name=... + context.headers.enable=true
  • DLQ 메시지 headers = 에러 컨텍스트 (__connect.errors.*)
  • SMT 운영 사례 — 필드 추가 (InsertField)·PII 마스킹 (MaskField)·라우팅 (TimestampRouter)·필터 (RecordIsTombstone)
  • Debezium = DB CDC, snapshot.mode=initial/never/when_needed
  • S3 Sink = Parquet + Time-based partition (year/month/day/hour)
  • 모니터링 = /connectors?expand=status&expand=info + Worker /·/connector-plugins
  • 함정 — Config password 평문 → SSL + ACL
  • 함정 — Restart 만으로 영구 에러 안 풀림 → trace 분석
  • 함정 — DLQ retention 관리
  • 함정 — SMT chain 너무 김 → 처리량 ↓
  • 함정 — group.id 다른 worker → split-brain

공식 문서: Kafka Connect User Guide 에서 자세한 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!