Spring Batch 입문 44편 — Async Processors · Remote Chunking · Partitioning

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 44편. 비동기·분산 처리 deep dive — AsyncItemProcessor + AsyncItemWriter (fork-join), ChunkMessageChannelItemWriter (Remote Chunking 의 manager 측), ChunkProcessorChunkHandler (worker 측), MessageChannelPartitionHandler (Remote Partitioning), @EnableBatchIntegration + RemoteChunkingManagerStepBuilderFactory · RemoteChunkingWorkerBuilder · RemotePartitioningManagerStepBuilderFactory · RemotePartitioningWorkerStepBuilderFactory 까지 정리한 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 44편 — Async Processors · Remote Chunking · Partitioning

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 44편이에요. 43편외부 → batch 결합 다음 — batch 내부의 비동기·분산 처리 deep dive. 42편 결합점 4·5·6 의 구체화.

두 갈래 — Local Async vs Remote Distributed

전략 분류 scope
AsyncItemProcessor + AsyncItemWriter local (1 JVM) item 단위 비동기
Remote Chunking multi-process chunk 단위 분산
Remote Partitioning multi-process Step 단위 분산

JVM(자바 실행 환경) 하나 안에서 끝낼지, 여러 process 로 분산할지 — 그리고 어디가 병목이고 어디까지 분산할지 에 따라 갈린다.

AsyncItemProcessor — Item 단위 fork

@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
        ItemProcessor<Customer, EnrichedCustomer> delegate,
        TaskExecutor taskExecutor) {
    AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
    async.setDelegate(delegate);
    async.setTaskExecutor(taskExecutor);
    return async;
}

작동 흐름은 이렇다.

Reader.read() → Customer (item)
  ↓
AsyncItemProcessor.process(Customer)
  ↓
TaskExecutor.execute(() -> delegate.process(item))  ← background thread
  ↓ (즉시 반환)
Future<EnrichedCustomer>

process 는 background thread 로 빠지고 read 는 멈추지 않는다. 작업을 나눠 던지고 결과를 모으는 fork-join(분기·합류) 패턴이다.

효과 — 외부 API 호출 등 expensive process

  • direct — 1 chunk (100 items) × API 100ms = 10초
  • async — 100 thread 동시 호출 = 약 100ms (API 만 worst case)

process bound 구간에서 throughput 이 수십 배까지 뛴다.

AsyncItemWriter — Join

AsyncItemProcessorFuture 를 반환하므로 일반 Writer 로는 받을 수 없다. 그래서 AsyncItemWriter 가 쌍으로 따라붙어야 한다.

@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(
        ItemWriter<EnrichedCustomer> delegate) {
    AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
    async.setDelegate(delegate);
    return async;
}

chunk 안의 Future 가 전부 끝날 때까지 기다렸다가 resolve 한 뒤 delegate.write 를 호출한다. fork-join 의 join(합류) 단계가 여기다.

Step 구성

@Bean
public Step asyncStep(JobRepository repo, PlatformTransactionManager tx,
                      ItemReader<Customer> reader,
                      AsyncItemProcessor<Customer, EnrichedCustomer> processor,
                      AsyncItemWriter<EnrichedCustomer> writer) {
    return new StepBuilder("asyncStep", repo)
        .<Customer, Future<EnrichedCustomer>>chunk(100, tx)     // ★ chunk type
        .reader(reader)
        .processor((ItemProcessor) processor)
        .writer((ItemWriter) writer)
        .build();
}

핵심은 chunk type 이 <Customer, Future<EnrichedCustomer>> 라는 점이다. processor 가 Future 를 반환하고 writer 가 Future 를 받는 구조다.

함정 — Async Pair 필수

// ❌ 잘못된 조합
.processor(asyncProcessor)
.writer(regularWriter)        // ItemWriter<EnrichedCustomer>

regular writer 는 Future 를 EnrichedCustomer 로 캐스팅하지 못한다. 반드시 AsyncItemWriter 가 와야 한다.

// ❌ 또 다른 잘못된 조합
.processor(regularProcessor)   // 일반 Processor
.writer(asyncWriter)

이 조합에서는 AsyncItemWriter 가 Future 가 아닌 plain item 을 받는다. 동작은 하지만 비동기 효과는 사라져서 의미가 없다.

규칙은 단순하다 — AsyncItemProcessorAsyncItemWriter 는 항상 pair.

Skip · Retry 와의 결합

AsyncItemProcessor 안에서 터진 예외는 Future 안에 캡슐화된다. AsyncItemWriterFuture.get() 을 호출할 때 비로소 예외가 드러난다.

그래서 Step 의 skip·retry 로직이 그 시점에 정상으로 작동한다. 다만 예외 검출이 약간 늦어지는 셈이고, Processor 단계가 아니라 Writer 단계에서 발견된다.

Multi-threaded Step vs AsyncItemProcessor 비교

항목 Multi-threaded Step (37편) AsyncItemProcessor
병렬 단위 chunk item
read·process·write 모두 thread 분배 read·write serial, process 만 비동기
Reader thread-safety 필수 X
사용 case 전체 throughput process 단계만 무거움

외부 API 호출이나 무거운 변환처럼 process 가 진짜 병목이라면 AsyncItemProcessor 쪽이 맞다.

Remote Chunking — 다른 머신에서 process · write

Spring Batch can also use Spring Integration internally. By using this approach, you can delegate the processing of items or even chunks to outside processes. — 공식 reference

37편의 Remote Chunking 을 Spring Batch Integration 으로 실제 구현한 모습이다.

전체 구조

[Manager Process]
  Reader → items 수집
  ↓
  chunk 100 items
  ↓
  ChunkMessageChannelItemWriter
  ↓ (JMS message)
═════════════════════════════════════════
[Worker Process 1]   [Worker Process 2]
  JMS message 수신    JMS message 수신
  ↓                   ↓
  ChunkProcessor      ChunkProcessor
   (process+write)     (process+write)
  ↓                   ↓
  reply              reply
═════════════════════════════════════════
[Manager Process]
  reply 수집

여기서 JMS(자바 메시징 표준) 가 manager 와 worker 사이의 전송 채널이다.

Manager — ChunkMessageChannelItemWriter

@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory cf) {
    return IntegrationFlow
        .from(requests())
        .handle(Jms.outboundAdapter(cf).destination("requests"))
        .get();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory cf) {
    return IntegrationFlow
        .from(Jms.messageDrivenChannelAdapter(cf).destination("replies"))
        .channel(replies())
        .get();
}

@Bean
public ChunkMessageChannelItemWriter<Person> itemWriter() {
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(requests());
    template.setReceiveTimeout(2000);

    ChunkMessageChannelItemWriter<Person> writer = new ChunkMessageChannelItemWriter<>();
    writer.setMessagingOperations(template);
    writer.setReplyChannel(replies());
    return writer;
}

@Bean
public Step managerStep(JobRepository repo, PlatformTransactionManager tx,
                        ItemReader<Person> reader,
                        ChunkMessageChannelItemWriter<Person> writer) {
    return new StepBuilder("managerStep", repo)
        .<Person, Person>chunk(200, tx)
        .reader(reader)
        .writer(writer)                  // ★ ChunkMessageChannelItemWriter
        .build();
}

Manager 의 Step 은 read 와 ChunkMessageChannelItemWriter 두 조각뿐이다. write 자리가 messaging 으로 바뀌고 process·write 본체는 빠진다.

Worker — ChunkProcessorChunkHandler

@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory cf) {
    return IntegrationFlow
        .from(Jms.messageDrivenChannelAdapter(cf).destination("requests"))
        .channel(requests())
        .get();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory cf) {
    return IntegrationFlow
        .from(replies())
        .handle(Jms.outboundAdapter(cf).destination("replies"))
        .get();
}

@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Person> chunkHandler(
        ItemProcessor<Person, Person> processor,
        ItemWriter<Person> writer) {
    ChunkProcessor<Person> chunkProcessor = new SimpleChunkProcessor<>(processor, writer);

    ChunkProcessorChunkHandler<Person> handler = new ChunkProcessorChunkHandler<>();
    handler.setChunkProcessor(chunkProcessor);
    return handler;
}

Worker 쪽에는 Step 이 없다. 메시지가 도착하면 처리해서 응답을 돌려주는 service activator(메시지 도착 시 작동하는 핸들러) 한 덩어리뿐이다. ChunkProcessorChunkHandler 가 requests 를 받아 processor 와 writer 를 실행하고 replies 로 결과를 돌려보낸다.

@EnableBatchIntegration — 단순화 (4.1+)

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfig {

    @Configuration
    static class ManagerConfig {

        @Autowired
        RemoteChunkingManagerStepBuilderFactory factory;

        @Bean
        public TaskletStep managerStep() {
            return factory.get("managerStep")
                .chunk(100)
                .reader(itemReader())
                .outputChannel(requests())   // requests to workers
                .inputChannel(replies())     // replies from workers
                .build();
        }
    }

    @Configuration
    static class WorkerConfig {

        @Autowired
        RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return workerBuilder
                .itemProcessor(itemProcessor())
                .itemWriter(itemWriter())
                .inputChannel(requests())
                .outputChannel(replies())
                .build();
        }
    }
}

RemoteChunkingManagerStepBuilderFactoryRemoteChunkingWorkerBuilder 가 보일러플레이트를 걷어내준다. Manager 도 Worker 도 4~5 줄이면 끝나서 Spring Batch Integration 4.1+ 에서 권장하는 방식이다.

Remote Partitioning — Step 단위 분산

37편의 Partitioning 을 messaging 으로 풀어낸 형태다.

차이점 (Chunking vs Partitioning)

항목 Remote Chunking Remote Partitioning
분산 단위 chunk (manager 가 read) Step (worker 가 자체 read)
Worker service activator Step instance
사용 case Process · Write heavy I/O · 전체 처리 heavy
입력 분할 manager 가 자연스럽게 Partitioner 가 명시

Remote Partitioning is useful when it is not the processing of items but rather the associated I/O that causes the bottleneck. — 공식 reference

MessageChannelPartitionHandler

@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
    handler.setStepName("step1");
    handler.setGridSize(3);
    handler.setReplyChannel(outboundReplies());

    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100_000);
    handler.setMessagingOperations(template);

    return handler;
}

37편의 PartitionHandler 를 messaging 으로 구현한 클래스다. JMS·AMQP(메시지 큐 프로토콜)·Kafka 같은 채널을 추상화해서 받는다.

Worker — StepExecutionRequestHandler

@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler(
        JobExplorer explorer, StepLocator locator) {
    StepExecutionRequestHandler handler = new StepExecutionRequestHandler();
    handler.setJobExplorer(explorer);
    handler.setStepLocator(locator);
    return handler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator(
        StepExecutionRequestHandler handler) {
    return handler;
}

Worker 가 Step 실행 요청을 받으면 로컬에서 해당 Step (37편의 worker step) 을 돌리고 reply 를 돌려보낸다.

@EnableBatchIntegration 단순화

@EnableBatchIntegration
@EnableBatchProcessing
public class RemotePartitioningJobConfig {

    @Configuration
    static class ManagerConfig {

        @Autowired
        RemotePartitioningManagerStepBuilderFactory factory;

        @Bean
        public Step managerStep() {
            return factory.get("managerStep")
                .partitioner("workerStep", partitioner())
                .gridSize(10)
                .outputChannel(outgoingRequestsToWorkers())
                .inputChannel(incomingRepliesFromWorkers())
                .build();
        }
    }

    @Configuration
    static class WorkerConfig {

        @Autowired
        RemotePartitioningWorkerStepBuilderFactory factory;

        @Bean
        public Step workerStep() {
            return factory.get("workerStep")
                .inputChannel(incomingRequestsFromManager())
                .outputChannel(outgoingRepliesToManager())
                .chunk(100)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
        }
    }
}

RemotePartitioningManagerStepBuilderFactoryRemotePartitioningWorkerStepBuilderFactory 가 표준 짝을 이룬다. 보일러플레이트가 사라져서 Spring Batch Integration 4.1+ 의 권장 패턴이 됐다.

Remote Step (Spring Batch 6 신규)

37편에서 봤던 RemoteStep 이다.

@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository repo) {
    return new RemoteStep("step", "workerStep", repo, messagingTemplate);
}

Remote Chunking·Partitioning 이 Step 내부 처리를 원격으로 보내는 거라면, Remote Step 은 Step 자체를 통째로 원격에 맡긴다. Spring Batch 6 부터 1급 시민이라 Job 의 step 자리에 그대로 박아 쓰면 된다.

어떤 전략을 언제

process 가 무거운데 단일 JVM 으로 충분?
  → AsyncItemProcessor + AsyncItemWriter

process·write 가 무거운데 multi-JVM 필요?
  → Remote Chunking

전체 처리 (read+process+write) 가 무거운데 분할 가능?
  → Remote Partitioning

Step 자체를 원격에서?
  → RemoteStep (v6)

운영에서 마주치는 빈도는 AsyncItemProcessor 가 압도적이다. Remote 계열은 진짜로 대규모 분산이 필요한 batch 환경에서나 등장한다.

자주 만나는 사고

사고 1: Async pair 누락

원인은 AsyncItemProcessor 만 쓰고 Writer 를 일반 Writer 로 둔 경우다. 해결은 AsyncItemWriter 까지 함께 wrap 해서 항상 pair 로 묶는 것.

사고 2: Async 의 throughput 향상 안 됨

process 가 1ms 처럼 너무 짧으면 thread 분기 overhead 가 이득을 잡아먹는다. process 가 100ms 이상일 때만 의미가 있고, 도입 전에 한 번이라도 측정해보고 결정한다.

사고 3: Remote Chunking 의 메시지 손실

DirectChannel 은 in-memory 라서 process 가 죽으면 메시지가 같이 사라진다. JMS persistent 나 Kafka 같은 durable queue 로 바꾸고 ack(메시지 수신 확인 신호) 를 보장한다.

사고 4: Remote Chunking 의 reply 시간 초과

Worker 처리가 MessagingTemplate.receiveTimeout 을 넘기면 manager 가 먼저 끊는다. setReceiveTimeout() 을 늘리거나 비동기 ack 로 풀어준다.

사고 5: Remote Partitioning 의 skew

Partitioner 가 입력을 고르지 않게 자르면 worker 하나만 끝까지 일한다. 균등 분할을 먼저 챙기고 필요하면 dynamic load balancing 을 얹는다. 여기서 skew(쏠림) 가 핵심 키워드다.

사고 6: Worker JobRepository 미공유

Manager 와 Worker 가 각자 다른 JobRepository 를 보면 metadata 가 둘로 갈라진다. 공유 DB 로 묶인 JobRepository 를 쓴다.

사고 7: @EnableBatchIntegration 누락

RemoteChunking*BuilderFactory 가 autowire 되지 않는다. @EnableBatchIntegration 을 빠뜨리지 않는다.

사고 8: Worker 측 Step bean 없음

Remote Partitioning 의 worker 가 workerStep 이름의 Step bean 을 찾지 못한다. Worker process 에도 Step bean 을 정의하고 StepLocator 를 설정해줘야 한다.

운영 권장 패턴

Pattern 1: AsyncItemProcessor 표준

@Bean
public ItemProcessor<Customer, EnrichedCustomer> coreProcessor(ApiClient client) {
    return customer -> new EnrichedCustomer(customer, client.fetch(customer.getId()));
}

@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
        ItemProcessor<Customer, EnrichedCustomer> delegate,
        TaskExecutor executor) {
    AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
    async.setDelegate(delegate);
    async.setTaskExecutor(executor);
    return async;
}

@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(ItemWriter<EnrichedCustomer> delegate) {
    AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
    async.setDelegate(delegate);
    return async;
}

@Bean
public TaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);
    executor.setMaxPoolSize(50);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("async-batch-");
    executor.initialize();
    return executor;
}

ThreadPool 에 Async pair 를 묶은 형태가 표준이다.

Pattern 2: Remote Chunking with @EnableBatchIntegration

@Configuration
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingConfig {

    @Configuration
    static class Manager {
        @Autowired RemoteChunkingManagerStepBuilderFactory factory;

        @Bean
        public TaskletStep managerStep(ItemReader<Order> reader,
                                        MessageChannel requests,
                                        QueueChannel replies) {
            return factory.get("managerStep")
                .chunk(500)
                .reader(reader)
                .outputChannel(requests)
                .inputChannel(replies)
                .build();
        }
    }

    @Configuration
    static class Worker {
        @Autowired RemoteChunkingWorkerBuilder builder;

        @Bean
        public IntegrationFlow workerFlow(ItemProcessor<Order, Order> processor,
                                           ItemWriter<Order> writer,
                                           MessageChannel requests,
                                           MessageChannel replies) {
            return builder
                .itemProcessor(processor)
                .itemWriter(writer)
                .inputChannel(requests)
                .outputChannel(replies)
                .build();
        }
    }
}

보일러플레이트가 사라진 권장 형태다.

Pattern 3: Remote Partitioning with @EnableBatchIntegration

@Configuration
@EnableBatchIntegration
@EnableBatchProcessing
public class RemotePartitioningConfig {

    @Configuration
    static class Manager {
        @Autowired RemotePartitioningManagerStepBuilderFactory factory;

        @Bean
        public Step managerStep(Partitioner partitioner,
                                 MessageChannel outgoing,
                                 MessageChannel incoming) {
            return factory.get("managerStep")
                .partitioner("workerStep", partitioner)
                .gridSize(8)
                .outputChannel(outgoing)
                .inputChannel(incoming)
                .build();
        }
    }

    @Configuration
    static class Worker {
        @Autowired RemotePartitioningWorkerStepBuilderFactory factory;

        @Bean
        public Step workerStep(ItemReader<Order> reader,
                                ItemProcessor<Order, Order> processor,
                                ItemWriter<Order> writer,
                                MessageChannel requests,
                                MessageChannel replies) {
            return factory.get("workerStep")
                .inputChannel(requests)
                .outputChannel(replies)
                .chunk(100)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
        }
    }
}

진짜 분산 batch 의 모습이다.

Pattern 4: Local + Remote 결합

// AsyncItemProcessor 로 process 비동기 (local)
// + Remote Chunking 으로 write 분산

process 는 thread pool 로, write 는 worker process 로 — 두 layer 를 겹친 형태다. 대부분 환경에서는 복잡도만 오르고 이득은 미미해서 측정해보고 결정한다.

시험 직전 한 번 더 — Async · Remote 함정 압축 노트

  • 3 전략 = AsyncItemProcessor + AsyncItemWriter (local) · Remote Chunking · Remote Partitioning
  • AsyncItemProcessor = item 단위 fork (background thread)
  • process()Future<Y> 반환
  • AsyncItemWriter = chunk 의 모든 Future resolve 후 delegate write (join)
  • Async pair 필수 — Processor + Writer 둘 다 wrap
  • chunk type = <I, Future<O>>
  • process 100ms+ 일 때 효과
  • vs Multi-threaded Step — item 단위 vs chunk 단위, process 만 vs 전체 thread 분배
  • skip · retry = Future.get() 시점에 예외 surface → Writer 단계 detection
  • Remote Chunking = manager(read) + workers(process+write), JMS·AMQP 메시지
  • ChunkMessageChannelItemWriter = Manager 의 writer 자리 (messaging)
  • ChunkProcessorChunkHandler = Worker 의 service activator
  • Manager Step = read + ChunkMessageChannelItemWriter (process·write 없음)
  • Worker Step 없음 — service activator
  • Remote Partitioning = Step 자체 분산, worker 가 자체 read·process·write
  • MessageChannelPartitionHandler = PartitionHandler 의 messaging 구현
  • StepExecutionRequestHandler = Worker 측 service activator
  • Worker Step = 일반 Step (각자 read·process·write)
  • Chunking vs Partitioning — process·write 무거움 vs I/O 무거움
  • @EnableBatchIntegration + 4 BuilderFactory:
  • RemoteChunkingManagerStepBuilderFactory
  • RemoteChunkingWorkerBuilder
  • RemotePartitioningManagerStepBuilderFactory
  • RemotePartitioningWorkerStepBuilderFactory
  • 4.1+ 권장 — 보일러플레이트 제거
  • RemoteStep (Spring Batch 6) = Step 자체를 원격에서 실행 (37편)
  • 결정 트리 — process 무거움 → Async / multi-JVM 필요 → Remote Chunking / 전체 분산 → Remote Partitioning / Step 분산 → RemoteStep
  • 함정 — Async pair 누락 → Future cast 실패
  • 함정 — Async 효과 X (process 짧음) → 측정 후 결정
  • 함정 — Remote Chunking 메시지 손실 → durable queue
  • 함정 — reply timeout 초과 → setReceiveTimeout 늘림
  • 함정 — Partitioning skew → 균등 분할
  • 함정 — Worker JobRepository 미공유 → 공유 DB
  • 함정 — @EnableBatchIntegration 누락
  • 함정 — Worker Step bean 없음 → StepLocator
  • 패턴 — Async ThreadPool 표준
  • 패턴 — Remote Chunking @EnableBatchIntegration
  • 패턴 — Remote Partitioning @EnableBatchIntegration
  • 패턴 — Local + Remote 결합 (대부분 복잡도 X)
  • 운영 빈도 — Async 가 가장 흔함, Remote 는 진정 분산 환경

공식 문서: Asynchronous Processors · Externalizing Batch Process Execution 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!