Spring Batch 입문 17편. ItemStream 인터페이스 등록 — open·update·close 의 자리, ItemReader/Writer 자동 등록 vs 수동 등록, Delegate·Wrapping 패턴에서의 함정, 재시작 안전성 보장까지 풀어쓴 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 17편이에요. 16편 까지 transaction 제어를 잡았다면, 이번 17편은 재시작 안전성의 핵심 — ItemStream 등록.
ItemStream 의 의미
public interface ItemStream {
void open(ExecutionContext context) throws ItemStreamException;
void update(ExecutionContext context) throws ItemStreamException;
void close() throws ItemStreamException;
}
세 메서드가 한 묶음으로 동작해요. open() 은 Step 시작 시 1회 호출돼서 이전 ExecutionContext(잡 실행 상태 저장소)를 복구하고, update() 는 매 chunk(한 트랜잭션 묶음 단위) 종료마다 현재 상태를 ExecutionContext 에 저장하고, close() 는 Step 종료 시 1회 호출돼서 리소스를 정리해요.
13편 Step Restart 에서 본 current position 추적 의 정체가 바로 이거예요. 이 인터페이스가 재시작 안전성 을 구현하는 진짜 골격.
자동 등록 — 가장 흔한 경우
@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("myStep", repo)
.<X, Y>chunk(100, tx)
.reader(flatFileItemReader()) // FlatFileItemReader implements ItemStream
.writer(flatFileItemWriter()) // FlatFileItemWriter implements ItemStream
.build();
}
Reader·Writer 가 ItemStream 을 구현하고, StepBuilder 의 reader()·writer() 인자로 직접 들어가면 자동 등록이 끝나요. JobRepository(잡 메타데이터 저장소) 와 PlatformTransactionManager(스프링 트랜잭션 추상화) 는 Step 마다 받는 표준 인자.
대부분 공식 ItemReader/Writer:
FlatFileItemReader/Writer✓JdbcCursorItemReader·JdbcBatchItemWriter✓JpaItemWriter·JpaCursorItemReader✓KafkaItemReader/Writer✓MongoItemReader/Writer✓
→ 대부분 환경 = 별도 등록 코드 없이 재시작 안전.
수동 등록 — Delegate · Wrapping 패턴
시험에서 자주 발을 거는 자리가 여기예요. Reader·Writer 가 다른 Reader 를 내부 wrap 하면 자동 등록이 안 돼요.
Wrapping 예제
public class FilteringReader<T> implements ItemReader<T> {
private final ItemReader<T> delegate;
private final Predicate<T> filter;
public FilteringReader(ItemReader<T> delegate, Predicate<T> filter) {
this.delegate = delegate;
this.filter = filter;
}
@Override
public T read() throws Exception {
T item;
while ((item = delegate.read()) != null) {
if (filter.test(item)) return item;
}
return null;
}
}
FilteringReader 가 FlatFileItemReader 를 wrap. delegate 의 ItemStream 이 자동 등록 X.
해결 — 수동 등록
@Bean
public Step wrappingStep(JobRepository repo, PlatformTransactionManager tx,
FlatFileItemReader<X> delegate) {
FilteringReader<X> filtering = new FilteringReader<>(delegate, filter);
return new StepBuilder("wrappingStep", repo)
.<X, Y>chunk(100, tx)
.reader(filtering)
.stream(delegate) // ★ 수동 등록
.writer(...)
.build();
}
.stream(itemStream) 이 delegate 의 ItemStream 을 명시적으로 등록해 줘요. 이러면 open·update·close 가 모두 호출돼요.
Wrapping Reader 가 직접 ItemStream 구현
public class FilteringReader<T> implements ItemReader<T>, ItemStream {
private final ItemReader<T> delegate;
@Override
public T read() { ... }
@Override
public void open(ExecutionContext context) {
if (delegate instanceof ItemStream s) {
s.open(context);
}
}
@Override
public void update(ExecutionContext context) {
if (delegate instanceof ItemStream s) {
s.update(context);
}
}
@Override
public void close() {
if (delegate instanceof ItemStream s) {
s.close();
}
}
}
Wrapping Reader 가 delegate 의 ItemStream 메서드 위임. .stream() 호출 불필요.
CompositeItemStream — 여러 stream
여러 Reader·Writer 를 조합 하는 Composite 패턴:
@Bean
public CompositeItemWriter<X> compositeWriter(
JdbcBatchItemWriter<X> jdbcWriter,
FlatFileItemWriter<X> fileWriter) {
CompositeItemWriter<X> composite = new CompositeItemWriter<>();
composite.setDelegates(List.of(jdbcWriter, fileWriter));
return composite;
}
CompositeItemWriter 가 자체로 ItemStream 구현. 내부적으로 모든 delegate 의 open·update·close 호출.
→ Composite 만 등록 하면 자동.
패턴별 등록 가이드
| 패턴 | ItemStream 등록 |
|---|---|
| 표준 ItemReader/Writer 단독 | 자동 |
| Composite (공식) | 자동 |
| Custom Wrapping (ItemStream 미구현) | 수동 .stream() |
| Custom Wrapping (ItemStream 위임 구현) | 자동 |
| Tasklet 내부에서 ItemReader 사용 | 수동 .stream() |
자주 헷갈리는 자리
.reader() 의 자동 등록 조건
조건 1: 객체가 ItemStream 구현
조건 2: .reader() 또는 .writer() 의 직접 인자
둘 다 만족 = 자동 등록.
Delegate 가 ItemStream 미구현
ItemReader<X> reader = new SomeCustomReader(); // ItemStream X
WrapperReader wrapper = new WrapperReader(reader);
new StepBuilder(...)
.reader(wrapper) // wrapper 만 등록 (reader 가 ItemStream 이라도)
// ...
delegate (= reader) 가 ItemStream 이라도 wrapper 를 통하면 자동 X.
Tasklet 안 ItemReader
Tasklet(단발 작업 단위 Step) 안에서 ItemReader 를 직접 꺼내 쓰는 경우.
@Component
public class MyTasklet implements Tasklet {
@Autowired
private FlatFileItemReader<X> reader;
@Override
public RepeatStatus execute(...) {
// reader 사용
}
}
Tasklet 안에서 직접 ItemReader 사용 = .reader() 호출 X = ItemStream 자동 등록 X.
해결:
new StepBuilder("myStep", repo)
.tasklet(myTasklet, tx)
.stream(reader) // ★ 수동
.build();
운영 권장 패턴
Pattern 1: 단순 ItemReader/Writer
new StepBuilder(...)
.reader(reader) // 자동
.writer(writer)
.build();
대부분 환경. 별도 작업 없음.
Pattern 2: Composite 사용
new StepBuilder(...)
.reader(reader)
.writer(compositeWriter) // 자동 (CompositeItemWriter 가 자체 ItemStream)
.build();
Pattern 3: Wrapping — 위임 구현 권장
public class CountingReader<T> implements ItemReader<T>, ItemStream {
private final ItemReader<T> delegate;
@Override
public T read() {
T item = delegate.read();
if (item != null) count++;
return item;
}
// ItemStream 메서드 — delegate 위임
@Override
public void open(ExecutionContext context) {
if (delegate instanceof ItemStream s) s.open(context);
count = context.getInt("count", 0);
}
@Override
public void update(ExecutionContext context) {
if (delegate instanceof ItemStream s) s.update(context);
context.putInt("count", count);
}
@Override
public void close() {
if (delegate instanceof ItemStream s) s.close();
}
}
위임 + 자체 state 추가. 자동 등록.
Pattern 4: Tasklet 안 Stream 사용
new StepBuilder("myStep", repo)
.tasklet(myTasklet, tx)
.stream(reader)
.stream(writer)
.build();
여러 stream 가능. 모두 .stream() 으로 명시.
ItemStream 의 ExecutionContext 활용
FlatFileItemReader 예제 (공식 구현체 내부)
public class FlatFileItemReader<T> implements ItemReader<T>, ItemStream {
private long currentItemCount = 0;
private long maxItemCount = Long.MAX_VALUE;
@Override
public void open(ExecutionContext context) {
// 이전 실행의 위치 복구
currentItemCount = context.getInt("currentItemCount", 0);
skipToLine(currentItemCount);
}
@Override
public void update(ExecutionContext context) {
context.putInt("currentItemCount", currentItemCount);
}
@Override
public T read() {
T item = doRead();
currentItemCount++;
return item;
}
}
open() 에서 복구하고, update() 에서 저장하고, read() 에서 카운트가 올라가요. 이 세 박자가 도는 사이클이 재시작 안전성 의 실체.
Custom Reader 의 ExecutionContext key
private static final String CURRENT_KEY = "myReader.currentPosition";
@Override
public void open(ExecutionContext context) {
if (context.containsKey(CURRENT_KEY)) {
this.position = context.getLong(CURRENT_KEY);
}
}
@Override
public void update(ExecutionContext context) {
context.putLong(CURRENT_KEY, this.position);
}
Key 이름이 unique 해야 해요. 여러 ItemStream 이 같은 key 를 쓰면 충돌이 나니까, class prefix 를 붙이는 쪽을 권장.
ItemStreamSupport — 추상 base class
public class MyReader extends ItemStreamSupport implements ItemReader<X> {
@Override
public X read() { ... }
@Override
public void update(ExecutionContext context) {
super.update(context); // executionContextName 자동 prefix
context.put("position", position);
}
@Override
public void open(ExecutionContext context) {
super.open(context);
position = (Long) context.get("position");
}
}
@Bean
public MyReader myReader() {
MyReader reader = new MyReader();
reader.setName("customers"); // executionContextName
return reader;
}
ItemStreamSupport 는 executionContextName prefix 자동. 같은 class 의 여러 instance 라도 다른 prefix 로 분리돼요.
ExecutionContext key 가 자동으로 "customers.position" 같이 붙어요.
자주 만나는 사고
사고 1: 재시작 시 처음부터 다시
원인 — ItemStream 미구현 또는 등록 누락.
해결 — .stream() 명시 + ItemStream 구현 확인.
사고 2: ExecutionContext key 충돌
원인 — 두 Reader 가 같은 key (예: "position") 사용.
해결 — ItemStreamSupport.setName() 또는 class-prefix key.
사고 3: Delegate 의 update 누락
원인 — Wrapping Reader 가 update() 위임 안 함.
해결 — delegate.update(context) 호출 또는 .stream(delegate).
사고 4: Composite 의 stream 누락
원인 — CompositeItemWriter 가 delegate stream 자동 위임 하는데 delegate 가 ItemStream 미구현.
해결 — delegate 도 ItemStream 구현 또는 수동 .stream().
한계·실무 함정
1. ExecutionContext 의 직렬화
putLong·putString·putInt 는 안전한데, put(key, obj) 로 복잡 객체를 넣으면 직렬화에서 깨질 위험이 있어요. 단순 type 으로 가는 쪽을 권장.
2. ExecutionContext 크기
너무 큰 metadata 는 DB BLOB 부담으로 돌아와요. 핵심 position 만 담는다는 감각.
3. close 호출 보장
close() 는 Step 종료 시 자동으로 불려요. 다만 예외가 중간에 터지면 호출이 누락될 수도 있어서, 리소스는 try-with-resources 나 finally 로 받쳐주는 게 안전해요.
4. open 의 비용
매 Step 시작마다 open 이 호출되니까, 큰 파일 open·DB connection 같은 비싼 작업이 여기 들어가면 시작 시간이 그만큼 늘어요. 측정 후 최적화.
5. Custom ItemStream 구현 시 thread-safety
37편 Multi-threaded Step 환경에서는 ItemStream 의 update·read 가 동시에 호출돼요. thread-safe 보장 필요.
6. ExecutionContext 의 transaction
update(context) 가 chunk transaction 안에서 호출되니까, chunk rollback 이 일어나면 ExecutionContext 변경도 같이 rollback. 자연스러운 동작.
시험 직전 한 번 더 — ItemStream 등록 함정 압축 노트
- ItemStream 3 메서드 =
open(시작) ·update(매 chunk) ·close(종료) - 자동 등록 조건 = (1) 객체가 ItemStream 구현 (2)
.reader()·.writer()직접 인자 - 대부분 공식 ItemReader/Writer = ItemStream 자동 구현
- 수동 등록 =
.stream(itemStream)Builder 메서드 - 자주 자리 — Custom Wrapping (delegate 가 ItemStream 인데 wrapper 가 ItemStream X) · Tasklet 안 Reader 사용
- CompositeItemWriter/Reader = 자체 ItemStream 구현, delegate 도 ItemStream 이어야 안전
- Wrapping 권장 패턴 = ItemStream 위임 구현 (
if (delegate instanceof ItemStream s) s.update(...)) ItemStreamSupport= base class,setName()으로 executionContextName prefix 자동- ExecutionContext key 충돌 방지
- ExecutionContext 활용 —
open()복구 ·update()저장 ·read()카운트 - 단순 type (
putLong·putString·putInt) 권장, 복잡 객체 = 직렬화 위험 - 사고 자주 — 재시작 처음부터 (ItemStream 누락) · key 충돌 · delegate update 누락 · composite stream 누락
- 함정 — ExecutionContext 크기 (BLOB 부담)
- 함정 — close 호출 보장 (try-with-resources)
- 함정 — open 비용 (큰 파일·DB conn)
- 함정 — Multi-threaded 의 thread-safety
- 함정 — chunk rollback = ExecutionContext rollback (자연스러운)
공식 문서: Registering ItemStream with a Step 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 12편 — Chunk Step 설정 + Commit Interval 튜닝
- 13편 — Step Restart + 부모 Step 상속
- 14편 — Skip Logic (부분 실패 무시 · SkipPolicy · SkipListener)
- 15편 — Retry Logic (일시적 실패 자동 재시도)
- 16편 — Transaction Attributes (Isolation · Propagation · Timeout)
다음 글: