Spring Batch 입문 43편 — Launching via Messages · JobLaunchingGateway · Informational

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 43편. Spring Integration 으로 Job 자동 실행 — JobLaunchRequest · JobLaunchingMessageHandler · JobLaunchingGateway 의 attributes (request/reply channel · job-launcher · reply-timeout), 파일 도착 trigger 의 IntegrationFlow 표준 패턴, sync vs async TaskExecutor 의 JobExecution 반환 동작, Step·Chunk·Job listener 를 messaging 으로 publish 하는 Informational Messages 패턴까지 정리한 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 43편 — Launching via Messages · JobLaunchingGateway · Informational

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 43편이에요. 42편 에서 잡았던 Overview 다음 — deep dive 첫 번째 = 메시지 기반 Job 실행으로, 42편이 다룬 3개 결합점을 한 흐름에서 통합해봐요.

Job 실행의 4가지 방법

방법 사용 case
CommandLineJobOperator shell script
JobOperator.start() 웹 애플리케이션 내부
@Scheduled + JobLauncher cron 기반
JobLaunchingMessageHandler 메시지·이벤트 기반

앞 3개는 시간 또는 명령이 기준이에요. 이벤트 기반으로 가야 할 때 등장하는 게 Spring Integration(스프링 모듈 간 메시징 프레임워크)이에요.

왜 messaging 기반이 필요한가

S(FTP) 서버 폴링 · 파일 도착 · 외부 시스템 알림 · 다수 데이터 소스 동시 처리. Spring Integration 의 adapter 들 + Job 실행을 decoupled, event-driven 으로. — 공식 reference

운영에서 자주 보는 시나리오는 이래요:

  • SFTP(Secure File Transfer Protocol, 보안 파일 전송) 디렉토리 폴링 → 파일 도착 = Job 실행
  • Kafka 메시지 = Job 실행
  • Webhook = Job 실행
  • 여러 source 동시 처리 — File + FTP + 메시지

이런 경우 별도 scheduler 를 두는 것보다 메시지 이벤트로 묶는 쪽이 훨씬 깔끔해요.

JobLaunchRequest — 메시지 payload

public class JobLaunchRequest {
    private Job job;
    private JobParameters jobParameters;
    // getter / constructor
}

JobJobParameters 를 감싼 wrapper(감싸는 객체) 예요. Spring Integration 메시지가 실어 나르는 payload(메시지 본문) 타입이 이거죠.

JobLaunchingMessageHandler — 핵심 endpoint

Message<JobLaunchRequest> 가 들어오면 JobLauncher.run(job, params) 를 호출해요. Spring Integration channel 의 endpoint(메시지 종착점) 역할이에요.

내부에서 일어나는 일은 이렇게 흘러요:

Spring Integration Message<JobLaunchRequest>
  ↓
JobLaunchingMessageHandler.handleMessage(message)
  ↓
jobLauncher.run(request.getJob(), request.getJobParameters())
  ↓
JobExecution
  ↓
reply channel 로 송신

파일 → JobLaunchRequest 변환

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setJob(Job job) {
        this.job = job;
    }

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());
        return new JobLaunchRequest(job, builder.toJobParameters());
    }
}

메서드 위의 @Transformer 가 Spring Integration 의 transformer endpoint(메시지 변환 노드) 라는 표시예요.

File payload 가 들어오면 JobLaunchRequest payload 로 바꿔서 다음 단계로 흘려보내요.

권장 — JobParametersBuilder 사용

new JobParametersBuilder()
    .addString("input.file.name", file.getAbsolutePath())
    .addLong("run.id", System.currentTimeMillis())     // unique 보장
    .toJobParameters();

run.id 를 timestamp 로 박는 이유는 같은 파일을 다시 돌릴 때도 unique parameter 가 보장되도록 하기 위해서예요.

JobLaunchingGateway

JobLaunchingMessageHandler 의 Spring Integration Gateway(외부와 메시지를 주고받는 진입점) 버전이라고 보면 돼요. 손볼 수 있는 옵션이 더 많아요.

@Bean
public JobLaunchingGateway jobLaunchingGateway(JobRepository repo) {
    TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
    launcher.setJobRepository(repo);
    launcher.setTaskExecutor(new SyncTaskExecutor());

    return new JobLaunchingGateway(launcher);
}

Gateway 의 attributes

Attribute 의미
id bean 이름 (EventDrivenConsumer 또는 PollingConsumer)
auto-startup 시작 시 자동 활성 (default = true)
request-channel 입력 메시지 channel
reply-channel JobExecution 출력 channel
reply-timeout reply 전송 timeout (ms, default = -1 = 무한 wait)
job-launcher 명시 JobLauncher (없으면 default 사용)
order SubscribableChannel 의 호출 순서

PollableChannel 사용 시 Poller

@Bean
@ServiceActivator(inputChannel = "queueChannel",
                  poller = @Poller(fixedRate = "1000"))
public JobLaunchingGateway gatewayWithPoller(JobLauncher launcher) {
    JobLaunchingGateway gateway = new JobLaunchingGateway(launcher);
    gateway.setOutputChannel(replyChannel());
    return gateway;
}

PollableChannel(꺼내가는 방식의 채널) 에서 1초마다 한 번씩 polling 해서 JobLaunchingGateway 가 처리하는 구조예요.

IntegrationFlow — 표준 패턴

파일 도착부터 Job 실행까지 전체를 한 흐름에 담은 형태예요:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job personJob) {
    FileMessageToJobRequest transformer = new FileMessageToJobRequest();
    transformer.setFileParameterName("input.file.name");
    transformer.setJob(personJob);
    return transformer;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway(JobRepository repo) {
    TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
    launcher.setJobRepository(repo);
    launcher.setTaskExecutor(new SyncTaskExecutor());
    return new JobLaunchingGateway(launcher);
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway gateway,
                                        FileMessageToJobRequest transformer) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/tmp/myfiles"))
                .filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
        .transform(transformer)
        .handle(gateway)
        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
        .get();
}

이 코드가 그리는 흐름은 이래요:

1. Files.inboundAdapter — /tmp/myfiles 디렉토리 polling (1초 간격)
2. SimplePatternFileListFilter — *.csv 만 통과
3. transform — File → JobLaunchRequest 변환
4. handle(gateway) — JobLaunchingGateway 가 Job 실행
5. log — 결과 JobExecution log

결국 CSV(쉼표로 구분된 텍스트 데이터) 파일 도착 = Job 자동 실행 이라는 그림이에요.

Step-Scope Reader 와 결합

JobParameters 의 input.file.name 을 Reader 에서 받는 방식은 이래요:

@Bean
@StepScope
public FlatFileItemReader<Person> sampleReader(
        @Value("#{jobParameters['input.file.name']}") String resource) {
    return new FlatFileItemReaderBuilder<Person>()
        .name("sampleReader")
        .resource(new FileSystemResource(resource))
        // ...
        .build();
}

21편에서 다룬 Late Binding(실행 시점에 값 주입) 이 그대로 들어맞는 자리예요. 파일 경로를 JobParameters 로 동적으로 꽂아주는 거죠.

TaskExecutor 의 영향 — Sync vs Async

When using synchronous TaskExecutor, JobExecution response is returned only after the job completes. When using asynchronous TaskExecutor, the JobExecution instance is returned immediately. — 공식 reference

SyncTaskExecutor (default)

launcher.setTaskExecutor(new SyncTaskExecutor());

이 경우 작동은 이래요:

JobLaunchingGateway.handle(request)
  ↓
jobLauncher.run() → Job 전체 실행 (수 분~수 시간)
  ↓
JobExecution 반환 (Job 끝난 후)
  ↓
reply channel

Job 이 끝날 때까지 thread 가 block 돼요. 짧은 Job 이거나 결과가 즉시 필요한 상황에 맞아요.

Async TaskExecutor

launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());

여기서는 흐름이 이렇게 바뀌어요:

JobLaunchingGateway.handle(request)
  ↓
jobLauncher.run() → Job 시작만 (background thread)
  ↓
JobExecution 즉시 반환 (Job 진행 중)
  ↓
reply channel

Job 시작 직후에 바로 반환해요. 오래 걸리는 Job 이나 큐로 밀려 들어오는 처리에 권장돼요.

Async 시 status 추적

// reply channel 에서 받은 JobExecution
JobExecution execution = ...;
long instanceId = execution.getJobInstance().getInstanceId();

// JobExplorer 로 최신 상태 조회 (10편 advanced metadata)
JobInstance instance = jobExplorer.getJobInstance(instanceId);
List<JobExecution> recent = jobExplorer.getJobExecutions(instance);

비동기 Job 의 진행 상태는 JobExplorer 로 들여다봐요.

Informational Messages — 진행 상태 publish

42편의 결합점 3번이 정확히 여기 응용돼요. Job·Step·Chunk listener 를 Spring Integration channel 의 gateway 로 감싸는 방식이에요.

흐름

Step 진행 (chunk·step·job 단위 event)
  ↓
StepExecutionListener (Spring Batch)
  ↓ (MessagingGateway 가 변환)
Spring Integration Channel
  ↓
Router · Filter · 외부 시스템 endpoint

MessagingGateway 정의

@MessagingGateway(name = "notificationExecutionsListener",
                  defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {
}

@MessagingGateway 가 붙은 interface 가 StepExecutionListener 를 구현하면, Spring Integration 이 runtime 에 proxy(대리 객체) 를 자동으로 만들어줘요. listener 메서드를 호출하는 게 곧 메시지 송신이 되는 구조예요.

@IntegrationComponentScan annotation 도 컨피그에 같이 박아야 해요.

Channel + Adapter

@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("STEP_NOTIFICATION");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

stepExecutionsChannel 로 들어온 메시지는 log 로 흘러나가요.

Job 에 listener 등록

@Bean
public Job importPaymentsJob(JobRepository repo,
                              PlatformTransactionManager tx,
                              NotificationExecutionListener listener) {
    return new JobBuilder("importPayments", repo)
        .start(new StepBuilder("step1", repo)
            .chunk(200, tx)
            .listener(listener)             // ★ Step listener 등록
            .reader(...)
            .writer(...)
            .build())
        .build();
}

Job 이 돌면 각 step 의 beforeStep · afterStep 이 messaging channel 로 publish 돼요.

Router 로 분기

@Bean
public IntegrationFlow notificationFlow() {
    return IntegrationFlow
        .from("stepExecutionsChannel")
        .<StepExecution, ExitStatus>route(StepExecution::getExitStatus, m -> m
            .channelMapping(ExitStatus.FAILED, "failedChannel")
            .channelMapping(ExitStatus.COMPLETED, "completedChannel"))
        .get();
}

@Bean
@ServiceActivator(inputChannel = "failedChannel")
public MessageHandler emailFailureNotifier(MailSender sender) {
    return message -> {
        StepExecution exec = (StepExecution) message.getPayload();
        sender.send(buildFailureEmail(exec));
    };
}

ExitStatus.FAILED 면 이메일을 쏘고, COMPLETED 면 log 만 남겨요. 조건에 따라 길이 갈리는 분기죠.

운영 시나리오 종합 — 파일 도착부터 알림까지

[SFTP] 파일 업로드
  ↓
Spring Integration FileInboundAdapter
  ↓ (1초 polling)
  ↓ FileMessageToJobRequest @Transformer
JobLaunchRequest
  ↓
JobLaunchingGateway (async TaskExecutor)
  ↓
[Spring Batch] Job 실행
  ↓ (각 Step 실행 시)
StepExecutionListener (proxy)
  ↓
stepExecutionsChannel
  ↓
Router (ExitStatus 검사)
  ├─ FAILED → emailFailureChannel → 이메일 발송
  └─ COMPLETED → loggingChannel → log

이렇게 event-driven batch 자동화 가 한 줄로 완성돼요.

자주 만나는 사고

사고 1: JobParametersBuilder unique 누락

원인 — 같은 파일을 다시 처리할 때 JobInstance already exists 가 떠요.

해결addLong("run.id", System.currentTimeMillis()) 또는 addDate("scheduledAt", new Date()) 로 unique 값을 박아요.

사고 2: SyncTaskExecutor 의 polling blocking

원인 — sync 모드에서 Job 이 도는 동안 polling thread 가 같이 묶여버려요.

해결 — async TaskExecutor 로 갈아끼우고, Job 결과는 JobExplorer 로 따로 봐요.

사고 3: Async 결과 무시

원인 — Job 이 실패해도 JobExecution 은 즉시 돌아와서, 호출자가 완료됐다고 착각 해요.

해결 — 별도 JobExecutionListener 로 실패 알림을 쏘거나 JobExplorer 로 폴링해요.

사고 4: Reader 가 jobParameters 못 받음

원인@StepScope 가 빠졌거나 #{jobParameters['key']} 의 quote 가 빠졌어요.

해결 — Reader 에 @StepScope 를 붙이고 quote 도 정확히 맞춰요 (21편).

사고 5: 같은 파일 두 번 처리

원인 — IntegrationFlow 가 파일을 안 지워서 다음 polling 에 또 잡혀요.

해결.filter(new AcceptOnceFileListFilter<>()) 를 끼우거나, 처리 후 archive 디렉토리로 옮겨요 (TaskletStep).

사고 6: MessagingGateway interface 가 Spring bean 으로 인식 X

원인@IntegrationComponentScan 이 빠져있어요.

해결 — 컨피그 클래스에 annotation 을 추가해요.

사고 7: reply-channel 없으면 메시지 끊김

원인 — JobLaunchingGateway 의 outputChannel 이 안 잡혀있고 downstream 도 없는 경우예요.

해결setOutputChannel() 을 쓰거나, IntegrationFlow 에서 .handle(gateway).log() 처럼 후속 endpoint 를 붙여요.

운영 권장 패턴

Pattern 1: 표준 파일 도착 trigger

@Bean
public IntegrationFlow fileLaunchFlow(JobLauncher launcher, Job dailyJob) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/data/incoming"))
                .filter(new ChainFileListFilter<>(List.of(
                    new SimplePatternFileListFilter("*.csv"),
                    new AcceptOnceFileListFilter<>()
                ))),
            c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(30))))
        .<File, JobLaunchRequest>transform(file -> new JobLaunchRequest(
            dailyJob,
            new JobParametersBuilder()
                .addString("input.file", file.getAbsolutePath())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters()
        ))
        .handle(new JobLaunchingMessageHandler(launcher))
        .get();
}

AcceptOnceFileListFilter 가 같은 파일이 두 번 잡히는 걸 막아줘요. 여기서는 Gateway 를 빼고 JobLaunchingMessageHandler 를 직접 써서 단순화했어요.

Pattern 2: Async + Status Tracking

@Bean
public JobLauncher asyncJobLauncher(JobRepository repo) {
    TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
    launcher.setJobRepository(repo);
    launcher.setTaskExecutor(new SimpleAsyncTaskExecutor("batch-"));
    return launcher;
}

@Bean
public JobExecutionListener completionTracker(SlackClient slack) {
    return new JobExecutionListener() {
        @Override
        public void afterJob(JobExecution execution) {
            slack.send("Job " + execution.getJobInstance().getJobName() +
                " finished: " + execution.getExitStatus().getExitCode());
        }
    };
}

Async 로 띄우고 listener 로 완료 알림을 받는 패턴이에요.

Pattern 3: SFTP + 처리 후 archive

@Bean
public IntegrationFlow sftpFlow(JobLauncher launcher, Job processJob) {
    return IntegrationFlow
        .from(Sftp.inboundAdapter(sftpSessionFactory)
            .remoteDirectory("/incoming")
            .localDirectory(new File("/data/working"))
            .deleteRemoteFiles(true)             // 다운로드 후 SFTP 에서 삭제
            .preserveTimestamp(true))
        .transform(File.class, file -> new JobLaunchRequest(processJob,
            new JobParametersBuilder()
                .addString("input.file", file.getAbsolutePath())
                .addLong("run.id", System.currentTimeMillis())
                .toJobParameters()))
        .handle(new JobLaunchingMessageHandler(launcher))
        .get();
}

SFTP 디렉토리를 polling 하면서 local 로 가져오고, Job 을 돌린 뒤 remote 에서 지우는 구성이에요.

Pattern 4: Informational + Conditional Notification

@MessagingGateway(name = "stepListener",
                  defaultRequestChannel = "stepEventsChannel")
public interface StepEventGateway extends StepExecutionListener {}

@Bean
public IntegrationFlow stepEventFlow() {
    return IntegrationFlow.from("stepEventsChannel")
        .<StepExecution>filter(exec ->
            exec.getStatus() == BatchStatus.FAILED ||
            exec.getSkipCount() > 100)
        .handle(message -> {
            StepExecution exec = (StepExecution) message.getPayload();
            log.warn("Significant event: {} - skip={}",
                exec.getStepName(), exec.getSkipCount());
            slack.alert(exec);
        })
        .get();
}

FAILED 거나 skip 이 100 을 넘는 경우에만 Slack 알림을 보내요. 잡음을 줄여주는 방식이에요.

Pattern 5: 다중 source 통합

@Bean
public IntegrationFlow multiSourceFlow(JobLauncher launcher, Job job) {
    return IntegrationFlow
        .from(Files.inboundAdapter(new File("/data/local")))
        .channel("requestChannel")
        .get();
}

@Bean
public IntegrationFlow sftpSource(SessionFactory<?> sftp) {
    return IntegrationFlow
        .from(Sftp.inboundAdapter(sftp).remoteDirectory("/incoming"))
        .channel("requestChannel")
        .get();
}

@Bean
public IntegrationFlow kafkaSource(ConsumerFactory<String, String> consumer) {
    return IntegrationFlow
        .from(Kafka.messageDrivenChannelAdapter(consumer, "batch-trigger-topic"))
        .channel("requestChannel")
        .get();
}

@Bean
public IntegrationFlow launchFlow(JobLauncher launcher, Job job) {
    return IntegrationFlow.from("requestChannel")
        .transform(/* 모든 source 를 JobLaunchRequest 로 통합 */)
        .handle(new JobLaunchingMessageHandler(launcher))
        .get();
}

여러 source (file · SFTP · Kafka) 를 단일 launchFlow 로 모아서 Job 실행을 한 군데서 처리해요.

시험 직전 한 번 더 — Launching via Messages 함정 압축 노트

  • Job 실행 4가지 = CommandLine · JobOperator · @Scheduled · JobLaunchingMessageHandler (event-driven)
  • 메시지 기반 = SFTP·파일·Kafka·webhook trigger
  • JobLaunchRequest = Job + JobParameters wrapper, 메시지 payload
  • JobLaunchingMessageHandler = Message → JobLauncher.run()
  • JobLaunchingGateway = Spring Integration Gateway 버전, configurable
  • Gateway attributes = id · auto-startup · request-channel · reply-channel · reply-timeout · job-launcher · order
  • reply-timeout default = -1 (무한 wait)
  • PollableChannel 사용 = @Poller 또는 global default poller
  • @Transformer = File → JobLaunchRequest 변환 메서드
  • JobParametersBuilder + run.id (timestamp) = unique parameter (재실행 보장)
  • TaskExecutor 영향:
  • SyncTaskExecutor = Job 완료 후 JobExecution 반환 (blocking)
  • Async = 즉시 반환 (background thread)
  • Async + JobExplorer = 상태 추적
  • IntegrationFlow 표준 = Files.inboundAdapter → filter → transform → handle(gateway) → log
  • AcceptOnceFileListFilter = 중복 처리 방지
  • Reader 측 = @StepScope + @Value("#{jobParameters['input.file.name']}")
  • Informational Messages = StepListener · ChunkListener · JobExecutionListener 를 messaging 으로
  • @MessagingGateway interface + extends StepExecutionListener = runtime proxy
  • @IntegrationComponentScan 필요
  • channel adapter (LoggingHandler · MailSender · slack) 가 endpoint
  • Router 로 ExitStatus 분기 (FAILED → email, COMPLETED → log)
  • 함정 — JobParameters unique 누락 → JobInstance already exists
  • 함정 — Sync 모드 polling blocking
  • 함정 — Async 결과 무시 → JobExplorer 폴링 또는 listener
  • 함정 — Reader @StepScope 누락
  • 함정 — 같은 파일 중복 처리 → AcceptOnceFileListFilter
  • 함정 — @IntegrationComponentScan 누락 → MessagingGateway 미작동
  • 함정 — reply-channel 없으면 메시지 끊김
  • 패턴 — 표준 파일 도착 trigger (AcceptOnce + JobLaunchingMessageHandler)
  • 패턴 — Async + JobExecutionListener notification
  • 패턴 — SFTP + 처리 후 archive
  • 패턴 — Conditional notification (FAILED 또는 skip > N)
  • 패턴 — 다중 source 통합 (File + SFTP + Kafka → 단일 channel)
  • event-driven batch 자동화 = SFTP → Job 실행 → 진행 알림 → 완료 처리 한 flow

공식 문서: Launching Batch Jobs through Messages · Job-Launching Gateway Attributes · Informational Messages 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!