Apache Kafka Connect 실전 정리 4편. 스키마 개념(Schemaless·Avro·JSON Schema), Converter 3종(JsonConverter·AvroConverter·StringConverter), Single Message Transform(SMT)의 자기 자리, SMT vs Kafka Streams 차이, Java로 GitHub Source 같은 커스텀 커넥터 작성까지 가공기 비유로 친절하게 정리.
이 글은 Apache Kafka Connect 실전 정리 시리즈의 네 번째 편입니다. 1편에서 표준 케이블 시스템의 큰 그림을, 2편에서 Source·Sink 커넥터의 종류를, 3편에서 Distributed Mode·REST API 설정까지 풀었어요. 이제는 자연스럽게 다음 질문이 따라옵니다 — "데이터를 그대로 옮기지 말고 중간에 살짝 가공하고 싶을 땐 어떻게 하지?", 그리고 "커뮤니티 커넥터에 없는 시스템과 카프카를 잇고 싶으면 어떻게 하지?"
이 두 질문에 답해주는 게 4편의 두 주인공이에요. 하나는 케이블 중간에 박는 작은 가공기 — SMT(Single Message Transform), 다른 하나는 표준 부품에 없는 케이블을 직접 깎는 커스텀 커넥터. 둘 다 데이터를 가공한다는 점에선 같지만 자기 자리가 분명히 다릅니다. 공식 사양은 Kafka Connect 공식 문서와 Single Message Transforms 문서를 곁에 두고 보면 좋아요.
왜 이 단원이 처음엔 어렵게 느껴질까요
이유는 세 가지예요.
첫째, "가공"이라는 말이 너무 막연합니다. 메시지를 가공한다는 게 필드 하나 바꾸는 건지, 토픽을 갈아끼우는 건지, 통째로 새 메시지를 만드는 건지 — 범위가 안 잡혀요.
둘째, SMT와 Kafka Streams가 비슷해 보입니다. 둘 다 데이터를 변환하는데 왜 따로 있어야 하는지, 어디까지가 SMT 자리고 어디부터 Kafka Streams 자리인지 헷갈려요.
셋째, 커스텀 커넥터를 직접 만드는 건 너무 거창해 보입니다. SourceConnector·SourceTask·Schema·Struct·SourceRecord — 이름만 들어도 머리가 아프죠.
해결법은 한 가지예요. SMT는 케이블 중간에 박는 작은 가공기, 커스텀 커넥터는 표준 부품에 없는 케이블을 직접 깎는 작업으로 잡고 들어가면 됩니다. 이 비유 하나만 머리에 두면 코드 예시가 갑자기 친근해져요.
스키마(Schema) — 케이블 양 끝의 자료 표준 양식
가공 얘기 전에 먼저 풀고 갈 게 있어요. 메시지에 스키마가 있느냐 없느냐. 이게 SMT·Converter·커스텀 커넥터 모든 곳에서 결정적인 출발점이에요.
스키마(Schema) 는 데이터의 구조를 정의하는 메타데이터입니다. 회사 비유로 — 케이블 양 끝에서 주고받는 자료의 표준 양식이에요. "이 자리엔 정수, 저 자리엔 문자열, 이건 필수, 저건 선택" 같은 약속이 적힌 종이예요.
스키마 없이 보내면 그냥 문자열이 흘러갑니다.
"Hello World"
스키마를 함께 보내면 메시지가 두 부분으로 나뉘어요 — schema(양식 종이) 와 payload(실제 자료).
{
"schema": {
"type": "string",
"optional": false
},
"payload": "Hello World"
}
구조화된 트윗 같은 데이터라면 스키마가 좀 더 풍성해져요. 필드 하나하나가 어떤 타입이고 필수인지 적혀 있죠.
{
"schema": {
"type": "struct",
"fields": [
{"field": "id", "type": "int64", "optional": false},
{"field": "text", "type": "string", "optional": true},
{"field": "user", "type": "struct", "fields": [
{"field": "name", "type": "string"},
{"field": "followers_count", "type": "int32"}
]},
{"field": "is_retweet", "type": "boolean", "optional": false}
]
},
"payload": {
"id": 9876543210,
"text": "Learning Kafka Connect! #kafka",
"user": {"name": "johndoe", "followers_count": 212},
"is_retweet": false
}
}
여기서 시험 함정이 하나 있어요. 스키마는 "메시지에 같이 실어 보낼 수도 있고, 별도 스키마 레지스트리에 따로 둘 수도 있다" 는 겁니다. JSON Converter는 메시지에 같이 싣고, Avro Converter는 별도 레지스트리에 보관해요. 이 차이가 뒤에서 이야기할 Converter 선택의 핵심이에요.
스키마를 켜야 할 때 vs 꺼도 될 때
| 상황 | 권장 설정 |
|---|---|
| 단순 텍스트 파일 처리 | schemas.enable=false |
| 구조화된 데이터 처리 | schemas.enable=true |
| Elasticsearch Sink 연동 | schemas.enable=true |
| JDBC Sink 연동 | schemas.enable=true (사실상 필수) |
| 빠른 프로토타이핑 | schemas.enable=false |
> 한 줄 정리 — 스키마는 케이블 양 끝의 자료 표준 양식. JDBC·Elasticsearch 같은 구조화된 싱크에 꽂을 거면 무조건 켜라.
Converter — 변환 어댑터 (자료 형태를 바꿔주는 부품)
스키마 얘기를 했으니 이제 그걸 실제로 어떤 형태(JSON·Avro·String)로 직렬화해 토픽에 실을지 정해야 해요. 그 일을 맡는 부품이 Converter 예요.
회사 비유로 — Converter는 케이블 양 끝에 끼우는 변환 어댑터예요. 우리 데이터 포맷(자바 객체·Struct)을 카프카 토픽이 이해하는 형태(바이트 배열)로 바꿔주고, 반대 방향에선 다시 풀어줘요.
데이터 흐름에서 Converter의 자리
[소스]
↓
[Source Task] → Object 생성
↓
[Converter] → 직렬화 (Object → bytes) ★ Converter 자리
↓
[Kafka Topic] → bytes 보관
↓
[Converter] → 역직렬화 (bytes → Object) ★ Converter 자리
↓
[Sink Task]
↓
[목적지]
Source 쪽 Converter와 Sink 쪽 Converter가 따로 있어요. 둘 다 같은 종류여야 양 끝이 맞아 떨어집니다.
Converter 3종(+1) — 자기 자리 비교표
| Converter | 클래스 | 자기 자리 | 특징 |
|---|---|---|---|
| JsonConverter | org.apache.kafka.connect.json.JsonConverter | 사람이 읽고 디버깅 | 사람이 보기 쉬움, 크기 큼 |
| AvroConverter | io.confluent.connect.avro.AvroConverter | 프로덕션 표준 | 스키마 레지스트리 필요, 압축 효율 높음 |
| StringConverter | org.apache.kafka.connect.storage.StringConverter | 단순 문자열 | 스키마 없음, 가장 가벼움 |
| ByteArrayConverter | org.apache.kafka.connect.converters.ByteArrayConverter | 바이너리 그대로 | 이미지·파일 같은 raw 바이트 |
JsonConverter 설정 예시
가장 흔히 처음 만나는 Converter예요. 키와 값 양쪽에 따로 지정합니다.
# 키 변환 어댑터 — JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
# 값 변환 어댑터 — JSON
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
schemas.enable=true 면 스키마와 payload가 통째로 메시지에 실립니다. false 로 두면 payload만 실려서 가벼워지는 대신 양 끝이 형태를 미리 알고 있어야 해요.
AvroConverter 설정 예시 — 프로덕션 표준
프로덕션 환경에서는 거의 AvroConverter 가 정석이에요. 이유는 두 가지 — 압축 효율이 압도적이고, 스키마 레지스트리에서 스키마 진화(evolution) 를 관리할 수 있어서예요. 자세한 사양은 Avro 스키마 문서를 참고하면 좋습니다.
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
여기서 시험 함정이 하나 있어요. AvroConverter는 그 자체로 카프카에 들어 있지 않습니다. Confluent의 Schema Registry와 짝을 이루는 별도 패키지예요. 그래서 클러스터에 Schema Registry를 띄우거나 Confluent Platform을 쓰는 환경이어야 자연스럽게 사용할 수 있어요.
StringConverter — 단순 문자열
키나 값이 그냥 단순 문자열일 때 쓰는 가장 가벼운 어댑터예요. 로그 한 줄, 간단한 ID, 아주 단순한 텍스트 메시지를 그대로 흘려보낼 때 적합합니다.
value.converter=org.apache.kafka.connect.storage.StringConverter
> 한 줄 정리 — JsonConverter는 디버깅용·읽기 좋음, AvroConverter는 프로덕션 표준·압축 좋음, StringConverter는 단순 문자열용.
Single Message Transform(SMT) — 케이블 중간에 박는 작은 가공기
이제 4편의 첫 주인공이에요. SMT(Single Message Transform) 는 메시지가 한 줄씩 흘러갈 때 그 줄을 살짝 가공해주는 작은 부품입니다.
회사 비유로 — SMT는 케이블 중간에 박는 작은 가공기예요. 케이블을 통째로 갈아끼우는 게 아니고, 케이블이 흐르는 도중에 메시지 한 줄씩만 살짝 손봐주는 자리예요.
SMT의 핵심 원칙 — "한 메시지를 한 메시지로"
SMT는 이름 그대로 한 메시지를 받아서 한 메시지로 내보내는 변환만 합니다. 두 메시지를 합치거나(join), 일정 시간 동안 모아서 평균을 내거나(aggregate) 같은 일은 SMT의 자리가 아니에요. 그건 Kafka Streams의 자리입니다.
[Source Task]
↓ SourceRecord 한 줄 생성
[SMT 1] (필드 추가)
↓
[SMT 2] (토픽 라우팅)
↓
[SMT 3] (필드 마스킹)
↓
[Converter]
↓
[Kafka Topic]
SMT는 여러 개를 체인으로 엮어 쓸 수 있어요. 한 줄 메시지가 어댑터를 만나기 전에 여러 가공기를 차례로 통과하는 식입니다.
자주 쓰는 SMT 종류
| SMT 이름 | 자기 자리 |
|---|---|
InsertField | 필드 추가 (예: 처리 시각·서버 호스트명) |
ReplaceField | 필드 이름 변경·삭제·필터링 |
MaskField | 민감 필드 마스킹 (비밀번호·주민번호) |
ValueToKey | 값에서 키를 만들어 메시지 키로 승격 |
ExtractField | Struct에서 한 필드만 뽑아 통째로 값으로 |
Cast | 필드 타입 캐스팅 (int → string 등) |
TimestampConverter | 타임스탬프 형식 변환 |
RegexRouter | 토픽 이름을 정규식으로 변경 |
Filter | 조건 맞는 메시지만 통과 |
Flatten | 중첩 구조를 평탄화 |
HoistField | 단일 값을 Struct 안 한 필드로 감쌈 |
SMT 적용 예시 — 처리 시각 필드 추가
설정 파일 한 줄로 SMT를 끼워 넣을 수 있어요. 예를 들어 모든 메시지에 processed_at 이라는 처리 시각 필드를 추가하고 싶다면.
# 커넥터 설정에 SMT 추가
transforms=insertTimestamp
transforms.insertTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertTimestamp.timestamp.field=processed_at
transforms 키에 가공기 이름들을 콤마로 나열하고, 각 이름 밑에 type 과 옵션을 적습니다. 케이블 중간에 작은 가공기 하나 박는 셈이죠.
SMT 체인 — 여러 가공기 차례로
가공기 여러 개를 차례로 박을 수도 있어요. 예를 들어 "민감 필드 마스킹 → 토픽 이름 변경 → 처리 시각 추가" 순서.
transforms=maskPassword,routeTopic,insertTimestamp
# 1번 가공기 — password 필드 마스킹
transforms.maskPassword.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskPassword.fields=password
# 2번 가공기 — 토픽 접두어 추가
transforms.routeTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeTopic.regex=(.*)
transforms.routeTopic.replacement=secured_$1
# 3번 가공기 — 처리 시각 필드 추가
transforms.insertTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertTimestamp.timestamp.field=processed_at
순서가 중요해요. 위 설정에선 마스킹 → 라우팅 → 시각 삽입 순서로 한 메시지가 통과합니다.
여기서 시험 함정이 하나 있어요. SMT는 메시지 단위로만 동작합니다. "이전 메시지와 비교해서 변화량을 계산해라" 같은 상태(state) 가 필요한 작업은 SMT로 못해요. SMT는 한 메시지가 들어오면 한 메시지가 나가는, 메모리 없는 가공기예요. 상태가 필요하면 그건 Kafka Streams의 자리입니다.
Source SMT vs Sink SMT
SMT는 양방향 모두에 박을 수 있어요.
Source 방향:
[소스] → [Source Task] → [SMT] → [Converter] → [Kafka]
↑
Source SMT 자리
Sink 방향:
[Kafka] → [Converter] → [SMT] → [Sink Task] → [목적지]
↑
Sink SMT 자리
같은 SMT 종류를 양쪽에 모두 쓸 수 있지만, 자기 자리가 분명해요. 저장 전에 가공할 일은 Source SMT, 적재 직전에 가공할 일은 Sink SMT.
> 한 줄 정리 — SMT는 케이블 중간에 박는 작은 가공기. 한 메시지를 한 메시지로 변환하는 자리.
SMT vs Kafka Streams — 케이블 안 작은 가공기 vs 별도 가공 라인
여기가 4편 전체에서 가장 자주 헷갈리는 자리예요. 둘 다 데이터를 변환하는데 왜 따로 있을까요?
회사 비유로 풀면 명확해져요. SMT는 케이블 안에 박는 작은 가공기, Kafka Streams는 케이블 옆에 따로 짓는 별도 가공 라인이에요. 작은 손질은 케이블 안에서, 본격적인 가공은 별도 라인에서.
비교표
| 항목 | SMT | Kafka Streams |
|---|---|---|
| 자기 자리 | 커넥터 안 (케이블 중간) | 별도 애플리케이션 (별도 라인) |
| 변환 단위 | 한 메시지 → 한 메시지 | 여러 메시지 결합·집계 가능 |
| 상태(state) | 없음 (stateless) | 있음 (stateful) |
| 윈도우(window) | 불가 | 가능 (시간·세션 윈도우) |
| 조인(join) | 불가 | 가능 (스트림·테이블 조인) |
| 집계(aggregate) | 불가 | 가능 (count·sum·평균) |
| 코드 작성 | 설정 파일만 | Java 애플리케이션 작성 |
| 운영 부담 | 낮음 (Connect 안에서 동작) | 별도 앱 배포·운영 |
| 적합한 작업 | 필드 추가·마스킹·라우팅 | 집계·조인·복잡한 변환 |
어디까지가 SMT 자리인가
SMT로 풀 만한 일은 보통 이 정도예요.
- 필드 추가 — 처리 시각·서버 호스트명·환경 태그
- 필드 제거 — 필요 없는 필드 빼기
- 필드 이름 변경 — 케이스 통일·접두어 추가
- 필드 마스킹 — 비밀번호·주민번호·카드번호 가리기
- 타입 캐스팅 — int를 string으로
- 토픽 라우팅 — 토픽 이름 정규식으로 바꾸기
- 단순 필터링 — 특정 조건 메시지만 통과
어디부터가 Kafka Streams 자리인가
이 정도부터는 SMT로 안 됩니다.
- 여러 메시지 집계 — 1분 단위 평균·합계·카운트
- 윈도우 연산 — 5분 윈도우 안 사용자별 활동 집계
- 조인 — 사용자 토픽과 주문 토픽 합치기
- 상태 기반 변환 — 이전 값과 비교한 차분 계산
- 복잡한 분기 로직 — 여러 조건에 따라 토픽 분리
여기서 시험 함정이 하나 있어요. "그럼 Kafka Streams가 만능인데 왜 SMT를 쓰지?"라는 의문이 들 수 있는데 — 운영 부담이 다르기 때문이에요. SMT는 커넥터 설정 한 줄로 끝나지만, Kafka Streams는 별도 자바 애플리케이션을 만들어 배포·운영해야 해요. 단순한 변환이면 SMT로 끝내는 게 정석입니다.
> 한 줄 정리 — 한 메시지짜리 단순 가공이면 SMT, 여러 메시지·상태·집계가 필요하면 Kafka Streams.
커스텀 커넥터 — 표준 부품에 없는 특수한 케이블을 직접 깎기
이제 4편의 두 번째 주인공입니다. 커스텀 커넥터 — 직접 자바로 새로 깎는 케이블이에요.
회사 비유로 — 표준 부품 카탈로그에 없는 특수한 시스템과 카프카를 잇고 싶을 때, 표준 부품을 본떠 직접 깎아 쓰는 작업이에요. 1편에서 봤듯이 Confluent Hub에 거의 모든 시스템용 커넥터가 있지만, 어떤 사내 자체 API나 특수한 외부 서비스는 직접 만들어야 할 때가 와요.
언제 커스텀 커넥터가 필요한가
- 회사 내부 자체 API가 있는데 표준 커넥터가 없을 때
- 매우 특수한 외부 서비스 (예: 사내 메시징·자체 SaaS)
- 기존 커넥터로는 충족 안 되는 비즈니스 로직이 필요할 때
- 특정 인증·암호화 방식이 표준 커넥터에서 안 될 때
이 글에서는 GitHub Source Connector 를 예시로 들어요. GitHub Issues API를 호출해 새 이슈를 카프카 토픽으로 흘려 넣는 커넥터예요.
커스텀 Source Connector의 3대 클래스
자바로 커스텀 커넥터를 만든다는 건 결국 다음 3개 클래스를 짜는 일이에요.
1. SourceConnectorConfig — 설정 파라미터 정의 (어떤 옵션을 받을지)
2. SourceConnector — 커넥터 메타데이터, Task 생성 로직
3. SourceTask — 실제 데이터 가져오는 일꾼
여기에 부가적으로 Schema 정의 와 HTTP 클라이언트 같은 도우미 클래스를 더 두는 게 표준이에요.
src/main/java/
├── GitHubSourceConnector.java # 메인 커넥터 클래스
├── GitHubSourceConnectorConfig.java # 설정 정의
├── GitHubSourceTask.java # 실제 데이터 가져오는 클래스
├── GitHubSchemas.java # Schema 정의
└── GitHubAPIHttpClient.java # GitHub API 호출 헬퍼
개발 환경 — Java JDK·Maven·IDE
기본 도구는 단순합니다.
- Java JDK 8 이상
- Maven (의존성 관리·빌드)
- IntelliJ IDEA Community Edition (또는 다른 자바 IDE)
Maven에서 Kafka Connect API를 의존성으로 추가해요. provided 스코프가 핵심이에요 — 런타임에는 Connect 자체가 라이브러리를 제공하니까 우리 JAR에는 같이 안 묶입니다.
<dependencies>
<!-- Kafka Connect API 의존성 (provided!) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.6.0</version>
<scope>provided</scope>
</dependency>
<!-- HTTP 클라이언트 (REST API 호출용) -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.0</version>
</dependency>
<!-- JSON 파싱 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
</dependencies>
여기서 시험 함정이 하나 있어요. connect-api 의존성을 provided 가 아닌 compile 로 두면 우리 JAR에 카프카 커넥트 라이브러리가 통째로 묶여 들어가 충돌 가능성이 커집니다. Connect 클러스터의 라이브러리와 우리 JAR의 라이브러리가 두 벌이 되는 거예요. 반드시 provided 로.
1단계 — Configuration 정의
먼저 우리 커넥터가 받을 설정 옵션을 정의해요. AbstractConfig 를 상속해 ConfigDef 를 작성합니다.
public class GitHubSourceConnectorConfig extends AbstractConfig {
// 설정 파라미터 이름 상수
public static final String TOPIC_CONFIG = "topic";
public static final String OWNER_CONFIG = "github.owner";
public static final String REPO_CONFIG = "github.repo";
public static final String SINCE_TIMESTAMP_CONFIG = "since.timestamp";
public static final String BATCH_SIZE_CONFIG = "batch.size";
public static final String AUTH_USERNAME_CONFIG = "auth.username";
public static final String AUTH_PASSWORD_CONFIG = "auth.password";
// ConfigDef — 모든 설정 파라미터를 한 자리에 모아 정의
public static ConfigDef config() {
return new ConfigDef()
.define(TOPIC_CONFIG,
Type.STRING,
Importance.HIGH,
"쓸 Kafka 토픽")
.define(OWNER_CONFIG,
Type.STRING,
Importance.HIGH,
"GitHub 저장소 소유자")
.define(REPO_CONFIG,
Type.STRING,
Importance.HIGH,
"GitHub 저장소 이름")
.define(SINCE_TIMESTAMP_CONFIG,
Type.STRING,
ZonedDateTime.now().minusYears(1)
.format(DateTimeFormatter.ISO_DATE_TIME),
Importance.HIGH,
"이 시각 이후 업데이트된 이슈만 가져옴")
.define(BATCH_SIZE_CONFIG,
Type.INT,
100,
new ConfigDef.Range(1, 100), // 1~100 범위 강제
Importance.LOW,
"한 번에 가져올 이슈 개수")
.define(AUTH_USERNAME_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"GitHub 인증 사용자명 (선택)")
.define(AUTH_PASSWORD_CONFIG,
Type.PASSWORD, // PASSWORD 타입은 로그에 안 찍힘
"",
Importance.HIGH,
"GitHub 인증 토큰 (선택)");
}
public GitHubSourceConnectorConfig(Map<?, ?> originals) {
super(config(), originals);
}
public String getTopic() { return getString(TOPIC_CONFIG); }
public String getOwner() { return getString(OWNER_CONFIG); }
public String getRepo() { return getString(REPO_CONFIG); }
public Integer getBatchSize() { return getInt(BATCH_SIZE_CONFIG); }
public String getAuthUsername() { return getString(AUTH_USERNAME_CONFIG); }
public Password getAuthPassword() { return getPassword(AUTH_PASSWORD_CONFIG); }
}
여기서 시험 함정이 하나 있어요. 비밀번호·토큰 같은 민감 값은 반드시 Type.PASSWORD 로 정의해야 합니다. 그래야 Connect가 로그·REST API 응답에 그 값을 노출하지 않아요. Type.STRING 으로 두면 로그에 그대로 찍혀 보안 사고가 나기 쉽습니다.
2단계 — SourceConnector 클래스
SourceConnector 를 상속해 메인 커넥터 클래스를 만듭니다. 여기서는 Task를 어떻게 생성할지만 정의해요.
public class GitHubSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public String version() {
return "0.1";
}
@Override
public void start(Map<String, String> props) {
config = props;
// 연결 유효성 검사 등 초기화
}
@Override
public Class<? extends Task> taskClass() {
// 어떤 Task 클래스를 쓸지 알려주는 자리
return GitHubSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// GitHub API는 보통 Task 1개로 충분
ArrayList<Map<String, String>> configs = new ArrayList<>();
configs.add(config);
return configs;
}
@Override
public void stop() {
// 리소스 정리
}
@Override
public ConfigDef config() {
return GitHubSourceConnectorConfig.config();
}
}
taskConfigs(int maxTasks) 가 핵심이에요. 이 메서드는 "내가 몇 개의 Task를 어떤 설정으로 띄울지" 를 반환합니다. GitHub처럼 단일 API 엔드포인트면 보통 Task 1개로 충분하지만, 여러 저장소를 동시에 추적한다면 저장소 수만큼 Task를 분배할 수도 있어요.
3단계 — Schema 정의
이슈 한 건이 어떤 구조를 가질지 미리 스키마로 정의해둬요. SchemaBuilder 로 중첩 구조까지 표현할 수 있어요.
public class GitHubSchemas {
// User 스키마 (중첩)
public static final Schema USER_SCHEMA = SchemaBuilder.struct()
.name("user")
.field("id", Schema.INT64_SCHEMA)
.field("login", Schema.STRING_SCHEMA)
.field("url", Schema.STRING_SCHEMA)
.optional()
.build();
// Pull Request 스키마 (중첩, nullable)
public static final Schema PULL_REQUEST_SCHEMA = SchemaBuilder.struct()
.name("pull_request")
.field("url", Schema.STRING_SCHEMA)
.optional()
.build();
// Issue 스키마 (메인)
public static final Schema ISSUE_SCHEMA = SchemaBuilder.struct()
.name("Issue")
.field("url", Schema.STRING_SCHEMA)
.field("id", Schema.INT64_SCHEMA)
.field("title", Schema.STRING_SCHEMA)
.field("state", Schema.STRING_SCHEMA)
.field("number", Schema.INT32_SCHEMA)
.field("created_at", Timestamp.SCHEMA)
.field("updated_at", Timestamp.SCHEMA)
.field("user", USER_SCHEMA)
.field("pull_request", PULL_REQUEST_SCHEMA)
.build();
// Key 스키마 (메시지 키 — 이슈를 고유 식별)
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("IssueKey")
.field("owner", Schema.STRING_SCHEMA)
.field("repository", Schema.STRING_SCHEMA)
.field("number", Schema.INT32_SCHEMA)
.build();
}
키 스키마와 값 스키마를 별도로 정의하는 게 표준이에요. 키는 "어느 메시지인가"를 식별하는 자리, 값은 "그 메시지의 내용" 이에요. GitHub 이슈는 owner/repo/number 셋이 유일하니 그 셋을 키로 묶었어요.
4단계 — SourceTask 클래스 (실제 데이터 가져오기)
여기가 커스텀 커넥터의 심장이에요. SourceTask 의 poll() 메서드가 주기적으로 호출되면서 새 데이터를 카프카로 흘려 보냅니다.
public class GitHubSourceTask extends SourceTask {
private GitHubSourceConnectorConfig config;
private GitHubAPIHttpClient gitHubHttpClient;
private Map<String, String> sourcePartition;
private Map<String, String> sourceOffset;
private Integer lastIssueNumber;
private ZonedDateTime nextQuerySince;
@Override
public String version() {
return "0.1";
}
@Override
public void start(Map<String, String> props) {
config = new GitHubSourceConnectorConfig(props);
// 소스 파티션 — 어떤 저장소를 읽고 있는지
sourcePartition = Collections.singletonMap(
"repository",
config.getOwner() + "/" + config.getRepo()
);
// 이전 오프셋 복원 (재시작 시 이어서 읽기)
Map<String, Object> offset = context.offsetStorageReader()
.offset(sourcePartition);
if (offset != null) {
lastIssueNumber = (Integer) offset.get("last_issue_number");
nextQuerySince = ZonedDateTime.parse(
(String) offset.get("next_query_since")
);
} else {
lastIssueNumber = -1;
nextQuerySince = config.getSinceTimestamp();
}
gitHubHttpClient = new GitHubAPIHttpClient(config);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
try {
JSONArray issues = gitHubHttpClient.getNextIssues(
lastIssueNumber, nextQuerySince
);
if (issues.isEmpty()) {
Thread.sleep(5000); // 새 이슈 없으면 잠시 대기
return records;
}
for (Object issue : issues) {
JSONObject issueJson = (JSONObject) issue;
Struct value = buildIssueStruct(issueJson);
Struct key = buildKeyStruct(issueJson);
// 오프셋 갱신
lastIssueNumber = issueJson.getInteger("number");
sourceOffset = new HashMap<>();
sourceOffset.put("last_issue_number", lastIssueNumber.toString());
sourceOffset.put("next_query_since", nextQuerySince.toString());
// SourceRecord 한 줄 생성해 records에 담기
records.add(new SourceRecord(
sourcePartition,
sourceOffset,
config.getTopic(),
null, // 카프카 파티션 자동 배정
GitHubSchemas.KEY_SCHEMA, key,
GitHubSchemas.ISSUE_SCHEMA, value
));
}
} catch (InterruptedException e) {
throw e; // 인터럽트는 반드시 다시 throw
} catch (Exception e) {
throw new RuntimeException(e);
}
return records;
}
@Override
public void stop() {
// HTTP 클라이언트 등 리소스 정리
}
}
여기서 핵심 개념 두 가지를 짚고 가요 — Source Partition 과 Source Offset.
Source Partition — "어디서 읽고 있는가"
Source Partition 은 우리가 어느 소스를 읽고 있는지 고유하게 식별하는 맵이에요. 같은 커넥터가 여러 소스(예: 여러 저장소·여러 테이블)를 동시에 추적할 때 각 소스를 구분하는 자리예요.
// GitHub의 경우 — 저장소로 식별
sourcePartition.put("repository", "kubernetes/kubernetes");
// JDBC의 경우 — 테이블로 식별
sourcePartition.put("table", "users");
// File의 경우 — 파일 경로로 식별
sourcePartition.put("filename", "/var/log/app.log");
Source Offset — "어디까지 읽었는가"
Source Offset 은 그 소스에서 마지막으로 읽은 위치예요. Connect가 이걸 내부 토픽 connect-offsets 에 자동으로 저장해 둡니다. Worker가 죽었다 살아나도 마지막 지점부터 이어서 읽을 수 있는 비결이에요.
// GitHub의 경우 — 마지막 이슈 번호
sourceOffset.put("last_issue_number", 5000);
sourceOffset.put("next_query_since", "2021-01-01T00:00:00Z");
// File의 경우 — 파일에서 읽은 바이트 위치
sourceOffset.put("position", 1024L);
여기서 시험 함정이 하나 있어요. Source Offset은 카프카 토픽의 offset과는 완전히 다른 개념입니다. 카프카 토픽 offset은 "토픽 안에서 어디까지 소비했는가"고, Source Offset은 "외부 소스에서 어디까지 가져왔는가"예요. 이름만 같지 자기 자리가 완전히 달라요.
5단계 — Struct 만들기
스키마는 양식 종이고, 실제 데이터는 Struct 에 담아요. 양식에 칸칸이 값을 채워 넣는 셈이에요.
private Struct buildIssueStruct(JSONObject issueJson) {
Struct value = new Struct(GitHubSchemas.ISSUE_SCHEMA)
.put("url", issueJson.getString("url"))
.put("id", issueJson.getLong("id"))
.put("title", issueJson.getString("title"))
.put("state", issueJson.getString("state"))
.put("number", issueJson.getInteger("number"))
.put("created_at", Date.from(
Instant.parse(issueJson.getString("created_at"))))
.put("updated_at", Date.from(
Instant.parse(issueJson.getString("updated_at"))));
// 중첩 user
JSONObject userJson = issueJson.getJSONObject("user");
if (userJson != null) {
Struct user = new Struct(GitHubSchemas.USER_SCHEMA)
.put("id", userJson.getLong("id"))
.put("login", userJson.getString("login"))
.put("url", userJson.getString("url"));
value.put("user", user);
}
// 중첩 pull_request (있을 때만)
JSONObject prJson = issueJson.getJSONObject("pull_request");
if (prJson != null) {
Struct pr = new Struct(GitHubSchemas.PULL_REQUEST_SCHEMA)
.put("url", prJson.getString("url"));
value.put("pull_request", pr);
}
return value;
}
Struct 는 빌더 패턴처럼 .put(필드, 값) 으로 채워 가는 게 표준이에요. 중첩된 필드는 또 다른 Struct 를 만들어 통째로 put 합니다.
SourceRecord — 카프카에 흘려 보내는 한 줄 메시지
Struct 까지 만들면 이제 그걸 SourceRecord 한 줄로 포장해 카프카로 흘려 보내요. SourceRecord는 한 줄짜리 메시지의 모든 정보(키·값·스키마·토픽·파티션·오프셋)를 한 번에 담는 자료 구조예요.
SourceRecord record = new SourceRecord(
sourcePartition, // 소스의 어느 파티션 (예: 어느 저장소)
sourceOffset, // 소스의 어느 위치까지 (예: 어느 이슈 번호)
topic, // 보낼 카프카 토픽
partition, // 카프카 파티션 (null = 자동)
keySchema, key, // 메시지 키 + 키 스키마
valueSchema, value // 메시지 값 + 값 스키마
);
이 SourceRecord 한 줄이 Converter를 통과해 바이트 배열로 직렬화되고, 카프카 토픽에 한 메시지로 적재됩니다.
HTTP 클라이언트 — GitHub API 호출 헬퍼
마지막으로 GitHub API를 실제로 두드리는 도우미 클래스. OkHttp 같은 흔한 HTTP 라이브러리를 쓰면 단순해요.
public class GitHubAPIHttpClient {
private GitHubSourceConnectorConfig config;
private OkHttpClient client;
private static final String BASE_URL = "https://api.github.com";
public GitHubAPIHttpClient(GitHubSourceConnectorConfig config) {
this.config = config;
this.client = new OkHttpClient();
}
public JSONArray getNextIssues(int lastIssueNumber,
ZonedDateTime since) throws Exception {
Request.Builder builder = new Request.Builder()
.url(BASE_URL + "/repos/" + config.getOwner() + "/"
+ config.getRepo() + "/issues"
+ "?state=all"
+ "&per_page=" + config.getBatchSize()
+ "&since=" + since.format(DateTimeFormatter.ISO_INSTANT)
+ "&direction=asc&sort=updated");
// 인증 헤더 추가 (있을 때만)
String username = config.getAuthUsername();
String password = config.getAuthPassword().value();
if (!username.isEmpty() && !password.isEmpty()) {
builder.addHeader("Authorization",
Credentials.basic(username, password));
}
try (Response response = client.newCall(builder.build()).execute()) {
if (response.code() == 200) {
return new JSONArray(response.body().string());
} else if (response.code() == 401) {
throw new RuntimeException("Authentication failed");
} else {
throw new RuntimeException("GitHub API error: " + response.code());
}
}
}
}
여기서 시험 함정이 하나 있어요. 외부 API에는 거의 다 Rate Limiting이 걸려 있어요. GitHub API는 인증 없이 시간당 60회, 인증 후 시간당 5,000회 제한이에요. 커스텀 커넥터를 잘못 짜면 1분 안에 한도를 다 써버립니다. X-RateLimit-Remaining 헤더를 보고 한도가 임박하면 Thread.sleep 으로 쉬어주는 게 정석이에요.
String remaining = response.header("X-RateLimit-Remaining");
String resetTime = response.header("X-RateLimit-Reset");
if (Integer.parseInt(remaining) < 10) {
long sleepMs = (Long.parseLong(resetTime) * 1000) - System.currentTimeMillis();
if (sleepMs > 0) Thread.sleep(sleepMs);
}
커스텀 커넥터 패키징 — Maven으로 JAR 만들기
코드를 다 짰으면 Maven으로 JAR을 만들어요.
# 빌드 명령
mvn clean package
# 성공 시 출력 예시
# [INFO] Tests run: 12, Failures: 0
# [INFO] BUILD SUCCESS
# [INFO] target/kafka-connect-github-0.1-SNAPSHOT-jar-with-dependencies.jar
빌드 결과물:
target/
├── kafka-connect-github-0.1-SNAPSHOT.jar # 의존성 미포함
└── kafka-connect-github-0.1-SNAPSHOT-jar-with-dependencies.jar # 의존성 포함
여기서 시험 함정이 하나 있어요. Connect에 배포할 JAR은 jar-with-dependencies 가 들어간 통짜 JAR(uber JAR) 이에요. 의존성이 빠진 기본 JAR을 올리면 OkHttp·Gson 같은 라이브러리를 못 찾아 런타임 에러가 납니다. uber JAR을 쓰는 게 표준.
커스텀 커넥터 배포 — Standalone Mode
테스트용으로는 Standalone Mode가 가장 단순해요. JAR 경로를 CLASSPATH에 추가하고 connect-standalone.sh 를 실행합니다.
#!/bin/bash
export CLASSPATH=/path/to/kafka-connect-github-0.1-SNAPSHOT-jar-with-dependencies.jar
connect-standalone.sh \
worker.properties \
github-source.properties
# github-source.properties
name=github-source-demo
connector.class=com.example.GitHubSourceConnector
tasks.max=1
topic=github-issues
# GitHub 설정
github.owner=kubernetes
github.repo=kubernetes
since.timestamp=2020-01-01T00:00:00Z
batch.size=100
# 선택 — 인증
# auth.username=your_username
# auth.password=your_personal_access_token
커스텀 커넥터 배포 — Distributed Mode (plugin.path)
프로덕션 환경에선 Distributed Mode에 올리는 게 정석이에요. 3편에서 배운 plugin.path 디렉터리에 우리 JAR을 떨궈 놓으면 Connect가 시작 시 자동으로 로드해요.
# 1. 빌드
mvn clean package
# 2. plugin.path 디렉터리에 JAR 복사
cp target/kafka-connect-github-0.1-SNAPSHOT-jar-with-dependencies.jar \
/opt/kafka/connectors/
# 3. 모든 Worker 재시작 (또는 Connect 재기동)
systemctl restart kafka-connect
# 4. REST API로 우리 커넥터 인식되는지 확인
curl http://localhost:8083/connector-plugins | jq .
# → "com.example.GitHubSourceConnector" 가 목록에 보여야 함
Docker 환경이면 호스트 디렉터리를 컨테이너의 plugin.path 로 볼륨 마운트하면 돼요.
# docker-compose.yml 일부
volumes:
- ./target:/opt/kafka/connectors
REST API로 커스텀 커넥터 띄우기
3편에서 배운 REST API를 그대로 써요. 커넥터 클래스 이름만 우리 클래스로 바꾸면 됩니다.
curl -s -X POST \
-H "Content-Type: application/json" \
-d '{
"name": "github-source-connector",
"config": {
"connector.class": "com.example.GitHubSourceConnector",
"tasks.max": "1",
"topic": "github-issues",
"github.owner": "kubernetes",
"github.repo": "kubernetes",
"batch.size": "100"
}
}' \
http://localhost:8083/connectors | jq .
> 한 줄 정리 — 커스텀 커넥터는 uber JAR로 만들어 plugin.path 에 떨구고, REST API로 띄우는 게 표준 흐름.
Sink Connector 개발은 어떻게 다른가
이 글에서는 Source 쪽만 풀었지만, Sink Connector 구조도 거의 비슷해요. 차이를 한 번에 짚으면 다음과 같습니다.
public class MySinkConnector extends SinkConnector {
@Override
public Class<? extends Task> taskClass() {
return MySinkTask.class;
}
}
public class MySinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
// Source의 poll() 대신 put()이 핵심
for (SinkRecord record : records) {
Struct value = (Struct) record.value();
// ... 데이터베이스에 저장하는 로직
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// 데이터 커밋
}
}
핵심 차이 두 가지.
- Source는
poll()이 주기적으로 호출돼 데이터를 가져오고, Sink는put()이 주기적으로 호출돼 데이터를 받아 적재해요. 방향이 반대. - Sink는
flush()가 추가로 있어요. Connect가 "여기까지 커밋 가능한 자리야"라고 알려줄 때 호출되는 자리예요.
여기서 시험 함정이 하나 있어요. Source Connector는 tasks.max 를 자기 마음대로 정할 수 있지만, Sink Connector는 토픽 파티션 수를 넘을 수 없어요. Sink Task는 결국 카프카 컨슈머인데, 컨슈머 그룹은 한 파티션을 한 컨슈머만 읽기 때문이에요. 토픽이 3 파티션이면 Sink Task는 최대 3개.
Task 수 설정 — 자기 자리에 맞게
Task 수는 커넥터 종류에 따라 적정값이 달라요.
# Source — 파일 기반: Task 1개로 충분
tasks.max=1
# Source — DB 여러 테이블 동시 추적: 테이블 수만큼
tasks.max=5
# Sink — 카프카 파티션 수만큼 (절대 그 이상 X)
# 토픽이 3 파티션이면
tasks.max=3
커스텀 커넥터 개발 모범 사례
마지막으로 커스텀 커넥터 짤 때 자주 빠뜨리는 자리를 모아둘게요.
1. 설정 유효성 검사
ConfigDef.Range 같은 유효성 검사기를 적극 써요. 설정이 잘못 들어왔을 때 런타임에 죽지 말고 설정 단계에서 즉시 거부하는 게 정석이에요.
.define(BATCH_SIZE_CONFIG,
Type.INT,
100,
new ConfigDef.Range(1, 100), // 1~100 범위 강제
Importance.LOW,
"Number of records per batch")
2. 인터럽트 예외는 반드시 다시 throw
poll() 안에서 InterruptedException 을 잡으면 반드시 다시 throw 해야 해요. 안 그러면 Connect가 Task를 종료시킬 수 없어 좀비가 되거든요.
try {
// ...
} catch (InterruptedException e) {
throw e; // ★ 반드시 다시 throw
} catch (Exception e) {
log.error("Error", e);
}
3. 외부 API Rate Limiting 처리
위에서 본 X-RateLimit-Remaining 헤더 체크. 외부 API를 두드리는 모든 커넥터는 한도에 부딪히기 전에 스스로 멈춰야 해요.
4. 비밀번호·토큰은 Type.PASSWORD
설정 파라미터 정의에서 민감 값은 무조건 Type.PASSWORD. 로그·REST API에 안 노출되도록.
5. uber JAR로 패키징
maven-shade-plugin 같은 걸로 uber JAR을 만들어 plugin.path 에 올리는 게 표준. 의존성이 빠진 일반 JAR은 런타임에 ClassNotFoundException으로 죽어요.
시험 직전 한 번 더 — 4편 압축 노트
여기까지가 4편의 핵심입니다. 시험 직전·실무에서 펼쳐 볼 압축 노트로 마무리할게요.
- 스키마(Schema) = 케이블 양 끝의 자료 표준 양식 (필드·타입·필수 여부)
- 메시지에는
schema(양식)와payload(실제 자료)가 같이 실릴 수 있다 schemas.enable=true면 양식까지,false면 payload만- JDBC·Elasticsearch Sink 는
schemas.enable=true거의 필수 - Converter = 변환 어댑터 (객체 ↔ 바이트)
- JsonConverter = 디버깅용·읽기 좋음·크기 큼
- AvroConverter = 프로덕션 표준·압축 좋음·Schema Registry 필요
- StringConverter = 단순 문자열, 가장 가벼움
- 키와 값에 각각 Converter를 따로 지정한다
- SMT(Single Message Transform) = 케이블 중간에 박는 작은 가공기
- SMT는 한 메시지 → 한 메시지, 상태 없음, 윈도우·조인 불가
- 자주 쓰는 SMT — InsertField·ReplaceField·MaskField·ValueToKey·RegexRouter
- SMT는
transforms=...설정으로 체인 엮기 가능, 순서 중요 - SMT vs Kafka Streams = 케이블 안 작은 가공기 vs 별도 가공 라인
- 단순 필드 가공·라우팅이면 SMT, 집계·조인·윈도우면 Kafka Streams
- 커스텀 커넥터 = 표준 부품에 없는 케이블을 직접 깎기
- 자바 3대 클래스 — Config·Connector·Task
connect-api의존성은 반드시provided스코프Type.PASSWORD로 민감 값 숨기기,ConfigDef.Range로 유효성 검사- SourceConnector —
taskClass()·taskConfigs()로 Task 분배 - SourceTask —
poll()이 주기 호출,SourceRecord리스트 반환 - Source Partition = 어디서 읽는가 (저장소·테이블·파일 경로)
- Source Offset = 어디까지 읽었는가,
connect-offsets토픽에 자동 저장 - Source Offset ≠ Kafka Topic Offset, 자기 자리 다름
- Schema·Struct — 양식 종이와 그 양식에 채운 값
- SourceRecord — 한 줄 메시지의 모든 정보(키·값·스키마·토픽·오프셋) 묶음
- 패키징은 uber JAR(
jar-with-dependencies),plugin.path에 떨구기 - 외부 API는 Rate Limiting 반드시 처리, 인터럽트 예외 반드시 재throw
- Sink Task =
poll()대신put()·flush() - Sink Task 수는 토픽 파티션 수를 못 넘는다 (컨슈머 그룹 원리)
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Kafka Connect 입문
- 2편 — Source · Sink Connector
- 3편 — Distributed Mode · REST API
- 4편 — SMT · 커스텀 커넥터 (현재 글)
- 5편 — 운영 · 프로덕션 (완)