백엔드 데이터 인프라 119편 — Kafka Connect Custom Connector 개발

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 119편. Kafka Custom Connector 개발 — SourceConnector·SourceTask·SinkConnector·SinkTask 구현, ConfigDef·SchemaBuilder, Offset 관리, 재시작 안전성, Single Message Transform (SMT) 작성, 배포·테스트 패턴까지 풀어쓴 학습 노트.

📚 백엔드 데이터 인프라 · 119편 — Kafka Connect Custom Connector 개발

이 글은 백엔드 데이터 인프라 시리즈 130편 중 119편이에요. 117·118편 에서 Pre-built Connector(이미 만들어진 커넥터) 사용 영역을 잡았다면, 이번 119편은 직접 만들기Custom Connector(직접 구현하는 커넥터) 개발.

언제 Custom Connector 가 필요한가

대부분의 경우엔 pre-built 로 끝나요. Custom 이 필요한 자리는 Pre-built 가 없는 외부 시스템(특수 SaaS·legacy), 비즈니스 로직이 복잡한 통합, 내부 자체 시스템 정도예요. 가능하면 pre-built 가 있을 때 그걸 쓰는 게 낫습니다. Custom 은 개발과 운영 양쪽에서 비용이 크게 붙어요.

SourceConnector 구조

public class MySourceConnector extends SourceConnector {

    private Map<String, String> configProps;

    @Override
    public void start(Map<String, String> props) {
        this.configProps = props;
        // 외부 시스템 초기화·credential 검증
    }

    @Override
    public Class<? extends Task> taskClass() {
        return MySourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // 작업을 maxTasks 개로 분할
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> taskConfig = new HashMap<>(configProps);
            taskConfig.put("task.id", String.valueOf(i));
            // 각 task 가 처리할 partition·shard 정보 분할
            taskConfigs.add(taskConfig);
        }
        return taskConfigs;
    }

    @Override
    public void stop() {
        // 정리
    }

    @Override
    public ConfigDef config() {
        return MyConfig.config();
    }

    @Override
    public String version() {
        return "1.0.0";
    }
}

핵심:

  • start() = 초기화
  • taskClass() = 실제 처리 Task 클래스
  • taskConfigs(int) = 작업을 N task 로 분할
  • config() = 설정 schema 정의

SourceTask 구현

public class MySourceTask extends SourceTask {

    private MyExternalSource source;

    @Override
    public void start(Map<String, String> props) {
        source = new MyExternalSource(props.get("api.endpoint"));
        // 마지막 처리 offset 복구
        Map<String, Object> lastOffset = context.offsetStorageReader()
            .offset(Collections.singletonMap("partition", props.get("task.id")));
        if (lastOffset != null) {
            source.resumeFrom((Long) lastOffset.get("position"));
        }
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // 외부에서 데이터 가져옴
        List<MyData> data = source.fetchNext(100);

        return data.stream().map(item -> new SourceRecord(
            Collections.singletonMap("partition", item.partition()),    // source partition
            Collections.singletonMap("position", item.position()),       // source offset
            "my-topic",                                                   // Kafka topic
            null,                                                         // Kafka partition (null = 자동)
            Schema.STRING_SCHEMA,                                         // key schema
            item.key(),                                                   // key
            buildValueSchema(),                                           // value schema
            buildValue(item)                                              // value
        )).collect(Collectors.toList());
    }

    @Override
    public void stop() {
        source.close();
    }
}

핵심:

  • poll() = 핵심 method, 메시지 batch 반환
  • SourceRecord(소스 메시지 단위) = source partition + source offset + Kafka record
  • Offset 관리 = context.offsetStorageReader() 로 마지막 처리 위치 복구

Offset 관리

// poll() 안에서
new SourceRecord(
    Collections.singletonMap("source-id", "shard-1"),    // partition (어디서 왔나)
    Collections.singletonMap("position", 12345L),         // offset (어디까지 왔나)
    ...
);

// Connect framework 가 자동으로 connect-offsets topic 에 commit

Source partition 과 source offset 을 명시해 두면 framework 가 알아서 관리해요.

SinkConnector 구조

public class MySinkConnector extends SinkConnector {

    @Override
    public void start(Map<String, String> props) { ... }

    @Override
    public Class<? extends Task> taskClass() {
        return MySinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // SinkConnector 는 보통 모든 task 가 같은 config (Kafka partition 자동 분담)
        List<Map<String, String>> configs = new ArrayList<>(maxTasks);
        for (int i = 0; i < maxTasks; i++) configs.add(configProps);
        return configs;
    }

    @Override public void stop() { }
    @Override public ConfigDef config() { return MyConfig.config(); }
    @Override public String version() { return "1.0.0"; }
}

SinkTask 구현

public class MySinkTask extends SinkTask {

    private MyExternalSink sink;

    @Override
    public void start(Map<String, String> props) {
        sink = new MyExternalSink(props.get("endpoint"));
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // Kafka 에서 받은 메시지를 외부 시스템에 write
        List<MyData> batch = records.stream()
            .map(this::convert)
            .collect(Collectors.toList());

        sink.writeBatch(batch);
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 모든 in-flight 데이터 외부 시스템에 flush
        sink.flush();
    }

    @Override
    public void stop() {
        sink.close();
    }
}

핵심:

  • put() = 핵심 method, Kafka 메시지를 외부에 write
  • flush() = 모든 in-flight 처리 강제 완료
  • Kafka offset commit 은 framework 가 자동 (flush 성공 후만)

ConfigDef — 설정 schema

public class MyConfig {
    public static ConfigDef config() {
        return new ConfigDef()
            .define("api.endpoint",
                    ConfigDef.Type.STRING,
                    ConfigDef.Importance.HIGH,
                    "External API endpoint URL")
            .define("api.key",
                    ConfigDef.Type.PASSWORD,             // PASSWORD type = 로그·UI 에 마스킹
                    ConfigDef.Importance.HIGH,
                    "API authentication key")
            .define("poll.interval.ms",
                    ConfigDef.Type.INT,
                    1000,                                  // default
                    ConfigDef.Range.atLeast(100),         // 검증
                    ConfigDef.Importance.MEDIUM,
                    "Polling interval in milliseconds")
            .define("batch.size",
                    ConfigDef.Type.INT,
                    100,
                    ConfigDef.Importance.MEDIUM,
                    "Records per batch");
    }
}

ConfigDef(설정 스키마 선언 클래스) 만 정의해 두면 REST API 가 알아서 config validation 까지 해줘요.

여기서 시험 함정이 하나 있어요 — PASSWORD type 필수. API key·password 는 반드시 PASSWORD type 으로 잡아둬야 로그와 UI 양쪽에서 마스킹돼요.

Schema 정의

private Schema buildValueSchema() {
    return SchemaBuilder.struct()
        .field("id", Schema.INT64_SCHEMA)
        .field("name", Schema.STRING_SCHEMA)
        .field("email", Schema.OPTIONAL_STRING_SCHEMA)
        .field("created_at", Timestamp.SCHEMA)
        .build();
}

private Struct buildValue(MyData item) {
    return new Struct(buildValueSchema())
        .put("id", item.id())
        .put("name", item.name())
        .put("email", item.email())
        .put("created_at", item.createdAt());
}

Connect 의 Schema 는 Avro(직렬화 포맷) 와 호환돼서 AvroConverter 가 그대로 변환을 맡아요.

재시작 안전성 — Source Connector

핵심은 재시작 후 같은 메시지를 두 번 보내지 않는 것.

@Override
public void start(Map<String, String> props) {
    // 마지막 처리 offset 복구
    Map<String, Object> partition = Collections.singletonMap("source-id", "shard-1");
    Map<String, Object> offset = context.offsetStorageReader().offset(partition);
    long lastPosition = offset != null ? (Long) offset.get("position") : 0L;
    source.resumeFrom(lastPosition);
}

Idempotent Source

@Override
public List<SourceRecord> poll() {
    // 같은 source position = 같은 메시지 (중복 제거 가능)
    return records.stream().map(r -> new SourceRecord(
        ...
        Collections.singletonMap("source-id", "shard-1"),
        Collections.singletonMap("position", r.position()),
        ...
    )).toList();
}

같은 source position 으로 들어온 메시지는 framework 가 commit 한 offset 이전 것을 자동으로 skip 해줘요.

Commit 알림 — commitRecord()

@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
    // Kafka 에 successfully written
    // 외부 시스템에 ack 보내거나, 자체 상태 update
}

source connector 가 Kafka write 성공 후 외부 시스템에도 ack 를 보내야 하는 경우에 이 콜백을 씁니다.

SMT 작성

SMT(Single Message Transform, 메시지 단위 변환) 는 다음처럼 구현해요.

public class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> {

    @Override
    public R apply(R record) {
        Struct value = (Struct) record.value();
        Struct newValue = new Struct(value.schema());
        for (Field field : value.schema().fields()) {
            Object v = value.get(field);
            // 변환 로직
            newValue.put(field.name(), transform(v));
        }
        return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            value.schema(), newValue,
            record.timestamp()
        );
    }

    @Override
    public ConfigDef config() { return new ConfigDef(); }

    @Override
    public void configure(Map<String, ?> configs) { }

    @Override
    public void close() { }
}

설정:

transforms=MyTransform
transforms.MyTransform.type=com.example.MyTransform

배포 — Plugin Path

# /usr/share/kafka-connect-plugins/my-connector/
├── my-connector-1.0.0.jar
├── dependency-1.jar
├── dependency-2.jar
└── ...

# worker.properties
plugin.path=/usr/share/kafka-connect-plugins

각 connector 는 별도 디렉토리 에 두는 게 원칙이에요 (classloader 격리 때문).

# Worker 시작 시 plugin 자동 로드
$ bin/connect-distributed.sh worker.properties

테스트

Unit Test

@Test
public void testTaskPoll() throws Exception {
    MySourceTask task = new MySourceTask();
    Map<String, String> props = Map.of("api.endpoint", "http://test");
    task.initialize(mockContext());
    task.start(props);

    List<SourceRecord> records = task.poll();
    assertEquals(100, records.size());
    assertEquals("test-key", records.get(0).key());

    task.stop();
}

mockContext() 는 테스트용 SourceTaskContext 를 끼워 넣는 헬퍼예요.

Integration Test

Confluent 가 제공하는 test 유틸 EmbeddedKafkaCluster(내장 Kafka 클러스터) + EmbeddedConnectCluster(내장 Connect 클러스터) 를 묶어 씁니다.

EmbeddedConnectCluster connectCluster = new EmbeddedConnectCluster.Builder()
    .numWorkers(2)
    .brokerProps(kafkaProps)
    .build();
connectCluster.start();

connectCluster.configureConnector("my-connector", connectorProps);
// 실제 Kafka topic 으로 메시지 흐름 검증

Confluent CCS (Connector Compatibility Standard)

Confluent 가 정의한 connector 품질 기준 으로, 다음 항목을 요구해요.

  • Schema 사용
  • Idempotent 처리
  • Error 처리 (DLQ(Dead Letter Queue, 실패 메시지 적재) 지원)
  • Metric 노출
  • SMT 호환

엔터프라이즈 인증을 받으려면 CCS 를 준수해야 합니다.

한계·실무 함정

1. Pre-built 우선

대부분의 외부 시스템에는 이미 pre-built 가 있어요. Custom 은 진짜 없을 때만 손대는 게 맞습니다.

2. Offset 관리 복잡

Source 쪽은 외부 시스템의 어디까지 처리했는지 추적하는 게 핵심이라, 재시작 안전성을 보장하기가 까다로워요.

3. Idempotent / Exactly-once

Sink 의 EOS(Exactly-Once Semantics, 정확히 한 번 보장) 는 외부 시스템 협력이 있어야 가능해요 (88편 transactional outbox 참고).

4. Schema 진화

Avro + Schema Registry(스키마 중앙 저장소) 권장. Schema 를 바꾸면 consumer 와 sink 의 호환성에 영향이 가요.

5. Plugin classloader 격리

Worker 가 connector 별로 별도 classloader 를 띄우기 때문에, plugin path 안에 모든 dependency 를 포함시켜야 합니다.

6. 운영 부담

Custom connector 는 직접 maintain·debugging·upgrade 까지 떠안는 부담이에요. 잘 모르는 system 통합이라면 Pre-built 도입 부터 검토하는 편이 낫습니다.

시험 직전 한 번 더 — Custom Connector 함정 압축 노트

  • 언제 = Pre-built 없는 경우만
  • SourceConnector = start·taskClass·taskConfigs·stop·config·version
  • SourceTask = start·poll·stop·commitRecord
  • poll() = 메시지 batch 반환 (SourceRecord)
  • SourceRecord = source partition + source offset + Kafka topic·key·value·schema
  • Offset 관리 = context.offsetStorageReader().offset(partition) 으로 복구
  • SinkConnector = start·taskClass·taskConfigs·stop·config
  • SinkTask = start·put·flush·stop
  • put() = Kafka 메시지를 외부에 write
  • flush() = in-flight 강제 완료 (offset commit 전)
  • ConfigDef = 설정 schema 정의
  • Type.PASSWORD = 로그·UI 마스킹 (반드시)
  • Range·default·Importance 옵션
  • Schema = SchemaBuilder.struct().field()
  • Avro 호환
  • 재시작 안전성 = 같은 source position = 같은 메시지 → framework 자동 skip
  • commitRecord = Kafka write 성공 후 callback (외부 ack 등)
  • SMT 작성 = Transformation<R> 구현
  • 배포 = plugin.path 디렉토리에 connector + dependencies
  • 테스트 = Unit + EmbeddedConnectCluster integration
  • Confluent CCS = 품질 기준 (schema·idempotent·error·metric·SMT)
  • 함정 — Pre-built 우선
  • 함정 — Offset 복잡
  • 함정 — Idempotent / EOS 어려움 (외부 시스템 협력)
  • 함정 — Schema 진화 호환성
  • 함정 — Plugin classloader 격리 (dependency 다 포함)
  • 함정 — 운영 부담

공식 문서: Kafka Connect Developer Guide 에서 자세한 API·예제를 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!