Apache Kafka Connect 실전 정리 2편. Source·Sink Connector의 자기 자리부터 — JDBC·MongoDB·Twitter·Elasticsearch·S3 같은 대표 커넥터, 설정 파일 구조, 핵심 옵션(connector.class·tasks.max·topics), Twitter→ES·JDBC 실습 흐름, Confluent Hub 설치까지 표준 케이블 비유로 친절하게 정리.
이 글은 Apache Kafka Connect 실전 정리 시리즈의 두 번째 편입니다. 1편에서는 Kafka Connect가 왜 만들어졌고, Connector·Task·Worker 3층 구조가 어떻게 맞물려 돌아가는지를 큰 그림으로 잡아 봤어요. 이번 2편에서는 한 칸 더 깊이 들어갑니다 — 실제 커넥터를 어떻게 고르고, 어떻게 설정해서, 어떻게 돌리는가 하는 손맛 단계예요.
특히 Source Connector 와 Sink Connector의 자리를 명확히 가르고, JDBC·MongoDB·Twitter·Elasticsearch·S3 같은 대표 커넥터를 자기 자리에 꽂아 두는 게 이번 단원의 목표입니다. 외운다기보다 "어떤 자리에 어떤 케이블이 어울리는지"를 한 번 그림으로 잡으면 시험과 실무 모두 편해져요.
비유는 1편과 그대로입니다. Source Connector = 외부에서 카프카로 자료 끌고 오는 케이블, Sink Connector = 카프카에서 외부로 자료 내보내는 케이블, 커넥터 설정 = 케이블 양 끝의 주소 적기, tasks.max = 케이블에서 빼낼 동시 작업 줄 개수, Confluent Hub = 케이블 부품 마켓플레이스. 부품 카탈로그가 궁금하면 Confluent Hub와 Kafka Connect 공식 문서를 곁에 두고 보면 좋아요.
왜 이 단원이 처음엔 어렵게 느껴질까요
이유는 두 가지예요.
첫째, 커넥터 종류가 너무 많아서 압도되기 쉽습니다. JDBC·MongoDB·Twitter·Elasticsearch·S3·HDFS·Cassandra·DynamoDB·FTP·Salesforce·GitHub API…. 한 번에 다 외우려 하면 머릿속이 엉켜요.
둘째, 설정 파일에 들어가는 옵션이 비슷비슷해 보여서 어느 게 필수고 어느 게 옵션인지, 어느 게 Source용이고 어느 게 Sink용인지가 한눈에 안 잡힙니다. topic이랑 topics처럼 한 글자 차이로 의미가 달라지는 함정도 있고요.
해결법은 한 가지예요. 커넥터 종류는 "Source 줄"과 "Sink 줄" 두 통으로 나눠 통째로 묶고, 설정 옵션은 "모든 커넥터 공통 3가지 + Source 전용 + Sink 전용" 으로 분류해서 머릿속에 정리해 두면 끝입니다. 이 글이 그 정리 작업을 같이 합니다.
Source Connector 자리부터 — 외부에서 카프카로 끌고 오는 케이블
Source Connector 는 외부 시스템에서 데이터를 끌어와 카프카 토픽으로 넣어주는 커넥터예요. 회사 비유로 — 외부 창고에서 우리 회사 보관함(=카프카 토픽)으로 자료를 옮겨 주는 케이블입니다. 한쪽 끝은 외부 시스템(MySQL·MongoDB·Twitter API·파일 등)에 꽂혀 있고, 다른 한쪽 끝은 카프카 토픽에 꽂혀요.
[외부 소스] → [Source Connector] → [Kafka Topic]
예시:
MySQL DB → JDBC Source Connector → topic: mysql-users
Twitter API → Twitter Source Connector → topic: tweets
파일 → FileStream Source Connector → topic: demo-1-standalone
GitHub API → GitHub Source Connector → topic: github-issues
전통적으로는 Kafka Producer API 로 직접 코드를 짜야 했던 영역이에요. 매번 연결 관리·재시도·오프셋 추적·장애 복구·병렬 처리를 처음부터 짜야 했죠. Source Connector를 쓰면 이걸 다 프레임워크가 알아서 처리합니다 — 우리는 설정 파일만 채우면 돼요.
여기서 시험 함정이 하나 있어요. "Source Connector도 결국 내부에선 Producer를 쓰는 거 아닌가?"라는 질문에 답은 "맞다, 하지만 그게 핵심이 아니다" 입니다. Source Connector의 본질은 Producer를 한 겹 더 감싼 게 아니라 — 연결 관리·오프셋 추적·재시도·장애 복구·병렬 처리까지 한 묶음으로 표준화한 프레임워크라는 점이에요. "그래서 직접 Producer 짜는 거랑 뭐가 달라요?" 라고 묻는다면, 답은 "운영 기능이 통째로 들어 있다" 입니다.
대표 Source Connector 카탈로그
자기 자리에 어떤 케이블이 있는지 한 번 훑어 봅시다. 거의 모든 시스템용 Source Connector가 이미 누군가 만들어 두었어요.
| Source Connector | 자리 | 대표 용도 |
|---|---|---|
| JDBC Source | 관계형 DB → Kafka | MySQL·PostgreSQL·Oracle 변경분 가져오기 |
| MongoDB Source | NoSQL → Kafka | 문서 DB의 변경 스트림 |
| Twitter Source | API → Kafka | 키워드 기반 트윗 실시간 수집 |
| FileStream Source | 파일 → Kafka | 로그 파일 추적, 학습용 기본 커넥터 |
| Debezium (CDC) | DB 로그 → Kafka | DB 변경 이벤트 캡처 |
| GitHub Source | API → Kafka | 이슈·PR 이벤트 수집 |
| DynamoDB Source | AWS NoSQL → Kafka | 변경 스트림 |
| Cassandra Source | NoSQL → Kafka | 와이드 컬럼 DB 데이터 |
| FTP/SFTP Source | 파일서버 → Kafka | 외부 파일 드롭 처리 |
| Salesforce Source | SaaS → Kafka | CRM 객체 이벤트 |
여기서 시험 함정이 하나 있어요. "JDBC Source면 무조건 모든 DB가 다 되겠지?" 라고 생각하기 쉬운데, 정확히 말하면 JDBC Source는 JDBC 드라이버가 존재하는 DB만 됩니다. PostgreSQL·MySQL·Oracle은 표준이지만, 일부 신생 DB는 별도 드라이버를 따로 넣어줘야 해요. 그리고 JDBC Source는 기본적으로 풀링(polling) 기반 이라 "변경 즉시" 잡는 데 한계가 있고, 진짜 실시간 변경 추적이 필요하면 Debezium 같은 CDC 커넥터가 정석입니다.
File Stream Source Connector — 가장 작은 실습 케이블
가장 단순한 Source Connector를 손에 익혀 봅시다. File Stream Source Connector 는 카프카에 기본 내장된 학습용 커넥터예요. 회사 비유로 — "파일 한 줄 들어오면 토픽 한 메시지로 보내주는 1인용 케이블" 같은 거예요.
Standalone 모드일 때 설정 파일은 두 개로 나뉩니다 — Worker 설정 파일과 Connector 설정 파일.
# worker.properties — Standalone Worker 설정
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
rest.port=8083
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000
# filestream-demo-standalone.properties — Connector 설정
name=filestream-demo-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
topic=demo-1-standalone
실행은 두 파일을 같이 넘기면 돼요.
# Standalone 모드 실행 (Worker + Connector 동시 기동)
connect-standalone.sh worker.properties filestream-demo-standalone.properties
# 파일에 데이터 추가하면 자동으로 토픽에 흘러 들어감
echo "Hello World" >> /tmp/demo-file.txt
echo "Second Line" >> /tmp/demo-file.txt
내부 동작은 이렇게 굴러갑니다.
- 커넥터가
demo-file.txt를 모니터링 - 새 줄이 추가되면 자동으로 카프카 토픽에 전송
standalone.offsets파일에 어디까지 읽었는지(=오프셋) 저장- 커넥터가 죽었다 살아나도 마지막 읽은 위치부터 이어서 읽음
여기서 시험 함정이 하나 있어요. "Standalone 모드라서 죽으면 오프셋이 다 날아가는 거 아닌가?" 가 자주 헷갈리는 지점인데, 답은 "아니다" 입니다. Standalone도 offset.storage.file.filename 으로 지정한 로컬 파일에 오프셋을 저장하기 때문에, 프로세스가 재시작돼도 마지막 위치부터 이어서 읽어요. 다만 그 파일을 저장한 서버 자체가 죽으면 복구가 어렵죠. Distributed 모드는 오프셋을 카프카 토픽에 저장해서 이 문제를 해결합니다.
File Stream Source — Distributed 모드 버전
Distributed 모드에선 Worker 설정 파일이 필요 없어요. 워커 클러스터는 이미 떠 있고, 우리는 Connector 설정만 클러스터에 던져 주면 되거든요.
# filestream-demo-distributed.properties
name=filestream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
스키마(Schema)를 켜면(schemas.enable=true) 메시지 형식이 이렇게 바뀝니다.
{
"schema": {
"type": "string",
"optional": false
},
"payload": "Hello World"
}
스키마를 끄면(schemas.enable=false) 본문만 그대로 들어가요.
"Hello World"
여기서 시험 함정이 하나 있어요. "스키마를 켜면 좋은 거 아닌가? 무조건 켜자" 가 단순화 함정입니다. 스키마를 켜면 메시지가 통째로 부풀어요(payload + schema). 단순한 문자열 한 줄 보낼 때 스키마까지 같이 보내면 트래픽이 크게 늘죠. 반대로 JDBC Sink처럼 스키마 정보가 반드시 필요한 Sink 와 짝지을 때는 켜야 하고요. 결론은 "Sink가 요구하면 켜고, 아니면 끈다" — 짝꿍 따라 결정하는 게 정석입니다.
Distributed 모드 배포는 두 가지 방법이 있어요.
- Kafka Connect UI 에서 New → FileStreamSource 선택 후 설정 붙여넣기
- REST API 로 JSON 형태로 POST
REST API 방식은 3편에서 자세히 풀어요. 지금은 "설정만 던지면 클러스터가 알아서 돌린다"는 그림만 잡아 두면 충분합니다.
Twitter Source Connector — 실전 케이블의 첫 맛
학습용 파일 케이블에서 한 단계 올라간 게 Twitter Source Connector예요. 키워드 기반으로 트윗을 실시간 수집해 카프카 토픽에 흘려 줍니다.
먼저 사전 조건이 있어요.
- Twitter 계정 생성
- Twitter Developer Portal(developer.twitter.com)에서 개발자 계정 신청
- 앱 생성 후 API Key·API Secret Key·Access Token·Access Token Secret 획득
설정 파일은 이렇게 생겼어요.
# source-twitter-distributed.properties
name=source-twitter-distributed
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=tweets
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# Twitter 인증 정보 (개인 키로 교체)
twitter.oauth.consumerKey=YOUR_API_KEY
twitter.oauth.consumerSecret=YOUR_API_SECRET
twitter.oauth.accessToken=YOUR_ACCESS_TOKEN
twitter.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRET
# 추적할 키워드 (쉼표 구분)
filter.keywords=programming,java,scala,kafka
twitter.debug=false
수집된 트윗 한 건이 토픽에 어떻게 적재되는지 한 번 봅시다.
{
"schema": {...},
"payload": {
"CreatedAt": 1234567890,
"Id": 9876543210,
"Text": "Learning Apache Kafka Connect! #kafka #programming",
"User": {
"Id": 111222333,
"Name": "johndoe",
"ScreenName": "johndoe",
"FollowersCount": 212
},
"IsRetweet": false,
"HashtagEntities": [
{"Text": "kafka"},
{"Text": "programming"}
]
}
}
여기서 시험 함정이 하나 있어요. 키워드 선택에서 처음 실수하기 쉽습니다. USA·music 같이 트래픽이 폭발적인 키워드를 넣으면 트윗이 초당 수백 건씩 쏟아져서 로컬 머신이 못 버팁니다. 반대로 너무 특이한 키워드(예: 본인 이름)를 넣으면 며칠을 기다려도 한 건도 못 받죠. 학습용으로는 programming,java,scala,kafka 처럼 적당히 활발한 기술 키워드가 정석입니다. 이 함정 한 번 빠지면 노트북이 굉음을 내요.
> 한 줄 정리 — Source Connector = 외부에서 카프카로 끌고 오는 케이블. JDBC·MongoDB·Twitter·File·Debezium이 대표 부품. topic (단수) 으로 쓸 토픽 하나를 지정.
Sink Connector 자리 — 카프카에서 외부로 내보내는 케이블
방향을 반대로 돌리면 Sink Connector 가 됩니다. 카프카 토픽에 쌓인 데이터를 읽어 외부 저장소(Elasticsearch·DB·S3 등)에 적재해 주는 커넥터예요. 회사 비유로 — 우리 회사 보관함(=카프카 토픽)에서 외부 창고로 자료를 옮기는 케이블입니다.
[Kafka Topic] → [Sink Connector] → [외부 저장소]
예시:
tweets → Elasticsearch Sink → Elasticsearch Index
tweets → JDBC Sink → PostgreSQL Table
s3-logs → S3 Sink → AWS S3 Bucket
전통적으로는 Kafka Consumer API 로 직접 코드를 짜야 했던 영역이에요. Sink Connector를 쓰면 컨슈머 그룹·오프셋 커밋·재시도·장애 복구가 다 알아서 굴러갑니다.
여기서 시험 함정이 하나 있어요. Sink Connector의 오프셋은 일반 Consumer 그룹과 같은 방식으로 카프카에 저장됩니다. Source Connector의 오프셋은 별도 토픽(connect-offsets)에 저장되지만, Sink는 그냥 평범한 Consumer Group 오프셋이에요. 그래서 Sink 커넥터 이름이 그대로 컨슈머 그룹 ID로 쓰입니다 — connect-<커넥터이름> 형식. 이름을 함부로 바꾸면 그룹이 새로 만들어져 처음부터 다시 읽기 시작해요.
대표 Sink Connector 카탈로그
| Sink Connector | 자리 | 대표 용도 |
|---|---|---|
| Elasticsearch Sink | Kafka → 검색엔진 | 실시간 검색·로그 분석 |
| JDBC Sink | Kafka → 관계형 DB | PostgreSQL·MySQL·Oracle 적재 |
| S3 Sink | Kafka → AWS S3 | 데이터 레이크·아카이브 |
| HDFS Sink | Kafka → Hadoop | 빅데이터 적재 |
| Cassandra Sink | Kafka → NoSQL | 와이드 컬럼 DB |
| DynamoDB Sink | Kafka → AWS NoSQL | 키-값 저장 |
| MongoDB Sink | Kafka → NoSQL 문서 | JSON 그대로 저장 |
| GCS Sink | Kafka → Google Cloud Storage | 클라우드 아카이브 |
| Redis Sink | Kafka → 인메모리 | 캐시 동기화 |
이 표 한 장만 머릿속에 박아 두면 "어떤 시스템에 적재할 때 어떤 Sink를 쓸까?" 라는 질문에 90% 이상 답할 수 있어요. 특수한 자리(예: BigQuery·Snowflake)는 Confluent Hub에서 검색하면 거의 다 있습니다.
Elasticsearch Sink Connector — 검색엔진으로 흘려 보내기
가장 자주 쓰는 Sink 중 하나가 Elasticsearch Sink예요. 카프카 토픽에 쌓인 JSON 메시지를 그대로 Elasticsearch 인덱스로 적재합니다. 검색·대시보드·로그 분석의 표준 조합이에요.
먼저 Elasticsearch가 떠 있어야 합니다.
# docker-compose.yml의 elasticsearch 섹션
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.12
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
확인은 curl 한 줄이면 끝.
curl http://localhost:9200
# 정상 응답
{
"name" : "NebulaNode",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "abc123",
"version" : {
"number" : "6.8.12"
},
"tagline" : "You Know, for Search"
}
이제 Sink Connector 설정.
# sink-elastic-twitter-distributed.properties
name=sink-elastic-twitter-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=tweets
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# Elasticsearch 연결
connection.url=http://elasticsearch:9200
# 문서 타입 (ES의 document type)
type.name=kafka-connect
# Twitter 메시지는 키가 null
key.ignore=true
핵심 옵션을 표로 정리하면 이렇습니다.
| 옵션 | 설명 | 예시 |
|---|---|---|
topics | 읽을 카프카 토픽 (S 붙음, 복수 가능) | tweets |
connection.url | Elasticsearch 주소 | http://elasticsearch:9200 |
type.name | ES 문서 타입 | kafka-connect |
key.ignore | 키가 null이면 무시 | true |
tasks.max | 동시 처리 줄 개수 | 2 (보통 파티션 수만큼) |
여기서 시험 함정이 하나 있어요. Source의 topic 과 Sink의 topics 의 차이입니다. 한 글자 차이지만 의미가 완전히 달라요.
- Source Connector:
topic=tweets— 단수, 쓸 토픽 하나 지정 - Sink Connector:
topics=tweets— 복수, 읽을 토픽 여러 개 지정 가능
이걸 헷갈려서 topic 으로 적었다가 검증 단계에서 빨간 줄이 뜨는 게 입문자 1순위 실수예요. 외울 때 "Source는 한 군데에 쓰니까 단수, Sink는 여러 토픽에서 모아 받을 수 있으니까 복수" 로 묶어 두면 잊히지 않습니다.
적재된 트윗은 Elasticsearch 인덱스에서 이렇게 보여요.
{
"_id": "9876543210",
"_source": {
"CreatedAt": 1234567890,
"Id": 9876543210,
"Text": "Learning Kafka Connect! #kafka",
"User": {
"Name": "johndoe",
"FollowersCount": 212
},
"IsRetweet": false
}
}
쿼리 두 개 정도 손에 익히면 검색 감이 잡혀요.
// 리트윗만 조회
{
"query": {
"match": {
"IsRetweet": true
}
}
}
// 팔로워 500명 이상 사용자의 트윗
{
"query": {
"range": {
"User.FollowersCount": {
"gte": 500
}
}
}
}
JDBC Sink Connector — PostgreSQL로 흘려 보내기
다음 케이블은 JDBC Sink. 카프카 토픽 메시지를 관계형 DB 테이블에 적재해 줍니다. 회사 비유로 — 카프카 토픽이라는 흐름판에서 표 양식 보관함(=RDB 테이블)으로 자료를 옮기는 케이블이에요.
PostgreSQL을 도커로 띄워 두고:
# docker-compose.yml의 postgres 섹션
postgres:
image: postgres:10
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
ports:
- "5432:5432"
JDBC Sink 설정은 이렇게.
# sink-postgres-twitter-distributed.properties
name=sink-postgres-twitter-distributed
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# 읽을 토픽
topics=tweets
# 컨버터 (스키마 켜야 컬럼 매핑 가능)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# JDBC 연결
connection.url=jdbc:postgresql://postgres:5432/postgres
connection.user=postgres
connection.password=postgres
# 삽입 모드: upsert (INSERT + UPDATE 동시)
insert.mode=upsert
# Primary Key
pk.mode=kafka
pk.fields=__connect_topic,__connect_partition,__connect_offset
# 테이블·스키마 자동 관리
auto.create=true
auto.evolve=true
# 저장할 필드 화이트리스트 (최상위 필드만)
fields.whitelist=id,createdat,text,lang,isretweet,retweetcount
핵심 옵션 표.
| 옵션 | 설명 | 주의 |
|---|---|---|
insert.mode | 적재 방식 | upsert 가 가장 안전 (있으면 갱신, 없으면 삽입) |
pk.mode | Primary Key 방식 | kafka 로 두면 토픽·파티션·오프셋이 자동 PK |
auto.create | 테이블 자동 생성 | true 면 테이블 없어도 자동 생성 |
auto.evolve | 스키마 자동 진화 | true 면 새 필드 추가 시 컬럼 자동 추가 |
fields.whitelist | 적재할 필드 목록 | 중첩 JSON 필드는 포함 불가 |
여기서 시험 함정이 하나 있어요. fields.whitelist 에 중첩된 JSON 필드를 넣으면 무조건 오류입니다. Twitter 메시지를 예로 들면:
Twitter JSON 구조:
{
"id": 123456, ← 최상위 (화이트리스트 가능)
"text": "...", ← 최상위 (화이트리스트 가능)
"user": { ← 중첩 (화이트리스트 불가)
"name": "johndoe",
"followers_count": 212
},
"entities": {...} ← 중첩 (화이트리스트 불가)
}
user.name 같이 점(.) 표기로 끌고 오려고 해도 안 돼요. 중첩 필드를 RDB 컬럼에 넣으려면 Single Message Transform(SMT) 으로 평탄화(flatten)를 먼저 거쳐야 합니다. SMT는 4편에서 풀어요.
PostgreSQL에서 적재 결과를 확인할 때 자주 쓰는 SQL 몇 개.
-- 적재된 트윗 조회
SELECT * FROM "tweets" LIMIT 20;
-- 특정 오프셋 이후만
SELECT * FROM "tweets"
WHERE __connect_offset = 50;
-- 총 트윗 수
SELECT COUNT(*) FROM "tweets";
-- 리트윗만
SELECT * FROM "tweets"
WHERE isretweet = true;
SQL 클라이언트로는 SQLectron(https://sqlectron.github.io/)이 가벼워요. Mac·Windows·Linux 모두 지원합니다.
> 한 줄 정리 — Sink Connector = 카프카에서 외부로 내보내는 케이블. ES·JDBC·S3가 대표 부품. topics (복수) 로 읽을 토픽들을 지정.
Source vs Sink — 한 장 정리
자주 헷갈리는 둘의 차이를 한 장으로 묶어 두면 시험·실무 모두 편해져요.
| 항목 | Source Connector | Sink Connector |
|---|---|---|
| 방향 | 외부 → Kafka | Kafka → 외부 |
| 역할 | Producer 대체 | Consumer 대체 |
| 토픽 설정 키 | topic (단수) | topics (복수) |
| 대표 예시 | JDBC Source, MongoDB, Twitter | Elasticsearch, JDBC Sink, S3 |
| 오프셋 관리 | Connect가 별도 토픽에 관리 | Consumer Group으로 관리 |
tasks.max 의미 | 외부에서 읽을 동시 줄 | 토픽에서 읽을 동시 줄 (보통 파티션 수와 매칭) |
여기서 시험 함정이 하나 있어요. tasks.max 를 무조건 크게 잡으면 빨라지는 게 아닙니다. Source Connector의 tasks.max 는 외부 시스템이 병렬 읽기를 지원해야 의미가 있고(예: JDBC Source는 테이블별로 분할), Sink Connector의 tasks.max 는 토픽 파티션 수를 넘으면 의미가 없어요. 파티션 4개 토픽을 tasks.max=10 으로 잡아 봐야 6개 Task는 놀고 있게 됩니다. 정석은 "파티션 수와 같거나 작게" 두는 거예요.
모든 커넥터 공통 — 필수 설정 3가지
종류는 달라도 모든 커넥터에 반드시 들어가야 하는 설정이 딱 세 개예요. 이 셋만 외워 두면 어떤 커넥터든 "어디부터 채울지" 가 보입니다.
# 1. 커넥터 이름 (클러스터 내 고유)
name=my-unique-connector-name
# 2. 커넥터 클래스 (실행할 JAR의 클래스명)
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
# 3. 최대 Task 수
tasks.max=1
커넥터 이름 — 그룹 ID로 쓰이는 신분증
name 은 단순 라벨이 아니에요. 같은 Connect 클러스터 안에서 고유 해야 하고, 오프셋·컨슈머 그룹 ID에 그대로 쓰입니다. 그러니 명확하고 설명적인 이름이 정석.
좋은 예:
- source-twitter-distributed
- sink-elasticsearch-twitter
- jdbc-mysql-users-source
나쁜 예:
- connector1
- test
- my-connector
여기서 시험 함정이 하나 있어요. 운영 중인 Sink 커넥터의 name 을 살짝 바꾸면 어떻게 될까요? 답은 "새 컨슈머 그룹이 생겨서 토픽을 처음부터 다시 읽기 시작한다" 입니다. 며칠치 데이터를 다시 적재하느라 외부 시스템이 폭주할 수 있어요. 이름은 처음 정할 때 한 번에 잘 정하고, 운영 중엔 절대 안 건드리는 게 정석입니다.
connector.class — 어떤 케이블 부품을 끼울지
connector.class 는 실행할 JAR 안의 정확한 클래스명이에요. 풀 패키지 경로로 적어야 합니다. 오타 한 글자에 즉시 실패하니 공식 문서나 Confluent Hub의 설명을 그대로 복사해 쓰는 게 안전해요.
자주 쓰는 클래스명 몇 개.
File Stream Source: org.apache.kafka.connect.file.FileStreamSourceConnector
File Stream Sink: org.apache.kafka.connect.file.FileStreamSinkConnector
JDBC Source: io.confluent.connect.jdbc.JdbcSourceConnector
JDBC Sink: io.confluent.connect.jdbc.JdbcSinkConnector
ES Sink: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
S3 Sink: io.confluent.connect.s3.S3SinkConnector
Twitter Source: com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
tasks.max — 동시 작업 줄 개수
tasks.max 는 이 케이블에서 빼낼 동시 작업 줄 개수예요. 1편에서 잡았던 비유 그대로입니다. 너무 작으면 처리량이 안 늘고, 너무 크면 자원만 먹어요.
가이드라인:
- Source: 외부 시스템이 병렬 읽기를 지원하는 만큼만 (JDBC Source는 테이블·쿼리 단위 분할 가능)
- Sink: 보통 읽을 토픽의 파티션 수와 같거나 작게. 파티션 4면
tasks.max=4가 상한.
Converter — 바이트와 객체 사이의 통역사
커넥터 설정에서 빠질 수 없는 게 Converter예요. 카프카는 메시지를 바이트로 저장하지만, 커넥터 안에서는 객체로 다뤄야 하니 그 사이 통역이 필요합니다. 그 통역사가 Converter예요.
# JSON Converter — 가장 일반적
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
스키마 켜고 끌 때 메시지 모양이 어떻게 달라지는지 다시 한 번 짚어 봅시다.
schemas.enable=false (스키마 없음)
"Hello World"
schemas.enable=true (스키마 포함)
{
"schema": {
"type": "string",
"optional": false
},
"payload": "Hello World"
}
여기서 시험 함정이 하나 있어요. Source Connector의 Converter 설정과 Sink Connector의 Converter 설정이 일치해야 합니다. Source가 schemas.enable=true 로 메시지를 발행했는데 Sink가 false 로 읽으면 — 메시지 안의 schema·payload 구조를 그대로 본문으로 받아들여서 데이터가 깨져요. 짝꿍 둘이 약속한 통역 방식을 맞춰야 메시지가 깔끔하게 흐릅니다.
다른 Converter도 자리에 따라 골라 쓸 수 있어요.
- JsonConverter — 가장 일반적, 사람이 읽기 쉬움
- AvroConverter — 스키마 레지스트리와 함께 사용, 압축률 좋음
- StringConverter — 단순 문자열만 다룰 때
- ByteArrayConverter — 바이트 그대로
Kafka Connect UI — 케이블 배전반 보기
설정 파일을 매번 손으로 던지는 건 불편해요. Kafka Connect UI(예: Landoop)는 클러스터 안 커넥터들을 한눈에 보여주는 배전반 같은 도구입니다.
URL 예시: http://localhost:3030/kafka-connect-ui
UI에서 할 수 있는 것:
- 왼쪽: 실행 중인 커넥터 목록
- 중앙(Connect Topology): 토픽-커넥터 연결 시각화
- New 버튼: 새 커넥터 생성
- 상태 확인: 클릭 한 번으로 RUNNING / PAUSED / FAILED 확인
- Task 목록: 각 Task가 어느 Worker에 붙어 있는지
새 커넥터 만드는 흐름은 이래요.
1. UI에서 "New" 클릭
2. 사용 가능한 커넥터 목록에서 선택 (예: FileStreamSourceConnector)
3. 설정값 입력 또는 properties 파일 붙여넣기
4. "Validate" 클릭으로 유효성 검사
5. "Create" 클릭으로 커넥터 생성
Connect Topology 시각화는 이런 그림으로 보여요.
Source Twitter (분산)
↓
tweets → Sink Elasticsearch (분산)
→ Sink PostgreSQL (분산)
여기서 시험 함정이 하나 있어요. UI는 편의 도구일 뿐, 본질은 REST API 입니다. UI가 백엔드로 호출하는 건 결국 Connect 클러스터의 REST API예요. 운영 자동화·CI/CD·인프라 코드화 같은 자리에선 REST API를 직접 호출하는 게 정석이고, UI는 "지금 뭐가 돌고 있나" 빠르게 보는 데 씁니다. REST API의 자세한 동작은 3편에서 풀어요.
커넥터 설치 — Confluent Hub와 수동 두 가지 길
마지막으로 커넥터를 어떻게 클러스터에 끼워 넣는지 정리합시다. 길은 두 갈래예요.
1. Confluent Hub — 케이블 부품 마켓플레이스
Confluent Hub 는 Kafka Connect 커넥터의 공식 마켓플레이스예요. 거의 모든 시스템용 커넥터가 모여 있고, 다운로드 수·평점·문서를 한 자리에서 봅니다.
설치는 두 방식.
# 1. Confluent Hub CLI (가장 깔끔)
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
# 2. ZIP 다운로드 후 plugin.path에 풀기
unzip kafka-connect-elasticsearch.zip -d /usr/share/kafka/plugins/
설치 후엔 Connect Worker를 재시작해야 새 커넥터가 인식돼요. Distributed 모드에선 모든 Worker에 같은 커넥터를 똑같이 깔아야 합니다 — 한 노드에만 깔면 그 노드로 Task가 갈 때만 동작해 어디선 되고 어디선 안 되는 환장의 상황이 벌어져요.
2. 수동 설치 — GitHub·자체 빌드
오픈 소스 커넥터를 직접 빌드해서 쓰거나, 사내 커스텀 커넥터를 배포할 때는 수동 설치예요. JAR 파일을 plugin.path 로 지정된 디렉토리에 넣고 워커를 재시작합니다.
# worker 설정에 plugin.path 지정
plugin.path=/usr/share/kafka/plugins,/opt/custom-connectors
여기서 시험 함정이 하나 있어요. plugin.path 의 각 경로는 "커넥터 디렉토리들이 들어 있는 부모 디렉토리" 예요. JAR 파일이 직접 놓인 경로가 아닙니다. 구조는 이렇게 생겼어야 해요.
/usr/share/kafka/plugins/
├── kafka-connect-elasticsearch/
│ ├── kafka-connect-elasticsearch-*.jar
│ └── (의존성 JAR들)
├── kafka-connect-jdbc/
│ ├── kafka-connect-jdbc-*.jar
│ └── (의존성 JAR들)
└── kafka-connect-twitter/
└── ...
이 구조를 안 맞추면 워커 시작 시 ClassNotFoundException 이 뜨거나 커넥터 목록에 안 잡혀요. 입문자 단골 함정.
설정 검증과 디버깅 — 빨간 줄 만났을 때
설정 파일을 던졌는데 안 돈다면 어디부터 봐야 할까요. 순서를 잡아 두면 30분이 5분으로 줄어요.
name중복 여부 — 클러스터 안에 같은 이름이 있으면 거절connector.class오타 — 풀 패키지 경로 정확히- JAR이
plugin.path에 있는가 — 모든 워커에 같이 깔렸는지 topicvstopics— Source는 단수, Sink는 복수- Converter 짝 맞춤 — Source·Sink의
schemas.enable일치 - Connect Worker 로그 — 가장 정확한 에러 단서.
ERROR·Caused by키워드로 검색 - REST API 상태 —
GET /connectors/로 RUNNING / FAILED 확인/status - Task별 상태 —
GET /connectors/로 어느 Task가 죽었는지/tasks
여기서 시험 함정이 하나 있어요. "커넥터 자체는 RUNNING인데 Task가 FAILED" 인 경우가 자주 있습니다. 커넥터 상태와 Task 상태는 따로 관리되거든요. status 만 보고 안심하면 안 되고, tasks 도 같이 확인하는 게 정석. 이건 3편에서 REST API와 함께 자세히 풀어요.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 Connect 2편의 핵심입니다. 시험 직전이나 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Source Connector = 외부 → Kafka 케이블, Sink Connector = Kafka → 외부 케이블
- Source 대표: JDBC·MongoDB·Twitter·File·Debezium·GitHub·DynamoDB
- Sink 대표: Elasticsearch·JDBC·S3·HDFS·Cassandra·MongoDB·GCS·Redis
- 모든 커넥터 공통 필수 3가지 —
name·connector.class·tasks.max name은 클러스터 내 고유, 컨슈머 그룹 ID 로 쓰이니 운영 중엔 절대 변경 금지connector.class는 풀 패키지 경로 (오타 한 글자에 즉시 실패)topic(단수) = Source /topics(복수) = Sink — 한 글자 차이 1순위 함정tasks.max는 무조건 크게 잡으면 안 됨. Sink는 파티션 수가 상한- File Stream Source = 학습용 가장 작은 케이블, Standalone에선 Worker + Connector 두 파일
- Standalone 오프셋은 로컬 파일(
offset.storage.file.filename), Distributed는 카프카 토픽 - Twitter Source 키워드 — 너무 인기·너무 특이 둘 다 X.
programming,java,scala,kafka정도가 정석 - Elasticsearch Sink —
key.ignore=true가 트위터처럼 키 없는 메시지의 정석 - JDBC Sink —
insert.mode=upsert·auto.create=true·auto.evolve=true가 안전 기본값 - JDBC Sink
fields.whitelist에 중첩 JSON 필드 못 넣음 — 평탄화는 SMT 자리(4편) pk.mode=kafka+pk.fields=__connect_topic,__connect_partition,__connect_offset가 안전 PK 패턴- Source·Sink 의 Converter
schemas.enable은 짝꿍끼리 일치 해야 - 스키마 켜기는 트래픽 부풀림. Sink가 요구할 때만 켠다 가 정석
- Sink 오프셋 = Consumer Group 오프셋, Source 오프셋 =
connect-offsets토픽 - Connect UI 는 편의 도구, 본질은 REST API (3편에서 자세히)
- Confluent Hub 가 부품 마켓플레이스 정석. CLI 한 줄로 설치
plugin.path는 "커넥터 디렉토리들의 부모 디렉토리". JAR 직접 놓는 곳 아님- Distributed 모드에선 모든 Worker에 같은 커넥터를 똑같이 설치 — 안 그러면 환장
- 디버깅 순서 — 이름·클래스·plugin.path·topic 단복수·Converter 짝·로그·REST status·Task status
- 커넥터 RUNNING ≠ Task RUNNING. 둘 다 따로 확인
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Kafka Connect 입문
- 2편 — Source · Sink Connector (현재 글)
- 3편 — Distributed Mode · REST API
- 4편 — SMT · 커스텀 커넥터
- 5편 — 운영 · 프로덕션 (완)