Spring Batch Job Flow — 조건부·병렬·리스너

2026-05-03확률과 통계 마스터 노트

Spring Batch 마스터 노트 시리즈 6편. 순차·조건부·동적 흐름의 5가지 흐름 제어, ABANDONED vs FAILED, FlowBuilder로 재사용 Flow, Nested Job·Parallel Steps, 6가지 리스너(Job/Step/Chunk/Read/Process/Write), ExecutionContext의 Job vs Step 스코프, ExecutionContextPromotionListener까지 한 흐름으로.

이 글은 Spring Batch 마스터 노트 시리즈의 여섯 번째 편입니다. 5편(Writer)까지 데이터 처리 패턴을 잡았다면, 이번엔 Job 차원의 흐름 제어와 모니터링.

순차·조건부·병렬 흐름을 어떻게 짤지, 6가지 리스너로 어디서 무엇을 할지, ExecutionContext로 Step 간 데이터를 어떻게 공유할지 — 운영 수준의 배치를 만드는 데 필수인 도구들입니다.

처음 Job Flow가 어렵게 느껴지는 이유

이유는 두 가지예요.

첫째, 흐름 제어 방법이 7가지(순차·조건부·커스텀 ExitStatus·Decider·FlowBuilder·Nested·Parallel)예요. 각자 적합한 자리가 다른데 처음엔 어느 자리에 어떤 도구를 써야 하는지 안 보입니다.

둘째, 리스너가 6종류예요. JobExecutionListener·StepExecutionListener·ChunkListener·ItemReadListener·ItemProcessListener·ItemWriteListener. 모두 비슷한 패턴(beforeXxx·afterXxx·onXxxError)이지만 어디에 어떤 리스너를 박을지가 첫 단계 막힘.

해결법은 한 가지예요. 흐름은 그래프로 그리고, 리스너는 "어떤 단위에서 무슨 정보가 필요한가" 한 질문으로 선택하세요. Job 단위 = JobExecutionListener. Step 단위 = StepExecutionListener. 청크 단위 = ChunkListener. 단순.

흐름 제어 7가지 비교

방법 자리 핵심
순차 항상 같은 순서 start/next
조건부 Exit Status 기반 분기 on/to/from
커스텀 ExitStatus Step 로직으로 흐름 결정 StepExecutionListener.afterStep()
JobExecutionDecider 런타임 데이터 기반 동적 JobExecutionDecider
FlowBuilder 재사용 가능한 Flow FlowBuilder<SimpleFlow>
Nested Job 독립 Job 조합 StepBuilder.job()
Parallel Step 동시 병렬 실행 split() + SimpleAsyncTaskExecutor

2편에서 첫 4가지를 다뤘으니, 이번엔 FlowBuilder·Nested·Parallel과 리스너·ExecutionContext에 집중.

순차 흐름 (다시)

@Bean
public Job sequentialJob() {
    return jobBuilderFactory.get("sequentialJob")
        .start(step1())
        .next(step2())
        .next(step3())
        .next(step4())
        .build();
}

이전 Step COMPLETED → 다음 실행. 어느 Step이라도 FAILED → Job 즉시 실패.

조건부 흐름 — on/to/from

@Bean
public Job conditionalJob() {
    return jobBuilderFactory.get("conditionalJob")
        .start(dataValidationStep())
            .on("COMPLETED").to(dataProcessingStep())
            .on("FAILED").to(errorNotificationStep())
        .from(dataProcessingStep())
            .on("COMPLETED").to(reportStep())
            .on("FAILED").fail()
        .from(errorNotificationStep())
            .on("*").end()
        .from(reportStep())
            .on("*").end()
        .end()
        .build();
}

패턴 매칭:

  • "COMPLETED" 정확
  • "*" 모든 (와일드카드)
  • "CUSTOM_*" 접두사
  • "?OMPLETED" 단일 문자

ABANDONED vs FAILED — 재시작 동작

상태 재시작 시
FAILED 해당 Step 다시 실행
ABANDONED 해당 Step 건너뛰고 다음 Step부터
FAILED 상태의 Step B:
실행: A → B(FAILED) → 재시작: A → B → C (B 다시)

ABANDONED 상태의 Step B:
실행: A → B(ABANDONED) → 재시작: A → B 건너뜀 → C

커스텀 ExitStatus

@Component
public class ProductCountStepListener implements StepExecutionListener {
    
    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("Step 시작: " + stepExecution.getStepName());
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        long writeCount = stepExecution.getWriteCount();
        
        if (writeCount == 0) {
            return new ExitStatus("NO_DATA");
        } else if (writeCount < 10) {
            return new ExitStatus("FEW_DATA");
        } else {
            return ExitStatus.COMPLETED;
        }
    }
}

Job Flow에서 활용:

.start(dataStep())
    .on("NO_DATA").end()
    .on("FEW_DATA").to(alertStep())
    .on("COMPLETED").to(reportStep())

JobExecutionDecider — 동적 흐름

@Component
public class DataSourceDecider implements JobExecutionDecider {
    
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String dataSource = jobExecution.getExecutionContext()
                                        .getString("dataSource", "UNKNOWN");
        
        switch (dataSource) {
            case "CSV":
                return new FlowExecutionStatus("CSV_SOURCE");
            case "DB":
                return new FlowExecutionStatus("DB_SOURCE");
            case "API":
                return new FlowExecutionStatus("API_SOURCE");
            default:
                return new FlowExecutionStatus("UNKNOWN_SOURCE");
        }
    }
}
@Bean
public Job decisionJob() {
    return jobBuilderFactory.get("decisionJob")
        .start(initStep())
        .next(dataSourceDecider())
            .on("CSV_SOURCE").to(csvProcessStep())
            .on("DB_SOURCE").to(dbProcessStep())
            .on("API_SOURCE").to(apiProcessStep())
            .on("UNKNOWN_SOURCE").fail()
        .from(csvProcessStep())
            .on("*").end()
        .from(dbProcessStep())
            .on("*").end()
        .from(apiProcessStep())
            .on("*").end()
        .end()
        .build();
}

ExecutionContext의 값을 읽어 동적 분기. StepExecutionListener보다 더 유연.

FlowBuilder — 재사용 가능한 Flow

@Bean
public Flow validationFlow() {
    return new FlowBuilder<SimpleFlow>("validationFlow")
        .start(schemaValidationStep())
        .next(businessValidationStep())
        .next(duplicateCheckStep())
        .end();
}

@Bean
public Job jobA() {
    return jobBuilderFactory.get("jobA")
        .start(validationFlow())
        .next(processStepA())
        .end()
        .build();
}

@Bean
public Job jobB() {
    return jobBuilderFactory.get("jobB")
        .start(validationFlow())  // 동일 Flow 재사용
        .next(processStepB())
        .end()
        .build();
}

같은 검증 단계를 여러 Job에서 재사용 — DRY 원칙.

Nested Job — 큰 작업을 작은 Job으로 분리

@Bean
public Job childJobA() {
    return jobBuilderFactory.get("childJobA")
        .start(childStepA1())
        .next(childStepA2())
        .build();
}

@Bean
public Job childJobB() {
    return jobBuilderFactory.get("childJobB")
        .start(childStepB1())
        .build();
}

@Bean
public Step jobStepA() {
    return stepBuilderFactory.get("jobStepA")
        .job(childJobA())
        .parametersExtractor(new DefaultJobParametersExtractor())
        .build();
}

@Bean
public Step jobStepB() {
    return stepBuilderFactory.get("jobStepB")
        .job(childJobB())
        .parametersExtractor(new DefaultJobParametersExtractor())
        .build();
}

@Bean
public Job parentJob() {
    return jobBuilderFactory.get("parentJob")
        .start(jobStepA())
        .next(jobStepB())
        .build();
}

각 자식 Job은 독립 JobInstance/JobExecution. 자체 재시작 가능.

Parallel Steps — 병렬 실행

@Bean
public Job parallelJob() {
    return jobBuilderFactory.get("parallelJob")
        .start(parallelFlow())
        .end()
        .build();
}

@Bean
public Flow parallelFlow() {
    return new FlowBuilder<SimpleFlow>("parallelFlow")
        .split(new SimpleAsyncTaskExecutor())
        .add(
            customerProcessFlow(),
            orderProcessFlow(),
            inventoryProcessFlow()
        )
        .build();
}

@Bean
public Flow customerProcessFlow() {
    return new FlowBuilder<SimpleFlow>("customerProcessFlow")
        .start(customerStep())
        .build();
}

// orderProcessFlow, inventoryProcessFlow도 동일 패턴

여기서 정말 중요한 시험 함정 — 병렬 Step에서 JdbcCursorItemReader 절대 사용 금지입니다. thread-safe 아니라서 데이터 누락·중복 발생. 반드시 JdbcPagingItemReader 사용.

6가지 리스너 — 실행 시점

리스너 시점
JobExecutionListener Job 시작 전/후
StepExecutionListener Step 시작 전/후
ChunkListener 청크 전/후, 오류 시
ItemReadListener 아이템 읽기 전/후, 오류
ItemProcessListener 처리 전/후, 오류
ItemWriteListener 쓰기 전/후, 오류

JobExecutionListener — Job 단위

@Component
public class ProductJobListener implements JobExecutionListener {
    
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("=== Job 시작 ===");
        System.out.println("Job ID: " + jobExecution.getJobId());
        System.out.println("Job 이름: " + jobExecution.getJobInstance().getJobName());
        System.out.println("시작 시간: " + jobExecution.getCreateTime());
    }
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("=== Job 종료 ===");
        System.out.println("종료 상태: " + jobExecution.getStatus());
        System.out.println("종료 시간: " + jobExecution.getEndTime());
        
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("Job 성공 완료");
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            System.out.println("Job 실패");
            jobExecution.getFailureExceptions().forEach(System.out::println);
        }
    }
}

활용 — DB 연결 확인, 알림 발송, 실행 이력 기록.

StepExecutionListener — Step 단위

@Component
public class ProductStepListener implements StepExecutionListener {
    
    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("Step 시작: " + stepExecution.getStepName());
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("Step 완료: " + stepExecution.getStepName());
        System.out.println("읽기: " + stepExecution.getReadCount());
        System.out.println("처리: " + stepExecution.getWriteCount());
        System.out.println("스킵: " + stepExecution.getSkipCount());
        System.out.println("필터링: " + stepExecution.getFilterCount());
        return stepExecution.getExitStatus();
    }
}

커스텀 ExitStatus 반환 자리이기도 함.

ChunkListener — 청크 단위

@Component
public class ProductChunkListener implements ChunkListener {
    
    private int chunkCount = 0;
    
    @Override
    public void beforeChunk(ChunkContext context) {
        chunkCount++;
        System.out.println("청크 " + chunkCount + " 시작");
    }
    
    @Override
    public void afterChunk(ChunkContext context) {
        System.out.println("청크 " + chunkCount + " 완료");
    }
    
    @Override
    public void afterChunkError(ChunkContext context) {
        System.out.println("청크 " + chunkCount + " 오류");
    }
}

성능 측정·실시간 모니터링에 유용.

ItemReadListener / ProcessListener / WriteListener

@Component
public class ProductReadListener implements ItemReadListener<Product> {
    
    @Override
    public void beforeRead() {}
    
    @Override
    public void afterRead(Product item) {
        System.out.println("읽기 완료: " + item.getProductName());
    }
    
    @Override
    public void onReadError(Exception ex) {
        System.err.println("읽기 오류: " + ex.getMessage());
    }
}

@Component
public class ProductProcessListener implements ItemProcessListener<Product, Product> {
    
    @Override
    public void beforeProcess(Product item) {
        System.out.println("처리 시작: " + item.getProductName());
    }
    
    @Override
    public void afterProcess(Product item, Product result) {
        if (result == null) {
            System.out.println("필터링됨: " + item.getProductName());
        } else {
            System.out.println("처리 완료: " + result.getProductName());
        }
    }
    
    @Override
    public void onProcessError(Product item, Exception e) {
        System.err.println("처리 오류 — 상품: " + item.getProductName() + ", 오류: " + e.getMessage());
    }
}

@Component
public class ProductWriteListener implements ItemWriteListener<Product> {
    
    @Override
    public void beforeWrite(List<? extends Product> items) {
        System.out.println("쓰기 시작: " + items.size() + "개");
    }
    
    @Override
    public void afterWrite(List<? extends Product> items) {
        System.out.println("쓰기 완료: " + items.size() + "개");
    }
    
    @Override
    public void onWriteError(Exception exception, List<? extends Product> items) {
        System.err.println("쓰기 오류: " + exception.getMessage());
        items.forEach(item -> System.err.println("실패: " + item));
    }
}

리스너 등록

@Bean
public Step productStep() {
    return stepBuilderFactory.get("productStep")
        .<Product, Product>chunk(10)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .listener(productStepListener())
        .listener(productChunkListener())
        .listener(productReadListener())
        .listener(productProcessListener())
        .listener(productWriteListener())
        .build();
}

@Bean
public Job productJob() {
    return jobBuilderFactory.get("productJob")
        .listener(productJobListener())
        .start(productStep())
        .build();
}

여기서 시험 함정이 하나 있어요. 동일 타입 리스너 여러 개 등록 시 실행 순서 = 등록 순서. beforeXxx()는 등록 순, afterXxx()는 역순 (AOP 인터셉터 스택 방식).

ExecutionContext — 데이터 공유

두 가지 스코프

스코프 범위 지속성
Job ExecutionContext Job 전체 모든 Step 접근
Step ExecutionContext 해당 Step 내 Step 종료 시 사라짐

DB에 직렬화되어 저장 — 재시작 시 데이터 유지.

Job ExecutionContext

// Step 1 — 저장
.tasklet((contribution, chunkContext) -> {
    ExecutionContext jobContext = chunkContext.getStepContext()
                                             .getStepExecution()
                                             .getJobExecution()
                                             .getExecutionContext();
    
    jobContext.put("processDate", "2026-01-01");
    jobContext.put("totalCount", 1000);
    jobContext.putLong("startTimestamp", System.currentTimeMillis());
    
    return RepeatStatus.FINISHED;
})

// Step 2 — 읽기
.tasklet((contribution, chunkContext) -> {
    ExecutionContext jobContext = chunkContext.getStepContext()
                                             .getStepExecution()
                                             .getJobExecution()
                                             .getExecutionContext();
    
    String processDate = jobContext.getString("processDate");
    int totalCount = jobContext.getInt("totalCount");
    
    System.out.println("처리 날짜: " + processDate);
    System.out.println("총 건수: " + totalCount);
    
    return RepeatStatus.FINISHED;
})

Step ExecutionContext

.tasklet((contribution, chunkContext) -> {
    ExecutionContext stepContext = chunkContext.getStepContext()
                                              .getStepExecution()
                                              .getExecutionContext();
    
    stepContext.put("stepKey1", "Step 로컬 데이터");
    stepContext.putInt("processedCount", 0);
    
    return RepeatStatus.FINISHED;
})

ExecutionContextPromotionListener — Step → Job 승격

Step ExecutionContext의 데이터를 Job ExecutionContext로 복사. Step 내 생성 데이터를 다음 Step에서 사용할 때.

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    
    // 승격할 키 지정
    listener.setKeys(new String[]{"processDate", "recordCount", "sourceSystem"});
    
    // true: 키 없으면 예외 / false: 무시
    listener.setStrict(true);
    
    return listener;
}

// Step 1 — Step Context 저장 + 승격 리스너
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
        .tasklet((contribution, chunkContext) -> {
            ExecutionContext stepCtx = chunkContext.getStepContext()
                                                  .getStepExecution()
                                                  .getExecutionContext();
            stepCtx.put("processDate", "2026-01-01");
            stepCtx.put("recordCount", "500");
            stepCtx.put("sourceSystem", "CRM");
            return RepeatStatus.FINISHED;
        })
        .listener(promotionListener())
        .build();
}

// Step 2 — Job Context에서 읽기
@Bean
public Step step2() {
    return stepBuilderFactory.get("step2")
        .tasklet((contribution, chunkContext) -> {
            ExecutionContext jobCtx = chunkContext.getStepContext()
                                                 .getStepExecution()
                                                 .getJobExecution()
                                                 .getExecutionContext();
            
            String processDate = jobCtx.getString("processDate");
            String recordCount = jobCtx.getString("recordCount");
            
            System.out.println("처리 날짜: " + processDate);
            System.out.println("레코드 수: " + recordCount);
            
            return RepeatStatus.FINISHED;
        })
        .build();
}

여기서 정말 중요한 시험 함정 — PromotionListener는 Step 완료 직후 실행입니다. Step 실패하면 리스너 안 돌아요. setStatuses()로 어떤 종료 상태에서 승격할지 지정 가능. 기본적으로 COMPLETED일 때만 승격되니, 실패해도 데이터를 넘기고 싶으면 명시 설정 필요.

일반적 실수와 주의사항

.end() 호출 위치

조건부 흐름 끝의 from/on/to 체인이 아닌 build() 직전.end(). 빠뜨리면 런타임 예외.

병렬 처리에서 JdbcCursor 금지

thread-safe X. JdbcPaging 사용.

ExecutionContext 직렬화 가능 타입

Serializable 구현 필요. Java 기본 타입(String/Integer/Long)은 자동. 커스텀 객체는 명시.

PromotionListener 타이밍

Step 실패 시 리스너 실행 X. setStatuses()로 조정.

리스너 등록 순서

beforeXxx 등록 순, afterXxx 역순.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • 흐름 제어 7가지 — 순차/조건부/커스텀 ExitStatus/Decider/FlowBuilder/Nested/Parallel
  • 패턴 매칭 — * 와일드카드, CUSTOM_* 접두사
  • .end() 위치 — build() 직전 (빠뜨리면 런타임 예외)
  • ABANDONED = 재시작 시 건너뜀 (FAILED는 재실행)
  • 커스텀 ExitStatus = StepExecutionListener.afterStep() 반환
  • JobExecutionDecider = 런타임 데이터 기반 동적 흐름
  • ExecutionContext 값 읽어 분기
  • FlowBuilder = 재사용 가능 Flow (DRY)
  • Nested Job = JobStep으로 자식 Job 실행 (독립 인스턴스)
  • Parallel Steps = split() + SimpleAsyncTaskExecutor
  • 병렬 처리 시 JdbcCursor 금지 — JdbcPaging 사용
  • 6가지 리스너 — Job/Step/Chunk/Read/Process/Write
  • 등록 순서 = beforeXxx 등록 순, afterXxx 역순
  • ChunkListener = 청크 모니터링·성능 측정
  • ProcessListener — null 반환 = 필터링됨 확인 가능
  • WriteListener — onWriteError에서 실패 아이템 로깅
  • ExecutionContext 두 스코프 — Job (전체) vs Step (해당)
  • 직렬화로 DB 저장 — 재시작 시 데이터 유지
  • Serializable 구현 필수
  • PromotionListener = Step ExecutionContext → Job 승격
  • setKeys(String[]) 승격 키 / setStrict(true) 키 없으면 예외
  • 기본 COMPLETED 시만 승격 — 실패 시 X
  • 같은 데이터를 여러 Step에서 쓰려면 PromotionListener

시리즈 다른 편

공식 문서: Spring Batch Domain Language에서 ExecutionContext 동작을 더 자세히 볼 수 있어요.

다음 글(7편)에서는 Skip·Retry·Restart 세 오류 처리 메커니즘 — faultTolerant() 시작점, SkipPolicy 커스텀, FlatFileParseException에서 원본 데이터 추출, Retry 청크 재처리 동작까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!