백엔드 데이터 인프라 93편. Kafka Admin Client API — 운영 자동화의 핵심. AdminClient 생성·Topic 생성/삭제/describe·Partition 추가·ACL 관리·Consumer Group 관리·Cluster 메타데이터·CI/CD provisioning 패턴까지 풀어쓴 학습 노트. Part 5-3 API 마무리.
이 글은 백엔드 데이터 인프라 시리즈 130편 중 93편이에요. Part 5-3 API 의 마지막 글. 91·92편 으로 데이터 흐름(Producer/Consumer)을 잡았다면, 이번 93편은 운영 자동화 쪽인 Admin Client API 차례. Topic 생성·ACL(접근 제어 목록) 관리·Consumer Group 모니터링처럼 DevOps 영역에 가까운 일을 다뤄요.
Admin API가 어렵게 느껴지는 이유
Admin API 는 Producer/Consumer 보다 덜 쓰이다 보니 자료가 적고 문서도 흩어져 있어요.
첫째, CLI 도구(kafka-topics.sh 등) 와의 관계가 헷갈려요. 같은 작업을 CLI 와 Admin API 둘 다 할 수 있는데, 언제 어느 쪽을 써야 할지 감이 안 잡히죠.
둘째, 모든 메서드가 KafkaFuture(비동기 결과 객체) 를 반환해요. .get() 으로 동기 변환하는 게 표준이지만 비동기 chain 도 가능합니다.
이 글에서는 Admin API 의 핵심 작업과 운영 자동화 패턴을 정리합니다.
AdminClient 생성
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 작업
}
kafka-clients 의존성은 91·92편과 같아요. ACL 환경에서는 security.protocol·sasl.*·ssl.* 를 추가합니다.
Topic 작업
Topic 생성
NewTopic topic = new NewTopic("orders", 3, (short) 2); // 3 partitions, RF=2
// 추가 설정
topic.configs(Map.of(
"retention.ms", "604800000", // 7일
"compression.type", "zstd",
"min.insync.replicas", "2"
));
CreateTopicsResult result = admin.createTopics(Collections.singletonList(topic));
result.all().get(); // 동기 대기 + 에러 시 throw
여기서 RF(Replication Factor, 복제본 개수)는 2 로 잡았습니다.
여러 Topic 한 번에
List<NewTopic> topics = Arrays.asList(
new NewTopic("orders", 3, (short) 2),
new NewTopic("payments", 5, (short) 3),
new NewTopic("notifications", 1, (short) 2)
);
admin.createTopics(topics).all().get();
Topic 목록
ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();
Topic 정보 (describe)
DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("orders"));
Map<String, TopicDescription> topics = result.allTopicNames().get();
TopicDescription orders = topics.get("orders");
System.out.println("Partitions: " + orders.partitions().size());
for (TopicPartitionInfo partition : orders.partitions()) {
System.out.println(" P" + partition.partition() +
" leader=" + partition.leader().id() +
" replicas=" + partition.replicas() +
" ISR=" + partition.isr());
}
89편 replication 의 ISR(In-Sync Replica, 동기화된 복제본) 와 leader 정보를 그대로 모니터링할 수 있습니다.
Topic 설정 조회·변경
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "orders");
// 조회
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(resource)).all().get();
Config orderConfig = configs.get(resource);
for (ConfigEntry entry : orderConfig.entries()) {
System.out.println(entry.name() + " = " + entry.value());
}
// 변경 (incremental)
Map<ConfigResource, Collection<AlterConfigOp>> updates = Map.of(
resource,
Arrays.asList(new AlterConfigOp(
new ConfigEntry("retention.ms", "1209600000"), // 14일
AlterConfigOp.OpType.SET
))
);
admin.incrementalAlterConfigs(updates).all().get();
Topic 삭제
admin.deleteTopics(Collections.singletonList("orders")).all().get();
여기서 시험 함정이 하나 있어요. delete topic 은 비동기여서 명령은 즉시 반환되지만 실제 삭제는 broker 가 백그라운드에서 처리합니다. 큰 topic 은 수 분이 걸리기도 해요.
Partition 추가
admin.createPartitions(Map.of(
"orders", NewPartitions.increaseTo(10) // 3 → 10 으로 늘림
)).all().get();
Partition 수는 늘릴 수만 있고 줄이는 건 불가능해요.
그리고 추가하는 순간 key 기반 partition 매핑이 깨집니다. 기존 메시지는 원래 partition 에 그대로 있고 새 메시지만 재해시되니까, 순서 보장에 영향을 줘요.
Consumer Group 작업
그룹 목록
ListConsumerGroupsResult result = admin.listConsumerGroups();
Collection<ConsumerGroupListing> groups = result.all().get();
for (ConsumerGroupListing group : groups) {
System.out.println(group.groupId());
}
그룹 상세
DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
Collections.singletonList("order-workers")
);
ConsumerGroupDescription desc = result.all().get().get("order-workers");
System.out.println("State: " + desc.state());
System.out.println("Members: " + desc.members().size());
for (MemberDescription member : desc.members()) {
System.out.println(" " + member.consumerId() +
" host=" + member.host() +
" assigned=" + member.assignment().topicPartitions());
}
Offset Lag 조회
Map<TopicPartition, OffsetAndMetadata> offsets = admin
.listConsumerGroupOffsets("order-workers")
.partitionsToOffsetAndMetadata()
.get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
// log end offset 조회는 KafkaConsumer 의 endOffsets() 사용 (Admin 별도)
}
Offset Reset (재처리·skip)
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
resetOffsets.put(
new TopicPartition("orders", 0),
new OffsetAndMetadata(0L) // 처음부터
);
admin.alterConsumerGroupOffsets("order-workers", resetOffsets).all().get();
운영에서 강력한 도구예요. consumer group 을 특정 offset 으로 강제로 옮기는 작업이라, 재처리나 incidents 복구에 씁니다.
그룹 삭제
admin.deleteConsumerGroups(Collections.singletonList("old-workers")).all().get();
ACL 작업
// ACL 추가
AclBinding binding = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "orders", PatternType.LITERAL),
new AccessControlEntry("User:app-user", "*", AclOperation.READ, AclPermissionType.ALLOW)
);
admin.createAcls(Collections.singletonList(binding)).all().get();
// ACL 조회
AclBindingFilter filter = new AclBindingFilter(
ResourcePatternFilter.ANY,
AccessControlEntryFilter.ANY
);
Collection<AclBinding> acls = admin.describeAcls(filter).values().get();
// ACL 삭제
admin.deleteAcls(Collections.singletonList(filter)).all().get();
ACL 자체의 깊이는 116편에서 다룹니다.
Cluster 메타데이터
DescribeClusterResult result = admin.describeCluster();
String clusterId = result.clusterId().get();
Collection<Node> nodes = result.nodes().get();
Node controller = result.controller().get();
System.out.println("Cluster: " + clusterId);
System.out.println("Brokers: " + nodes.size());
System.out.println("Controller: " + controller.id());
운영 모니터링에서 broker 수와 controller 를 확인하는 용도예요.
KafkaFuture — 비동기 패턴
모든 Admin 메서드는 KafkaFuture 를 반환합니다. 처리 방법은 3가지예요.
1. 동기 (.get())
admin.createTopics(...).all().get(); // 블로킹 + 에러 throw
가장 단순해서 운영 스크립트 표준으로 쓰입니다.
2. 비동기 callback
admin.createTopics(...).all().whenComplete((result, exception) -> {
if (exception != null) log.error("failed", exception);
});
3. Chain
admin.createTopics(...).all()
.thenApply(v -> "Topic created")
.thenApply(msg -> log.info(msg));
Spring Boot — KafkaAdmin + TopicBuilder
@Configuration
public class KafkaTopicConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092"
));
}
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders")
.partitions(3)
.replicas(2)
.config("retention.ms", "604800000")
.build();
}
@Bean
public NewTopic paymentsTopic() {
return TopicBuilder.name("payments")
.partitions(5)
.replicas(3)
.build();
}
}
@Bean NewTopic 을 선언해두면 애플리케이션 시작 시 topic 이 자동 생성되고, 이미 있으면 skip 합니다. CI/CD 환경에서 가장 깔끔한 provisioning(자원 사전 준비) 패턴이에요.
CLI vs Admin API — 언제 어느 걸?
| 시나리오 | 선택 |
|---|---|
| 일회성 작업·디버깅 | CLI (kafka-topics.sh 등) |
| Shell script 자동화 | CLI 또는 API |
| Java/Spring 애플리케이션 시작 시 provisioning | Admin API + Spring TopicBuilder |
| 다른 언어 (Python·Go) | 해당 언어 client |
| CI/CD 파이프라인 | Terraform Kafka provider 또는 Admin API |
| GitOps 스타일 | Strimzi (Kubernetes) 또는 Terraform |
GitOps(Git 저장소를 운영 단일 진실원으로 삼는 방식) 라인은 Strimzi(Kubernetes 위 Kafka 오퍼레이터) 나 Terraform 쪽으로 가는 게 일반적입니다.
운영 자동화 패턴
Topic 자동 provisioning
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders")
.partitions(3)
.replicas(2)
.build();
}
애플리케이션이 자기가 쓸 topic 을 직접 선언하니까 DevOps 부담이 줄어요.
Lag 모니터링 스크립트
public class LagMonitor {
public void check(String groupId) throws Exception {
Map<TopicPartition, OffsetAndMetadata> committed = admin
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
// KafkaConsumer 로 end offset 조회
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
Map<TopicPartition, Long> ends = consumer.endOffsets(committed.keySet());
for (TopicPartition tp : committed.keySet()) {
long lag = ends.get(tp) - committed.get(tp).offset();
if (lag > 10000) {
alert(groupId, tp, lag);
}
}
}
}
}
Topic Config Drift 검사
public boolean isExpected(String topic, Map<String, String> expected) {
Config actual = admin.describeConfigs(
Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic))
).all().get().values().iterator().next();
for (Map.Entry<String, String> exp : expected.entrySet()) {
ConfigEntry entry = actual.get(exp.getKey());
if (entry == null || !entry.value().equals(exp.getValue())) {
return false;
}
}
return true;
}
CI/CD 에서 실제 broker 설정과 기대 설정을 비교해 drift(설정 어긋남) 를 잡아내는 흐름입니다.
한계·실무 함정
1. .get() 의 블로킹
운영 스크립트에서는 괜찮지만 애플리케이션 hot path 에서는 쓰면 안 됩니다. 비동기 chain 으로 풀거나 별도 thread 로 빼야 해요.
2. Partition 수 줄이기 불가
신중히 정해야 합니다. 과한 partition 보다 부족한 partition 쪽이 더 안전해요.
3. ACL 시 권한 부족
Cluster:Describe·Topic:Create 같은 권한이 없으면 AuthorizationException 이 떨어집니다. 별도 admin 계정으로 분리해 두세요.
4. deleteTopics 비동기
명령은 즉시 반환되지만 실제 삭제는 백그라운드에서 일어납니다. 큰 topic 은 수 분 뒤에 사라져요.
5. Spring NewTopic Bean = 시작 시만
런타임에 동적으로 topic 을 만들고 싶다면 AdminClient 를 직접 써야 합니다.
시험 직전 한 번 더 — Kafka Admin API 함정 압축 노트
AdminClient.create(props)—kafka-clients의존성- ACL 환경 =
security.protocol·sasl.*·ssl.*추가 - Topic 생성 =
NewTopic(name, partitions, replication)+createTopics() - 여러 topic 한 번에 가능
- Topic 정보 =
describeTopics→TopicDescription(partitions·leader·ISR) - Config 조회·변경 =
describeConfigs·incrementalAlterConfigs - Topic 삭제 =
deleteTopics(비동기, 실제 삭제는 백그라운드) - Partition 추가 =
createPartitions(NewPartitions.increaseTo(N)) - Partition 줄이기 불가
- Partition 추가 = key 기반 매핑 깨짐
- Consumer Group 목록 =
listConsumerGroups - 그룹 상세 =
describeConsumerGroups→ state·members·assignment - Offset 조회 =
listConsumerGroupOffsets - Offset Reset =
alterConsumerGroupOffsets(재처리·incidents 복구) - 그룹 삭제 =
deleteConsumerGroups - ACL CRUD =
createAcls·describeAcls·deleteAcls - Cluster 메타데이터 =
describeCluster→ clusterId·nodes·controller - 모든 메서드 =
KafkaFuture반환 - 처리 3가지 =
.get()(동기) ·whenComplete(callback) · chain - 운영 스크립트 =
.get()표준 - Spring =
KafkaAdmin+TopicBuilder.name(...).partitions(...).replicas(...).build() @Bean NewTopic= 애플리케이션 시작 시 자동 생성 (이미 있으면 skip)- CI/CD provisioning 표준 패턴
- CLI vs API — 일회성=CLI / 자동화=API / GitOps=Terraform/Strimzi
- 자동화 패턴 — topic provisioning · lag 모니터링 · config drift 검사
- 함정 —
.get()블로킹은 운영 스크립트만 - 함정 —
deleteTopics비동기 - 함정 — Partition 줄이기 불가
- 함정 — ACL 권한 부족 = AuthorizationException
- 함정 — Spring
NewTopic= 시작 시만, 런타임 동적은 AdminClient 직접
공식 문서: AdminClient Javadoc 에서 자세한 API 사양을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 88편 — Kafka Message Delivery Semantics (at-most·at-least·exactly-once)
- 89편 — Kafka Replication (ISR · Leader Election · Unclean)
- 90편 — Kafka API 5종 종합 (Producer · Consumer · Streams · Connect · Admin)
- 91편 — Kafka Producer API 깊이 (Serializer · Callback · Interceptor)
- 92편 — Kafka Consumer API 깊이 (Commit · Rebalance · Seek)
다음 글: