Spring Batch 입문 42편. Spring Batch 와 Spring Integration 의 경계 — 결합점 6가지 (Launching via Messages · Job-Launching Gateway · Informational Messages · Async Processors · Externalizing Execution · Remote Chunking/Partitioning). 언제 통합이 필요한가 (granularity · 공통 패턴 적용), batch-integration XML namespace (v7 제거 예정), Java config 권장까지 정리한 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 42편이에요. 41편 의 운영 패턴 카탈로그 다음 — Spring Batch 단독으로 부족할 때 손잡는 짝꿍 — Spring Integration.
왜 둘이 만나는가
Spring Batch 사용자는 Spring Batch 범위 밖 요구사항을 만남. Spring Integration 사용자는 Spring Batch 요구사항 을 만남. — 공식 reference
각각의 강점을 한 줄로 비교해두면 결합점을 따라갈 때 머리가 덜 복잡해요.
| 프레임워크 | 강점 |
|---|---|
| Spring Batch | 대량 데이터 처리 · chunk-oriented (덩어리 단위 처리) · transaction · restart |
| Spring Integration | 메시지·이벤트 · channel · gateway (외부 진입 endpoint) · EIP (Enterprise Integration Patterns, 통합 설계 패턴 모음) |
두 프레임워크는 영역이 겹치지 않고 서로 보완해서, 운영 환경에서는 함께 쓰는 경우가 흔합니다.
6가지 결합점
| # | 결합점 | 무엇 |
|---|---|---|
| 1 | Launching Jobs through Messages | 메시지 도착 = Job 실행 trigger |
| 2 | Job-Launching Gateway attributes | gateway 옵션 (parameter conversion 등) |
| 3 | Informational Messages | Job 진행 상태를 messaging channel 로 알림 |
| 4 | Asynchronous Processors | ItemProcessor 를 비동기 thread pool 로 |
| 5 | Externalizing Execution | Step 실행을 원격 worker 로 분리 |
| 6 | Remote Chunking / Partitioning | 37편의 messaging 구현 |
각 결합점은 43·44·45편에서 따로 깊게 파고들어요. 42편은 전체 그림과 경계를 먼저 잡는 자리입니다.
Granularity — 어디까지 batch, 어디부터 integration
Two pieces of advice can help: Thinking about granularity and applying common patterns. — 공식 reference
Granularity 판단 기준
| 자리 | Spring Batch | Spring Integration |
|---|---|---|
| 대상 | 대량 데이터·record | 단일 메시지·event |
| 단위 | chunk (수백~수만) | 1 message |
| transaction | chunk transaction | 메시지 ack |
| 재시작 | JobRepository | message broker re-delivery |
| 상태 | ExecutionContext | 대체로 stateless |
| 모델 | step-oriented (sequential) | flow-oriented (channels) |
같은 패턴이라도 어느 자리에서 처리하느냐가 달라요. 변환은 ItemProcessor 와 Integration Transformer 가, 필터는 null 반환과 Filter 가, 분기는 Decision 과 Router 가, 분배는 Partitioning 과 Splitter / Aggregator 가 같은 역할을 합니다.
결국 Spring Integration 의 EIP 패턴 일부가 Spring Batch 안에서도 등장하는 셈이고, 어느 layer 에서 처리할지는 granularity 로 결정합니다.
결정 가이드
batch run 안에서 처리하는 일은 Spring Batch 가, batch run 의 시작과 종료를 알리는 trigger·notify 는 Spring Integration 이 맡습니다. batch 바깥에서 흐르는 message 처리도 Spring Integration 쪽이고, batch run 안에서 외부 시스템과 통신하는 경우만 둘 다 — Spring Integration channel 을 빌려 씁니다.
결합점 1: Launching Jobs through Messages
시나리오 — 메시지가 도착 (예: SFTP (Secure File Transfer Protocol) 파일 업로드 알림) = Job 자동 실행.
[SFTP Server] → File 업로드
↓
Spring Integration File Inbound Channel
↓
JobLaunchRequest 변환
↓
JobLaunchingMessageHandler → JobLauncher.run(job, params)
↓
[Spring Batch] Job 실행
핵심은 Spring Batch 의 JobLauncher 를 Spring Integration channel 의 endpoint 로 감싸 쓰는 구조라는 점이에요. 자세한 내용은 43편 (Launching Batch Jobs through Messages) 에서 다룹니다.
결합점 2: Job-Launching Gateway
JobLaunchingGateway (Job 실행 진입 endpoint) = Spring Integration 의 gateway endpoint.
옵션은 세 가지를 기억해두면 충분해요. JobLaunchRequest (Job 실행 요청 객체) 안에서 Job · JobParameters mapping 을 어떻게 풀지, jobLauncher 를 어떻게 주입할지, 그리고 reply channel 로 JobExecution 을 어떤 형태로 반환할지. 셋 다 43편의 일부로 다룹니다.
결합점 3: Informational Messages
Job 진행 상태를 messaging channel 로 publish.
활용은 세 갈래입니다. Job 의 시작·종료 notification 을 Slack·email·모니터링 시스템으로 보내거나, Step 진행 통계를 실시간 전송하거나, 실패 이벤트만 골라 alert 하는 식이에요. Spring Integration 의 event-driven channel adapter 가 그 통로가 됩니다. 43편 의 일부.
결합점 4: Asynchronous Processors
시나리오 — ItemProcessor 가 무거운 외부 호출 (예: 외부 API 100ms / call) → chunk 100 = 10초.
해결 — AsyncItemProcessor (Processor 를 비동기 thread 로 실행) + AsyncItemWriter (Future 를 받아 resolve 후 위임) — Spring Batch Integration 제공.
@Bean
public AsyncItemProcessor<X, Y> asyncProcessor(
ItemProcessor<X, Y> delegate, TaskExecutor executor) {
AsyncItemProcessor<X, Y> async = new AsyncItemProcessor<>();
async.setDelegate(delegate);
async.setTaskExecutor(executor);
return async;
}
@Bean
public AsyncItemWriter<Y> asyncWriter(ItemWriter<Y> delegate) {
AsyncItemWriter<Y> async = new AsyncItemWriter<>();
async.setDelegate(delegate);
return async;
}
작동은 두 단계로 짜여요. AsyncItemProcessor 가 item 별로 Future<Y> 를 반환하고 — 실제 처리는 background thread 가 — 뒤이어 AsyncItemWriter 가 그 Future<Y> 를 받아 resolve 한 다음 delegate writer 를 호출합니다.
중요 — AsyncItemProcessor 를 쓰면 AsyncItemWriter 도 필수 결합. Future 의 정상 처리.
자세한 내용 = 44편 (Asynchronous Processors · Externalizing Execution).
Multi-threaded Step 과의 차이
| 항목 | Multi-threaded Step | AsyncItemProcessor |
|---|---|---|
| 병렬 단위 | chunk | item |
| 단위 | TaskExecutor thread | Future |
| read·write | serial | serial (단 process 만 비동기) |
| 사용 case | chunk-level 병렬 | item-level expensive process |
주요 병목이 process 한 단계에 몰려 있으면 AsyncItemProcessor 가, 전체 chunk 처리량을 늘리고 싶으면 Multi-threaded 가 답입니다.
결합점 5: Externalizing Execution
시나리오 — Step 의 실행 을 다른 process / machine 으로 분리.
[Manager Process]
Job 실행 → Step 1 도달
↓
Spring Integration channel 로 *Step 실행 요청 메시지*
↓
[Worker Process] (다른 머신)
메시지 수신 → Step 실행 → 결과 reply
↓
[Manager] reply 받고 다음 Step
37편에서 다룬 Remote Step 의 정확한 구현 기반이고, 44편 (Externalizing Batch Process Execution) 에서 더 자세히 풀어둡니다.
결합점 6: Remote Chunking / Partitioning
37편의 Remote Chunking · Partitioning 도 결국 Spring Integration messaging 위에서 돌아갑니다. Remote Chunking 은 manager 가 chunk 를 message queue 로 publish 하면 worker 들이 consume 하는 형태고, Remote Partitioning 은 MessageChannelPartitionHandler (메시지 채널 기반 PartitionHandler 구현체) 가 그 자리를 맡아요. 두 전략 모두 Spring Integration 의 channel·gateway 가 통신 기반이 됩니다.
XML Namespace — Deprecated
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="...">
<!-- ... -->
</beans>
batch-integration XML namespace = Spring Batch Integration 1.3+ 도입. 단:
The batch-integration XML namespace is deprecated as of Spring Batch 6.0 and will be removed in version 7.0. — 공식 reference
Spring Batch 6.0 = batch-integration namespace deprecated.
Spring Batch 7.0 = 제거 예정.
→ Java config 권장. XML 환경은 마이그레이션 계획 필요.
결합 시 의존성
spring-batch-integration 모듈 추가:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
spring-integration-core 도 transitive 포함.
사용 결정 — 어떤 결합점이 진짜 필요한가
운영 환경 실제 사용 빈도:
| 결합점 | 빈도 | 이유 |
|---|---|---|
| Launching via Messages | 높음 | 파일 도착 trigger · 외부 시스템 연동 |
| Informational Messages | 중 | Job notification · 모니터링 |
| AsyncItemProcessor | 높음 | 외부 API enrichment |
| Remote Step / Chunking | 낮음 | 진정한 분산 batch 시스템 |
| Remote Partitioning | 중 | 큰 batch 의 worker 분산 |
실제로는 Launching via Messages 와 AsyncItemProcessor 두 개부터 시작하는 환경이 대부분이고, 그 이상은 진짜 필요할 때 하나씩 붙입니다.
결합점 결정 — Decision Tree
Batch 요구사항?
├─ Job trigger 가 외부 event?
│ ├─ YES → Launching via Messages (43편)
│ └─ NO → 일반 JobLauncher
│
├─ Job 진행 상태를 외부 시스템에 publish?
│ ├─ YES → Informational Messages (43편)
│ └─ NO → 일반 logging
│
├─ ItemProcessor 가 expensive?
│ ├─ YES → AsyncItemProcessor + AsyncItemWriter (44편)
│ └─ NO → 일반 Processor
│
├─ Step 을 원격 worker 로?
│ ├─ YES → Externalizing Execution (44편)
│ └─ NO → 일반 Step
│
└─ 대규모 분산 batch?
├─ YES → Remote Chunking / Partitioning (37편 + 44편)
└─ NO → 일반 Multi-threaded · Partitioning
Java Config 예제 — Launching via Messages 미리보기
@Bean
public IntegrationFlow jobLaunchingFlow(JobLauncher launcher, Job dailyJob) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/incoming"))
.patternFilter("orders-*.csv"))
.transform(File.class, file -> {
JobParameters params = new JobParametersBuilder()
.addString("input.file", file.getAbsolutePath())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
return new JobLaunchRequest(dailyJob, params);
})
.handle(new JobLaunchingMessageHandler(launcher))
.get();
}
Files.inboundAdapter 가 디렉토리를 폴링하다 새 파일이 들어오면 JobLaunchRequest 로 변환하고, JobLaunchingMessageHandler (메시지 받아 Job 을 launch 하는 핸들러) 가 실제 launch 를 맡습니다. 결국 파일 도착이 곧 Job 자동 실행이 되는 셈이고, 이게 43편의 핵심 패턴이에요.
자주 만나는 사고
사고 1: dependency 누락
원인 — spring-batch-integration 안 추가.
해결 — Maven/Gradle 의존성 명시.
사고 2: AsyncItemProcessor 만 사용 (AsyncItemWriter 누락)
원인 — Processor 만 비동기 + 일반 Writer.
해결 — 반드시 Async pair — Writer 도 AsyncItemWriter 로 wrap.
사고 3: batch-integration XML namespace 미래 위험
원인 — v6 deprecated, v7 제거.
해결 — Java config 마이그레이션. 또는 v7 upgrade 시 동시 작업.
사고 4: Spring Integration channel 메시지 손실
원인 — direct channel = in-memory.
해결 — durable channel (JMS · Kafka · file based) + 메시지 broker.
사고 5: 결합점 오용 — over-engineering
원인 — 단순 batch 에 Spring Integration 전체 도입.
해결 — 진짜 필요한 결합점 1~2개만. EIP 패턴이 batch 안에서도 가능 한지 검토.
운영 권장 패턴
Pattern 1: 파일 도착 기반 Job trigger
@Bean
public IntegrationFlow fileLaunchFlow(JobLauncher launcher, Job dailyJob) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/incoming"))
.patternFilter("*.csv")
.preventDuplicates(true),
poller -> poller.poller(Pollers.fixedRate(Duration.ofSeconds(30))))
.<File, JobLaunchRequest>transform(file -> {
JobParameters params = new JobParametersBuilder()
.addString("input.file", file.getAbsolutePath())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
return new JobLaunchRequest(dailyJob, params);
})
.handle(new JobLaunchingMessageHandler(launcher))
.get();
}
가장 흔한 패턴. Scheduler 별도 운영 X + 파일 도착 즉시 처리.
Pattern 2: AsyncItemProcessor + AsyncItemWriter
@Bean
public ItemProcessor<Customer, EnrichedCustomer> coreProcessor(ApiClient client) {
return customer -> new EnrichedCustomer(customer, client.fetch(customer.getId()));
}
@Bean
public AsyncItemProcessor<Customer, EnrichedCustomer> asyncProcessor(
ItemProcessor<Customer, EnrichedCustomer> delegate, TaskExecutor executor) {
AsyncItemProcessor<Customer, EnrichedCustomer> async = new AsyncItemProcessor<>();
async.setDelegate(delegate);
async.setTaskExecutor(executor);
return async;
}
@Bean
public ItemWriter<EnrichedCustomer> coreWriter(DataSource ds) {
return new JdbcBatchItemWriterBuilder<EnrichedCustomer>()
.dataSource(ds)
.sql("INSERT INTO ...")
.beanMapped()
.build();
}
@Bean
public AsyncItemWriter<EnrichedCustomer> asyncWriter(ItemWriter<EnrichedCustomer> delegate) {
AsyncItemWriter<EnrichedCustomer> async = new AsyncItemWriter<>();
async.setDelegate(delegate);
return async;
}
@Bean
public Step asyncStep(JobRepository repo, PlatformTransactionManager tx,
ItemReader<Customer> reader,
AsyncItemProcessor<Customer, EnrichedCustomer> processor,
AsyncItemWriter<EnrichedCustomer> writer) {
return new StepBuilder("asyncStep", repo)
.<Customer, Future<EnrichedCustomer>>chunk(100, tx)
.reader(reader)
.processor((ItemProcessor) processor)
.writer((ItemWriter) writer)
.build();
}
chunk type = <Customer, Future<EnrichedCustomer>>. Processor 가 Future 반환.
Pattern 3: Job 종료 notification
@Bean
public JobExecutionListener slackNotifier(SlackClient slack) {
return new JobExecutionListener() {
@Override
public void afterJob(JobExecution execution) {
String msg = String.format(
"Job %s finished: %s, read=%d, write=%d, skip=%d",
execution.getJobInstance().getJobName(),
execution.getExitStatus().getExitCode(),
execution.getStepExecutions().stream().mapToLong(StepExecution::getReadCount).sum(),
execution.getStepExecutions().stream().mapToLong(StepExecution::getWriteCount).sum(),
execution.getStepExecutions().stream().mapToLong(StepExecution::getSkipCount).sum()
);
slack.send(msg);
}
};
}
Spring Integration 까지 안 가도 간단한 notification 은 JobExecutionListener 로 OK.
Pattern 4: 결합 최소화
// Spring Integration 없이 동작
@Bean
public Job simpleDailyJob(JobRepository repo, ...) {
return new JobBuilder("simpleDailyJob", repo)
// ...
.build();
}
@Scheduled(cron = "0 0 2 * * *")
public void launchDailyJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(simpleDailyJob, params);
}
Cron 기반 trigger = @Scheduled 면 충분. Spring Integration 의존성 도입 회피.
→ over-engineering 회피 — 진짜 messaging 이 필요 한 자리만 통합.
시험 직전 한 번 더 — Spring Batch Integration 함정 압축 노트
- 두 프레임워크 관계 = Batch (대량 처리) + Integration (메시지·이벤트)
- 6 결합점 = Launching via Messages · Job-Launching Gateway · Informational Messages · Async Processors · Externalizing Execution · Remote Chunking/Partitioning
- Granularity 판단 — batch (record·chunk·transaction) vs integration (1 message·stateless·flow)
- 결정 가이드 — batch 안 처리 vs trigger·notify·외부 message
- 결합점 1: Launching via Messages — 메시지 = Job trigger (43편)
- JobLaunchingMessageHandler +
JobLaunchRequest(Job + JobParameters) - 흔한 case — 파일 도착 = Job 실행
- 결합점 4: AsyncItemProcessor + AsyncItemWriter (반드시 pair)
- Processor 가
Future<Y>반환, Writer 가 resolve - chunk type =
<I, Future<O>> - vs Multi-threaded Step — chunk-level vs item-level expensive process
- 결합점 5: Externalizing Execution — Step 을 원격 worker 로 (44편)
- 결합점 6: Remote Chunking · Partitioning — Spring Integration messaging 기반 (37편)
MessageChannelPartitionHandler= remote partition handler- batch-integration XML namespace = v6 deprecated · v7 제거 예정
- Java config 권장
- 의존성 =
spring-batch-integration모듈 - 실제 빈도 — Launching via Messages · AsyncItemProcessor 가 가장 흔함
- Remote Step · Chunking 은 진정 분산 환경
- 함정 — dependency 누락
- 함정 — AsyncItemProcessor 만 사용 (AsyncItemWriter 누락) → Future 미해결
- 함정 — XML namespace 미래 위험 (Java config 마이그레이션)
- 함정 — direct channel 메시지 손실 (durable channel)
- 함정 — over-engineering (단순 batch 에 전체 도입)
- 패턴 — 파일 도착 trigger (
Files.inboundAdapter+JobLaunchingMessageHandler) - 패턴 — AsyncItemProcessor + AsyncItemWriter pair
- 패턴 — JobExecutionListener 로 간단 notification
- 패턴 — Cron
@Scheduled면 Integration 불필요 - 결정 트리 — Job trigger / notification / async / remote / 분산 단계별 결합점 선택
공식 문서: Spring Batch Integration · Namespace Support 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 37편 — Scaling · Parallel 6가지 전략 종합
- 38편 — Repeat · RepeatTemplate · CompletionPolicy
- 39편 — Retry · Spring Framework 7 Core Retry (v6 변경)
- 40편 — Testing · @SpringBatchTest · End-to-End
- 41편 — Common Patterns · 흔한 운영 패턴 카탈로그
다음 글: