Spring Batch 입문 24편. ItemStream 인터페이스 3 메서드 (open · update · close) 의 정확한 호출 시점, ExecutionContext 활용, ItemStreamSupport 의 name prefix 메커니즘, 재시작 안전성 직접 구현 패턴, transactional commit 시점과의 결합까지 풀어쓴 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 24편이에요. 22편 ItemReader · 23편 ItemWriter 의 짝꿍 인터페이스 — ItemStream(자원·상태·재시작을 표준화한 인터페이스). 17편에서 Step 등록 시점에 잠깐 봤다면, 이번 글은 인터페이스 자체의 의미를 깊게 본다.
왜 ItemStream 이 필요한가
ItemReader 와 ItemWriter 는 공통 고민을 안고 있다. 파일 open·DB connection·queue 같은 자원을 열고 닫아야 하고, 어디까지 read·write 했는지 상태를 영속화해야 하고, 재시작 시 이전 실행의 위치를 이어받아야 한다.
ItemReader 의 read() 만으로도, ItemWriter 의 write() 만으로도 풀리지 않는다. ItemStream 인터페이스가 이 세 고민의 표준 답이다.
ItemStream — 단 3개의 메서드
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
각 메서드 책임이 깔끔하게 나뉜다.
| 메서드 | 호출 시점 | 책임 |
|---|---|---|
open |
Step 시작 시 1회 | 이전 ExecutionContext 복구 + 자원 열기 |
update |
매 chunk commit 직전 | 현재 상태 저장 (다음 chunk 시작 위치) |
close |
Step 종료 시 1회 | 자원 닫기 |
open() — 진입의 의미
@Override
public void open(ExecutionContext context) throws ItemStreamException {
// 1. 이전 실행의 ExecutionContext 복구
if (context.containsKey("currentPosition")) {
this.position = context.getLong("currentPosition");
} else {
this.position = 0;
}
// 2. 자원 열기 (이전 위치까지 skip)
this.file = new BufferedReader(new FileReader(filename));
for (long i = 0; i < position; i++) {
file.readLine();
}
}
호출 보증은 세 가지다. Step 시작 직후 첫 chunk(한 묶음 단위) 시작 전에 1회 호출되고, 재시작이면 이전 JobExecution(직전 실행 단위) 이 남긴 ExecutionContext(실행 상태를 담는 저장소) 가 그대로 인자로 전달되며, 처음 실행이면 빈 ExecutionContext 가 들어온다.
덕분에 같은 코드 한 벌이 처음 실행과 재시작을 똑같이 다룬다. containsKey 한 줄로 분기가 끝난다.
함정 — open 안 작업의 비용
매 Step 시작마다 open 이 호출된다. 큰 파일을 reopen 하거나 DB connection 을 새로 만들면 비용이 빠르게 누적된다. 재사용 가능한 connection pool, memory-mapped file, lazy open 같은 최적화를 권장한다.
update() — 상태 영속화
@Override
public void update(ExecutionContext context) throws ItemStreamException {
context.putLong("currentPosition", this.position);
}
호출 보증은 한 줄로 정리된다 — 매 chunk commit 직전.
read · process · write → write 끝남
→ ItemStream.update(context) ← 여기
→ TransactionManager.commit()
→ DB 의 ExecutionContext 도 함께 commit
chunk 가 성공적으로 commit 되면 ExecutionContext 도 함께 영속화되고, rollback 시에는 ExecutionContext 변경도 함께 rollback 된다. 이 2-phase 협력이 재시작 안전성의 핵심이다.
왜 update 가 commit 전인가
만약 commit 후 update 라면:
commit → ExecutionContext update 직전 JVM 죽음
재시작 시 → DB 에는 commit 된 상태, ExecutionContext 에는 이전 위치
→ 같은 chunk 다시 처리 → 중복
commit 전 update 를 같은 transaction 안에 묶어야 DB row 와 ExecutionContext 가 원자적 commit 으로 일관성을 보장한다.
update 의 ExecutionContext 변경 = transaction 안
그래서 update() 안에서 put 한 값은 commit 이 실패하면 자연스럽게 함께 rollback 된다.
close() — 자원 정리
@Override
public void close() throws ItemStreamException {
if (file != null) {
try {
file.close();
} catch (IOException e) {
throw new ItemStreamException(e);
}
}
}
호출 보증은 단순하다 — Step 종료 시 1회.
함정 — 예외 발생 시 close 호출 보장 X
Step 이 비정상 종료되면 (예: JVM 강제 종료) close() 가 호출되지 않을 수 있다. try-with-resources 또는 finalizer 패턴으로 안전 장치를 더해 둔다.
public class MyReader implements ItemReader<X>, ItemStream {
private BufferedReader file;
@Override
public void close() {
// close 정상 경로
closeInternal();
}
@Override
protected void finalize() {
// 안전 그물 — JVM 죽음 대비
closeInternal();
}
private void closeInternal() {
if (file != null) {
try { file.close(); } catch (IOException ignored) {}
file = null;
}
}
}
(finalize 는 deprecated — java.lang.ref.Cleaner 권장.)
ExecutionContext — Key·Value 저장소
ExecutionContext 는 Map
안전한 type
context.putString("key", "value");
context.putLong("key", 100L);
context.putInt("key", 42);
context.putDouble("key", 3.14);
기본 type 은 DB 직렬화가 안전하다.
위험한 type
context.put("key", new ComplexObject()); // 직렬화 필요
복잡 객체는 직렬화 실패 위험이 따라온다 (Jackson serializer 또는 Java serializer). 권장하지 않는다.
크기 함정
ExecutionContext 는 DB 의 BLOB(대용량 이진 데이터) column 에 저장된다. 너무 크면 DB 부담이 늘고, 직렬화·역직렬화 시간이 길어지고, chunk 마다 update 가 호출되면서 write amplification 이 발생한다.
그래서 꼭 필요한 position 정보 만 담아야 한다. 큰 데이터는 별도 테이블로 뺀다.
Step 간 데이터 전달 — JobExecutionContext
// Step 1 에서 저장
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
stepExecution.getJobExecution()
.getExecutionContext()
.put("processedDate", LocalDate.now().toString());
return ExitStatus.COMPLETED;
}
// Step 2 에서 read (21편 Late Binding 활용)
@Bean
@StepScope
public Tasklet step2Tasklet(
@Value("#{jobExecutionContext['processedDate']}") String date) {
return (contribution, chunkContext) -> {
// date 사용
return RepeatStatus.FINISHED;
};
}
JobExecution 의 ExecutionContext 는 Step 간 공유되고, StepExecution 의 ExecutionContext 는 해당 Step 안에서만 살아 있다.
ItemStreamSupport — 추상 base class
public abstract class ItemStreamSupport implements ItemStream {
private String name = ClassUtils.getShortName(getClass());
private boolean saveState = true;
public void setName(String name) {
this.name = name;
}
protected String getExecutionContextKey(String key) {
return name + "." + key;
}
@Override
public void open(ExecutionContext executionContext) {
// 빈 기본 구현
}
@Override
public void update(ExecutionContext executionContext) {
// 빈 기본 구현
}
@Override
public void close() {
// 빈 기본 구현
}
}
ItemStreamSupport 의 핵심 가치는 하나다 — name prefix 자동.
사용 예제
public class CountingReader<T> extends ItemStreamSupport implements ItemReader<T> {
private long count = 0;
private final ItemReader<T> delegate;
public CountingReader(ItemReader<T> delegate) {
this.delegate = delegate;
setName("counter"); // ★ name 지정
}
@Override
public void open(ExecutionContext context) {
super.open(context);
if (context.containsKey(getExecutionContextKey("count"))) {
count = context.getLong(getExecutionContextKey("count"));
}
}
@Override
public void update(ExecutionContext context) {
super.update(context);
context.putLong(getExecutionContextKey("count"), count);
}
@Override
public T read() throws Exception {
T item = delegate.read();
if (item != null) count++;
return item;
}
}
ExecutionContext key 는 "counter.count" 가 된다. 다른 reader 가 같은 prefix 에 다른 name 만 붙여도 key 충돌이 사라진다.
name 의 충돌 방지
@Bean
public CountingReader<Customer> customerCounter(ItemReader<Customer> delegate) {
CountingReader<Customer> reader = new CountingReader<>(delegate);
reader.setName("customerCounter");
return reader;
}
@Bean
public CountingReader<Order> orderCounter(ItemReader<Order> delegate) {
CountingReader<Order> reader = new CountingReader<>(delegate);
reader.setName("orderCounter");
return reader;
}
같은 class 의 두 인스턴스가 서로 다른 name 으로 분리된다. ExecutionContext key 도 "customerCounter.count" 와 "orderCounter.count" 로 깔끔하게 갈린다.
saveState 옵션
reader.setSaveState(false);
saveState = false 를 주면 update() 가 ExecutionContext 에 저장하지 않는다. 재시작 안전성을 포기하는 대신 성능이 올라간다.
언제 쓰는가 — short-lived Step, 절대 재시작이 일어나지 않는 Step, multi-threaded 환경에서 각 thread 가 독립 상태를 가져야 할 때.
17편의 등록과 차이
17편은 Step 에 등록하는 방법 — 자동 등록과 수동 .stream() 의 차이를 다뤘다. 24편은 인터페이스 자체 — open·update·close 의 세 메서드 contract 를 본다.
| 17편 | 24편 |
|---|---|
| StepBuilder 에 등록 | 인터페이스 구현 |
| 자동 등록 조건 | 메서드 호출 시점·책임 |
| Delegate 함정 | ExecutionContext 활용 |
두 편을 함께 보면 그림이 완성된다.
자주 만나는 사고
사고 1: 재시작 시 처음부터 처리
원인 — open() 안에서 ExecutionContext 의 위치 복구가 빠졌다.
해결 — containsKey 로 검사하고 위치를 복원한다.
사고 2: ExecutionContext key 충돌
원인 — 두 ItemStream 이 같은 key (예: "position") 를 쓴다.
해결 — ItemStreamSupport.setName() 과 getExecutionContextKey() 로 prefix 를 붙인다. 또는 class-prefix key 를 쓴다.
사고 3: update() 누락
원인 — Custom ItemStream 의 update() 가 빈 구현으로 남아 있다.
해결 — update() 안에서 현재 위치를 확실히 저장한다.
사고 4: open() 비용 폭발
원인 — 매 Step 시작마다 큰 파일을 read 한 뒤 위치까지 skip 한다.
해결 — seek 가능한 stream (RandomAccessFile) 이나 Resource skip 최적화를 쓴다.
사고 5: close() 미호출 → 자원 누수
원인 — Step 이 비정상 종료됐다.
해결 — try-with-resources, finalizer, Cleaner 같은 안전 그물을 둔다.
사고 6: 복잡 객체 직렬화 실패
원인 — context.put("key", new Domain()) 의 직렬화가 실패한다.
해결 — String·Long 같은 단순 type 만 담고, 복잡 객체는 별도 테이블로 뺀다.
사고 7: Multi-threaded 환경 thread-safety
원인 — open·update·close 가 multi-threaded Step 에서 동시에 호출된다.
해결 — thread-safe 하게 구현하거나, partitioning 으로 각 thread 에 독립 ItemStream 을 준다.
운영 권장 패턴
Pattern 1: ItemStreamSupport 상속
public class CustomReader<T> extends ItemStreamSupport implements ItemReader<T> {
// open·update·close 구현
}
@Bean
@StepScope
public CustomReader<Foo> customReader() {
CustomReader<Foo> reader = new CustomReader<>();
reader.setName("fooReader");
return reader;
}
가장 권장하는 형태다. name prefix 가 자동으로 붙는다.
Pattern 2: 위임형 wrapping
public class FilteringReader<T> implements ItemReader<T>, ItemStream {
private final ItemReader<T> delegate;
@Override
public void open(ExecutionContext context) {
if (delegate instanceof ItemStream s) s.open(context);
}
@Override
public void update(ExecutionContext context) {
if (delegate instanceof ItemStream s) s.update(context);
}
@Override
public void close() {
if (delegate instanceof ItemStream s) s.close();
}
}
22편 Delegate Pattern 의 위임 구현 패턴이다. wrapper 가 delegate ItemStream 으로 호출을 흘려준다.
Pattern 3: Multi-resource 추적
public class MultiFileReader<T> extends ItemStreamSupport implements ItemReader<T> {
private final List<Resource> resources;
private int currentResourceIndex = 0;
private long currentPosition = 0;
@Override
public void open(ExecutionContext context) {
super.open(context);
currentResourceIndex = context.getInt(
getExecutionContextKey("resourceIndex"), 0);
currentPosition = context.getLong(
getExecutionContextKey("position"), 0);
// 해당 resource 의 해당 위치 열기
}
@Override
public void update(ExecutionContext context) {
super.update(context);
context.putInt(getExecutionContextKey("resourceIndex"), currentResourceIndex);
context.putLong(getExecutionContextKey("position"), currentPosition);
}
}
33편 multi-file input 의 핵심 메커니즘이다.
Pattern 4: Tasklet 안 ItemReader 사용
@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx,
FlatFileItemReader<X> reader, Tasklet myTasklet) {
return new StepBuilder("myStep", repo)
.tasklet(myTasklet, tx)
.stream(reader) // ★ 수동 등록 필수
.build();
}
Tasklet 안에서 ItemReader 를 쓰면 .stream() 으로 명시 등록이 필수다 (17편 등록 함정).
시험 직전 한 번 더 — ItemStream 함정 압축 노트
- ItemStream 인터페이스 3 메서드 =
open·update·close open(context)= Step 시작 1회, 이전 ExecutionContext 복구 + 자원 열기update(context)= 매 chunk commit 직전, 현재 상태 저장close()= Step 종료 1회, 자원 정리- commit 직전 update + 같은 transaction = ExecutionContext + DB 원자적 commit
- rollback 시 ExecutionContext 변경도 rollback (자연스러운)
- ExecutionContext = Map
유사 - 안전 type =
putString·putLong·putInt·putDouble - 복잡 객체 = 직렬화 실패 위험
- DB BLOB column 저장 — 큰 데이터 X, 핵심 position 만
- JobExecutionContext = Step 간 공유 (JobExecution 단위)
- StepExecutionContext = 해당 Step 만 (StepExecution 단위)
- Step 간 데이터 전달 = afterStep 에서 JobExecutionContext 저장 + 21편 Late Binding
ItemStreamSupport= 추상 base classsetName()+getExecutionContextKey()= ExecutionContext key prefix 자동- 같은 class 의 두 인스턴스 = 다른 name 으로 분리
saveState = false= update() 가 ExecutionContext 저장 안 함 (재시작 안전성 포기 + 성능)- 함정 —
open비용 (큰 파일 reopen) → seek 가능한 stream - 함정 —
close호출 보장 X (비정상 종료) → try-with-resources·Cleaner - 함정 — key 충돌 → name prefix
- 함정 — update 누락 → 재시작 시 처음부터
- 함정 — multi-threaded 동시 호출 → thread-safe 구현 또는 partitioning
- 2-phase 협력 — write commit + ExecutionContext commit = 재시작 안전성 핵심
- 17편 (등록) + 24편 (인터페이스) = 그림 완성
공식 문서: ItemStream 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 19편 — TaskletStep (단발 작업의 정석)
- 20편 — Flow Control · Decision · Split · 조건 분기
- 21편 — Late Binding · @StepScope · @JobScope · SpEL
- 22편 — ItemReader 인터페이스 종합 · Delegate Pattern
- 23편 — ItemWriter 인터페이스 종합
다음 글: