Spring Batch 마스터 노트 시리즈 6편. 순차·조건부·동적 흐름의 5가지 흐름 제어, ABANDONED vs FAILED, FlowBuilder로 재사용 Flow, Nested Job·Parallel Steps, 6가지 리스너(Job/Step/Chunk/Read/Process/Write), ExecutionContext의 Job vs Step 스코프, ExecutionContextPromotionListener까지 한 흐름으로.
이 글은 Spring Batch 마스터 노트 시리즈의 여섯 번째 편입니다. 5편(Writer)까지 데이터 처리 패턴을 잡았다면, 이번엔 Job 차원의 흐름 제어와 모니터링.
순차·조건부·병렬 흐름을 어떻게 짤지, 6가지 리스너로 어디서 무엇을 할지, ExecutionContext로 Step 간 데이터를 어떻게 공유할지 — 운영 수준의 배치를 만드는 데 필수인 도구들입니다.
처음 Job Flow가 어렵게 느껴지는 이유
이유는 두 가지예요.
첫째, 흐름 제어 방법이 7가지(순차·조건부·커스텀 ExitStatus·Decider·FlowBuilder·Nested·Parallel)예요. 각자 적합한 자리가 다른데 처음엔 어느 자리에 어떤 도구를 써야 하는지 안 보입니다.
둘째, 리스너가 6종류예요. JobExecutionListener·StepExecutionListener·ChunkListener·ItemReadListener·ItemProcessListener·ItemWriteListener. 모두 비슷한 패턴(beforeXxx·afterXxx·onXxxError)이지만 어디에 어떤 리스너를 박을지가 첫 단계 막힘.
해결법은 한 가지예요. 흐름은 그래프로 그리고, 리스너는 "어떤 단위에서 무슨 정보가 필요한가" 한 질문으로 선택하세요. Job 단위 = JobExecutionListener. Step 단위 = StepExecutionListener. 청크 단위 = ChunkListener. 단순.
흐름 제어 7가지 비교
| 방법 | 자리 | 핵심 |
|---|---|---|
| 순차 | 항상 같은 순서 | start/next |
| 조건부 | Exit Status 기반 분기 | on/to/from |
| 커스텀 ExitStatus | Step 로직으로 흐름 결정 | StepExecutionListener.afterStep() |
| JobExecutionDecider | 런타임 데이터 기반 동적 | JobExecutionDecider |
| FlowBuilder | 재사용 가능한 Flow | FlowBuilder<SimpleFlow> |
| Nested Job | 독립 Job 조합 | StepBuilder.job() |
| Parallel Step | 동시 병렬 실행 | split() + SimpleAsyncTaskExecutor |
2편에서 첫 4가지를 다뤘으니, 이번엔 FlowBuilder·Nested·Parallel과 리스너·ExecutionContext에 집중.
순차 흐름 (다시)
@Bean
public Job sequentialJob() {
return jobBuilderFactory.get("sequentialJob")
.start(step1())
.next(step2())
.next(step3())
.next(step4())
.build();
}
이전 Step COMPLETED → 다음 실행. 어느 Step이라도 FAILED → Job 즉시 실패.
조건부 흐름 — on/to/from
@Bean
public Job conditionalJob() {
return jobBuilderFactory.get("conditionalJob")
.start(dataValidationStep())
.on("COMPLETED").to(dataProcessingStep())
.on("FAILED").to(errorNotificationStep())
.from(dataProcessingStep())
.on("COMPLETED").to(reportStep())
.on("FAILED").fail()
.from(errorNotificationStep())
.on("*").end()
.from(reportStep())
.on("*").end()
.end()
.build();
}
패턴 매칭:
"COMPLETED"정확"*"모든 (와일드카드)"CUSTOM_*"접두사"?OMPLETED"단일 문자
ABANDONED vs FAILED — 재시작 동작
| 상태 | 재시작 시 |
|---|---|
| FAILED | 해당 Step 다시 실행 |
| ABANDONED | 해당 Step 건너뛰고 다음 Step부터 |
FAILED 상태의 Step B:
실행: A → B(FAILED) → 재시작: A → B → C (B 다시)
ABANDONED 상태의 Step B:
실행: A → B(ABANDONED) → 재시작: A → B 건너뜀 → C
커스텀 ExitStatus
@Component
public class ProductCountStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Step 시작: " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long writeCount = stepExecution.getWriteCount();
if (writeCount == 0) {
return new ExitStatus("NO_DATA");
} else if (writeCount < 10) {
return new ExitStatus("FEW_DATA");
} else {
return ExitStatus.COMPLETED;
}
}
}
Job Flow에서 활용:
.start(dataStep())
.on("NO_DATA").end()
.on("FEW_DATA").to(alertStep())
.on("COMPLETED").to(reportStep())
JobExecutionDecider — 동적 흐름
@Component
public class DataSourceDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String dataSource = jobExecution.getExecutionContext()
.getString("dataSource", "UNKNOWN");
switch (dataSource) {
case "CSV":
return new FlowExecutionStatus("CSV_SOURCE");
case "DB":
return new FlowExecutionStatus("DB_SOURCE");
case "API":
return new FlowExecutionStatus("API_SOURCE");
default:
return new FlowExecutionStatus("UNKNOWN_SOURCE");
}
}
}
@Bean
public Job decisionJob() {
return jobBuilderFactory.get("decisionJob")
.start(initStep())
.next(dataSourceDecider())
.on("CSV_SOURCE").to(csvProcessStep())
.on("DB_SOURCE").to(dbProcessStep())
.on("API_SOURCE").to(apiProcessStep())
.on("UNKNOWN_SOURCE").fail()
.from(csvProcessStep())
.on("*").end()
.from(dbProcessStep())
.on("*").end()
.from(apiProcessStep())
.on("*").end()
.end()
.build();
}
ExecutionContext의 값을 읽어 동적 분기. StepExecutionListener보다 더 유연.
FlowBuilder — 재사용 가능한 Flow
@Bean
public Flow validationFlow() {
return new FlowBuilder<SimpleFlow>("validationFlow")
.start(schemaValidationStep())
.next(businessValidationStep())
.next(duplicateCheckStep())
.end();
}
@Bean
public Job jobA() {
return jobBuilderFactory.get("jobA")
.start(validationFlow())
.next(processStepA())
.end()
.build();
}
@Bean
public Job jobB() {
return jobBuilderFactory.get("jobB")
.start(validationFlow()) // 동일 Flow 재사용
.next(processStepB())
.end()
.build();
}
같은 검증 단계를 여러 Job에서 재사용 — DRY 원칙.
Nested Job — 큰 작업을 작은 Job으로 분리
@Bean
public Job childJobA() {
return jobBuilderFactory.get("childJobA")
.start(childStepA1())
.next(childStepA2())
.build();
}
@Bean
public Job childJobB() {
return jobBuilderFactory.get("childJobB")
.start(childStepB1())
.build();
}
@Bean
public Step jobStepA() {
return stepBuilderFactory.get("jobStepA")
.job(childJobA())
.parametersExtractor(new DefaultJobParametersExtractor())
.build();
}
@Bean
public Step jobStepB() {
return stepBuilderFactory.get("jobStepB")
.job(childJobB())
.parametersExtractor(new DefaultJobParametersExtractor())
.build();
}
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(jobStepA())
.next(jobStepB())
.build();
}
각 자식 Job은 독립 JobInstance/JobExecution. 자체 재시작 가능.
Parallel Steps — 병렬 실행
@Bean
public Job parallelJob() {
return jobBuilderFactory.get("parallelJob")
.start(parallelFlow())
.end()
.build();
}
@Bean
public Flow parallelFlow() {
return new FlowBuilder<SimpleFlow>("parallelFlow")
.split(new SimpleAsyncTaskExecutor())
.add(
customerProcessFlow(),
orderProcessFlow(),
inventoryProcessFlow()
)
.build();
}
@Bean
public Flow customerProcessFlow() {
return new FlowBuilder<SimpleFlow>("customerProcessFlow")
.start(customerStep())
.build();
}
// orderProcessFlow, inventoryProcessFlow도 동일 패턴
여기서 정말 중요한 시험 함정 — 병렬 Step에서 JdbcCursorItemReader 절대 사용 금지입니다. thread-safe 아니라서 데이터 누락·중복 발생. 반드시 JdbcPagingItemReader 사용.
6가지 리스너 — 실행 시점
| 리스너 | 시점 |
|---|---|
JobExecutionListener |
Job 시작 전/후 |
StepExecutionListener |
Step 시작 전/후 |
ChunkListener |
청크 전/후, 오류 시 |
ItemReadListener |
아이템 읽기 전/후, 오류 |
ItemProcessListener |
처리 전/후, 오류 |
ItemWriteListener |
쓰기 전/후, 오류 |
JobExecutionListener — Job 단위
@Component
public class ProductJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("=== Job 시작 ===");
System.out.println("Job ID: " + jobExecution.getJobId());
System.out.println("Job 이름: " + jobExecution.getJobInstance().getJobName());
System.out.println("시작 시간: " + jobExecution.getCreateTime());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("=== Job 종료 ===");
System.out.println("종료 상태: " + jobExecution.getStatus());
System.out.println("종료 시간: " + jobExecution.getEndTime());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
System.out.println("Job 성공 완료");
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
System.out.println("Job 실패");
jobExecution.getFailureExceptions().forEach(System.out::println);
}
}
}
활용 — DB 연결 확인, 알림 발송, 실행 이력 기록.
StepExecutionListener — Step 단위
@Component
public class ProductStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Step 시작: " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step 완료: " + stepExecution.getStepName());
System.out.println("읽기: " + stepExecution.getReadCount());
System.out.println("처리: " + stepExecution.getWriteCount());
System.out.println("스킵: " + stepExecution.getSkipCount());
System.out.println("필터링: " + stepExecution.getFilterCount());
return stepExecution.getExitStatus();
}
}
커스텀 ExitStatus 반환 자리이기도 함.
ChunkListener — 청크 단위
@Component
public class ProductChunkListener implements ChunkListener {
private int chunkCount = 0;
@Override
public void beforeChunk(ChunkContext context) {
chunkCount++;
System.out.println("청크 " + chunkCount + " 시작");
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("청크 " + chunkCount + " 완료");
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("청크 " + chunkCount + " 오류");
}
}
성능 측정·실시간 모니터링에 유용.
ItemReadListener / ProcessListener / WriteListener
@Component
public class ProductReadListener implements ItemReadListener<Product> {
@Override
public void beforeRead() {}
@Override
public void afterRead(Product item) {
System.out.println("읽기 완료: " + item.getProductName());
}
@Override
public void onReadError(Exception ex) {
System.err.println("읽기 오류: " + ex.getMessage());
}
}
@Component
public class ProductProcessListener implements ItemProcessListener<Product, Product> {
@Override
public void beforeProcess(Product item) {
System.out.println("처리 시작: " + item.getProductName());
}
@Override
public void afterProcess(Product item, Product result) {
if (result == null) {
System.out.println("필터링됨: " + item.getProductName());
} else {
System.out.println("처리 완료: " + result.getProductName());
}
}
@Override
public void onProcessError(Product item, Exception e) {
System.err.println("처리 오류 — 상품: " + item.getProductName() + ", 오류: " + e.getMessage());
}
}
@Component
public class ProductWriteListener implements ItemWriteListener<Product> {
@Override
public void beforeWrite(List<? extends Product> items) {
System.out.println("쓰기 시작: " + items.size() + "개");
}
@Override
public void afterWrite(List<? extends Product> items) {
System.out.println("쓰기 완료: " + items.size() + "개");
}
@Override
public void onWriteError(Exception exception, List<? extends Product> items) {
System.err.println("쓰기 오류: " + exception.getMessage());
items.forEach(item -> System.err.println("실패: " + item));
}
}
리스너 등록
@Bean
public Step productStep() {
return stepBuilderFactory.get("productStep")
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(productStepListener())
.listener(productChunkListener())
.listener(productReadListener())
.listener(productProcessListener())
.listener(productWriteListener())
.build();
}
@Bean
public Job productJob() {
return jobBuilderFactory.get("productJob")
.listener(productJobListener())
.start(productStep())
.build();
}
여기서 시험 함정이 하나 있어요. 동일 타입 리스너 여러 개 등록 시 실행 순서 = 등록 순서. beforeXxx()는 등록 순, afterXxx()는 역순 (AOP 인터셉터 스택 방식).
ExecutionContext — 데이터 공유
두 가지 스코프
| 스코프 | 범위 | 지속성 |
|---|---|---|
| Job ExecutionContext | Job 전체 | 모든 Step 접근 |
| Step ExecutionContext | 해당 Step 내 | Step 종료 시 사라짐 |
DB에 직렬화되어 저장 — 재시작 시 데이터 유지.
Job ExecutionContext
// Step 1 — 저장
.tasklet((contribution, chunkContext) -> {
ExecutionContext jobContext = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
jobContext.put("processDate", "2026-01-01");
jobContext.put("totalCount", 1000);
jobContext.putLong("startTimestamp", System.currentTimeMillis());
return RepeatStatus.FINISHED;
})
// Step 2 — 읽기
.tasklet((contribution, chunkContext) -> {
ExecutionContext jobContext = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
String processDate = jobContext.getString("processDate");
int totalCount = jobContext.getInt("totalCount");
System.out.println("처리 날짜: " + processDate);
System.out.println("총 건수: " + totalCount);
return RepeatStatus.FINISHED;
})
Step ExecutionContext
.tasklet((contribution, chunkContext) -> {
ExecutionContext stepContext = chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext();
stepContext.put("stepKey1", "Step 로컬 데이터");
stepContext.putInt("processedCount", 0);
return RepeatStatus.FINISHED;
})
ExecutionContextPromotionListener — Step → Job 승격
Step ExecutionContext의 데이터를 Job ExecutionContext로 복사. Step 내 생성 데이터를 다음 Step에서 사용할 때.
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
// 승격할 키 지정
listener.setKeys(new String[]{"processDate", "recordCount", "sourceSystem"});
// true: 키 없으면 예외 / false: 무시
listener.setStrict(true);
return listener;
}
// Step 1 — Step Context 저장 + 승격 리스너
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
ExecutionContext stepCtx = chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext();
stepCtx.put("processDate", "2026-01-01");
stepCtx.put("recordCount", "500");
stepCtx.put("sourceSystem", "CRM");
return RepeatStatus.FINISHED;
})
.listener(promotionListener())
.build();
}
// Step 2 — Job Context에서 읽기
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
ExecutionContext jobCtx = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
String processDate = jobCtx.getString("processDate");
String recordCount = jobCtx.getString("recordCount");
System.out.println("처리 날짜: " + processDate);
System.out.println("레코드 수: " + recordCount);
return RepeatStatus.FINISHED;
})
.build();
}
여기서 정말 중요한 시험 함정 — PromotionListener는 Step 완료 직후 실행입니다. Step 실패하면 리스너 안 돌아요. setStatuses()로 어떤 종료 상태에서 승격할지 지정 가능. 기본적으로 COMPLETED일 때만 승격되니, 실패해도 데이터를 넘기고 싶으면 명시 설정 필요.
일반적 실수와 주의사항
.end() 호출 위치
조건부 흐름 끝의 from/on/to 체인이 아닌 build() 직전에 .end(). 빠뜨리면 런타임 예외.
병렬 처리에서 JdbcCursor 금지
thread-safe X. JdbcPaging 사용.
ExecutionContext 직렬화 가능 타입
Serializable 구현 필요. Java 기본 타입(String/Integer/Long)은 자동. 커스텀 객체는 명시.
PromotionListener 타이밍
Step 실패 시 리스너 실행 X. setStatuses()로 조정.
리스너 등록 순서
beforeXxx 등록 순, afterXxx 역순.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- 흐름 제어 7가지 — 순차/조건부/커스텀 ExitStatus/Decider/FlowBuilder/Nested/Parallel
- 패턴 매칭 —
*와일드카드,CUSTOM_*접두사 .end()위치 — build() 직전 (빠뜨리면 런타임 예외)- ABANDONED = 재시작 시 건너뜀 (FAILED는 재실행)
- 커스텀 ExitStatus =
StepExecutionListener.afterStep()반환 - JobExecutionDecider = 런타임 데이터 기반 동적 흐름
- ExecutionContext 값 읽어 분기
- FlowBuilder = 재사용 가능 Flow (DRY)
- Nested Job = JobStep으로 자식 Job 실행 (독립 인스턴스)
- Parallel Steps =
split()+SimpleAsyncTaskExecutor - 병렬 처리 시 JdbcCursor 금지 — JdbcPaging 사용
- 6가지 리스너 — Job/Step/Chunk/Read/Process/Write
- 등록 순서 =
beforeXxx등록 순,afterXxx역순 - ChunkListener = 청크 모니터링·성능 측정
- ProcessListener — null 반환 = 필터링됨 확인 가능
- WriteListener —
onWriteError에서 실패 아이템 로깅 - ExecutionContext 두 스코프 — Job (전체) vs Step (해당)
- 직렬화로 DB 저장 — 재시작 시 데이터 유지
Serializable구현 필수- PromotionListener = Step ExecutionContext → Job 승격
setKeys(String[])승격 키 /setStrict(true)키 없으면 예외- 기본 COMPLETED 시만 승격 — 실패 시 X
- 같은 데이터를 여러 Step에서 쓰려면 PromotionListener
시리즈 다른 편
- 1편 — Spring Batch 입문 (Job·Step·Chunk 모델)
- 2편 — Spring Batch Job 설정 (Tasklet과 Chunk Step)
- 3편 — 청크 처리 (Reader·Processor·Writer 패턴)
- 4편 — ItemReader 마스터 (CSV·JdbcCursor·Paging)
- 5편 — ItemWriter 마스터 (FlatFile·JdbcBatch·Composite)
- 6편 — Job Flow와 리스너 (현재 글)
- 7편 — 오류 처리 (Skip·Retry·SkipPolicy)
- 8편 — Spring Batch 5 마이그레이션
공식 문서: Spring Batch Domain Language에서 ExecutionContext 동작을 더 자세히 볼 수 있어요.
다음 글(7편)에서는 Skip·Retry·Restart 세 오류 처리 메커니즘 — faultTolerant() 시작점, SkipPolicy 커스텀, FlatFileParseException에서 원본 데이터 추출, Retry 청크 재처리 동작까지 풀어 갑니다.