Spring Batch 입문 18편. Step Execution 단계마다 끼어드는 Listener 6종 — StepExecutionListener·ChunkListener·ItemReadListener·ItemProcessListener·ItemWriteListener·SkipListener — 호출 시점·트랜잭션 위치·annotation 변환·자동 등록 함정까지 풀어쓴 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 18편이에요. 17편 까지 ItemStream(재시작용 상태 저장 컴포넌트) 의 상태 저장 메커니즘을 봤다면, 이번 18편은 그 사이사이에 끼어드는 — Step Listener 6종 종합. Part 3 (Step·Chunk Processing) 의 마지막 편이에요.
왜 Listener 가 필요한가
Spring Batch 의 Step(배치 작업 한 단위) 실행은 복잡한 다단계 사이클이에요.
Step 시작 → Chunk 시작 → read → process → write → Chunk 종료 → ... → Step 종료
이 사이클의 각 시점에 코드를 끼워 넣고 싶은 케이스가 많아요.
- Step 시작·종료 로그
- Chunk(한 번에 묶어 처리하는 단위) 시작·종료 메트릭 측정
- Read error 발생 시 별도 알림
- Skip 발생 시 DB 로깅
- Footer/Trailer 파일에 쓰기
- ExitStatus(Step 종료 상태 코드) 동적 변경
Spring Batch 가 이걸 위해 6종의 StepListener 인터페이스를 제공해요.
Listener 6종 전체 지도
| # | Listener | 호출 시점 |
|---|---|---|
| 1 | StepExecutionListener |
Step 시작·종료 |
| 2 | ChunkListener |
Chunk 시작·종료·예외 |
| 3 | ItemReadListener |
read 전·후·error |
| 4 | ItemProcessListener |
process 전·후·error |
| 5 | ItemWriteListener |
write 전·후·error |
| 6 | SkipListener |
skip 발생 (read·process·write) |
전부 공통 부모가 StepListener 인터페이스예요. 비어 있고 marker(타입 표시용 빈 인터페이스) 역할만 해요.
호출 순서 시각화
═══════ Step 시작 ═══════
beforeStep ← StepExecutionListener
─── chunk 1 ───
beforeChunk ← ChunkListener
─── transaction 시작 ───
반복:
beforeRead ← ItemReadListener
ItemReader.read()
afterRead(item) ← ItemReadListener (success)
beforeProcess(item) ← ItemProcessListener
ItemProcessor.process(item)
afterProcess(item, result) ← ItemProcessListener (success)
beforeWrite(items) ← ItemWriteListener
ItemWriter.write(items)
afterWrite(items) ← ItemWriteListener (success)
[SkipListener.onSkipInXxx — transaction commit 직전 한 번]
─── transaction commit ───
afterChunk ← ChunkListener
─── chunk 2 ───
... 반복
═══════ Step 종료 ═══════
afterStep → ExitStatus 반환 ← StepExecutionListener
ChunkListener.afterChunk 는 commit 후, SkipListener.onSkipInXxx 는 commit 직전. 이 차이가 시험·실무 함정 자리예요.
1. StepExecutionListener — Step 단위 가장 일반적
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
기본 호출:
beforeStep= Step 시작 직후 1회 (transaction 시작 전)afterStep= Step 종료 직후 1회, ExitStatus 반환 가능
사용 예제
public class CustomStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("Step 시작: {} (jobName={})",
stepExecution.getStepName(),
stepExecution.getJobExecution().getJobInstance().getJobName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long readCount = stepExecution.getReadCount();
long writeCount = stepExecution.getWriteCount();
long skipCount = stepExecution.getSkipCount();
log.info("Step 종료: read={}, write={}, skip={}", readCount, writeCount, skipCount);
// 동적 ExitStatus 변경
if (skipCount > 1000) {
return new ExitStatus("WARN_HIGH_SKIP");
}
return stepExecution.getExitStatus();
}
}
afterStep 의 return 값이 다음 Flow Control (20편) 에서 조건부 transition 의 핵심이 돼요.
Annotation 변환
@Component
public class CustomStepListener {
@BeforeStep
public void onBeforeStep(StepExecution stepExecution) { ... }
@AfterStep
public ExitStatus onAfterStep(StepExecution stepExecution) { ... }
}
@BeforeStep·@AfterStep 을 쓰면 인터페이스 구현 없이 POJO(평범한 자바 클래스) 로 listener 를 작성할 수 있어요.
2. ChunkListener — Chunk 경계 후크
public interface ChunkListener<I, O> extends StepListener {
void beforeChunk(Chunk<I> chunk);
void afterChunk(Chunk<O> chunk);
void afterChunkError(Exception exception, Chunk<O> chunk);
}
핵심 시점:
beforeChunk= transaction 시작 후, item 처리 전afterChunk= write 끝난 후, transaction commit 또는 rollback 후afterChunkError= chunk 처리 중 예외 발생 시
사용 예제
public class ChunkMetricsListener implements ChunkListener<Item, Item> {
@Override
public void beforeChunk(Chunk<Item> chunk) {
log.debug("Chunk 시작 — size={}", chunk.size());
}
@Override
public void afterChunk(Chunk<Item> chunk) {
meterRegistry.counter("batch.chunk.completed").increment();
}
@Override
public void afterChunkError(Exception ex, Chunk<Item> chunk) {
meterRegistry.counter("batch.chunk.error").increment();
log.error("Chunk error", ex);
}
}
함정 — Concurrent Step 에서 안 불림
The ChunkListener listener interface is not called in concurrent steps — 공식 문서
37편 Multi-threaded Step(여러 스레드로 병렬 실행하는 Step) 환경에서 ChunkListener 가 호출되지 않아요. 일부러 안 부르는 게 아니라 호환성을 보장하지 않는다는 뜻이에요. 멀티스레드 환경에서는 ChunkListener 에 의존하면 안 돼요.
함정 — Checked Exception 금지
ChunkListener 구현체 안에서 checked exception 을 throw 하면 안 돼요. 던지면 Step 이 종료돼요. 안에서 try-catch 로 막아야 해요.
Annotation 변환
@BeforeChunk
@AfterChunk
@AfterChunkError
3. ItemReadListener — Read 단위
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
호출 시점:
beforeRead=ItemReader.read()호출 전afterRead(item)= read 성공 후, 읽힌 item 전달onReadError(ex)= read 중 예외 발생 시
사용 예제 — Read 에러만 별도 채널 알림
@Component
public class ReadErrorAlerter {
@OnReadError
public void onReadError(Exception ex) {
slack.send("Batch read error: " + ex.getMessage());
}
}
Skip 적용된 read error 라도 onReadError 는 항상 호출돼요. skip 과 read 성공은 다른 개념이라 구분해야 해요.
4. ItemProcessListener — Process 단위
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
호출 시점:
beforeProcess(item)=ItemProcessor.process()호출 전, 입력 item 전달afterProcess(item, result)= process 성공 후, 입력과 결과 모두 전달onProcessError(item, ex)= process 중 예외 발생 시
afterProcess 의 result 가 null 이면 ItemProcessor 가 filter 처리한 경우예요 (process 에서 null 리턴).
사용 예제 — Process 시간 측정
public class ProcessTimerListener implements ItemProcessListener<Item, Item> {
private long start;
@Override
public void beforeProcess(Item item) {
start = System.nanoTime();
}
@Override
public void afterProcess(Item item, Item result) {
meterRegistry.timer("batch.process.duration")
.record(Duration.ofNanos(System.nanoTime() - start));
}
@Override
public void onProcessError(Item item, Exception e) {
log.warn("Process error for {}", item, e);
}
}
5. ItemWriteListener — Write 단위
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
호출 시점:
beforeWrite(items)=ItemWriter.write()호출 전, chunk 전체 itemsafterWrite(items)= write 성공 후, commit 전onWriteError(ex, items)= write 중 예외 발생 시
함정 — afterWrite 는 commit 전
afterWrite 가 호출됐다고 해서 transaction 이 commit 된 건 아니에요. commit 은 그 다음. afterWrite 안에서 외부 시스템을 호출할 때 transaction rollback 가능성을 염두에 둬야 해요.
사용 예제
public class WriteErrorLogger implements ItemWriteListener<Item> {
@Override
public void onWriteError(Exception ex, List<? extends Item> items) {
log.error("Write error for {} items", items.size(), ex);
// 외부 DB·queue 에 실패 기록 (rollback 위험 주의)
}
}
6. SkipListener — Skip 추적의 유일한 통로
public interface SkipListener<T, S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
핵심 차별점 — 읽고 처리하고 쓰는 사이의 에러 알림은 ItemReadListener·ItemProcessListener·ItemWriteListener 도 제공해요.
근데 onReadError·onProcessError·onWriteError 는 retry 성공 시에도 호출돼요. 실제로 skip 됐는지는 알 수 없어요.
SkipListener 는 실제로 skip 처리된 item 만 추적해요.
호출 시점
onSkipInRead(ex)= read 단계 skiponSkipInProcess(item, ex)= process 단계 skip (이미 read 성공한 item 제공)onSkipInWrite(item, ex)= write 단계 skip
SkipListener · Transaction 보증 2가지
- The appropriate skip method (depending on when the error happened) is called only once per item.
- The SkipListener is always called just before the transaction is committed.
보증 1 — 동일 item 이 여러 번 rollback 되는 상황에서도 skip 메서드는 item 당 1회만 호출돼요. 14편의 fault-tolerant(부분 실패 허용) scan 메커니즘과 연결돼요.
보증 2 — SkipListener 호출은 transaction commit 직전. Listener 안에서 DB·queue 에 skip 로그를 기록해도 rollback 되지 않아요. 안전해요.
사용 예제 — Skip 기록 DB 저장
@Component
public class SkipLogger implements SkipListener<Item, Item> {
@Autowired
private SkipLogRepository repo;
@Override
public void onSkipInRead(Throwable t) {
repo.save(new SkipLog("READ", null, t.getMessage()));
}
@Override
public void onSkipInProcess(Item item, Throwable t) {
repo.save(new SkipLog("PROCESS", item.toString(), t.getMessage()));
}
@Override
public void onSkipInWrite(Item item, Throwable t) {
repo.save(new SkipLog("WRITE", item.toString(), t.getMessage()));
}
}
이 DB save 는 현재 chunk transaction 안에서 발생해요. 다만 commit 직전 호출이라 rollback 위험이 낮아요. 단 추가 외부 시스템 호출(예: Slack alert)은 비동기로 처리하는 게 안전해요.
Listener 등록 방법 4가지
방법 1: StepBuilder .listener()
@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx,
CustomStepListener stepListener,
ChunkMetricsListener chunkListener) {
return new StepBuilder("myStep", repo)
.<X, Y>chunk(100, tx)
.reader(reader)
.writer(writer)
.listener(stepListener) // StepExecutionListener
.listener(chunkListener) // ChunkListener
.build();
}
가장 흔한 패턴이에요. .listener() 가 모든 종류의 StepListener 를 인식해요.
방법 2: Annotation 기반 POJO
@Component
public class MyListener {
@BeforeStep
public void start(StepExecution exec) { ... }
@AfterStep
public ExitStatus end(StepExecution exec) { ... }
}
// StepBuilder 등록
.listener(myListener)
StepBuilder 가 annotation 메서드를 자동으로 변환해줘요.
방법 3: ItemReader/Writer/Processor 가 직접 구현
public class CountingReader implements ItemReader<X>, ItemReadListener<X> {
@Override
public X read() { ... }
@Override
public void afterRead(X item) {
// ItemReader 자체가 listener
}
}
.reader(countingReader) 만 호출하면 listener 자동 등록. 단 직접 주입되는 경우만 해당해요 (delegate(위임 대상 컴포넌트)·nested(감싸진 내부 컴포넌트) 는 안 돼요).
방법 4: Step 안 listeners 그룹 등록 (XML)
XML 환경에서는 <listeners> 안에 <listener ref="..."> 로 묶어요. Java config 환경은 .listener() 를 반복하면 돼요.
함정 — Nested Listener 자동 등록 X
ItemReader/Writer 자동 listener 등록은 직접 주입 경우만 적용돼요. 17편 wrapping 패턴에서 본 것처럼 delegate 안에 listener 가 있으면 수동 등록이 필수예요.
ItemReader<X> delegate = new FlatFileItemReader(); // ItemReadListener 미구현
WrappingReader wrapper = new WrappingReader(delegate); // ItemReadListener 구현
new StepBuilder(...)
.reader(wrapper) // wrapper 의 listener 자동 등록 OK
// delegate 의 listener 가 있다면 .listener(delegate) 명시
Annotation vs 인터페이스 — 선택 기준
Annotation 권장 케이스
- 단일 hook 만 필요 (예:
@AfterStep하나만) - 기존 클래스에 부가 기능 추가 (예:
ItemReader안에@AfterRead) - POJO 유지 — 인터페이스 의존 회피
인터페이스 권장 케이스
- 여러 hook 동시 필요 (Step 전체 lifecycle 추적)
- 명시적 타입 강제 — 컴파일 시 메서드 시그니처 보장
- IDE 자동 완성 활용
대체로 annotation 이 자유도가 높고 결합도가 낮아요. 모던 코드에서 권장돼요.
자주 만나는 사고
사고 1: ChunkListener 가 안 불림
원인 1 — Multi-threaded Step 환경. 호환성 미보장.
원인 2 — tasklet() Step (chunk 가 없음). ChunkListener 부적합.
사고 2: SkipListener.onSkipInXxx 가 1번 이상 호출
원인 — fault-tolerant Step 의 scan 메커니즘. Spring Batch 는 item 당 1회 호출을 보증해요. 2번 이상이면 버그라 자체 코드를 점검해야 해요.
사고 3: afterWrite 안 외부 호출이 rollback
원인 — afterWrite 는 transaction commit 전. 외부 호출이 현재 transaction 안에 묶여요.
해결 — afterChunk 사용 (commit 후) 또는 SkipListener 의 commit 직전 보증 활용.
사고 4: Annotation 메서드 인식 안 됨
원인 — @BeforeStep 등이 Spring Batch annotation 인데 Spring Framework @BeforeStep (없음) 으로 혼동.
해결 — org.springframework.batch.core.annotation.* import 확인.
사고 5: afterStep 의 ExitStatus 무시됨
원인 — afterStep return 을 void 로 잘못 선언. ExitStatus 를 return 해야 동적 변경이 가능해요.
운영 권장 — Listener 구조 패턴
Pattern 1: 메트릭 수집 전용
@Component
public class MetricsListener implements StepExecutionListener, ChunkListener<X, Y> {
@Override
public void beforeStep(StepExecution exec) {
timer = meterRegistry.timer("batch.step.duration");
sample = Timer.start();
}
@Override
public ExitStatus afterStep(StepExecution exec) {
sample.stop(timer);
return exec.getExitStatus();
}
@Override
public void afterChunk(Chunk<Y> chunk) {
meterRegistry.counter("batch.chunk.completed").increment();
}
}
Pattern 2: Error 채널 분리
@Component
public class ErrorAlerter {
@OnReadError public void onRead(Exception e) { slack.alert("read", e); }
@OnProcessError public void onProcess(Object item, Exception e) { slack.alert("process", e); }
@OnWriteError public void onWrite(Exception e, List<?> items) { slack.alert("write", e); }
@OnSkipInRead public void onSkipRead(Throwable t) { db.logSkip("read", t); }
@OnSkipInWrite public void onSkipWrite(Object item, Throwable t) { db.logSkip("write", t); }
}
annotation POJO 1개로 6가지 후크를 통합해요.
Pattern 3: ExitStatus 동적 제어
@Component
public class ConditionalStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution exec) {}
@Override
public ExitStatus afterStep(StepExecution exec) {
long failed = exec.getFailureExceptions().size();
long skipped = exec.getSkipCount();
if (failed > 0) return new ExitStatus("FAILED_CRITICAL");
if (skipped > 1000) return new ExitStatus("WARN_HIGH_SKIP");
return ExitStatus.COMPLETED;
}
}
20편 Flow Control 의 조건부 transition 과 결합돼요.
시험 직전 한 번 더 — Step Listener 함정 압축 노트
- 6종 Listener = StepExecutionListener · ChunkListener · ItemReadListener · ItemProcessListener · ItemWriteListener · SkipListener
- 전부 부모 = StepListener (marker 인터페이스, 비어 있음, 직접 구현 X)
- StepExecutionListener —
beforeStep·afterStep(ExitStatus 반환 가능) - ChunkListener —
beforeChunk·afterChunk(commit 후)·afterChunkError - ItemReadListener —
beforeRead·afterRead(item)·onReadError - ItemProcessListener —
beforeProcess(item)·afterProcess(item, result)·onProcessError - ItemWriteListener —
beforeWrite(items)·afterWrite(items)(commit 전)·onWriteError - SkipListener —
onSkipInRead·onSkipInProcess·onSkipInWrite(실제 skip 추적 유일한 통로) - 함정 —
onReadError·onWriteError는 retry 성공 시도 호출됨 → 실제 skip 은 SkipListener - SkipListener 의 두 보증 — (1) item 당 1회 호출 (2) commit 직전 호출
- annotation 변환 —
@BeforeStep·@AfterStep·@BeforeChunk·@AfterChunk·@AfterChunkError·@BeforeRead·@AfterRead·@OnReadError·@BeforeProcess·@AfterProcess·@OnProcessError·@BeforeWrite·@AfterWrite·@OnWriteError·@OnSkipInRead·@OnSkipInProcess·@OnSkipInWrite - 등록 방법 =
.listener()Builder · annotation POJO · ItemReader/Writer 가 listener 동시 구현 (직접 주입만 자동) · XML<listeners> - 자동 등록 = 직접 주입 경우만 — nested·delegate 는 수동
.listener() - 함정 — ChunkListener = Multi-threaded Step 에서 호출 안 됨
- 함정 — ChunkListener 안 checked exception throw 금지 (Step 종료)
- 함정 —
afterWrite= commit 전 → 외부 호출 rollback 위험 - 함정 —
afterChunk= commit 후 → 외부 호출 안전 - 함정 —
afterStepreturn 이 void 면 ExitStatus 동적 변경 불가 - 함정 — SkipListener 가 같은 item 2번 호출 = 버그
- 사용 — annotation 권장 (자유도 ↑·결합도 ↓)
- afterStep ExitStatus + Flow Control 결합 = 동적 transition (20편)
공식 문서: Intercepting Step Execution 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 13편 — Step Restart + 부모 Step 상속
- 14편 — Skip Logic (부분 실패 무시 · SkipPolicy · SkipListener)
- 15편 — Retry Logic (일시적 실패 자동 재시도)
- 16편 — Transaction Attributes (Isolation · Propagation · Timeout)
- 17편 — ItemStream 등록 (재시작 안전성의 핵심)
다음 글: