Spring Batch 입문 18편 — Step 라이프사이클 Listener 종합

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 18편. Step Execution 단계마다 끼어드는 Listener 6종 — StepExecutionListener·ChunkListener·ItemReadListener·ItemProcessListener·ItemWriteListener·SkipListener — 호출 시점·트랜잭션 위치·annotation 변환·자동 등록 함정까지 풀어쓴 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 18편 — Step 라이프사이클 Listener 종합

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 18편이에요. 17편 까지 ItemStream(재시작용 상태 저장 컴포넌트) 의 상태 저장 메커니즘을 봤다면, 이번 18편은 그 사이사이에 끼어드는 — Step Listener 6종 종합. Part 3 (Step·Chunk Processing) 의 마지막 편이에요.

왜 Listener 가 필요한가

Spring Batch 의 Step(배치 작업 한 단위) 실행은 복잡한 다단계 사이클이에요.

Step 시작 → Chunk 시작 → read → process → write → Chunk 종료 → ... → Step 종료

이 사이클의 각 시점에 코드를 끼워 넣고 싶은 케이스가 많아요.

  • Step 시작·종료 로그
  • Chunk(한 번에 묶어 처리하는 단위) 시작·종료 메트릭 측정
  • Read error 발생 시 별도 알림
  • Skip 발생 시 DB 로깅
  • Footer/Trailer 파일에 쓰기
  • ExitStatus(Step 종료 상태 코드) 동적 변경

Spring Batch 가 이걸 위해 6종의 StepListener 인터페이스를 제공해요.

Listener 6종 전체 지도

# Listener 호출 시점
1 StepExecutionListener Step 시작·종료
2 ChunkListener Chunk 시작·종료·예외
3 ItemReadListener read 전·후·error
4 ItemProcessListener process 전·후·error
5 ItemWriteListener write 전·후·error
6 SkipListener skip 발생 (read·process·write)

전부 공통 부모가 StepListener 인터페이스예요. 비어 있고 marker(타입 표시용 빈 인터페이스) 역할만 해요.

호출 순서 시각화

═══════ Step 시작 ═══════
  beforeStep                          ← StepExecutionListener

  ─── chunk 1 ───
    beforeChunk                       ← ChunkListener
      ─── transaction 시작 ───
      반복:
        beforeRead                    ← ItemReadListener
        ItemReader.read()
        afterRead(item)               ← ItemReadListener (success)

        beforeProcess(item)           ← ItemProcessListener
        ItemProcessor.process(item)
        afterProcess(item, result)    ← ItemProcessListener (success)

      beforeWrite(items)              ← ItemWriteListener
      ItemWriter.write(items)
      afterWrite(items)               ← ItemWriteListener (success)

      [SkipListener.onSkipInXxx — transaction commit 직전 한 번]
      ─── transaction commit ───
    afterChunk                        ← ChunkListener

  ─── chunk 2 ───
    ... 반복

═══════ Step 종료 ═══════
  afterStep → ExitStatus 반환         ← StepExecutionListener

ChunkListener.afterChunkcommit 후, SkipListener.onSkipInXxxcommit 직전. 이 차이가 시험·실무 함정 자리예요.

1. StepExecutionListener — Step 단위 가장 일반적

public interface StepExecutionListener extends StepListener {
    void beforeStep(StepExecution stepExecution);
    ExitStatus afterStep(StepExecution stepExecution);
}

기본 호출:

  • beforeStep = Step 시작 직후 1회 (transaction 시작 전)
  • afterStep = Step 종료 직후 1회, ExitStatus 반환 가능

사용 예제

public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        log.info("Step 시작: {} (jobName={})",
            stepExecution.getStepName(),
            stepExecution.getJobExecution().getJobInstance().getJobName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        long readCount = stepExecution.getReadCount();
        long writeCount = stepExecution.getWriteCount();
        long skipCount = stepExecution.getSkipCount();

        log.info("Step 종료: read={}, write={}, skip={}", readCount, writeCount, skipCount);

        // 동적 ExitStatus 변경
        if (skipCount > 1000) {
            return new ExitStatus("WARN_HIGH_SKIP");
        }
        return stepExecution.getExitStatus();
    }
}

afterStep 의 return 값이 다음 Flow Control (20편) 에서 조건부 transition 의 핵심이 돼요.

Annotation 변환

@Component
public class CustomStepListener {

    @BeforeStep
    public void onBeforeStep(StepExecution stepExecution) { ... }

    @AfterStep
    public ExitStatus onAfterStep(StepExecution stepExecution) { ... }
}

@BeforeStep·@AfterStep 을 쓰면 인터페이스 구현 없이 POJO(평범한 자바 클래스) 로 listener 를 작성할 수 있어요.

2. ChunkListener — Chunk 경계 후크

public interface ChunkListener<I, O> extends StepListener {
    void beforeChunk(Chunk<I> chunk);
    void afterChunk(Chunk<O> chunk);
    void afterChunkError(Exception exception, Chunk<O> chunk);
}

핵심 시점:

  • beforeChunk = transaction 시작 후, item 처리 전
  • afterChunk = write 끝난 후, transaction commit 또는 rollback 후
  • afterChunkError = chunk 처리 중 예외 발생 시

사용 예제

public class ChunkMetricsListener implements ChunkListener<Item, Item> {

    @Override
    public void beforeChunk(Chunk<Item> chunk) {
        log.debug("Chunk 시작 — size={}", chunk.size());
    }

    @Override
    public void afterChunk(Chunk<Item> chunk) {
        meterRegistry.counter("batch.chunk.completed").increment();
    }

    @Override
    public void afterChunkError(Exception ex, Chunk<Item> chunk) {
        meterRegistry.counter("batch.chunk.error").increment();
        log.error("Chunk error", ex);
    }
}

함정 — Concurrent Step 에서 안 불림

The ChunkListener listener interface is not called in concurrent steps — 공식 문서

37편 Multi-threaded Step(여러 스레드로 병렬 실행하는 Step) 환경에서 ChunkListener 가 호출되지 않아요. 일부러 안 부르는 게 아니라 호환성을 보장하지 않는다는 뜻이에요. 멀티스레드 환경에서는 ChunkListener 에 의존하면 안 돼요.

함정 — Checked Exception 금지

ChunkListener 구현체 안에서 checked exception 을 throw 하면 안 돼요. 던지면 Step 이 종료돼요. 안에서 try-catch 로 막아야 해요.

Annotation 변환

@BeforeChunk
@AfterChunk
@AfterChunkError

3. ItemReadListener — Read 단위

public interface ItemReadListener<T> extends StepListener {
    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);
}

호출 시점:

  • beforeRead = ItemReader.read() 호출 전
  • afterRead(item) = read 성공 후, 읽힌 item 전달
  • onReadError(ex) = read 중 예외 발생 시

사용 예제 — Read 에러만 별도 채널 알림

@Component
public class ReadErrorAlerter {
    @OnReadError
    public void onReadError(Exception ex) {
        slack.send("Batch read error: " + ex.getMessage());
    }
}

Skip 적용된 read error 라도 onReadError 는 항상 호출돼요. skip 과 read 성공은 다른 개념이라 구분해야 해요.

4. ItemProcessListener — Process 단위

public interface ItemProcessListener<T, S> extends StepListener {
    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);
}

호출 시점:

  • beforeProcess(item) = ItemProcessor.process() 호출 전, 입력 item 전달
  • afterProcess(item, result) = process 성공 후, 입력과 결과 모두 전달
  • onProcessError(item, ex) = process 중 예외 발생 시

afterProcess 의 result 가 null 이면 ItemProcessor 가 filter 처리한 경우예요 (process 에서 null 리턴).

사용 예제 — Process 시간 측정

public class ProcessTimerListener implements ItemProcessListener<Item, Item> {
    private long start;

    @Override
    public void beforeProcess(Item item) {
        start = System.nanoTime();
    }

    @Override
    public void afterProcess(Item item, Item result) {
        meterRegistry.timer("batch.process.duration")
            .record(Duration.ofNanos(System.nanoTime() - start));
    }

    @Override
    public void onProcessError(Item item, Exception e) {
        log.warn("Process error for {}", item, e);
    }
}

5. ItemWriteListener — Write 단위

public interface ItemWriteListener<S> extends StepListener {
    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);
}

호출 시점:

  • beforeWrite(items) = ItemWriter.write() 호출 전, chunk 전체 items
  • afterWrite(items) = write 성공 후, commit 전
  • onWriteError(ex, items) = write 중 예외 발생 시

함정 — afterWrite 는 commit 전

afterWrite 가 호출됐다고 해서 transaction 이 commit 된 건 아니에요. commit 은 그 다음. afterWrite 안에서 외부 시스템을 호출할 때 transaction rollback 가능성을 염두에 둬야 해요.

사용 예제

public class WriteErrorLogger implements ItemWriteListener<Item> {
    @Override
    public void onWriteError(Exception ex, List<? extends Item> items) {
        log.error("Write error for {} items", items.size(), ex);
        // 외부 DB·queue 에 실패 기록 (rollback 위험 주의)
    }
}

6. SkipListener — Skip 추적의 유일한 통로

public interface SkipListener<T, S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

핵심 차별점 — 읽고 처리하고 쓰는 사이의 에러 알림은 ItemReadListener·ItemProcessListener·ItemWriteListener 도 제공해요.

근데 onReadError·onProcessError·onWriteError 는 retry 성공 시에도 호출돼요. 실제로 skip 됐는지는 알 수 없어요.

SkipListener실제로 skip 처리된 item 만 추적해요.

호출 시점

  • onSkipInRead(ex) = read 단계 skip
  • onSkipInProcess(item, ex) = process 단계 skip (이미 read 성공한 item 제공)
  • onSkipInWrite(item, ex) = write 단계 skip

SkipListener · Transaction 보증 2가지

  1. The appropriate skip method (depending on when the error happened) is called only once per item.
  2. The SkipListener is always called just before the transaction is committed.

보증 1 — 동일 item 이 여러 번 rollback 되는 상황에서도 skip 메서드는 item 당 1회만 호출돼요. 14편의 fault-tolerant(부분 실패 허용) scan 메커니즘과 연결돼요.

보증 2 — SkipListener 호출은 transaction commit 직전. Listener 안에서 DB·queue 에 skip 로그를 기록해도 rollback 되지 않아요. 안전해요.

사용 예제 — Skip 기록 DB 저장

@Component
public class SkipLogger implements SkipListener<Item, Item> {

    @Autowired
    private SkipLogRepository repo;

    @Override
    public void onSkipInRead(Throwable t) {
        repo.save(new SkipLog("READ", null, t.getMessage()));
    }

    @Override
    public void onSkipInProcess(Item item, Throwable t) {
        repo.save(new SkipLog("PROCESS", item.toString(), t.getMessage()));
    }

    @Override
    public void onSkipInWrite(Item item, Throwable t) {
        repo.save(new SkipLog("WRITE", item.toString(), t.getMessage()));
    }
}

이 DB save 는 현재 chunk transaction 안에서 발생해요. 다만 commit 직전 호출이라 rollback 위험이 낮아요. 추가 외부 시스템 호출(예: Slack alert)은 비동기로 처리하는 게 안전해요.

Listener 등록 방법 4가지

방법 1: StepBuilder .listener()

@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx,
                   CustomStepListener stepListener,
                   ChunkMetricsListener chunkListener) {
    return new StepBuilder("myStep", repo)
        .<X, Y>chunk(100, tx)
        .reader(reader)
        .writer(writer)
        .listener(stepListener)              // StepExecutionListener
        .listener(chunkListener)             // ChunkListener
        .build();
}

가장 흔한 패턴이에요. .listener() 가 모든 종류의 StepListener 를 인식해요.

방법 2: Annotation 기반 POJO

@Component
public class MyListener {
    @BeforeStep
    public void start(StepExecution exec) { ... }

    @AfterStep
    public ExitStatus end(StepExecution exec) { ... }
}

// StepBuilder 등록
.listener(myListener)

StepBuilder 가 annotation 메서드를 자동으로 변환해줘요.

방법 3: ItemReader/Writer/Processor 가 직접 구현

public class CountingReader implements ItemReader<X>, ItemReadListener<X> {

    @Override
    public X read() { ... }

    @Override
    public void afterRead(X item) {
        // ItemReader 자체가 listener
    }
}

.reader(countingReader) 만 호출하면 listener 자동 등록. 단 직접 주입되는 경우만 해당해요 (delegate(위임 대상 컴포넌트)·nested(감싸진 내부 컴포넌트) 는 안 돼요).

방법 4: Step 안 listeners 그룹 등록 (XML)

XML 환경에서는 <listeners> 안에 <listener ref="..."> 로 묶어요. Java config 환경은 .listener() 를 반복하면 돼요.

함정 — Nested Listener 자동 등록 X

ItemReader/Writer 자동 listener 등록은 직접 주입 경우만 적용돼요. 17편 wrapping 패턴에서 본 것처럼 delegate 안에 listener 가 있으면 수동 등록이 필수예요.

ItemReader<X> delegate = new FlatFileItemReader();    // ItemReadListener 미구현
WrappingReader wrapper = new WrappingReader(delegate); // ItemReadListener 구현

new StepBuilder(...)
    .reader(wrapper)         // wrapper 의 listener 자동 등록 OK
    // delegate 의 listener 가 있다면 .listener(delegate) 명시

Annotation vs 인터페이스 — 선택 기준

Annotation 권장 케이스

  • 단일 hook 만 필요 (예: @AfterStep 하나만)
  • 기존 클래스에 부가 기능 추가 (예: ItemReader 안에 @AfterRead)
  • POJO 유지 — 인터페이스 의존 회피

인터페이스 권장 케이스

  • 여러 hook 동시 필요 (Step 전체 lifecycle 추적)
  • 명시적 타입 강제 — 컴파일 시 메서드 시그니처 보장
  • IDE 자동 완성 활용

대체로 annotation 이 자유도가 높고 결합도가 낮아요. 모던 코드에서 권장돼요.

자주 만나는 사고

사고 1: ChunkListener 가 안 불림

원인 1 — Multi-threaded Step 환경. 호환성 미보장.

원인 2tasklet() Step (chunk 가 없음). ChunkListener 부적합.

사고 2: SkipListener.onSkipInXxx 가 1번 이상 호출

원인 — fault-tolerant Step 의 scan 메커니즘. Spring Batch 는 item 당 1회 호출을 보증해요. 2번 이상이면 버그라 자체 코드를 점검해야 해요.

사고 3: afterWrite 안 외부 호출이 rollback

원인afterWrite 는 transaction commit 전. 외부 호출이 현재 transaction 안에 묶여요.

해결afterChunk 사용 (commit 후) 또는 SkipListener 의 commit 직전 보증 활용.

사고 4: Annotation 메서드 인식 안 됨

원인@BeforeStep 등이 Spring Batch annotation 인데 Spring Framework @BeforeStep (없음) 으로 혼동.

해결org.springframework.batch.core.annotation.* import 확인.

사고 5: afterStep 의 ExitStatus 무시됨

원인afterStep return 을 void 로 잘못 선언. ExitStatus 를 return 해야 동적 변경이 가능해요.

운영 권장 — Listener 구조 패턴

Pattern 1: 메트릭 수집 전용

@Component
public class MetricsListener implements StepExecutionListener, ChunkListener<X, Y> {

    @Override
    public void beforeStep(StepExecution exec) {
        timer = meterRegistry.timer("batch.step.duration");
        sample = Timer.start();
    }

    @Override
    public ExitStatus afterStep(StepExecution exec) {
        sample.stop(timer);
        return exec.getExitStatus();
    }

    @Override
    public void afterChunk(Chunk<Y> chunk) {
        meterRegistry.counter("batch.chunk.completed").increment();
    }
}

Pattern 2: Error 채널 분리

@Component
public class ErrorAlerter {
    @OnReadError public void onRead(Exception e) { slack.alert("read", e); }
    @OnProcessError public void onProcess(Object item, Exception e) { slack.alert("process", e); }
    @OnWriteError public void onWrite(Exception e, List<?> items) { slack.alert("write", e); }
    @OnSkipInRead public void onSkipRead(Throwable t) { db.logSkip("read", t); }
    @OnSkipInWrite public void onSkipWrite(Object item, Throwable t) { db.logSkip("write", t); }
}

annotation POJO 1개로 6가지 후크를 통합해요.

Pattern 3: ExitStatus 동적 제어

@Component
public class ConditionalStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution exec) {}

    @Override
    public ExitStatus afterStep(StepExecution exec) {
        long failed = exec.getFailureExceptions().size();
        long skipped = exec.getSkipCount();

        if (failed > 0) return new ExitStatus("FAILED_CRITICAL");
        if (skipped > 1000) return new ExitStatus("WARN_HIGH_SKIP");
        return ExitStatus.COMPLETED;
    }
}

20편 Flow Control 의 조건부 transition 과 결합돼요.

시험 직전 한 번 더 — Step Listener 함정 압축 노트

  • 6종 Listener = StepExecutionListener · ChunkListener · ItemReadListener · ItemProcessListener · ItemWriteListener · SkipListener
  • 전부 부모 = StepListener (marker 인터페이스, 비어 있음, 직접 구현 X)
  • StepExecutionListenerbeforeStep·afterStep (ExitStatus 반환 가능)
  • ChunkListenerbeforeChunk·afterChunk (commit 후afterChunkError
  • ItemReadListenerbeforeRead·afterRead(item)·onReadError
  • ItemProcessListenerbeforeProcess(item)·afterProcess(item, result)·onProcessError
  • ItemWriteListenerbeforeWrite(items)·afterWrite(items) (commit 전onWriteError
  • SkipListeneronSkipInRead·onSkipInProcess·onSkipInWrite (실제 skip 추적 유일한 통로)
  • 함정 — onReadError·onWriteErrorretry 성공 시도 호출됨 → 실제 skip 은 SkipListener
  • SkipListener 의 두 보증 — (1) item 당 1회 호출 (2) commit 직전 호출
  • annotation 변환@BeforeStep·@AfterStep·@BeforeChunk·@AfterChunk·@AfterChunkError·@BeforeRead·@AfterRead·@OnReadError·@BeforeProcess·@AfterProcess·@OnProcessError·@BeforeWrite·@AfterWrite·@OnWriteError·@OnSkipInRead·@OnSkipInProcess·@OnSkipInWrite
  • 등록 방법 = .listener() Builder · annotation POJO · ItemReader/Writer 가 listener 동시 구현 (직접 주입만 자동) · XML <listeners>
  • 자동 등록 = 직접 주입 경우만 — nested·delegate 는 수동 .listener()
  • 함정 — ChunkListener = Multi-threaded Step 에서 호출 안 됨
  • 함정 — ChunkListener 안 checked exception throw 금지 (Step 종료)
  • 함정 — afterWrite = commit → 외부 호출 rollback 위험
  • 함정 — afterChunk = commit → 외부 호출 안전
  • 함정 — afterStep return 이 void 면 ExitStatus 동적 변경 불가
  • 함정 — SkipListener 가 같은 item 2번 호출 = 버그
  • 사용 — annotation 권장 (자유도 ↑·결합도 ↓)
  • afterStep ExitStatus + Flow Control 결합 = 동적 transition (20편)

공식 문서: Intercepting Step Execution 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!