Spring Batch 입문 42편 — Spring Batch Integration · 두 프레임워크 경계

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 42편. Spring Batch 와 Spring Integration 의 경계 — 결합점 6가지 (Launching via Messages · Job-Launching Gateway · Informational Messages · Async Processors · Externalizing Execution · Remote Chunking/Partitioning). 언제 통합이 필요한가 (granularity · 공통 패턴 적용), batch-integration XML namespace (v7 제거 예정), Java config 권장까지 정리한 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 42편 — Spring Batch Integration · 두 프레임워크 경계

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 42편이에요. 41편 의 운영 패턴 카탈로그 다음 — Spring Batch 단독으로 부족할 때 손잡는 짝꿍 — Spring Integration.

왜 둘이 만나는가

Spring Batch 사용자는 Spring Batch 범위 밖 요구사항을 만남. Spring Integration 사용자는 Spring Batch 요구사항 을 만남. — 공식 reference

각각의 강점을 한 줄로 비교해두면 결합점을 따라갈 때 머리가 덜 복잡해요.

프레임워크 강점
Spring Batch 대량 데이터 처리 · chunk-oriented (덩어리 단위 처리) · transaction · restart
Spring Integration 메시지·이벤트 · channel · gateway (외부 진입 endpoint) · EIP (Enterprise Integration Patterns, 통합 설계 패턴 모음)

두 프레임워크는 영역이 겹치지 않고 서로 보완해서, 운영 환경에서는 함께 쓰는 경우가 흔합니다.

6가지 결합점

# 결합점 무엇
1 Launching Jobs through Messages 메시지 도착 = Job 실행 trigger
2 Job-Launching Gateway attributes gateway 옵션 (parameter conversion 등)
3 Informational Messages Job 진행 상태를 messaging channel 로 알림
4 Asynchronous Processors ItemProcessor 를 비동기 thread pool
5 Externalizing Execution Step 실행을 원격 worker 로 분리
6 Remote Chunking / Partitioning 37편의 messaging 구현

각 결합점은 43·44·45편에서 따로 깊게 파고들어요. 42편은 전체 그림과 경계를 먼저 잡는 자리입니다.

Granularity — 어디까지 batch, 어디부터 integration

Two pieces of advice can help: Thinking about granularity and applying common patterns. — 공식 reference

Granularity 판단 기준

자리 Spring Batch Spring Integration
대상 대량 데이터·record 단일 메시지·event
단위 chunk (수백~수만) 1 message
transaction chunk transaction 메시지 ack
재시작 JobRepository message broker re-delivery
상태 ExecutionContext 대체로 stateless
모델 step-oriented (sequential) flow-oriented (channels)

같은 패턴이라도 어느 자리에서 처리하느냐가 달라요. 변환은 ItemProcessor 와 Integration Transformer 가, 필터는 null 반환과 Filter 가, 분기는 Decision 과 Router 가, 분배는 Partitioning 과 Splitter / Aggregator 가 같은 역할을 합니다.

결국 Spring Integration 의 EIP 패턴 일부가 Spring Batch 안에서도 등장하는 셈이고, 어느 layer 에서 처리할지는 granularity 로 결정합니다.

결정 가이드

batch run 안에서 처리하는 일은 Spring Batch 가, batch run 의 시작과 종료를 알리는 trigger·notify 는 Spring Integration 이 맡습니다. batch 바깥에서 흐르는 message 처리도 Spring Integration 쪽이고, batch run 안에서 외부 시스템과 통신하는 경우만 둘 다 — Spring Integration channel 을 빌려 씁니다.

결합점 1: Launching Jobs through Messages

시나리오메시지가 도착 (예: SFTP (Secure File Transfer Protocol) 파일 업로드 알림) = Job 자동 실행.

[SFTP Server] → File 업로드
  ↓
Spring Integration File Inbound Channel
  ↓
JobLaunchRequest 변환
  ↓
JobLaunchingMessageHandler → JobLauncher.run(job, params)
  ↓
[Spring Batch] Job 실행

핵심은 Spring Batch 의 JobLauncher 를 Spring Integration channel 의 endpoint 로 감싸 쓰는 구조라는 점이에요. 자세한 내용은 43편 (Launching Batch Jobs through Messages) 에서 다룹니다.

결합점 2: Job-Launching Gateway

JobLaunchingGateway (Job 실행 진입 endpoint) = Spring Integration 의 gateway endpoint.

옵션은 세 가지를 기억해두면 충분해요. JobLaunchRequest (Job 실행 요청 객체) 안에서 Job · JobParameters mapping 을 어떻게 풀지, jobLauncher 를 어떻게 주입할지, 그리고 reply channel 로 JobExecution 을 어떤 형태로 반환할지. 셋 다 43편의 일부로 다룹니다.

결합점 3: Informational Messages

Job 진행 상태를 messaging channel 로 publish.

활용은 세 갈래입니다. Job 의 시작·종료 notification 을 Slack·email·모니터링 시스템으로 보내거나, Step 진행 통계를 실시간 전송하거나, 실패 이벤트만 골라 alert 하는 식이에요. Spring Integration 의 event-driven channel adapter 가 그 통로가 됩니다. 43편 의 일부.

결합점 4: Asynchronous Processors

시나리오ItemProcessor무거운 외부 호출 (예: 외부 API 100ms / call) → chunk 100 = 10초.

해결AsyncItemProcessor (Processor 를 비동기 thread 로 실행) + AsyncItemWriter (Future 를 받아 resolve 후 위임) — Spring Batch Integration 제공.

@Bean
public AsyncItemProcessor<X, Y> asyncProcessor(
        ItemProcessor<X, Y> delegate, TaskExecutor executor) {
    AsyncItemProcessor<X, Y> async = new AsyncItemProcessor<>();
    async.setDelegate(delegate);
    async.setTaskExecutor(executor);
    return async;
}

@Bean
public AsyncItemWriter<Y> asyncWriter(ItemWriter<Y> delegate) {
    AsyncItemWriter<Y> async = new AsyncItemWriter<>();
    async.setDelegate(delegate);
    return async;
}

작동은 두 단계로 짜여요. AsyncItemProcessor 가 item 별로 Future<Y> 를 반환하고 — 실제 처리는 background thread 가 — 뒤이어 AsyncItemWriter 가 그 Future<Y> 를 받아 resolve 한 다음 delegate writer 를 호출합니다.

중요AsyncItemProcessor 를 쓰면 AsyncItemWriter필수 결합. Future 의 정상 처리.

자세한 내용 = 44편 (Asynchronous Processors · Externalizing Execution).

Multi-threaded Step 과의 차이

항목 Multi-threaded Step AsyncItemProcessor
병렬 단위 chunk item
단위 TaskExecutor thread Future
read·write serial serial (단 process 만 비동기)
사용 case chunk-level 병렬 item-level expensive process

주요 병목이 process 한 단계에 몰려 있으면 AsyncItemProcessor 가, 전체 chunk 처리량을 늘리고 싶으면 Multi-threaded 가 답입니다.

결합점 5: Externalizing Execution

시나리오Step 의 실행다른 process / machine 으로 분리.

[Manager Process]
  Job 실행 → Step 1 도달
  ↓
  Spring Integration channel 로 *Step 실행 요청 메시지*
  ↓
[Worker Process] (다른 머신)
  메시지 수신 → Step 실행 → 결과 reply
  ↓
[Manager] reply 받고 다음 Step

37편에서 다룬 Remote Step 의 정확한 구현 기반이고, 44편 (Externalizing Batch Process Execution) 에서 더 자세히 풀어둡니다.

결합점 6: Remote Chunking / Partitioning

37편의 Remote Chunking · Partitioning 도 결국 Spring Integration messaging 위에서 돌아갑니다. Remote Chunking 은 manager 가 chunk 를 message queue 로 publish 하면 worker 들이 consume 하는 형태고, Remote Partitioning 은 MessageChannelPartitionHandler (메시지 채널 기반 PartitionHandler 구현체) 가 그 자리를 맡아요. 두 전략 모두 Spring Integration 의 channel·gateway 가 통신 기반이 됩니다.

XML Namespace — Deprecated

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
       xsi:schemaLocation="...">
    <!-- ... -->
</beans>

batch-integration XML namespace = Spring Batch Integration 1.3+ 도입. 단:

The batch-integration XML namespace is deprecated as of Spring Batch 6.0 and will be removed in version 7.0. — 공식 reference

Spring Batch 6.0 = batch-integration namespace deprecated. Spring Batch 7.0 = 제거 예정.

Java config 권장. XML 환경은 마이그레이션 계획 필요.

결합 시 의존성

spring-batch-integration 모듈 추가:

<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
</dependency>

spring-integration-core 도 transitive 포함.

사용 결정 — 어떤 결합점이 진짜 필요한가

운영 환경 실제 사용 빈도:

결합점 빈도 이유
Launching via Messages 높음 파일 도착 trigger · 외부 시스템 연동
Informational Messages Job notification · 모니터링
AsyncItemProcessor 높음 외부 API enrichment
Remote Step / Chunking 낮음 진정한 분산 batch 시스템
Remote Partitioning 큰 batch 의 worker 분산

실제로는 Launching via MessagesAsyncItemProcessor 두 개부터 시작하는 환경이 대부분이고, 그 이상은 진짜 필요할 때 하나씩 붙입니다.

결합점 결정 — Decision Tree

Batch 요구사항?
├─ Job trigger 가 외부 event?
│  ├─ YES → Launching via Messages (43편)
│  └─ NO  → 일반 JobLauncher
│
├─ Job 진행 상태를 외부 시스템에 publish?
│  ├─ YES → Informational Messages (43편)
│  └─ NO  → 일반 logging
│
├─ ItemProcessor 가 expensive?
│  ├─ YES → AsyncItemProcessor + AsyncItemWriter (44편)
│  └─ NO  → 일반 Processor
│
├─ Step 을 원격 worker 로?
│  ├─ YES → Externalizing Execution (44편)
│  └─ NO  → 일반 Step
│
└─ 대규모 분산 batch?
   ├─ YES → Remote Chunking / Partitioning (37편 + 44편)
   └─ NO  → 일반 Multi-threaded · Partitioning

Java Config 예제 — Launching via Messages 미리보기

@Bean
public IntegrationFlow jobLaunchingFlow(JobLauncher launcher, Job dailyJob) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/data/incoming"))
            .patternFilter("orders-*.csv"))
        .transform(File.class, file -> {
            JobParameters params = new JobParametersBuilder()
                .addString("input.file", file.getAbsolutePath())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();
            return new JobLaunchRequest(dailyJob, params);
        })
        .handle(new JobLaunchingMessageHandler(launcher))
        .get();
}

Files.inboundAdapter 가 디렉토리를 폴링하다 새 파일이 들어오면 JobLaunchRequest 로 변환하고, JobLaunchingMessageHandler (메시지 받아 Job 을 launch 하는 핸들러) 가 실제 launch 를 맡습니다. 결국 파일 도착이 곧 Job 자동 실행이 되는 셈이고, 이게 43편의 핵심 패턴이에요.

자주 만나는 사고

사고 1: dependency 누락

원인spring-batch-integration 안 추가.

해결 — Maven/Gradle 의존성 명시.

사고 2: AsyncItemProcessor 만 사용 (AsyncItemWriter 누락)

원인 — Processor 만 비동기 + 일반 Writer.

해결반드시 Async pair — Writer 도 AsyncItemWriter 로 wrap.

사고 3: batch-integration XML namespace 미래 위험

원인 — v6 deprecated, v7 제거.

해결 — Java config 마이그레이션. 또는 v7 upgrade 시 동시 작업.

사고 4: Spring Integration channel 메시지 손실

원인 — direct channel = in-memory.

해결durable channel (JMS · Kafka · file based) + 메시지 broker.

사고 5: 결합점 오용 — over-engineering

원인 — 단순 batch 에 Spring Integration 전체 도입.

해결진짜 필요한 결합점 1~2개만. EIP 패턴이 batch 안에서도 가능 한지 검토.

운영 권장 패턴

Pattern 1: 파일 도착 기반 Job trigger

@Bean
public IntegrationFlow fileLaunchFlow(JobLauncher launcher, Job dailyJob) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/data/incoming"))
            .patternFilter("*.csv")
            .preventDuplicates(true),
            poller -> poller.poller(Pollers.fixedRate(Duration.ofSeconds(30))))
        .<File, JobLaunchRequest>transform(file -> {
            JobParameters params = new JobParametersBuilder()
                .addString("input.file", file.getAbsolutePath())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters();
            return new JobLaunchRequest(dailyJob, params);
        })
        .handle(new JobLaunchingMessageHandler(launcher))
        .get();
}

가장 흔한 패턴. Scheduler 별도 운영 X + 파일 도착 즉시 처리.

Pattern 2: AsyncItemProcessor + AsyncItemWriter

@Bean
public ItemProcessor<Customer, EnrichedCustomer> coreProcessor(ApiClient client) {
    return customer -> new EnrichedCustomer(customer, client.fetch(customer.getId()));
}

@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
        ItemProcessor<Customer, EnrichedCustomer> delegate, TaskExecutor executor) {
    AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
    async.setDelegate(delegate);
    async.setTaskExecutor(executor);
    return async;
}

@Bean
public ItemWriter<EnrichedCustomer> coreWriter(DataSource ds) {
    return new JdbcBatchItemWriterBuilder<EnrichedCustomer>()
        .dataSource(ds)
        .sql("INSERT INTO ...")
        .beanMapped()
        .build();
}

@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(ItemWriter<EnrichedCustomer> delegate) {
    AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
    async.setDelegate(delegate);
    return async;
}

@Bean
public Step asyncStep(JobRepository repo, PlatformTransactionManager tx,
                      ItemReader<Customer> reader,
                      AsyncItemProcessor<Customer, EnrichedCustomer> processor,
                      AsyncItemWriter<EnrichedCustomer> writer) {
    return new StepBuilder("asyncStep", repo)
        .<Customer, Future<EnrichedCustomer>>chunk(100, tx)
        .reader(reader)
        .processor((ItemProcessor) processor)
        .writer((ItemWriter) writer)
        .build();
}

chunk type = <Customer, Future<EnrichedCustomer>>. Processor 가 Future 반환.

Pattern 3: Job 종료 notification

@Bean
public JobExecutionListener slackNotifier(SlackClient slack) {
    return new JobExecutionListener() {
        @Override
        public void afterJob(JobExecution execution) {
            String msg = String.format(
                "Job %s finished: %s, read=%d, write=%d, skip=%d",
                execution.getJobInstance().getJobName(),
                execution.getExitStatus().getExitCode(),
                execution.getStepExecutions().stream().mapToLong(StepExecution::getReadCount).sum(),
                execution.getStepExecutions().stream().mapToLong(StepExecution::getWriteCount).sum(),
                execution.getStepExecutions().stream().mapToLong(StepExecution::getSkipCount).sum()
            );
            slack.send(msg);
        }
    };
}

Spring Integration 까지 안 가도 간단한 notification 은 JobExecutionListener 로 OK.

Pattern 4: 결합 최소화

// Spring Integration 없이 동작
@Bean
public Job simpleDailyJob(JobRepository repo, ...) {
    return new JobBuilder("simpleDailyJob", repo)
        // ...
        .build();
}

@Scheduled(cron = "0 0 2 * * *")
public void launchDailyJob() throws Exception {
    JobParameters params = new JobParametersBuilder()
        .addLong("run.id", System.currentTimeMillis())
        .toJobParameters();
    jobLauncher.run(simpleDailyJob, params);
}

Cron 기반 trigger = @Scheduled 면 충분. Spring Integration 의존성 도입 회피.

→ over-engineering 회피 — 진짜 messaging 이 필요 한 자리만 통합.

시험 직전 한 번 더 — Spring Batch Integration 함정 압축 노트

  • 두 프레임워크 관계 = Batch (대량 처리) + Integration (메시지·이벤트)
  • 6 결합점 = Launching via Messages · Job-Launching Gateway · Informational Messages · Async Processors · Externalizing Execution · Remote Chunking/Partitioning
  • Granularity 판단 — batch (record·chunk·transaction) vs integration (1 message·stateless·flow)
  • 결정 가이드 — batch 안 처리 vs trigger·notify·외부 message
  • 결합점 1: Launching via Messages — 메시지 = Job trigger (43편)
  • JobLaunchingMessageHandler + JobLaunchRequest (Job + JobParameters)
  • 흔한 case — 파일 도착 = Job 실행
  • 결합점 4: AsyncItemProcessor + AsyncItemWriter (반드시 pair)
  • Processor 가 Future<Y> 반환, Writer 가 resolve
  • chunk type = <I, Future<O>>
  • vs Multi-threaded Step — chunk-level vs item-level expensive process
  • 결합점 5: Externalizing Execution — Step 을 원격 worker 로 (44편)
  • 결합점 6: Remote Chunking · Partitioning — Spring Integration messaging 기반 (37편)
  • MessageChannelPartitionHandler = remote partition handler
  • batch-integration XML namespace = v6 deprecated · v7 제거 예정
  • Java config 권장
  • 의존성 = spring-batch-integration 모듈
  • 실제 빈도 — Launching via Messages · AsyncItemProcessor 가 가장 흔함
  • Remote Step · Chunking 은 진정 분산 환경
  • 함정 — dependency 누락
  • 함정 — AsyncItemProcessor 만 사용 (AsyncItemWriter 누락) → Future 미해결
  • 함정 — XML namespace 미래 위험 (Java config 마이그레이션)
  • 함정 — direct channel 메시지 손실 (durable channel)
  • 함정 — over-engineering (단순 batch 에 전체 도입)
  • 패턴 — 파일 도착 trigger (Files.inboundAdapter + JobLaunchingMessageHandler)
  • 패턴 — AsyncItemProcessor + AsyncItemWriter pair
  • 패턴 — JobExecutionListener 로 간단 notification
  • 패턴 — Cron @Scheduled 면 Integration 불필요
  • 결정 트리 — Job trigger / notification / async / remote / 분산 단계별 결합점 선택

공식 문서: Spring Batch Integration · Namespace Support 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!