백엔드 데이터 인프라 93편 — Kafka Admin Client API (Topic·ACL·Consumer Group 관리)

2026-05-17백엔드 데이터 인프라

백엔드 데이터 인프라 93편. Kafka Admin Client API — 운영 자동화의 핵심. AdminClient 생성·Topic 생성/삭제/describe·Partition 추가·ACL 관리·Consumer Group 관리·Cluster 메타데이터·CI/CD provisioning 패턴까지 풀어쓴 학습 노트. Part 5-3 API 마무리.

📚 백엔드 데이터 인프라 · 93편 — Kafka Admin Client API (Topic·ACL·Consumer Group 관리)

이 글은 백엔드 데이터 인프라 시리즈 130편 중 93편이에요. Part 5-3 API 의 마지막 글. 91·92편 으로 데이터 흐름(Producer/Consumer)을 잡았다면, 이번 93편은 운영 자동화 쪽인 Admin Client API 차례. Topic 생성·ACL(접근 제어 목록) 관리·Consumer Group 모니터링처럼 DevOps 영역에 가까운 일을 다뤄요.

Admin API가 어렵게 느껴지는 이유

Admin API 는 Producer/Consumer 보다 덜 쓰이다 보니 자료가 적고 문서도 흩어져 있어요.

첫째, CLI 도구(kafka-topics.sh 등) 와의 관계가 헷갈려요. 같은 작업을 CLI 와 Admin API 둘 다 할 수 있는데, 언제 어느 쪽을 써야 할지 감이 안 잡히죠.

둘째, 모든 메서드가 KafkaFuture(비동기 결과 객체) 를 반환해요. .get() 으로 동기 변환하는 게 표준이지만 비동기 chain 도 가능합니다.

이 글에서는 Admin API 의 핵심 작업과 운영 자동화 패턴을 정리합니다.

AdminClient 생성

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");

try (AdminClient admin = AdminClient.create(props)) {
    // 작업
}

kafka-clients 의존성은 91·92편과 같아요. ACL 환경에서는 security.protocol·sasl.*·ssl.* 를 추가합니다.

Topic 작업

Topic 생성

NewTopic topic = new NewTopic("orders", 3, (short) 2);   // 3 partitions, RF=2

// 추가 설정
topic.configs(Map.of(
    "retention.ms", "604800000",       // 7일
    "compression.type", "zstd",
    "min.insync.replicas", "2"
));

CreateTopicsResult result = admin.createTopics(Collections.singletonList(topic));
result.all().get();   // 동기 대기 + 에러 시 throw

여기서 RF(Replication Factor, 복제본 개수)는 2 로 잡았습니다.

여러 Topic 한 번에

List<NewTopic> topics = Arrays.asList(
    new NewTopic("orders", 3, (short) 2),
    new NewTopic("payments", 5, (short) 3),
    new NewTopic("notifications", 1, (short) 2)
);
admin.createTopics(topics).all().get();

Topic 목록

ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();

Topic 정보 (describe)

DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("orders"));
Map<String, TopicDescription> topics = result.allTopicNames().get();
TopicDescription orders = topics.get("orders");

System.out.println("Partitions: " + orders.partitions().size());
for (TopicPartitionInfo partition : orders.partitions()) {
    System.out.println("  P" + partition.partition() +
                       " leader=" + partition.leader().id() +
                       " replicas=" + partition.replicas() +
                       " ISR=" + partition.isr());
}

89편 replication 의 ISR(In-Sync Replica, 동기화된 복제본) 와 leader 정보를 그대로 모니터링할 수 있습니다.

Topic 설정 조회·변경

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "orders");

// 조회
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(resource)).all().get();
Config orderConfig = configs.get(resource);
for (ConfigEntry entry : orderConfig.entries()) {
    System.out.println(entry.name() + " = " + entry.value());
}

// 변경 (incremental)
Map<ConfigResource, Collection<AlterConfigOp>> updates = Map.of(
    resource,
    Arrays.asList(new AlterConfigOp(
        new ConfigEntry("retention.ms", "1209600000"),   // 14일
        AlterConfigOp.OpType.SET
    ))
);
admin.incrementalAlterConfigs(updates).all().get();

Topic 삭제

admin.deleteTopics(Collections.singletonList("orders")).all().get();

여기서 시험 함정이 하나 있어요. delete topic 은 비동기여서 명령은 즉시 반환되지만 실제 삭제는 broker 가 백그라운드에서 처리합니다. 큰 topic 은 수 분이 걸리기도 해요.

Partition 추가

admin.createPartitions(Map.of(
    "orders", NewPartitions.increaseTo(10)   // 3 → 10 으로 늘림
)).all().get();

Partition 수는 늘릴 수만 있고 줄이는 건 불가능해요.

그리고 추가하는 순간 key 기반 partition 매핑이 깨집니다. 기존 메시지는 원래 partition 에 그대로 있고 새 메시지만 재해시되니까, 순서 보장에 영향을 줘요.

Consumer Group 작업

그룹 목록

ListConsumerGroupsResult result = admin.listConsumerGroups();
Collection<ConsumerGroupListing> groups = result.all().get();
for (ConsumerGroupListing group : groups) {
    System.out.println(group.groupId());
}

그룹 상세

DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
    Collections.singletonList("order-workers")
);
ConsumerGroupDescription desc = result.all().get().get("order-workers");

System.out.println("State: " + desc.state());
System.out.println("Members: " + desc.members().size());
for (MemberDescription member : desc.members()) {
    System.out.println("  " + member.consumerId() +
                       " host=" + member.host() +
                       " assigned=" + member.assignment().topicPartitions());
}

Offset Lag 조회

Map<TopicPartition, OffsetAndMetadata> offsets = admin
    .listConsumerGroupOffsets("order-workers")
    .partitionsToOffsetAndMetadata()
    .get();

for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
    TopicPartition tp = entry.getKey();
    long committed = entry.getValue().offset();
    // log end offset 조회는 KafkaConsumer 의 endOffsets() 사용 (Admin 별도)
}

Offset Reset (재처리·skip)

Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
resetOffsets.put(
    new TopicPartition("orders", 0),
    new OffsetAndMetadata(0L)   // 처음부터
);
admin.alterConsumerGroupOffsets("order-workers", resetOffsets).all().get();

운영에서 강력한 도구예요. consumer group 을 특정 offset 으로 강제로 옮기는 작업이라, 재처리나 incidents 복구에 씁니다.

그룹 삭제

admin.deleteConsumerGroups(Collections.singletonList("old-workers")).all().get();

ACL 작업

// ACL 추가
AclBinding binding = new AclBinding(
    new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
    new AccessControlEntry("User:app-user", "*", AclOperation.READ, AclPermissionType.ALLOW)
);
admin.createAcls(Collections.singletonList(binding)).all().get();

// ACL 조회
AclBindingFilter filter = new AclBindingFilter(
    ResourcePatternFilter.ANY,
    AccessControlEntryFilter.ANY
);
Collection<AclBinding> acls = admin.describeAcls(filter).values().get();

// ACL 삭제
admin.deleteAcls(Collections.singletonList(filter)).all().get();

ACL 자체의 깊이는 116편에서 다룹니다.

Cluster 메타데이터

DescribeClusterResult result = admin.describeCluster();
String clusterId = result.clusterId().get();
Collection<Node> nodes = result.nodes().get();
Node controller = result.controller().get();

System.out.println("Cluster: " + clusterId);
System.out.println("Brokers: " + nodes.size());
System.out.println("Controller: " + controller.id());

운영 모니터링에서 broker 수와 controller 를 확인하는 용도예요.

KafkaFuture — 비동기 패턴

모든 Admin 메서드는 KafkaFuture 를 반환합니다. 처리 방법은 3가지예요.

1. 동기 (.get())

admin.createTopics(...).all().get();   // 블로킹 + 에러 throw

가장 단순해서 운영 스크립트 표준으로 쓰입니다.

2. 비동기 callback

admin.createTopics(...).all().whenComplete((result, exception) -> {
    if (exception != null) log.error("failed", exception);
});

3. Chain

admin.createTopics(...).all()
     .thenApply(v -> "Topic created")
     .thenApply(msg -> log.info(msg));

Spring Boot — KafkaAdmin + TopicBuilder

@Configuration
public class KafkaTopicConfig {

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(Map.of(
            AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092"
        ));
    }

    @Bean
    public NewTopic ordersTopic() {
        return TopicBuilder.name("orders")
                .partitions(3)
                .replicas(2)
                .config("retention.ms", "604800000")
                .build();
    }

    @Bean
    public NewTopic paymentsTopic() {
        return TopicBuilder.name("payments")
                .partitions(5)
                .replicas(3)
                .build();
    }
}

@Bean NewTopic 을 선언해두면 애플리케이션 시작 시 topic 이 자동 생성되고, 이미 있으면 skip 합니다. CI/CD 환경에서 가장 깔끔한 provisioning(자원 사전 준비) 패턴이에요.

CLI vs Admin API — 언제 어느 걸?

시나리오 선택
일회성 작업·디버깅 CLI (kafka-topics.sh 등)
Shell script 자동화 CLI 또는 API
Java/Spring 애플리케이션 시작 시 provisioning Admin API + Spring TopicBuilder
다른 언어 (Python·Go) 해당 언어 client
CI/CD 파이프라인 Terraform Kafka provider 또는 Admin API
GitOps 스타일 Strimzi (Kubernetes) 또는 Terraform

GitOps(Git 저장소를 운영 단일 진실원으로 삼는 방식) 라인은 Strimzi(Kubernetes 위 Kafka 오퍼레이터) 나 Terraform 쪽으로 가는 게 일반적입니다.

운영 자동화 패턴

Topic 자동 provisioning

@Bean
public NewTopic ordersTopic() {
    return TopicBuilder.name("orders")
            .partitions(3)
            .replicas(2)
            .build();
}

애플리케이션이 자기가 쓸 topic 을 직접 선언하니까 DevOps 부담이 줄어요.

Lag 모니터링 스크립트

public class LagMonitor {
    public void check(String groupId) throws Exception {
        Map<TopicPartition, OffsetAndMetadata> committed = admin
            .listConsumerGroupOffsets(groupId)
            .partitionsToOffsetAndMetadata()
            .get();

        // KafkaConsumer 로 end offset 조회
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
            Map<TopicPartition, Long> ends = consumer.endOffsets(committed.keySet());

            for (TopicPartition tp : committed.keySet()) {
                long lag = ends.get(tp) - committed.get(tp).offset();
                if (lag > 10000) {
                    alert(groupId, tp, lag);
                }
            }
        }
    }
}

Topic Config Drift 검사

public boolean isExpected(String topic, Map<String, String> expected) {
    Config actual = admin.describeConfigs(
        Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic))
    ).all().get().values().iterator().next();

    for (Map.Entry<String, String> exp : expected.entrySet()) {
        ConfigEntry entry = actual.get(exp.getKey());
        if (entry == null || !entry.value().equals(exp.getValue())) {
            return false;
        }
    }
    return true;
}

CI/CD 에서 실제 broker 설정과 기대 설정을 비교해 drift(설정 어긋남) 를 잡아내는 흐름입니다.

한계·실무 함정

1. .get() 의 블로킹

운영 스크립트에서는 괜찮지만 애플리케이션 hot path 에서는 쓰면 안 됩니다. 비동기 chain 으로 풀거나 별도 thread 로 빼야 해요.

2. Partition 수 줄이기 불가

신중히 정해야 합니다. 과한 partition 보다 부족한 partition 쪽이 더 안전해요.

3. ACL 시 권한 부족

Cluster:Describe·Topic:Create 같은 권한이 없으면 AuthorizationException 이 떨어집니다. 별도 admin 계정으로 분리해 두세요.

4. deleteTopics 비동기

명령은 즉시 반환되지만 실제 삭제는 백그라운드에서 일어납니다. 큰 topic 은 수 분 뒤에 사라져요.

5. Spring NewTopic Bean = 시작 시만

런타임에 동적으로 topic 을 만들고 싶다면 AdminClient 를 직접 써야 합니다.

시험 직전 한 번 더 — Kafka Admin API 함정 압축 노트

  • AdminClient.create(props)kafka-clients 의존성
  • ACL 환경 = security.protocol·sasl.*·ssl.* 추가
  • Topic 생성 = NewTopic(name, partitions, replication) + createTopics()
  • 여러 topic 한 번에 가능
  • Topic 정보 = describeTopicsTopicDescription (partitions·leader·ISR)
  • Config 조회·변경 = describeConfigs·incrementalAlterConfigs
  • Topic 삭제 = deleteTopics (비동기, 실제 삭제는 백그라운드)
  • Partition 추가 = createPartitions(NewPartitions.increaseTo(N))
  • Partition 줄이기 불가
  • Partition 추가 = key 기반 매핑 깨짐
  • Consumer Group 목록 = listConsumerGroups
  • 그룹 상세 = describeConsumerGroups → state·members·assignment
  • Offset 조회 = listConsumerGroupOffsets
  • Offset Reset = alterConsumerGroupOffsets (재처리·incidents 복구)
  • 그룹 삭제 = deleteConsumerGroups
  • ACL CRUD = createAcls·describeAcls·deleteAcls
  • Cluster 메타데이터 = describeCluster → clusterId·nodes·controller
  • 모든 메서드 = KafkaFuture 반환
  • 처리 3가지 = .get() (동기) · whenComplete (callback) · chain
  • 운영 스크립트 = .get() 표준
  • Spring = KafkaAdmin + TopicBuilder.name(...).partitions(...).replicas(...).build()
  • @Bean NewTopic = 애플리케이션 시작 시 자동 생성 (이미 있으면 skip)
  • CI/CD provisioning 표준 패턴
  • CLI vs API — 일회성=CLI / 자동화=API / GitOps=Terraform/Strimzi
  • 자동화 패턴 — topic provisioning · lag 모니터링 · config drift 검사
  • 함정 — .get() 블로킹은 운영 스크립트만
  • 함정 — deleteTopics 비동기
  • 함정 — Partition 줄이기 불가
  • 함정 — ACL 권한 부족 = AuthorizationException
  • 함정 — Spring NewTopic = 시작 시만, 런타임 동적은 AdminClient 직접

공식 문서: AdminClient Javadoc 에서 자세한 API 사양을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!