백엔드 데이터 인프라 119편. Kafka Custom Connector 개발 — SourceConnector·SourceTask·SinkConnector·SinkTask 구현, ConfigDef·SchemaBuilder, Offset 관리, 재시작 안전성, Single Message Transform (SMT) 작성, 배포·테스트 패턴까지 풀어쓴 학습 노트.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 119편이에요. 117·118편 에서 Pre-built Connector(이미 만들어진 커넥터) 사용 영역을 잡았다면, 이번 119편은 직접 만들기 — Custom Connector(직접 구현하는 커넥터) 개발.
언제 Custom Connector 가 필요한가
대부분의 경우엔 pre-built 로 끝나요. Custom 이 필요한 자리는 Pre-built 가 없는 외부 시스템(특수 SaaS·legacy), 비즈니스 로직이 복잡한 통합, 내부 자체 시스템 정도예요. 가능하면 pre-built 가 있을 때 그걸 쓰는 게 낫습니다. Custom 은 개발과 운영 양쪽에서 비용이 크게 붙어요.
SourceConnector 구조
public class MySourceConnector extends SourceConnector {
private Map<String, String> configProps;
@Override
public void start(Map<String, String> props) {
this.configProps = props;
// 외부 시스템 초기화·credential 검증
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// 작업을 maxTasks 개로 분할
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(configProps);
taskConfig.put("task.id", String.valueOf(i));
// 각 task 가 처리할 partition·shard 정보 분할
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public void stop() {
// 정리
}
@Override
public ConfigDef config() {
return MyConfig.config();
}
@Override
public String version() {
return "1.0.0";
}
}
핵심:
start()= 초기화taskClass()= 실제 처리 Task 클래스taskConfigs(int)= 작업을 N task 로 분할config()= 설정 schema 정의
SourceTask 구현
public class MySourceTask extends SourceTask {
private MyExternalSource source;
@Override
public void start(Map<String, String> props) {
source = new MyExternalSource(props.get("api.endpoint"));
// 마지막 처리 offset 복구
Map<String, Object> lastOffset = context.offsetStorageReader()
.offset(Collections.singletonMap("partition", props.get("task.id")));
if (lastOffset != null) {
source.resumeFrom((Long) lastOffset.get("position"));
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// 외부에서 데이터 가져옴
List<MyData> data = source.fetchNext(100);
return data.stream().map(item -> new SourceRecord(
Collections.singletonMap("partition", item.partition()), // source partition
Collections.singletonMap("position", item.position()), // source offset
"my-topic", // Kafka topic
null, // Kafka partition (null = 자동)
Schema.STRING_SCHEMA, // key schema
item.key(), // key
buildValueSchema(), // value schema
buildValue(item) // value
)).collect(Collectors.toList());
}
@Override
public void stop() {
source.close();
}
}
핵심:
poll()= 핵심 method, 메시지 batch 반환SourceRecord(소스 메시지 단위) = source partition + source offset + Kafka record- Offset 관리 =
context.offsetStorageReader()로 마지막 처리 위치 복구
Offset 관리
// poll() 안에서
new SourceRecord(
Collections.singletonMap("source-id", "shard-1"), // partition (어디서 왔나)
Collections.singletonMap("position", 12345L), // offset (어디까지 왔나)
...
);
// Connect framework 가 자동으로 connect-offsets topic 에 commit
Source partition 과 source offset 을 명시해 두면 framework 가 알아서 관리해요.
SinkConnector 구조
public class MySinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) { ... }
@Override
public Class<? extends Task> taskClass() {
return MySinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// SinkConnector 는 보통 모든 task 가 같은 config (Kafka partition 자동 분담)
List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++) configs.add(configProps);
return configs;
}
@Override public void stop() { }
@Override public ConfigDef config() { return MyConfig.config(); }
@Override public String version() { return "1.0.0"; }
}
SinkTask 구현
public class MySinkTask extends SinkTask {
private MyExternalSink sink;
@Override
public void start(Map<String, String> props) {
sink = new MyExternalSink(props.get("endpoint"));
}
@Override
public void put(Collection<SinkRecord> records) {
// Kafka 에서 받은 메시지를 외부 시스템에 write
List<MyData> batch = records.stream()
.map(this::convert)
.collect(Collectors.toList());
sink.writeBatch(batch);
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 모든 in-flight 데이터 외부 시스템에 flush
sink.flush();
}
@Override
public void stop() {
sink.close();
}
}
핵심:
put()= 핵심 method, Kafka 메시지를 외부에 writeflush()= 모든 in-flight 처리 강제 완료- Kafka offset commit 은 framework 가 자동 (flush 성공 후만)
ConfigDef — 설정 schema
public class MyConfig {
public static ConfigDef config() {
return new ConfigDef()
.define("api.endpoint",
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"External API endpoint URL")
.define("api.key",
ConfigDef.Type.PASSWORD, // PASSWORD type = 로그·UI 에 마스킹
ConfigDef.Importance.HIGH,
"API authentication key")
.define("poll.interval.ms",
ConfigDef.Type.INT,
1000, // default
ConfigDef.Range.atLeast(100), // 검증
ConfigDef.Importance.MEDIUM,
"Polling interval in milliseconds")
.define("batch.size",
ConfigDef.Type.INT,
100,
ConfigDef.Importance.MEDIUM,
"Records per batch");
}
}
ConfigDef(설정 스키마 선언 클래스) 만 정의해 두면 REST API 가 알아서 config validation 까지 해줘요.
여기서 시험 함정이 하나 있어요 — PASSWORD type 필수. API key·password 는 반드시 PASSWORD type 으로 잡아둬야 로그와 UI 양쪽에서 마스킹돼요.
Schema 정의
private Schema buildValueSchema() {
return SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("created_at", Timestamp.SCHEMA)
.build();
}
private Struct buildValue(MyData item) {
return new Struct(buildValueSchema())
.put("id", item.id())
.put("name", item.name())
.put("email", item.email())
.put("created_at", item.createdAt());
}
Connect 의 Schema 는 Avro(직렬화 포맷) 와 호환돼서 AvroConverter 가 그대로 변환을 맡아요.
재시작 안전성 — Source Connector
핵심은 재시작 후 같은 메시지를 두 번 보내지 않는 것.
@Override
public void start(Map<String, String> props) {
// 마지막 처리 offset 복구
Map<String, Object> partition = Collections.singletonMap("source-id", "shard-1");
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
long lastPosition = offset != null ? (Long) offset.get("position") : 0L;
source.resumeFrom(lastPosition);
}
Idempotent Source
@Override
public List<SourceRecord> poll() {
// 같은 source position = 같은 메시지 (중복 제거 가능)
return records.stream().map(r -> new SourceRecord(
...
Collections.singletonMap("source-id", "shard-1"),
Collections.singletonMap("position", r.position()),
...
)).toList();
}
같은 source position 으로 들어온 메시지는 framework 가 commit 한 offset 이전 것을 자동으로 skip 해줘요.
Commit 알림 — commitRecord()
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
// Kafka 에 successfully written
// 외부 시스템에 ack 보내거나, 자체 상태 update
}
source connector 가 Kafka write 성공 후 외부 시스템에도 ack 를 보내야 하는 경우에 이 콜백을 씁니다.
SMT 작성
SMT(Single Message Transform, 메시지 단위 변환) 는 다음처럼 구현해요.
public class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
Struct value = (Struct) record.value();
Struct newValue = new Struct(value.schema());
for (Field field : value.schema().fields()) {
Object v = value.get(field);
// 변환 로직
newValue.put(field.name(), transform(v));
}
return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
value.schema(), newValue,
record.timestamp()
);
}
@Override
public ConfigDef config() { return new ConfigDef(); }
@Override
public void configure(Map<String, ?> configs) { }
@Override
public void close() { }
}
설정:
transforms=MyTransform
transforms.MyTransform.type=com.example.MyTransform
배포 — Plugin Path
# /usr/share/kafka-connect-plugins/my-connector/
├── my-connector-1.0.0.jar
├── dependency-1.jar
├── dependency-2.jar
└── ...
# worker.properties
plugin.path=/usr/share/kafka-connect-plugins
각 connector 는 별도 디렉토리 에 두는 게 원칙이에요 (classloader 격리 때문).
# Worker 시작 시 plugin 자동 로드
$ bin/connect-distributed.sh worker.properties
테스트
Unit Test
@Test
public void testTaskPoll() throws Exception {
MySourceTask task = new MySourceTask();
Map<String, String> props = Map.of("api.endpoint", "http://test");
task.initialize(mockContext());
task.start(props);
List<SourceRecord> records = task.poll();
assertEquals(100, records.size());
assertEquals("test-key", records.get(0).key());
task.stop();
}
mockContext() 는 테스트용 SourceTaskContext 를 끼워 넣는 헬퍼예요.
Integration Test
Confluent 가 제공하는 test 유틸 EmbeddedKafkaCluster(내장 Kafka 클러스터) + EmbeddedConnectCluster(내장 Connect 클러스터) 를 묶어 씁니다.
EmbeddedConnectCluster connectCluster = new EmbeddedConnectCluster.Builder()
.numWorkers(2)
.brokerProps(kafkaProps)
.build();
connectCluster.start();
connectCluster.configureConnector("my-connector", connectorProps);
// 실제 Kafka topic 으로 메시지 흐름 검증
Confluent CCS (Connector Compatibility Standard)
Confluent 가 정의한 connector 품질 기준 으로, 다음 항목을 요구해요.
- Schema 사용
- Idempotent 처리
- Error 처리 (DLQ(Dead Letter Queue, 실패 메시지 적재) 지원)
- Metric 노출
- SMT 호환
엔터프라이즈 인증을 받으려면 CCS 를 준수해야 합니다.
한계·실무 함정
1. Pre-built 우선
대부분의 외부 시스템에는 이미 pre-built 가 있어요. Custom 은 진짜 없을 때만 손대는 게 맞습니다.
2. Offset 관리 복잡
Source 쪽은 외부 시스템의 어디까지 처리했는지 추적하는 게 핵심이라, 재시작 안전성을 보장하기가 까다로워요.
3. Idempotent / Exactly-once
Sink 의 EOS(Exactly-Once Semantics, 정확히 한 번 보장) 는 외부 시스템 협력이 있어야 가능해요 (88편 transactional outbox 참고).
4. Schema 진화
Avro + Schema Registry(스키마 중앙 저장소) 권장. Schema 를 바꾸면 consumer 와 sink 의 호환성에 영향이 가요.
5. Plugin classloader 격리
Worker 가 connector 별로 별도 classloader 를 띄우기 때문에, plugin path 안에 모든 dependency 를 포함시켜야 합니다.
6. 운영 부담
Custom connector 는 직접 maintain·debugging·upgrade 까지 떠안는 부담이에요. 잘 모르는 system 통합이라면 Pre-built 도입 부터 검토하는 편이 낫습니다.
시험 직전 한 번 더 — Custom Connector 함정 압축 노트
- 언제 = Pre-built 없는 경우만
- SourceConnector =
start·taskClass·taskConfigs·stop·config·version - SourceTask =
start·poll·stop·commitRecord poll()= 메시지 batch 반환 (SourceRecord)- SourceRecord = source partition + source offset + Kafka topic·key·value·schema
- Offset 관리 =
context.offsetStorageReader().offset(partition)으로 복구 - SinkConnector =
start·taskClass·taskConfigs·stop·config - SinkTask =
start·put·flush·stop put()= Kafka 메시지를 외부에 writeflush()= in-flight 강제 완료 (offset commit 전)- ConfigDef = 설정 schema 정의
Type.PASSWORD= 로그·UI 마스킹 (반드시)Range·default·Importance옵션- Schema =
SchemaBuilder.struct().field() - Avro 호환
- 재시작 안전성 = 같은 source position = 같은 메시지 → framework 자동 skip
commitRecord= Kafka write 성공 후 callback (외부 ack 등)- SMT 작성 =
Transformation<R>구현 - 배포 =
plugin.path디렉토리에 connector + dependencies - 테스트 = Unit +
EmbeddedConnectClusterintegration - Confluent CCS = 품질 기준 (schema·idempotent·error·metric·SMT)
- 함정 — Pre-built 우선
- 함정 — Offset 복잡
- 함정 — Idempotent / EOS 어려움 (외부 시스템 협력)
- 함정 — Schema 진화 호환성
- 함정 — Plugin classloader 격리 (dependency 다 포함)
- 함정 — 운영 부담
공식 문서: Kafka Connect Developer Guide 에서 자세한 API·예제를 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 114편 — Kafka SSL/TLS (Keystore · mTLS · 인증서 운영)
- 115편 — Kafka SASL (SCRAM · PLAIN · OAuth · Kerberos)
- 116편 — Kafka ACL (Authorization 깊이)
- 117편 — Kafka Connect (Source · Sink · Worker 아키텍처)
- 118편 — Kafka Connect 운영 (REST · Status · Error Handler · DLQ)
다음 글: