Distributed Mode·REST API 설정 가이드

2026-05-02AWS SAA-C03 스터디

Apache Kafka Connect 실전 정리 3편. Standalone vs Distributed Mode의 결정적 차이부터 — Worker 설정 파일 구조, offset/config/status 토픽, plugin.path, key/value Converter, 10가지 핵심 REST API 호출, 디버깅 로그 확인까지 1인 사무실 vs 여러 일꾼 사무소 비유로 친절하게 정리.

📚 Apache Kafka Connect 실전 정리 · 3편 / 14편 — Distributed Mode·REST API 설정 가이드

이 글은 Apache Kafka Connect 실전 정리 시리즈의 세 번째 편입니다. 1편에서 Connector·Task·Worker 3층 구조를 잡고, 2편에서 Source·Sink 커넥터 종류를 훑었다면 — 이번 3편은 "실제로 어떻게 설정하고, 어떻게 띄우고, 어떻게 운영하느냐" 의 실무 단계예요. 핵심 포커스는 Distributed ModeREST API 둘입니다.

본문 흐름은 1·2편에서 잡은 비유를 그대로 이어 갑니다. Standalone Mode는 1인 사무실, Distributed Mode 는 여러 일꾼이 일하는 사무소 + 공용 캐비닛(Kafka 토픽)에 상태를 보관하는 식이에요. 그리고 REST API는 그 사무소 입구에 있는 안내 데스크 — 10가지 표준 요청으로 모든 운영을 처리합니다. 공식 문서는 Kafka Connect 공식 문서Connect REST 문서를 곁에 두고 보면 좋아요.

왜 Kafka Connect 설정이 처음엔 어렵게 느껴질까요

이유는 세 가지예요.

첫째, 설정 파일이 두 종류(Worker Properties + Connector Properties)라는 점. 한 종류만 있으면 좋겠는데 둘이 따로 있고, 거기에 모드(Standalone·Distributed)에 따라 또 형태가 달라요.

둘째, Distributed Mode는 "보이지 않는 곳"에 상태를 저장합니다. Worker 설정 파일 어디에도 "현재 어떤 커넥터가 떠 있는지"가 안 적혀 있어요. 그건 Kafka 내부 토픽 — connect-offsets, connect-configs, connect-status — 에 들어가 있거든요. "그럼 설정 파일은 도대체 뭘 담는 거야?" 싶은 거죠.

셋째, REST API 호출이 10가지나 됩니다. GET·POST·PUT·DELETE가 섞여 있고, 같은 PUT인데 어떤 건 일시 정지·어떤 건 설정 업데이트라 헷갈려요.

해결법은 한 가지예요. "Standalone = 1인 사무실 / Distributed Mode = 여러 일꾼 사무소 + 공용 캐비닛" 비유를 잡고, REST API 10가지를 "안내 데스크 표준 요청표" 로 정리하면 갑자기 명확해집니다. 이 글은 그 비유를 따라 풀어 갑니다.

Standalone Mode — 1인 사무실 다시 펼쳐 보기

먼저 가벼운 쪽부터. Standalone Mode 는 단일 JVM 프로세스에서 모든 커넥터와 Task를 실행하는 모드예요. 회사 비유로 — 한 일꾼이 혼자 다 처리하는 1인 사무실입니다. 개발·테스트·실험에 적합하고, 프로덕션엔 절대 안 써요.

Standalone 실행 명령

실행 흐름은 이렇게 됩니다.

# 기본 실행 명령어
connect-standalone.sh [worker.properties] [connector1.properties] [connector2.properties] ...

# 예시: 파일 스트림 커넥터 실행
connect-standalone.sh \
  /path/to/worker.properties \
  /path/to/filestream-demo-standalone.properties

여기서 시험 함정이 하나 있어요. Standalone은 명령줄에 Connector Properties 파일을 같이 넘겨야 합니다. Worker 설정 파일만 넘기면 커넥터 없이 빈 Worker가 뜨고 끝나요. 즉 Standalone은 "Worker + 커넥터"가 한 묶음으로 동시에 시작되는 모드예요.

Standalone Worker Properties 파일

Worker 설정 파일은 이렇게 생겼어요.

# worker.properties (독립 실행형 전용)

# Kafka 클러스터 연결
bootstrap.servers=localhost:9092

# 메시지 직렬화/역직렬화 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# 내부 처리용 컨버터 (일반적으로 변경하지 않음)
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false

# REST API 엔드포인트 (충돌 방지를 위해 설정)
rest.port=8083
rest.host.name=localhost

# 오프셋 저장 파일 경로
offset.storage.file.filename=standalone.offsets

# 오프셋 커밋 주기 (밀리초)
offset.flush.interval.ms=10000

각 항목을 비유로 풀면:

  • bootstrap.servers — 사무실이 통신할 Kafka 본부 주소
  • key.converter / value.converter케이블 양쪽 끝의 자료 변환 어댑터. 들어오는 데이터를 어떤 형식으로 직렬화·역직렬화할지 정함
  • rest.port — 1인 사무실 입구에 달린 안내 데스크 포트 번호 (기본 8083)
  • offset.storage.file.filename — 일꾼이 "지금까지 어디까지 일했는지"를 적어두는 개인 메모장 파일

Standalone에서 오프셋이 동작하는 방식

Standalone 모드의 정체성은 오프셋이 로컬 파일에 저장된다는 점이에요.

# 커넥터 실행 후 오프셋 파일 생성 확인
ls -la standalone.offsets
# -rw-r--r-- 1 user group 123 Jan 1 00:00 standalone.offsets

# 파일 내용은 바이너리 형식 (직접 읽기 불가)

이 메모장 덕분에:

  • 커넥터를 Ctrl+C 로 멈췄다가 재시작해도 마지막 읽은 위치부터 이어서 읽음
  • 오프셋 파일을 삭제하면 처음부터 다시 읽음

여기서 시험 함정이 하나 있어요. 이 메모장은 그 일꾼만 갖고 있어요. 다른 사무소(다른 서버)에서는 못 봅니다. 그래서 서버가 죽으면 그 메모장도 같이 사라질 위험이 있고, 다른 머신에 옮겨 띄우려면 파일도 같이 옮겨야 해요. 이게 Standalone이 프로덕션에 안 맞는 결정적 이유 중 하나입니다.

Distributed Mode — 여러 일꾼 사무소 + 공용 캐비닛

이제 본 게임. Distributed Mode 는 여러 Worker 프로세스가 협력해서 커넥터와 Task를 실행하는 모드예요. 회사 비유로 — 여러 일꾼이 함께 일하는 사무소입니다. 그리고 결정적 차이가 한 가지 있어요. 일꾼 개인 메모장 대신 공용 캐비닛(Kafka 토픽) 을 씁니다.

Distributed Mode 아키텍처

큰 그림을 한 장에 펼쳐 봅시다.

[REST API / UI]  ←→  [Connect Worker 1]
                      [Connect Worker 2]
                      [Connect Worker 3]
                      [Connect Worker 4]
                             ↓
                      [Kafka Cluster]
                      (내부 토픽: connect-offsets,
                                  connect-configs,
                                  connect-status)

핵심은 두 가지예요.

  1. REST API 안내 데스크 가 모든 Worker 앞에 있음 — 어떤 Worker에 요청을 보내도 결과는 같음
  2. 공용 캐비닛(내부 토픽 3개) 에 상태가 보관됨 — Worker가 죽어도 다른 Worker가 캐비닛을 열어 이어받음

공용 캐비닛 — offset·config·status 토픽 세 칸

Distributed Mode의 정체성을 만드는 게 바로 이 세 개의 내부 Kafka 토픽입니다. 회사 비유로 — 공용 캐비닛의 세 칸 이에요.

내부 토픽캐비닛 칸용도
connect-offsets진행 상황 칸소스 커넥터의 현재 오프셋 저장
connect-configs설정 칸어떤 커넥터가 어떤 설정으로 떠 있는지 저장
connect-status상태 칸커넥터·Task의 현재 상태 (RUNNING·FAILED 등) 저장

이 캐비닛 덕분에:

  • Worker가 죽어도 다른 Worker가 진행 상황 칸에서 오프셋을 이어받아 중복 없이 처리
  • 새 Worker를 추가하면 설정 칸을 보고 자동으로 기존 클러스터에 합류
  • 운영자는 상태 칸에서 어떤 커넥터가 죽었는지 한눈에 확인

여기서 시험 함정이 하나 있어요. Distributed Mode에서는 Connector Properties 파일이 사라집니다. Standalone처럼 명령줄에 properties 파일을 같이 안 넘겨요. 커넥터는 모두 REST API로 등록하고, 그 설정은 connect-configs 토픽에 자동 저장돼 클러스터 전체에 공유됩니다. Worker Properties 파일은 한 번 정해두면 거의 안 바꿔요.

Distributed Worker Properties 파일

Distributed용 Worker 설정 파일은 이렇게 생겼어요. Standalone과 결정적 차이가 몇 줄에 있어요 — 잘 보세요.

# connect-distributed.properties (분산 모드용)

# Kafka 클러스터 연결
bootstrap.servers=localhost:9092

# 클러스터 그룹 ID (같은 그룹 = 같은 Connect 클러스터)
group.id=connect-cluster

# 메시지 컨버터 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# 내부 컨버터 설정
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false

# 내부 Kafka 토픽 설정
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
# 개발 환경 (단일 브로커): replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

# 오프셋 플러시 주기
offset.flush.interval.ms=10000

# REST API 설정
rest.port=8083
# 두 번째 Worker는 다른 포트 사용 (같은 호스트일 경우)
# rest.port=8084

# 커넥터(JAR) 경로
plugin.path=/usr/share/java,/path/to/connectors

Standalone과의 결정적 차이를 짚어 봅시다.

  • group.id 가 새로 생김 — 같은 group.id 를 가진 Worker들은 한 클러스터로 묶임. 카프카 Consumer Group과 같은 사상이에요
  • offset.storage.file.filename 이 사라지고 offset.storage.topic, config.storage.topic, status.storage.topic 셋이 등장 — 개인 메모장 대신 공용 캐비닛 세 칸을 가리킴
  • plugin.path 가 등장 — 커넥터 JAR 파일이 어디 들어 있는지 가리키는 경로 (Standalone에도 쓸 수 있지만 Distributed에선 거의 필수)

여기서 시험 함정이 하나 있어요. group.id 는 카프카 Consumer Group ID와 절대 충돌하면 안 됩니다. 같은 Kafka 클러스터에 떠 있는 Consumer Group과 동일한 ID를 Connect group.id 로 쓰면 충돌이 나요. 그래서 보통 connect- 접두어를 붙이거나 (connect-cluster, connect-cluster-prod) 클러스터별로 고유한 이름을 줍니다.

plugin.path — JAR 파일을 어디서 찾아?

plugin.path 는 회사 비유로 — 사무소 창고 위치예요. 여기에 커넥터 JAR 파일이 들어 있어야 Worker가 시작 시점에 스캔해 로드합니다.

# connectors 디렉터리 생성
mkdir /opt/kafka/connectors

# 다운로드한 커넥터 JAR을 여기에 복사
cp debezium-connector-postgres/*.jar /opt/kafka/connectors/

# Worker Properties에서 가리키기
plugin.path=/opt/kafka/connectors

그리고 Worker를 재시작하면 새 커넥터가 로드돼요. REST API로 확인할 수 있습니다.

curl http://localhost:8083/connector-plugins | jq .

여기서 시험 함정이 하나 있어요. plugin.path 는 콤마로 여러 경로를 적을 수 있습니다. 단일 경로라고 생각하기 쉬운데 실제로는 리스트예요. plugin.path=/usr/share/java,/path/to/connectors 처럼 쓰면 두 위치 모두 스캔합니다.

key.converter / value.converter — 케이블 양쪽 끝 어댑터

Converter 는 회사 비유로 — 케이블 양쪽 끝에 붙어 자료 형식을 변환하는 어댑터예요. 들어오는 데이터를 카프카 토픽에 저장할 때 어떤 형식으로 직렬화할지, 또 토픽에서 꺼낼 때 어떻게 역직렬화할지를 정합니다.

자주 쓰는 Converter 종류:

  • JsonConverter — JSON 형식 (가장 많이 씀)
  • StringConverter — 단순 문자열
  • AvroConverter — Avro 스키마 (Schema Registry 필요)
  • ByteArrayConverter — 바이너리 그대로
# JSON 사용 (스키마 비활성)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

여기서 시험 함정이 하나 있어요. schemas.enable=true 로 두면 메시지마다 스키마 정보가 같이 실립니다. 메시지 크기가 커지고 처리 비용이 늘어나는 대신, 데이터 구조가 명확해져요. 단순 텍스트 데이터라면 false 로 둬도 충분하고, 스키마 검증이 중요하다면 true 또는 Avro Converter를 씁니다. 개발 환경은 false, 프로덕션 정형 데이터는 Avro 가 일반적인 패턴이에요.

Distributed Mode 실행 명령

실행 흐름은 이렇게 됩니다.

# 첫 번째 Worker 시작
bin/connect-distributed.sh config/connect-distributed.properties

# 두 번째 Worker 시작 (다른 포트)
# 먼저 설정 파일 복사 후 REST 포트 변경
cp config/connect-distributed.properties \
   config/connect-distributed-worker2.properties
# rest.port=8084로 변경

bin/connect-distributed.sh config/connect-distributed-worker2.properties

같은 호스트에 여러 Worker를 띄우려면 rest.port 만 다르게 주면 돼요. 다른 호스트라면 같은 포트로 OK입니다. 같은 group.id 만 같으면 자동으로 한 클러스터로 묶여요.

Distributed에서 커넥터 배포는 어떻게?

여기가 Standalone과 결정적으로 다른 부분. Distributed Mode에서는 명령줄로 커넥터를 못 띄웁니다. Worker만 띄워두고, 커넥터는 REST API의 POST /connectors 호출로 동적 등록해요. 등록한 커넥터 설정은 자동으로 connect-configs 토픽에 저장되고, 클러스터의 모든 Worker가 그 설정을 공유합니다.

> 한 줄 정리 — Standalone = 명령줄로 커넥터 같이 띄움 / Distributed = Worker만 띄워두고 REST API로 커넥터 등록.

Standalone vs Distributed Mode 비교표 한 번 더

여기까지가 두 모드의 큰 그림이에요. 한 표에 압축해 둡시다.

항목Standalone ModeDistributed Mode
목적개발·테스트프로덕션
Worker 수1개 (1인 사무실)여러 개 (사무소)
장애 허용없음자동 재균형
오프셋 저장로컬 파일 (개인 메모장)Kafka 토픽 (공용 캐비닛)
설정 저장로컬 properties 파일Kafka 토픽
배포 방법명령줄 인수REST API / UI
확장불가Worker 추가로 확장
재시작 복구수동자동

여기서 시험 함정이 하나 있어요. "Standalone Mode가 더 단순하니까 작은 회사도 그걸로 가면 되지 않나?" — 답은 NO. 한 번 장애가 나면 데이터 누락이 발생할 수 있고, 복구도 수동이라 운영 비용이 결국 더 큽니다. 개발 환경 외엔 무조건 Distributed Mode 가 답이에요.

REST API — 사무소 안내 데스크 10가지 표준 요청

Distributed Mode의 모든 운영은 REST API를 거쳐요. UI가 있는 도구(Kafka Connect UI, Confluent Control Center 등)도 내부적으론 다 이 REST API를 호출합니다. 회사 비유로 — 사무소 입구에 있는 안내 데스크예요. 표준 요청 양식이 10가지 있고, 그 외 모든 게 이 10가지의 조합입니다.

기본 엔드포인트

http://localhost:8083/  (Standalone 또는 첫 번째 Worker)
http://localhost:8084/  (두 번째 Worker)

여기서 시험 함정이 하나 있어요. Distributed Mode에서는 어느 Worker에 요청을 보내도 결과가 같습니다. 안내 데스크가 사무소 입구마다 있어서, 어느 데스크에 가도 같은 캐비닛을 보고 답해주는 식이에요. 그래서 운영 자동화 스크립트는 보통 첫 번째 Worker(8083)를 고정으로 부르거나, 로드밸런서 뒤에 두고 분산 호출합니다.

1. Worker 정보 확인 — GET /

가장 가볍게 "이 Worker 살아 있어?" 를 물어보는 호출.

curl -s http://localhost:8083/ | jq .

응답:

{
  "version": "2.6.0",
  "commit": "abc123",
  "kafka_cluster_id": "xyz456"
}

kafka_cluster_id 값이 다른 Worker와 같으면 같은 Kafka 클러스터에 붙어 있다 는 뜻이에요. 두 Worker가 같은 Connect 클러스터인지 확인할 때 자주 씁니다.

2. 사용 가능한 커넥터 플러그인 목록 — GET /connector-plugins

창고에 어떤 부품이 들어 있나 를 보는 호출이에요. plugin.path 에서 로드된 모든 커넥터가 나옵니다.

curl -s http://localhost:8083/connector-plugins | jq .

응답:

[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "11.0.0"
  },
  {
    "class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "type": "source",
    "version": "0.3.33"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.6.0"
  }
]

새 커넥터 JAR을 추가하고 Worker 재시작 후 여기에 등장하면 정상 로드된 거예요.

3. 실행 중인 커넥터 목록 — GET /connectors

현재 사무소에서 돌아가는 작업표.

curl -s http://localhost:8083/connectors | jq .

응답:

[
  "filestream-demo-distributed",
  "source-twitter-distributed",
  "sink-elastic-twitter-distributed",
  "sink-postgres-twitter-distributed"
]

이름만 나옵니다. 상세 정보를 보려면 다음 호출.

4. 특정 커넥터의 작업과 설정 정보 — GET /connectors/{name}

특정 부품의 상세 명세서.

curl -s http://localhost:8083/connectors/source-twitter-distributed | jq .

응답:

{
  "name": "source-twitter-distributed",
  "config": {
    "name": "source-twitter-distributed",
    "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "tasks.max": "1",
    "topic": "tweets",
    "filter.keywords": "programming,java,scala,kafka"
  },
  "tasks": [
    {
      "connector": "source-twitter-distributed",
      "task": 0
    }
  ],
  "type": "source"
}

설정값과 현재 떠 있는 Task 목록을 같이 줘요. "이 커넥터가 정확히 어떤 설정으로 돌고 있지?" 가 궁금할 때 씁니다.

5. 커넥터 상태 확인 — GET /connectors/{name}/status

작업이 정상 돌고 있나 를 보는 호출. 운영 모니터링에서 가장 자주 부릅니다.

curl -s http://localhost:8083/connectors/source-twitter-distributed/status | jq .

응답:

{
  "name": "source-twitter-distributed",
  "connector": {
    "state": "RUNNING",
    "worker_id": "worker1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "worker1:8083"
    }
  ],
  "type": "source"
}

가능한 상태값:

  • RUNNING — 정상 실행 중
  • PAUSED — 일시 정지
  • FAILED — 오류 발생
  • UNASSIGNED — 아직 Worker에 할당되지 않음

여기서 시험 함정이 하나 있어요. Connector 상태와 Task 상태는 따로 봅니다. Connector는 RUNNING 인데 Task가 FAILED 인 경우가 흔해요. 모니터링 스크립트는 두 상태를 모두 검사해야 합니다.

6. 커넥터 일시 정지 / 재개 — PUT /connectors/{name}/pause · /resume

# 커넥터 일시 정지
curl -s -X PUT http://localhost:8083/connectors/source-twitter-distributed/pause

# 커넥터 재개
curl -s -X PUT http://localhost:8083/connectors/source-twitter-distributed/resume

응답 본문이 없어요 (HTTP 204 No Content). 정지된 커넥터는 PAUSED 상태로 바뀌고, 재개하면 다시 RUNNING 으로 돌아갑니다. 회사 비유로 — 일꾼한테 "잠깐 손 놓고 있어" 라고 지시하는 것. Task와 진행 상황은 그대로 두고 잠시 처리만 멈춥니다.

7. 커넥터 삭제 — DELETE /connectors/{name}

curl -s -X DELETE http://localhost:8083/connectors/filestream-demo-distributed

# 삭제 후 다시 삭제 시도하면 404 오류
curl -s -X DELETE http://localhost:8083/connectors/filestream-demo-distributed
# {"error_code":404,"message":"Connector filestream-demo-distributed not found"}

삭제하면 connect-configs 토픽에서 설정이 빠지고, 모든 Task가 종료됩니다.

여기서 시험 함정이 하나 있어요. 삭제는 즉시 반영되지만, 같은 이름으로 바로 다시 등록할 때 가끔 잠깐 충돌이 납니다. Kafka 토픽 동기화 시간이 필요해서 그래요. 자동화 스크립트라면 삭제 후 1~2초 대기하거나, 등록 실패 시 재시도 로직을 두는 게 안전합니다.

8. 새 커넥터 생성 — POST /connectors

가장 자주 부르는 호출. 새 부품을 사무소에 등록하는 표준 요청표.

curl -s -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "name": "filestream-demo-new",
    "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "tasks.max": "1",
      "file": "/tmp/demo-file.txt",
      "topic": "demo-topic-new",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
  }' \
  http://localhost:8083/connectors | jq .

응답:

{
  "name": "filestream-demo-new",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "1",
    "file": "/tmp/demo-file.txt",
    "topic": "demo-topic-new"
  },
  "tasks": [],
  "type": "source"
}

요청 본문 구조를 잘 보세요. 최상위에 nameconfig 두 개가 있고, config 안에 모든 설정이 들어갑니다.

9. 커넥터 설정 업데이트 — PUT /connectors/{name}/config

기존 커넥터의 설정만 바꿀 때 씁니다. POST와 다른 점은 — 새로 만드는 게 아니라 기존 것을 갱신한다는 점, 그리고 요청 본문 구조가 다르다는 점이에요.

curl -s -X PUT \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "2",
    "file": "/tmp/demo-file.txt",
    "topic": "demo-topic-new"
  }' \
  http://localhost:8083/connectors/filestream-demo-new/config | jq .

여기서 시험 함정이 하나 있어요. PUT 업데이트 시 본문 객체에 name 을 포함하지 않습니다. POST는 {name, config} 구조였지만 PUT은 config 객체 자체 만 보내요. 또 PUT은 멱등(idempotent) 이라 — 커넥터가 없으면 새로 만들고, 있으면 업데이트해요. 자동화 스크립트에서 배포할 때 POST 대신 PUT을 쓰는 패턴이 흔한 이유입니다.

10. 커넥터 설정 조회 — GET /connectors/{name}/config

curl -s http://localhost:8083/connectors/filestream-demo-new/config | jq .

응답:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "tasks.max": "2",
  "file": "/tmp/demo-file.txt",
  "topic": "demo-topic-new"
}

순수 설정 객체만 줍니다. (4번 호출은 name·config·tasks·type 을 모두 포함했지만, 10번은 config 만.)

10가지 호출 한 표로 정리

┌──┬────────┬──────────────────────────────┬──────────────────────────┐
│ #│ 메서드  │ 엔드포인트                    │ 설명                     │
├──┼────────┼──────────────────────────────┼──────────────────────────┤
│ 1│ GET    │ /                            │ Worker 버전·정보         │
│ 2│ GET    │ /connector-plugins           │ 사용 가능한 플러그인 목록│
│ 3│ GET    │ /connectors                  │ 실행 중 커넥터 목록      │
│ 4│ GET    │ /connectors/{name}           │ 커넥터 상세 + Task       │
│ 5│ GET    │ /connectors/{name}/status    │ 상태 확인                │
│6a│ PUT    │ /connectors/{name}/pause     │ 일시 정지                │
│6b│ PUT    │ /connectors/{name}/resume    │ 재개                     │
│ 7│ DELETE │ /connectors/{name}           │ 삭제                     │
│ 8│ POST   │ /connectors                  │ 새 커넥터 생성           │
│ 9│ PUT    │ /connectors/{name}/config    │ 설정 업데이트            │
│10│ GET    │ /connectors/{name}/config    │ 설정 조회                │
└──┴────────┴──────────────────────────────┴──────────────────────────┘

이 10가지가 사무소 안내 데스크의 표준 요청표 전부예요. 실무는 거의 다 이 안에서 끝납니다.

jq 유틸리티 — JSON을 사람이 읽게

REST API 응답이 죄다 JSON이라 그냥 보면 한 줄로 와요. jq 를 같이 쓰면 사람이 읽기 좋게 포맷팅됩니다.

# jq 설치 (Alpine Linux)
apk update && apk add jq

# jq 설치 (Ubuntu)
apt-get install jq

# 사용 방법
curl -s http://localhost:8083/connectors | jq .
# jq .       → 전체 JSON 예쁘게 출력
# jq '.[0]'  → 첫 번째 요소만
# jq '.name' → name 필드만 추출

운영 자동화 스크립트에서 특정 필드만 꺼내 쓸 때도 유용해요. curl ... | jq -r '.connector.state' 처럼 쓰면 상태 문자열 하나만 깔끔히 추출됩니다.

모드 전환·디버깅 — 실제로 부딪히는 상황들

여기까지가 설정·REST API의 표준 흐름이고, 여기서부턴 실무에서 자주 부딪히는 상황을 풀어 갑니다.

Standalone에서 Distributed로 어떻게 옮기지?

개발은 Standalone으로 시작하고 프로덕션을 Distributed로 가는 게 정석. 옮기는 흐름은:

  1. Worker Properties를 Distributed용으로 새로 작성group.id, offset.storage.topic, config.storage.topic, status.storage.topic 추가
  2. offset.storage.file.filename 줄 삭제 — 더 이상 로컬 파일 안 씀
  3. 내부 토픽 미리 생성replication.factor 값에 맞는 클러스터인지 확인
  4. Worker 띄우고 — REST API로 커넥터 등록 — 명령줄에 properties 파일 안 넘김
  5. 상태 확인 — GET /connectors/{name}/statusRUNNING 인지

여기서 시험 함정이 하나 있어요. Standalone에서 쓰던 standalone.offsets 파일은 Distributed로 가져갈 수 없습니다. 형식이 다릅니다. Distributed는 처음부터 다시 시작하거나, 소스 시스템 쪽에서 이전 위치를 알아서 잡아주는 방식으로 마이그레이션해야 해요. 그래서 마이그레이션은 보통 유지 보수 시간 에 합니다.

디버깅 — 로그를 어디서 보지?

Standalone Mode 는 간단해요. 명령줄을 띄운 터미널에 그대로 로그가 쏟아집니다.

[INFO] Created connector filestream-demo-standalone
[INFO] Task connector source-0 is starting
[WARN] ... (보통 무시 가능)
[ERROR] ... (조치 필요)

INFO 는 정상, WARN 은 경고, ERROR 가 보이면 그 줄 위아래로 원인을 찾아 갑니다.

Distributed Mode 는 Worker마다 별도 로그 파일이 있어요. 보통 logs/connect.log 또는 logs/connectDistributed.out 같은 위치에 쌓입니다. 컨테이너로 띄웠다면 docker-compose logs -f 또는 컨테이너에 들어가 tail -f 로 따라봐요.

# 실시간 로그 스트리밍
docker-compose logs -f kafka

# 마지막 100줄만
docker logs --tail 100 [container-id]

자주 나오는 오류와 해결

실무에서 가장 자주 만나는 네 가지를 정리해 둘게요.

1. 포트 충돌

Error: Address already in use: 8083

같은 호스트에서 두 번째 Worker를 띄울 때 자주 나요. rest.port=8084 로 바꾸거나 기존 프로세스를 종료합니다.

2. Kafka 연결 실패

Error: TimeoutException: Timeout of 60000ms expired

bootstrap.servers 주소가 틀렸거나 Kafka가 안 떠 있을 때. 컨테이너 환경에서 호스트와 컨테이너 네트워크 차이로 이 오류가 자주 납니다. Docker라면 컨테이너 이름·내부 네트워크 주소를 잘 확인하세요.

3. 커넥터 클래스를 찾을 수 없음

Error: ClassNotFoundException: com.example.MyConnector

plugin.path 가 잘못 가리키고 있거나 JAR 파일이 그 경로에 없을 때. JAR을 올린 후 Worker 재시작 했는지도 확인. 재시작 없이 추가하면 로드 안 됩니다.

4. 메모리 부족 (특히 Sink 대상)

Error: OutOfMemoryError

Elasticsearch·Postgres 같은 Sink 대상 시스템이 충분한 메모리를 못 받았을 때. Docker라면 Docker Desktop의 메모리 할당을 4GB 이상으로 올려요.

디버깅 7단계 — 침착하게 하나씩

오류가 나면 이 순서대로:

  1. 숨 쉬고 침착하게 — 오류는 학습의 일부
  2. 설정 복사 확인 — 한 글자 차이로 안 되는 경우가 절반
  3. 문서 순서대로 따르기 — 단계 건너뛰지 않기
  4. 로그 확인ERROR 또는 WARN 메시지 위주로
  5. 검색 — 같은 오류 메시지 그대로 검색
  6. 공식 문서 재확인 — 버전이 바뀌어 항목 이름이 바뀌었을 수도 있음
  7. 커뮤니티 질문 — OS·Kafka 버전·실행 명령·로그 전체 포함해 정확히

토픽 미리 생성 — 작은 디테일이 큰 차이

마지막으로 한 가지 — 커넥터를 띄울 때 대상 토픽을 미리 만들어 두는 게 좋아요. 이유는:

  • 파티션 수를 원하는 대로 설정 가능
  • 자동 생성에 맡기면 파티션이 보통 1개만 만들어짐
  • 병렬 처리를 위해 적절한 파티션 수가 필요
# 권장: 3개 파티션으로 미리 생성
kafka-topics.sh --create \
  --topic tweets \
  --partitions 3 \
  --replication-factor 1 \
  --zookeeper localhost:2181

여기서 시험 함정이 하나 있어요. tasks.max 값보다 토픽 파티션 수가 적으면 Task가 놀게 됩니다. Source 커넥터는 보통 자체 분할 단위(테이블·파티션 등)에 따라 Task 수가 정해지지만, Sink 커넥터는 토픽 파티션 수가 사실상 최대 병렬도 예요. 토픽을 1 파티션으로 만들어 두고 tasks.max=10 으로 줘도 실제 일하는 Task는 1개입니다.

프로덕션용 설정 — 한 단계 더 올라가기

개발 환경에서 시작했다가 프로덕션으로 가는 단계라면 몇 가지 값을 정리해 줍니다.

# 프로덕션 환경 설정 (요점만)
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# 새 클러스터마다 고유한 group.id
group.id=connect-cluster-prod

# 컨버터 — 정형 데이터면 schemas.enable=true 또는 Avro
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# 내부 토픽 — replication.factor=3 + 충분한 파티션
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5

rest.port=8083
plugin.path=/opt/kafka/connectors
offset.flush.interval.ms=60000

핵심 포인트:

  • bootstrap.servers 에 다중 브로커 — 한 브로커가 죽어도 클러스터에 붙을 수 있게
  • 내부 토픽 replication.factor=3 — 캐비닛도 복제본 3개. 한 브로커 장애에 견딤
  • offset.storage.partitions=25, status.storage.partitions=5 — 충분한 파티션
  • group.id 는 환경별로 분리 (connect-cluster-prod, connect-cluster-stage 등)

여기서 시험 함정이 하나 있어요. 개발 환경(단일 브로커)에서 replication.factor=3 으로 두면 토픽 생성이 실패합니다. 브로커가 1개인데 복제본 3개를 요구하면 만들 수 없어요. 개발은 replication.factor=1, 프로덕션은 3 이 정석입니다.

Distributed Mode·REST API 압축 노트

여기까지가 3편의 핵심이에요. 시험 직전 또는 실무 디버깅 때 다시 펼칠 수 있게 압축 노트로 마무리할게요.

  • Standalone Mode = 1인 사무실 — 단일 JVM, 개발·테스트용, 죽으면 멈춤
  • Standalone은 Worker properties + Connector properties 명령줄로 같이 띄움
  • Standalone의 오프셋은 로컬 파일 (offset.storage.file.filename) — 개인 메모장
  • Distributed Mode = 여러 일꾼 사무소 + 공용 캐비닛 — 프로덕션 정석
  • 공용 캐비닛 세 칸 — connect-offsets(진행 상황) / connect-configs(설정) / connect-status(상태)
  • Distributed에선 Connector properties 파일이 사라지고 REST API로 등록
  • group.id 같으면 한 클러스터 — Consumer Group ID와 절대 충돌 X
  • plugin.path = 커넥터 JAR이 든 창고 (콤마로 다중 경로 가능)
  • key.converter / value.converter = 케이블 양쪽 끝 자료 변환 어댑터
  • schemas.enable=true 면 메시지마다 스키마 동봉 (개발은 false, 정형 데이터는 Avro)
  • 같은 호스트 다중 Worker는 rest.port 만 다르게
  • REST API 10가지 — Worker 정보·플러그인 목록·커넥터 목록·상세·상태·일시정지·재개·삭제·생성·설정 업데이트·설정 조회
  • POST /connectors 본문 = {name, config}, PUT /config 본문 = config만 (name X)
  • PUT /config멱등 — 없으면 생성·있으면 업데이트, 자동화에서 자주 사용
  • Connector 상태와 Task 상태는 따로 모니터링
  • 같은 Kafka 클러스터인지 확인 = kafka_cluster_id 비교
  • jq는 JSON 응답 가독·필드 추출에 필수
  • Standalone → Distributed 마이그레이션 시 standalone.offsets 파일은 이전 불가
  • 토픽은 미리 생성해 적절한 파티션 수 확보
  • 프로덕션 내부 토픽 replication.factor=3, 개발은 1
  • 디버깅 — INFO·WARN·ERROR 순으로 살피고, ERROR 위아래 줄 원인 추적
  • 자주 만나는 오류 4종 — 포트 충돌·Kafka 연결 실패·ClassNotFoundException·OOM
  • 프로덕션은 무조건 Distributed Mode — Standalone은 개발 한정

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!