Spring Batch 입문 43편. Spring Integration 으로 Job 자동 실행 — JobLaunchRequest · JobLaunchingMessageHandler · JobLaunchingGateway 의 attributes (request/reply channel · job-launcher · reply-timeout), 파일 도착 trigger 의 IntegrationFlow 표준 패턴, sync vs async TaskExecutor 의 JobExecution 반환 동작, Step·Chunk·Job listener 를 messaging 으로 publish 하는 Informational Messages 패턴까지 정리한 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 43편이에요. 42편 에서 잡았던 Overview 다음 — deep dive 첫 번째 = 메시지 기반 Job 실행으로, 42편이 다룬 3개 결합점을 한 흐름에서 통합해봐요.
Job 실행의 4가지 방법
| 방법 | 사용 case |
|---|---|
| CommandLineJobOperator | shell script |
| JobOperator.start() | 웹 애플리케이션 내부 |
| @Scheduled + JobLauncher | cron 기반 |
| JobLaunchingMessageHandler ★ | 메시지·이벤트 기반 |
앞 3개는 시간 또는 명령이 기준이에요. 이벤트 기반으로 가야 할 때 등장하는 게 Spring Integration(스프링 모듈 간 메시징 프레임워크)이에요.
왜 messaging 기반이 필요한가
S(FTP) 서버 폴링 · 파일 도착 · 외부 시스템 알림 · 다수 데이터 소스 동시 처리. Spring Integration 의 adapter 들 + Job 실행을 decoupled, event-driven 으로. — 공식 reference
운영에서 자주 보는 시나리오는 이래요:
- SFTP(Secure File Transfer Protocol, 보안 파일 전송) 디렉토리 폴링 → 파일 도착 = Job 실행
- Kafka 메시지 = Job 실행
- Webhook = Job 실행
- 여러 source 동시 처리 — File + FTP + 메시지
이런 경우 별도 scheduler 를 두는 것보다 메시지 이벤트로 묶는 쪽이 훨씬 깔끔해요.
JobLaunchRequest — 메시지 payload
public class JobLaunchRequest {
private Job job;
private JobParameters jobParameters;
// getter / constructor
}
Job 과 JobParameters 를 감싼 wrapper(감싸는 객체) 예요. Spring Integration 메시지가 실어 나르는 payload(메시지 본문) 타입이 이거죠.
JobLaunchingMessageHandler — 핵심 endpoint
Message<JobLaunchRequest> 가 들어오면 JobLauncher.run(job, params) 를 호출해요. Spring Integration channel 의 endpoint(메시지 종착점) 역할이에요.
내부에서 일어나는 일은 이렇게 흘러요:
Spring Integration Message<JobLaunchRequest>
↓
JobLaunchingMessageHandler.handleMessage(message)
↓
jobLauncher.run(request.getJob(), request.getJobParameters())
↓
JobExecution
↓
reply channel 로 송신
파일 → JobLaunchRequest 변환
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setJob(Job job) {
this.job = job;
}
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder builder = new JobParametersBuilder();
builder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, builder.toJobParameters());
}
}
메서드 위의 @Transformer 가 Spring Integration 의 transformer endpoint(메시지 변환 노드) 라는 표시예요.
File payload 가 들어오면 JobLaunchRequest payload 로 바꿔서 다음 단계로 흘려보내요.
권장 — JobParametersBuilder 사용
new JobParametersBuilder()
.addString("input.file.name", file.getAbsolutePath())
.addLong("run.id", System.currentTimeMillis()) // unique 보장
.toJobParameters();
run.id 를 timestamp 로 박는 이유는 같은 파일을 다시 돌릴 때도 unique parameter 가 보장되도록 하기 위해서예요.
JobLaunchingGateway
JobLaunchingMessageHandler 의 Spring Integration Gateway(외부와 메시지를 주고받는 진입점) 버전이라고 보면 돼요. 손볼 수 있는 옵션이 더 많아요.
@Bean
public JobLaunchingGateway jobLaunchingGateway(JobRepository repo) {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(repo);
launcher.setTaskExecutor(new SyncTaskExecutor());
return new JobLaunchingGateway(launcher);
}
Gateway 의 attributes
| Attribute | 의미 |
|---|---|
| id | bean 이름 (EventDrivenConsumer 또는 PollingConsumer) |
| auto-startup | 시작 시 자동 활성 (default = true) |
| request-channel | 입력 메시지 channel |
| reply-channel | JobExecution 출력 channel |
| reply-timeout | reply 전송 timeout (ms, default = -1 = 무한 wait) |
| job-launcher | 명시 JobLauncher (없으면 default 사용) |
| order | SubscribableChannel 의 호출 순서 |
PollableChannel 사용 시 Poller
@Bean
@ServiceActivator(inputChannel = "queueChannel",
poller = @Poller(fixedRate = "1000"))
public JobLaunchingGateway gatewayWithPoller(JobLauncher launcher) {
JobLaunchingGateway gateway = new JobLaunchingGateway(launcher);
gateway.setOutputChannel(replyChannel());
return gateway;
}
PollableChannel(꺼내가는 방식의 채널) 에서 1초마다 한 번씩 polling 해서 JobLaunchingGateway 가 처리하는 구조예요.
IntegrationFlow — 표준 패턴
파일 도착부터 Job 실행까지 전체를 한 흐름에 담은 형태예요:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job personJob) {
FileMessageToJobRequest transformer = new FileMessageToJobRequest();
transformer.setFileParameterName("input.file.name");
transformer.setJob(personJob);
return transformer;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway(JobRepository repo) {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(repo);
launcher.setTaskExecutor(new SyncTaskExecutor());
return new JobLaunchingGateway(launcher);
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway gateway,
FileMessageToJobRequest transformer) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/tmp/myfiles"))
.filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(transformer)
.handle(gateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}
이 코드가 그리는 흐름은 이래요:
1. Files.inboundAdapter — /tmp/myfiles 디렉토리 polling (1초 간격)
2. SimplePatternFileListFilter — *.csv 만 통과
3. transform — File → JobLaunchRequest 변환
4. handle(gateway) — JobLaunchingGateway 가 Job 실행
5. log — 결과 JobExecution log
결국 CSV(쉼표로 구분된 텍스트 데이터) 파일 도착 = Job 자동 실행 이라는 그림이에요.
Step-Scope Reader 와 결합
JobParameters 의 input.file.name 을 Reader 에서 받는 방식은 이래요:
@Bean
@StepScope
public FlatFileItemReader<Person> sampleReader(
@Value("#{jobParameters['input.file.name']}") String resource) {
return new FlatFileItemReaderBuilder<Person>()
.name("sampleReader")
.resource(new FileSystemResource(resource))
// ...
.build();
}
21편에서 다룬 Late Binding(실행 시점에 값 주입) 이 그대로 들어맞는 자리예요. 파일 경로를 JobParameters 로 동적으로 꽂아주는 거죠.
TaskExecutor 의 영향 — Sync vs Async
When using synchronous TaskExecutor, JobExecution response is returned only after the job completes. When using asynchronous TaskExecutor, the JobExecution instance is returned immediately. — 공식 reference
SyncTaskExecutor (default)
launcher.setTaskExecutor(new SyncTaskExecutor());
이 경우 작동은 이래요:
JobLaunchingGateway.handle(request)
↓
jobLauncher.run() → Job 전체 실행 (수 분~수 시간)
↓
JobExecution 반환 (Job 끝난 후)
↓
reply channel
Job 이 끝날 때까지 thread 가 block 돼요. 짧은 Job 이거나 결과가 즉시 필요한 상황에 맞아요.
Async TaskExecutor
launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
여기서는 흐름이 이렇게 바뀌어요:
JobLaunchingGateway.handle(request)
↓
jobLauncher.run() → Job 시작만 (background thread)
↓
JobExecution 즉시 반환 (Job 진행 중)
↓
reply channel
Job 시작 직후에 바로 반환해요. 오래 걸리는 Job 이나 큐로 밀려 들어오는 처리에 권장돼요.
Async 시 status 추적
// reply channel 에서 받은 JobExecution
JobExecution execution = ...;
long instanceId = execution.getJobInstance().getInstanceId();
// JobExplorer 로 최신 상태 조회 (10편 advanced metadata)
JobInstance instance = jobExplorer.getJobInstance(instanceId);
List<JobExecution> recent = jobExplorer.getJobExecutions(instance);
비동기 Job 의 진행 상태는 JobExplorer 로 들여다봐요.
Informational Messages — 진행 상태 publish
42편의 결합점 3번이 정확히 여기 응용돼요. Job·Step·Chunk listener 를 Spring Integration channel 의 gateway 로 감싸는 방식이에요.
흐름
Step 진행 (chunk·step·job 단위 event)
↓
StepExecutionListener (Spring Batch)
↓ (MessagingGateway 가 변환)
Spring Integration Channel
↓
Router · Filter · 외부 시스템 endpoint
MessagingGateway 정의
@MessagingGateway(name = "notificationExecutionsListener",
defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {
}
@MessagingGateway 가 붙은 interface 가 StepExecutionListener 를 구현하면, Spring Integration 이 runtime 에 proxy(대리 객체) 를 자동으로 만들어줘요. listener 메서드를 호출하는 게 곧 메시지 송신이 되는 구조예요.
@IntegrationComponentScan annotation 도 컨피그에 같이 박아야 해요.
Channel + Adapter
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("STEP_NOTIFICATION");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
stepExecutionsChannel 로 들어온 메시지는 log 로 흘러나가요.
Job 에 listener 등록
@Bean
public Job importPaymentsJob(JobRepository repo,
PlatformTransactionManager tx,
NotificationExecutionListener listener) {
return new JobBuilder("importPayments", repo)
.start(new StepBuilder("step1", repo)
.chunk(200, tx)
.listener(listener) // ★ Step listener 등록
.reader(...)
.writer(...)
.build())
.build();
}
Job 이 돌면 각 step 의 beforeStep · afterStep 이 messaging channel 로 publish 돼요.
Router 로 분기
@Bean
public IntegrationFlow notificationFlow() {
return IntegrationFlow
.from("stepExecutionsChannel")
.<StepExecution, ExitStatus>route(StepExecution::getExitStatus, m -> m
.channelMapping(ExitStatus.FAILED, "failedChannel")
.channelMapping(ExitStatus.COMPLETED, "completedChannel"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "failedChannel")
public MessageHandler emailFailureNotifier(MailSender sender) {
return message -> {
StepExecution exec = (StepExecution) message.getPayload();
sender.send(buildFailureEmail(exec));
};
}
ExitStatus.FAILED 면 이메일을 쏘고, COMPLETED 면 log 만 남겨요. 조건에 따라 길이 갈리는 분기죠.
운영 시나리오 종합 — 파일 도착부터 알림까지
[SFTP] 파일 업로드
↓
Spring Integration FileInboundAdapter
↓ (1초 polling)
↓ FileMessageToJobRequest @Transformer
JobLaunchRequest
↓
JobLaunchingGateway (async TaskExecutor)
↓
[Spring Batch] Job 실행
↓ (각 Step 실행 시)
StepExecutionListener (proxy)
↓
stepExecutionsChannel
↓
Router (ExitStatus 검사)
├─ FAILED → emailFailureChannel → 이메일 발송
└─ COMPLETED → loggingChannel → log
이렇게 event-driven batch 자동화 가 한 줄로 완성돼요.
자주 만나는 사고
사고 1: JobParametersBuilder unique 누락
원인 — 같은 파일을 다시 처리할 때 JobInstance already exists 가 떠요.
해결 — addLong("run.id", System.currentTimeMillis()) 또는 addDate("scheduledAt", new Date()) 로 unique 값을 박아요.
사고 2: SyncTaskExecutor 의 polling blocking
원인 — sync 모드에서 Job 이 도는 동안 polling thread 가 같이 묶여버려요.
해결 — async TaskExecutor 로 갈아끼우고, Job 결과는 JobExplorer 로 따로 봐요.
사고 3: Async 결과 무시
원인 — Job 이 실패해도 JobExecution 은 즉시 돌아와서, 호출자가 완료됐다고 착각 해요.
해결 — 별도 JobExecutionListener 로 실패 알림을 쏘거나 JobExplorer 로 폴링해요.
사고 4: Reader 가 jobParameters 못 받음
원인 — @StepScope 가 빠졌거나 #{jobParameters['key']} 의 quote 가 빠졌어요.
해결 — Reader 에 @StepScope 를 붙이고 quote 도 정확히 맞춰요 (21편).
사고 5: 같은 파일 두 번 처리
원인 — IntegrationFlow 가 파일을 안 지워서 다음 polling 에 또 잡혀요.
해결 — .filter(new AcceptOnceFileListFilter<>()) 를 끼우거나, 처리 후 archive 디렉토리로 옮겨요 (TaskletStep).
사고 6: MessagingGateway interface 가 Spring bean 으로 인식 X
원인 — @IntegrationComponentScan 이 빠져있어요.
해결 — 컨피그 클래스에 annotation 을 추가해요.
사고 7: reply-channel 없으면 메시지 끊김
원인 — JobLaunchingGateway 의 outputChannel 이 안 잡혀있고 downstream 도 없는 경우예요.
해결 — setOutputChannel() 을 쓰거나, IntegrationFlow 에서 .handle(gateway).log() 처럼 후속 endpoint 를 붙여요.
운영 권장 패턴
Pattern 1: 표준 파일 도착 trigger
@Bean
public IntegrationFlow fileLaunchFlow(JobLauncher launcher, Job dailyJob) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/incoming"))
.filter(new ChainFileListFilter<>(List.of(
new SimplePatternFileListFilter("*.csv"),
new AcceptOnceFileListFilter<>()
))),
c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(30))))
.<File, JobLaunchRequest>transform(file -> new JobLaunchRequest(
dailyJob,
new JobParametersBuilder()
.addString("input.file", file.getAbsolutePath())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters()
))
.handle(new JobLaunchingMessageHandler(launcher))
.get();
}
AcceptOnceFileListFilter 가 같은 파일이 두 번 잡히는 걸 막아줘요. 여기서는 Gateway 를 빼고 JobLaunchingMessageHandler 를 직접 써서 단순화했어요.
Pattern 2: Async + Status Tracking
@Bean
public JobLauncher asyncJobLauncher(JobRepository repo) {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(repo);
launcher.setTaskExecutor(new SimpleAsyncTaskExecutor("batch-"));
return launcher;
}
@Bean
public JobExecutionListener completionTracker(SlackClient slack) {
return new JobExecutionListener() {
@Override
public void afterJob(JobExecution execution) {
slack.send("Job " + execution.getJobInstance().getJobName() +
" finished: " + execution.getExitStatus().getExitCode());
}
};
}
Async 로 띄우고 listener 로 완료 알림을 받는 패턴이에요.
Pattern 3: SFTP + 처리 후 archive
@Bean
public IntegrationFlow sftpFlow(JobLauncher launcher, Job processJob) {
return IntegrationFlow
.from(Sftp.inboundAdapter(sftpSessionFactory)
.remoteDirectory("/incoming")
.localDirectory(new File("/data/working"))
.deleteRemoteFiles(true) // 다운로드 후 SFTP 에서 삭제
.preserveTimestamp(true))
.transform(File.class, file -> new JobLaunchRequest(processJob,
new JobParametersBuilder()
.addString("input.file", file.getAbsolutePath())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters()))
.handle(new JobLaunchingMessageHandler(launcher))
.get();
}
SFTP 디렉토리를 polling 하면서 local 로 가져오고, Job 을 돌린 뒤 remote 에서 지우는 구성이에요.
Pattern 4: Informational + Conditional Notification
@MessagingGateway(name = "stepListener",
defaultRequestChannel = "stepEventsChannel")
public interface StepEventGateway extends StepExecutionListener {}
@Bean
public IntegrationFlow stepEventFlow() {
return IntegrationFlow.from("stepEventsChannel")
.<StepExecution>filter(exec ->
exec.getStatus() == BatchStatus.FAILED ||
exec.getSkipCount() > 100)
.handle(message -> {
StepExecution exec = (StepExecution) message.getPayload();
log.warn("Significant event: {} - skip={}",
exec.getStepName(), exec.getSkipCount());
slack.alert(exec);
})
.get();
}
FAILED 거나 skip 이 100 을 넘는 경우에만 Slack 알림을 보내요. 잡음을 줄여주는 방식이에요.
Pattern 5: 다중 source 통합
@Bean
public IntegrationFlow multiSourceFlow(JobLauncher launcher, Job job) {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/data/local")))
.channel("requestChannel")
.get();
}
@Bean
public IntegrationFlow sftpSource(SessionFactory<?> sftp) {
return IntegrationFlow
.from(Sftp.inboundAdapter(sftp).remoteDirectory("/incoming"))
.channel("requestChannel")
.get();
}
@Bean
public IntegrationFlow kafkaSource(ConsumerFactory<String, String> consumer) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumer, "batch-trigger-topic"))
.channel("requestChannel")
.get();
}
@Bean
public IntegrationFlow launchFlow(JobLauncher launcher, Job job) {
return IntegrationFlow.from("requestChannel")
.transform(/* 모든 source 를 JobLaunchRequest 로 통합 */)
.handle(new JobLaunchingMessageHandler(launcher))
.get();
}
여러 source (file · SFTP · Kafka) 를 단일 launchFlow 로 모아서 Job 실행을 한 군데서 처리해요.
시험 직전 한 번 더 — Launching via Messages 함정 압축 노트
- Job 실행 4가지 = CommandLine · JobOperator · @Scheduled · JobLaunchingMessageHandler (event-driven)
- 메시지 기반 = SFTP·파일·Kafka·webhook trigger
- JobLaunchRequest = Job + JobParameters wrapper, 메시지 payload
- JobLaunchingMessageHandler = Message
→ JobLauncher.run() - JobLaunchingGateway = Spring Integration Gateway 버전, configurable
- Gateway attributes = id · auto-startup · request-channel · reply-channel · reply-timeout · job-launcher · order
- reply-timeout default = -1 (무한 wait)
- PollableChannel 사용 =
@Poller또는 global default poller @Transformer= File → JobLaunchRequest 변환 메서드- JobParametersBuilder +
run.id(timestamp) = unique parameter (재실행 보장) - TaskExecutor 영향:
- SyncTaskExecutor = Job 완료 후 JobExecution 반환 (blocking)
- Async = 즉시 반환 (background thread)
- Async + JobExplorer = 상태 추적
- IntegrationFlow 표준 =
Files.inboundAdapter→ filter → transform →handle(gateway)→ log AcceptOnceFileListFilter= 중복 처리 방지- Reader 측 =
@StepScope+@Value("#{jobParameters['input.file.name']}") - Informational Messages = StepListener · ChunkListener · JobExecutionListener 를 messaging 으로
@MessagingGatewayinterface +extends StepExecutionListener= runtime proxy@IntegrationComponentScan필요- channel adapter (LoggingHandler · MailSender · slack) 가 endpoint
- Router 로 ExitStatus 분기 (FAILED → email, COMPLETED → log)
- 함정 — JobParameters unique 누락 → JobInstance already exists
- 함정 — Sync 모드 polling blocking
- 함정 — Async 결과 무시 → JobExplorer 폴링 또는 listener
- 함정 — Reader @StepScope 누락
- 함정 — 같은 파일 중복 처리 → AcceptOnceFileListFilter
- 함정 —
@IntegrationComponentScan누락 → MessagingGateway 미작동 - 함정 — reply-channel 없으면 메시지 끊김
- 패턴 — 표준 파일 도착 trigger (AcceptOnce + JobLaunchingMessageHandler)
- 패턴 — Async + JobExecutionListener notification
- 패턴 — SFTP + 처리 후 archive
- 패턴 — Conditional notification (FAILED 또는 skip > N)
- 패턴 — 다중 source 통합 (File + SFTP + Kafka → 단일 channel)
- event-driven batch 자동화 = SFTP → Job 실행 → 진행 알림 → 완료 처리 한 flow
공식 문서: Launching Batch Jobs through Messages · Job-Launching Gateway Attributes · Informational Messages 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 38편 — Repeat · RepeatTemplate · CompletionPolicy
- 39편 — Retry · Spring Framework 7 Core Retry (v6 변경)
- 40편 — Testing · @SpringBatchTest · End-to-End
- 41편 — Common Patterns · 흔한 운영 패턴 카탈로그
- 42편 — Spring Batch Integration · 두 프레임워크 경계
다음 글: