Spring Batch 입문 44편. 비동기·분산 처리 deep dive — AsyncItemProcessor + AsyncItemWriter (fork-join), ChunkMessageChannelItemWriter (Remote Chunking 의 manager 측), ChunkProcessorChunkHandler (worker 측), MessageChannelPartitionHandler (Remote Partitioning), @EnableBatchIntegration + RemoteChunkingManagerStepBuilderFactory · RemoteChunkingWorkerBuilder · RemotePartitioningManagerStepBuilderFactory · RemotePartitioningWorkerStepBuilderFactory 까지 정리한 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 44편이에요. 43편 의 외부 → batch 결합 다음 — batch 내부의 비동기·분산 처리 deep dive. 42편 결합점 4·5·6 의 구체화.
두 갈래 — Local Async vs Remote Distributed
| 전략 | 분류 | scope |
|---|---|---|
| AsyncItemProcessor + AsyncItemWriter | local (1 JVM) | item 단위 비동기 |
| Remote Chunking | multi-process | chunk 단위 분산 |
| Remote Partitioning | multi-process | Step 단위 분산 |
JVM(자바 실행 환경) 하나 안에서 끝낼지, 여러 process 로 분산할지 — 그리고 어디가 병목이고 어디까지 분산할지 에 따라 갈린다.
AsyncItemProcessor — Item 단위 fork
@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
ItemProcessor<Customer, EnrichedCustomer> delegate,
TaskExecutor taskExecutor) {
AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
async.setDelegate(delegate);
async.setTaskExecutor(taskExecutor);
return async;
}
작동 흐름은 이렇다.
Reader.read() → Customer (item)
↓
AsyncItemProcessor.process(Customer)
↓
TaskExecutor.execute(() -> delegate.process(item)) ← background thread
↓ (즉시 반환)
Future<EnrichedCustomer>
process 는 background thread 로 빠지고 read 는 멈추지 않는다. 작업을 나눠 던지고 결과를 모으는 fork-join(분기·합류) 패턴이다.
효과 — 외부 API 호출 등 expensive process
- direct — 1 chunk (100 items) × API 100ms = 10초
- async — 100 thread 동시 호출 = 약 100ms (API 만 worst case)
process bound 구간에서 throughput 이 수십 배까지 뛴다.
AsyncItemWriter — Join
AsyncItemProcessor 가 Future 를 반환하므로 일반 Writer 로는 받을 수 없다. 그래서 AsyncItemWriter 가 쌍으로 따라붙어야 한다.
@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(
ItemWriter<EnrichedCustomer> delegate) {
AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
async.setDelegate(delegate);
return async;
}
chunk 안의 Future 가 전부 끝날 때까지 기다렸다가 resolve 한 뒤 delegate.write 를 호출한다. fork-join 의 join(합류) 단계가 여기다.
Step 구성
@Bean
public Step asyncStep(JobRepository repo, PlatformTransactionManager tx,
ItemReader<Customer> reader,
AsyncItemProcessor<Customer, EnrichedCustomer> processor,
AsyncItemWriter<EnrichedCustomer> writer) {
return new StepBuilder("asyncStep", repo)
.<Customer, Future<EnrichedCustomer>>chunk(100, tx) // ★ chunk type
.reader(reader)
.processor((ItemProcessor) processor)
.writer((ItemWriter) writer)
.build();
}
핵심은 chunk type 이 <Customer, Future<EnrichedCustomer>> 라는 점이다. processor 가 Future 를 반환하고 writer 가 Future 를 받는 구조다.
함정 — Async Pair 필수
// ❌ 잘못된 조합
.processor(asyncProcessor)
.writer(regularWriter) // ItemWriter<EnrichedCustomer>
regular writer 는 Future 를 EnrichedCustomer 로 캐스팅하지 못한다. 반드시 AsyncItemWriter 가 와야 한다.
// ❌ 또 다른 잘못된 조합
.processor(regularProcessor) // 일반 Processor
.writer(asyncWriter)
이 조합에서는 AsyncItemWriter 가 Future 가 아닌 plain item 을 받는다. 동작은 하지만 비동기 효과는 사라져서 의미가 없다.
규칙은 단순하다 — AsyncItemProcessor 와 AsyncItemWriter 는 항상 pair.
Skip · Retry 와의 결합
AsyncItemProcessor 안에서 터진 예외는 Future 안에 캡슐화된다. AsyncItemWriter 가 Future.get() 을 호출할 때 비로소 예외가 드러난다.
그래서 Step 의 skip·retry 로직이 그 시점에 정상으로 작동한다. 다만 예외 검출이 약간 늦어지는 셈이고, Processor 단계가 아니라 Writer 단계에서 발견된다.
Multi-threaded Step vs AsyncItemProcessor 비교
| 항목 | Multi-threaded Step (37편) | AsyncItemProcessor |
|---|---|---|
| 병렬 단위 | chunk | item |
| read·process·write | 모두 thread 분배 | read·write serial, process 만 비동기 |
| Reader thread-safety | 필수 | X |
| 사용 case | 전체 throughput | process 단계만 무거움 |
외부 API 호출이나 무거운 변환처럼 process 가 진짜 병목이라면 AsyncItemProcessor 쪽이 맞다.
Remote Chunking — 다른 머신에서 process · write
Spring Batch can also use Spring Integration internally. By using this approach, you can delegate the processing of items or even chunks to outside processes. — 공식 reference
37편의 Remote Chunking 을 Spring Batch Integration 으로 실제 구현한 모습이다.
전체 구조
[Manager Process]
Reader → items 수집
↓
chunk 100 items
↓
ChunkMessageChannelItemWriter
↓ (JMS message)
═════════════════════════════════════════
[Worker Process 1] [Worker Process 2]
JMS message 수신 JMS message 수신
↓ ↓
ChunkProcessor ChunkProcessor
(process+write) (process+write)
↓ ↓
reply reply
═════════════════════════════════════════
[Manager Process]
reply 수집
여기서 JMS(자바 메시징 표준) 가 manager 와 worker 사이의 전송 채널이다.
Manager — ChunkMessageChannelItemWriter
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory cf) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(cf).destination("requests"))
.get();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory cf) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(cf).destination("replies"))
.channel(replies())
.get();
}
@Bean
public ChunkMessageChannelItemWriter<Person> itemWriter() {
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(requests());
template.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Person> writer = new ChunkMessageChannelItemWriter<>();
writer.setMessagingOperations(template);
writer.setReplyChannel(replies());
return writer;
}
@Bean
public Step managerStep(JobRepository repo, PlatformTransactionManager tx,
ItemReader<Person> reader,
ChunkMessageChannelItemWriter<Person> writer) {
return new StepBuilder("managerStep", repo)
.<Person, Person>chunk(200, tx)
.reader(reader)
.writer(writer) // ★ ChunkMessageChannelItemWriter
.build();
}
Manager 의 Step 은 read 와 ChunkMessageChannelItemWriter 두 조각뿐이다. write 자리가 messaging 으로 바뀌고 process·write 본체는 빠진다.
Worker — ChunkProcessorChunkHandler
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory cf) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(cf).destination("requests"))
.channel(requests())
.get();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory cf) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(cf).destination("replies"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Person> chunkHandler(
ItemProcessor<Person, Person> processor,
ItemWriter<Person> writer) {
ChunkProcessor<Person> chunkProcessor = new SimpleChunkProcessor<>(processor, writer);
ChunkProcessorChunkHandler<Person> handler = new ChunkProcessorChunkHandler<>();
handler.setChunkProcessor(chunkProcessor);
return handler;
}
Worker 쪽에는 Step 이 없다. 메시지가 도착하면 처리해서 응답을 돌려주는 service activator(메시지 도착 시 작동하는 핸들러) 한 덩어리뿐이다. ChunkProcessorChunkHandler 가 requests 를 받아 processor 와 writer 를 실행하고 replies 로 결과를 돌려보낸다.
@EnableBatchIntegration — 단순화 (4.1+)
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfig {
@Configuration
static class ManagerConfig {
@Autowired
RemoteChunkingManagerStepBuilderFactory factory;
@Bean
public TaskletStep managerStep() {
return factory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests to workers
.inputChannel(replies()) // replies from workers
.build();
}
}
@Configuration
static class WorkerConfig {
@Autowired
RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests())
.outputChannel(replies())
.build();
}
}
}
RemoteChunkingManagerStepBuilderFactory 와 RemoteChunkingWorkerBuilder 가 보일러플레이트를 걷어내준다. Manager 도 Worker 도 4~5 줄이면 끝나서 Spring Batch Integration 4.1+ 에서 권장하는 방식이다.
Remote Partitioning — Step 단위 분산
37편의 Partitioning 을 messaging 으로 풀어낸 형태다.
차이점 (Chunking vs Partitioning)
| 항목 | Remote Chunking | Remote Partitioning |
|---|---|---|
| 분산 단위 | chunk (manager 가 read) | Step (worker 가 자체 read) |
| Worker | service activator | Step instance |
| 사용 case | Process · Write heavy | I/O · 전체 처리 heavy |
| 입력 분할 | manager 가 자연스럽게 | Partitioner 가 명시 |
Remote Partitioning is useful when it is not the processing of items but rather the associated I/O that causes the bottleneck. — 공식 reference
MessageChannelPartitionHandler
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("step1");
handler.setGridSize(3);
handler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100_000);
handler.setMessagingOperations(template);
return handler;
}
37편의 PartitionHandler 를 messaging 으로 구현한 클래스다. JMS·AMQP(메시지 큐 프로토콜)·Kafka 같은 채널을 추상화해서 받는다.
Worker — StepExecutionRequestHandler
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler(
JobExplorer explorer, StepLocator locator) {
StepExecutionRequestHandler handler = new StepExecutionRequestHandler();
handler.setJobExplorer(explorer);
handler.setStepLocator(locator);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator(
StepExecutionRequestHandler handler) {
return handler;
}
Worker 가 Step 실행 요청을 받으면 로컬에서 해당 Step (37편의 worker step) 을 돌리고 reply 를 돌려보낸다.
@EnableBatchIntegration 단순화
@EnableBatchIntegration
@EnableBatchProcessing
public class RemotePartitioningJobConfig {
@Configuration
static class ManagerConfig {
@Autowired
RemotePartitioningManagerStepBuilderFactory factory;
@Bean
public Step managerStep() {
return factory.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
}
@Configuration
static class WorkerConfig {
@Autowired
RemotePartitioningWorkerStepBuilderFactory factory;
@Bean
public Step workerStep() {
return factory.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
}
}
RemotePartitioningManagerStepBuilderFactory 와 RemotePartitioningWorkerStepBuilderFactory 가 표준 짝을 이룬다. 보일러플레이트가 사라져서 Spring Batch Integration 4.1+ 의 권장 패턴이 됐다.
Remote Step (Spring Batch 6 신규)
37편에서 봤던 RemoteStep 이다.
@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository repo) {
return new RemoteStep("step", "workerStep", repo, messagingTemplate);
}
Remote Chunking·Partitioning 이 Step 내부 처리를 원격으로 보내는 거라면, Remote Step 은 Step 자체를 통째로 원격에 맡긴다. Spring Batch 6 부터 1급 시민이라 Job 의 step 자리에 그대로 박아 쓰면 된다.
어떤 전략을 언제
process 가 무거운데 단일 JVM 으로 충분?
→ AsyncItemProcessor + AsyncItemWriter
process·write 가 무거운데 multi-JVM 필요?
→ Remote Chunking
전체 처리 (read+process+write) 가 무거운데 분할 가능?
→ Remote Partitioning
Step 자체를 원격에서?
→ RemoteStep (v6)
운영에서 마주치는 빈도는 AsyncItemProcessor 가 압도적이다. Remote 계열은 진짜로 대규모 분산이 필요한 batch 환경에서나 등장한다.
자주 만나는 사고
사고 1: Async pair 누락
원인은 AsyncItemProcessor 만 쓰고 Writer 를 일반 Writer 로 둔 경우다. 해결은 AsyncItemWriter 까지 함께 wrap 해서 항상 pair 로 묶는 것.
사고 2: Async 의 throughput 향상 안 됨
process 가 1ms 처럼 너무 짧으면 thread 분기 overhead 가 이득을 잡아먹는다. process 가 100ms 이상일 때만 의미가 있고, 도입 전에 한 번이라도 측정해보고 결정한다.
사고 3: Remote Chunking 의 메시지 손실
DirectChannel 은 in-memory 라서 process 가 죽으면 메시지가 같이 사라진다. JMS persistent 나 Kafka 같은 durable queue 로 바꾸고 ack(메시지 수신 확인 신호) 를 보장한다.
사고 4: Remote Chunking 의 reply 시간 초과
Worker 처리가 MessagingTemplate.receiveTimeout 을 넘기면 manager 가 먼저 끊는다. setReceiveTimeout() 을 늘리거나 비동기 ack 로 풀어준다.
사고 5: Remote Partitioning 의 skew
Partitioner 가 입력을 고르지 않게 자르면 worker 하나만 끝까지 일한다. 균등 분할을 먼저 챙기고 필요하면 dynamic load balancing 을 얹는다. 여기서 skew(쏠림) 가 핵심 키워드다.
사고 6: Worker JobRepository 미공유
Manager 와 Worker 가 각자 다른 JobRepository 를 보면 metadata 가 둘로 갈라진다. 공유 DB 로 묶인 JobRepository 를 쓴다.
사고 7: @EnableBatchIntegration 누락
RemoteChunking*BuilderFactory 가 autowire 되지 않는다. @EnableBatchIntegration 을 빠뜨리지 않는다.
사고 8: Worker 측 Step bean 없음
Remote Partitioning 의 worker 가 workerStep 이름의 Step bean 을 찾지 못한다. Worker process 에도 Step bean 을 정의하고 StepLocator 를 설정해줘야 한다.
운영 권장 패턴
Pattern 1: AsyncItemProcessor 표준
@Bean
public ItemProcessor<Customer, EnrichedCustomer> coreProcessor(ApiClient client) {
return customer -> new EnrichedCustomer(customer, client.fetch(customer.getId()));
}
@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
ItemProcessor<Customer, EnrichedCustomer> delegate,
TaskExecutor executor) {
AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
async.setDelegate(delegate);
async.setTaskExecutor(executor);
return async;
}
@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(ItemWriter<EnrichedCustomer> delegate) {
AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
async.setDelegate(delegate);
return async;
}
@Bean
public TaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-batch-");
executor.initialize();
return executor;
}
ThreadPool 에 Async pair 를 묶은 형태가 표준이다.
Pattern 2: Remote Chunking with @EnableBatchIntegration
@Configuration
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingConfig {
@Configuration
static class Manager {
@Autowired RemoteChunkingManagerStepBuilderFactory factory;
@Bean
public TaskletStep managerStep(ItemReader<Order> reader,
MessageChannel requests,
QueueChannel replies) {
return factory.get("managerStep")
.chunk(500)
.reader(reader)
.outputChannel(requests)
.inputChannel(replies)
.build();
}
}
@Configuration
static class Worker {
@Autowired RemoteChunkingWorkerBuilder builder;
@Bean
public IntegrationFlow workerFlow(ItemProcessor<Order, Order> processor,
ItemWriter<Order> writer,
MessageChannel requests,
MessageChannel replies) {
return builder
.itemProcessor(processor)
.itemWriter(writer)
.inputChannel(requests)
.outputChannel(replies)
.build();
}
}
}
보일러플레이트가 사라진 권장 형태다.
Pattern 3: Remote Partitioning with @EnableBatchIntegration
@Configuration
@EnableBatchIntegration
@EnableBatchProcessing
public class RemotePartitioningConfig {
@Configuration
static class Manager {
@Autowired RemotePartitioningManagerStepBuilderFactory factory;
@Bean
public Step managerStep(Partitioner partitioner,
MessageChannel outgoing,
MessageChannel incoming) {
return factory.get("managerStep")
.partitioner("workerStep", partitioner)
.gridSize(8)
.outputChannel(outgoing)
.inputChannel(incoming)
.build();
}
}
@Configuration
static class Worker {
@Autowired RemotePartitioningWorkerStepBuilderFactory factory;
@Bean
public Step workerStep(ItemReader<Order> reader,
ItemProcessor<Order, Order> processor,
ItemWriter<Order> writer,
MessageChannel requests,
MessageChannel replies) {
return factory.get("workerStep")
.inputChannel(requests)
.outputChannel(replies)
.chunk(100)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
}
진짜 분산 batch 의 모습이다.
Pattern 4: Local + Remote 결합
// AsyncItemProcessor 로 process 비동기 (local)
// + Remote Chunking 으로 write 분산
process 는 thread pool 로, write 는 worker process 로 — 두 layer 를 겹친 형태다. 대부분 환경에서는 복잡도만 오르고 이득은 미미해서 측정해보고 결정한다.
시험 직전 한 번 더 — Async · Remote 함정 압축 노트
- 3 전략 = AsyncItemProcessor + AsyncItemWriter (local) · Remote Chunking · Remote Partitioning
- AsyncItemProcessor = item 단위 fork (background thread)
process()→Future<Y>반환- AsyncItemWriter = chunk 의 모든 Future resolve 후 delegate write (join)
- Async pair 필수 — Processor + Writer 둘 다 wrap
- chunk type =
<I, Future<O>> - process 100ms+ 일 때 효과
- vs Multi-threaded Step — item 단위 vs chunk 단위, process 만 vs 전체 thread 분배
- skip · retry = Future.get() 시점에 예외 surface → Writer 단계 detection
- Remote Chunking = manager(read) + workers(process+write), JMS·AMQP 메시지
ChunkMessageChannelItemWriter= Manager 의 writer 자리 (messaging)ChunkProcessorChunkHandler= Worker 의 service activator- Manager Step = read + ChunkMessageChannelItemWriter (process·write 없음)
- Worker Step 없음 — service activator
- Remote Partitioning = Step 자체 분산, worker 가 자체 read·process·write
MessageChannelPartitionHandler= PartitionHandler 의 messaging 구현StepExecutionRequestHandler= Worker 측 service activator- Worker Step = 일반 Step (각자 read·process·write)
- Chunking vs Partitioning — process·write 무거움 vs I/O 무거움
@EnableBatchIntegration+ 4 BuilderFactory:RemoteChunkingManagerStepBuilderFactoryRemoteChunkingWorkerBuilderRemotePartitioningManagerStepBuilderFactoryRemotePartitioningWorkerStepBuilderFactory- 4.1+ 권장 — 보일러플레이트 제거
RemoteStep(Spring Batch 6) = Step 자체를 원격에서 실행 (37편)- 결정 트리 — process 무거움 → Async / multi-JVM 필요 → Remote Chunking / 전체 분산 → Remote Partitioning / Step 분산 → RemoteStep
- 함정 — Async pair 누락 → Future cast 실패
- 함정 — Async 효과 X (process 짧음) → 측정 후 결정
- 함정 — Remote Chunking 메시지 손실 → durable queue
- 함정 — reply timeout 초과 → setReceiveTimeout 늘림
- 함정 — Partitioning skew → 균등 분할
- 함정 — Worker JobRepository 미공유 → 공유 DB
- 함정 — @EnableBatchIntegration 누락
- 함정 — Worker Step bean 없음 → StepLocator
- 패턴 — Async ThreadPool 표준
- 패턴 — Remote Chunking @EnableBatchIntegration
- 패턴 — Remote Partitioning @EnableBatchIntegration
- 패턴 — Local + Remote 결합 (대부분 복잡도 X)
- 운영 빈도 — Async 가 가장 흔함, Remote 는 진정 분산 환경
공식 문서: Asynchronous Processors · Externalizing Batch Process Execution 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 39편 — Retry · Spring Framework 7 Core Retry (v6 변경)
- 40편 — Testing · @SpringBatchTest · End-to-End
- 41편 — Common Patterns · 흔한 운영 패턴 카탈로그
- 42편 — Spring Batch Integration · 두 프레임워크 경계
- 43편 — Launching via Messages · JobLaunchingGateway · Informational
다음 글: