Spring Batch 입문 24편 — ItemStream 인터페이스 본격 풀이

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 24편. ItemStream 인터페이스 3 메서드 (open · update · close) 의 정확한 호출 시점, ExecutionContext 활용, ItemStreamSupport 의 name prefix 메커니즘, 재시작 안전성 직접 구현 패턴, transactional commit 시점과의 결합까지 풀어쓴 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 24편 — ItemStream 인터페이스 본격 풀이

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 24편이에요. 22편 ItemReader · 23편 ItemWriter짝꿍 인터페이스ItemStream(자원·상태·재시작을 표준화한 인터페이스). 17편에서 Step 등록 시점에 잠깐 봤다면, 이번 글은 인터페이스 자체의 의미를 깊게 본다.

왜 ItemStream 이 필요한가

ItemReader 와 ItemWriter 는 공통 고민을 안고 있다. 파일 open·DB connection·queue 같은 자원을 열고 닫아야 하고, 어디까지 read·write 했는지 상태를 영속화해야 하고, 재시작 시 이전 실행의 위치를 이어받아야 한다.

ItemReaderread() 만으로도, ItemWriterwrite() 만으로도 풀리지 않는다. ItemStream 인터페이스가 이 세 고민의 표준 답이다.

ItemStream — 단 3개의 메서드

public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

각 메서드 책임이 깔끔하게 나뉜다.

메서드 호출 시점 책임
open Step 시작 시 1회 이전 ExecutionContext 복구 + 자원 열기
update 매 chunk commit 직전 현재 상태 저장 (다음 chunk 시작 위치)
close Step 종료 시 1회 자원 닫기

open() — 진입의 의미

@Override
public void open(ExecutionContext context) throws ItemStreamException {
    // 1. 이전 실행의 ExecutionContext 복구
    if (context.containsKey("currentPosition")) {
        this.position = context.getLong("currentPosition");
    } else {
        this.position = 0;
    }

    // 2. 자원 열기 (이전 위치까지 skip)
    this.file = new BufferedReader(new FileReader(filename));
    for (long i = 0; i < position; i++) {
        file.readLine();
    }
}

호출 보증은 세 가지다. Step 시작 직후 첫 chunk(한 묶음 단위) 시작 전에 1회 호출되고, 재시작이면 이전 JobExecution(직전 실행 단위) 이 남긴 ExecutionContext(실행 상태를 담는 저장소) 가 그대로 인자로 전달되며, 처음 실행이면 빈 ExecutionContext 가 들어온다.

덕분에 같은 코드 한 벌이 처음 실행과 재시작을 똑같이 다룬다. containsKey 한 줄로 분기가 끝난다.

함정 — open 안 작업의 비용

매 Step 시작마다 open 이 호출된다. 큰 파일을 reopen 하거나 DB connection 을 새로 만들면 비용이 빠르게 누적된다. 재사용 가능한 connection pool, memory-mapped file, lazy open 같은 최적화를 권장한다.

update() — 상태 영속화

@Override
public void update(ExecutionContext context) throws ItemStreamException {
    context.putLong("currentPosition", this.position);
}

호출 보증은 한 줄로 정리된다 — 매 chunk commit 직전.

read · process · write → write 끝남
  → ItemStream.update(context)        ← 여기
  → TransactionManager.commit()
  → DB 의 ExecutionContext 도 함께 commit

chunk 가 성공적으로 commit 되면 ExecutionContext 도 함께 영속화되고, rollback 시에는 ExecutionContext 변경도 함께 rollback 된다. 이 2-phase 협력이 재시작 안전성의 핵심이다.

왜 update 가 commit 전인가

만약 commit 후 update 라면:

commit → ExecutionContext update 직전 JVM 죽음
재시작 시 → DB 에는 commit 된 상태, ExecutionContext 에는 이전 위치
→ 같은 chunk 다시 처리 → 중복

commit 전 update 를 같은 transaction 안에 묶어야 DB row 와 ExecutionContext 가 원자적 commit 으로 일관성을 보장한다.

update 의 ExecutionContext 변경 = transaction 안

그래서 update() 안에서 put 한 값은 commit 이 실패하면 자연스럽게 함께 rollback 된다.

close() — 자원 정리

@Override
public void close() throws ItemStreamException {
    if (file != null) {
        try {
            file.close();
        } catch (IOException e) {
            throw new ItemStreamException(e);
        }
    }
}

호출 보증은 단순하다 — Step 종료 시 1회.

함정 — 예외 발생 시 close 호출 보장 X

Step 이 비정상 종료되면 (예: JVM 강제 종료) close() 가 호출되지 않을 수 있다. try-with-resources 또는 finalizer 패턴으로 안전 장치를 더해 둔다.

public class MyReader implements ItemReader<X>, ItemStream {
    private BufferedReader file;

    @Override
    public void close() {
        // close 정상 경로
        closeInternal();
    }

    @Override
    protected void finalize() {
        // 안전 그물 — JVM 죽음 대비
        closeInternal();
    }

    private void closeInternal() {
        if (file != null) {
            try { file.close(); } catch (IOException ignored) {}
            file = null;
        }
    }
}

(finalize 는 deprecated — java.lang.ref.Cleaner 권장.)

ExecutionContext — Key·Value 저장소

ExecutionContextMap 와 비슷한 저장소다. JobExecution 단위와 StepExecution(Step 단위 실행 정보) 단위로 두 개가 존재한다.

안전한 type

context.putString("key", "value");
context.putLong("key", 100L);
context.putInt("key", 42);
context.putDouble("key", 3.14);

기본 type 은 DB 직렬화가 안전하다.

위험한 type

context.put("key", new ComplexObject());      // 직렬화 필요

복잡 객체는 직렬화 실패 위험이 따라온다 (Jackson serializer 또는 Java serializer). 권장하지 않는다.

크기 함정

ExecutionContext 는 DB 의 BLOB(대용량 이진 데이터) column 에 저장된다. 너무 크면 DB 부담이 늘고, 직렬화·역직렬화 시간이 길어지고, chunk 마다 update 가 호출되면서 write amplification 이 발생한다.

그래서 꼭 필요한 position 정보 만 담아야 한다. 큰 데이터는 별도 테이블로 뺀다.

Step 간 데이터 전달 — JobExecutionContext

// Step 1 에서 저장
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
    stepExecution.getJobExecution()
        .getExecutionContext()
        .put("processedDate", LocalDate.now().toString());
    return ExitStatus.COMPLETED;
}

// Step 2 에서 read (21편 Late Binding 활용)
@Bean
@StepScope
public Tasklet step2Tasklet(
        @Value("#{jobExecutionContext['processedDate']}") String date) {
    return (contribution, chunkContext) -> {
        // date 사용
        return RepeatStatus.FINISHED;
    };
}

JobExecution 의 ExecutionContext 는 Step 간 공유되고, StepExecution 의 ExecutionContext 는 해당 Step 안에서만 살아 있다.

ItemStreamSupport — 추상 base class

public abstract class ItemStreamSupport implements ItemStream {

    private String name = ClassUtils.getShortName(getClass());
    private boolean saveState = true;

    public void setName(String name) {
        this.name = name;
    }

    protected String getExecutionContextKey(String key) {
        return name + "." + key;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        // 빈 기본 구현
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // 빈 기본 구현
    }

    @Override
    public void close() {
        // 빈 기본 구현
    }
}

ItemStreamSupport 의 핵심 가치는 하나다 — name prefix 자동.

사용 예제

public class CountingReader<T> extends ItemStreamSupport implements ItemReader<T> {

    private long count = 0;
    private final ItemReader<T> delegate;

    public CountingReader(ItemReader<T> delegate) {
        this.delegate = delegate;
        setName("counter");          // ★ name 지정
    }

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        if (context.containsKey(getExecutionContextKey("count"))) {
            count = context.getLong(getExecutionContextKey("count"));
        }
    }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);
        context.putLong(getExecutionContextKey("count"), count);
    }

    @Override
    public T read() throws Exception {
        T item = delegate.read();
        if (item != null) count++;
        return item;
    }
}

ExecutionContext key 는 "counter.count" 가 된다. 다른 reader 가 같은 prefix 에 다른 name 만 붙여도 key 충돌이 사라진다.

name 의 충돌 방지

@Bean
public CountingReader<Customer> customerCounter(ItemReader<Customer> delegate) {
    CountingReader<Customer> reader = new CountingReader<>(delegate);
    reader.setName("customerCounter");
    return reader;
}

@Bean
public CountingReader<Order> orderCounter(ItemReader<Order> delegate) {
    CountingReader<Order> reader = new CountingReader<>(delegate);
    reader.setName("orderCounter");
    return reader;
}

같은 class 의 두 인스턴스가 서로 다른 name 으로 분리된다. ExecutionContext key 도 "customerCounter.count""orderCounter.count" 로 깔끔하게 갈린다.

saveState 옵션

reader.setSaveState(false);

saveState = false 를 주면 update() 가 ExecutionContext 에 저장하지 않는다. 재시작 안전성을 포기하는 대신 성능이 올라간다.

언제 쓰는가 — short-lived Step, 절대 재시작이 일어나지 않는 Step, multi-threaded 환경에서 각 thread 가 독립 상태를 가져야 할 때.

17편의 등록과 차이

17편은 Step 에 등록하는 방법 — 자동 등록과 수동 .stream() 의 차이를 다뤘다. 24편은 인터페이스 자체 — open·update·close 의 세 메서드 contract 를 본다.

17편 24편
StepBuilder 에 등록 인터페이스 구현
자동 등록 조건 메서드 호출 시점·책임
Delegate 함정 ExecutionContext 활용

두 편을 함께 보면 그림이 완성된다.

자주 만나는 사고

사고 1: 재시작 시 처음부터 처리

원인open() 안에서 ExecutionContext 의 위치 복구가 빠졌다.

해결containsKey 로 검사하고 위치를 복원한다.

사고 2: ExecutionContext key 충돌

원인 — 두 ItemStream 이 같은 key (예: "position") 를 쓴다.

해결ItemStreamSupport.setName()getExecutionContextKey() 로 prefix 를 붙인다. 또는 class-prefix key 를 쓴다.

사고 3: update() 누락

원인 — Custom ItemStream 의 update() 가 빈 구현으로 남아 있다.

해결update() 안에서 현재 위치를 확실히 저장한다.

사고 4: open() 비용 폭발

원인 — 매 Step 시작마다 큰 파일을 read 한 뒤 위치까지 skip 한다.

해결 — seek 가능한 stream (RandomAccessFile) 이나 Resource skip 최적화를 쓴다.

사고 5: close() 미호출 → 자원 누수

원인 — Step 이 비정상 종료됐다.

해결 — try-with-resources, finalizer, Cleaner 같은 안전 그물을 둔다.

사고 6: 복잡 객체 직렬화 실패

원인context.put("key", new Domain()) 의 직렬화가 실패한다.

해결 — String·Long 같은 단순 type 만 담고, 복잡 객체는 별도 테이블로 뺀다.

사고 7: Multi-threaded 환경 thread-safety

원인 — open·update·close 가 multi-threaded Step 에서 동시에 호출된다.

해결 — thread-safe 하게 구현하거나, partitioning 으로 각 thread 에 독립 ItemStream 을 준다.

운영 권장 패턴

Pattern 1: ItemStreamSupport 상속

public class CustomReader<T> extends ItemStreamSupport implements ItemReader<T> {
    // open·update·close 구현
}

@Bean
@StepScope
public CustomReader<Foo> customReader() {
    CustomReader<Foo> reader = new CustomReader<>();
    reader.setName("fooReader");
    return reader;
}

가장 권장하는 형태다. name prefix 가 자동으로 붙는다.

Pattern 2: 위임형 wrapping

public class FilteringReader<T> implements ItemReader<T>, ItemStream {
    private final ItemReader<T> delegate;

    @Override
    public void open(ExecutionContext context) {
        if (delegate instanceof ItemStream s) s.open(context);
    }

    @Override
    public void update(ExecutionContext context) {
        if (delegate instanceof ItemStream s) s.update(context);
    }

    @Override
    public void close() {
        if (delegate instanceof ItemStream s) s.close();
    }
}

22편 Delegate Pattern 의 위임 구현 패턴이다. wrapper 가 delegate ItemStream 으로 호출을 흘려준다.

Pattern 3: Multi-resource 추적

public class MultiFileReader<T> extends ItemStreamSupport implements ItemReader<T> {

    private final List<Resource> resources;
    private int currentResourceIndex = 0;
    private long currentPosition = 0;

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        currentResourceIndex = context.getInt(
            getExecutionContextKey("resourceIndex"), 0);
        currentPosition = context.getLong(
            getExecutionContextKey("position"), 0);
        // 해당 resource 의 해당 위치 열기
    }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);
        context.putInt(getExecutionContextKey("resourceIndex"), currentResourceIndex);
        context.putLong(getExecutionContextKey("position"), currentPosition);
    }
}

33편 multi-file input 의 핵심 메커니즘이다.

Pattern 4: Tasklet 안 ItemReader 사용

@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx,
                   FlatFileItemReader<X> reader, Tasklet myTasklet) {
    return new StepBuilder("myStep", repo)
        .tasklet(myTasklet, tx)
        .stream(reader)             // ★ 수동 등록 필수
        .build();
}

Tasklet 안에서 ItemReader 를 쓰면 .stream() 으로 명시 등록이 필수다 (17편 등록 함정).

시험 직전 한 번 더 — ItemStream 함정 압축 노트

  • ItemStream 인터페이스 3 메서드 = open · update · close
  • open(context) = Step 시작 1회, 이전 ExecutionContext 복구 + 자원 열기
  • update(context) = 매 chunk commit 직전, 현재 상태 저장
  • close() = Step 종료 1회, 자원 정리
  • commit 직전 update + 같은 transaction = ExecutionContext + DB 원자적 commit
  • rollback 시 ExecutionContext 변경도 rollback (자연스러운)
  • ExecutionContext = Map 유사
  • 안전 type = putString·putLong·putInt·putDouble
  • 복잡 객체 = 직렬화 실패 위험
  • DB BLOB column 저장 — 큰 데이터 X, 핵심 position 만
  • JobExecutionContext = Step 간 공유 (JobExecution 단위)
  • StepExecutionContext = 해당 Step 만 (StepExecution 단위)
  • Step 간 데이터 전달 = afterStep 에서 JobExecutionContext 저장 + 21편 Late Binding
  • ItemStreamSupport = 추상 base class
  • setName() + getExecutionContextKey() = ExecutionContext key prefix 자동
  • 같은 class 의 두 인스턴스 = 다른 name 으로 분리
  • saveState = false = update() 가 ExecutionContext 저장 안 함 (재시작 안전성 포기 + 성능)
  • 함정 — open 비용 (큰 파일 reopen) → seek 가능한 stream
  • 함정 — close 호출 보장 X (비정상 종료) → try-with-resources·Cleaner
  • 함정 — key 충돌 → name prefix
  • 함정 — update 누락 → 재시작 시 처음부터
  • 함정 — multi-threaded 동시 호출 → thread-safe 구현 또는 partitioning
  • 2-phase 협력 — write commit + ExecutionContext commit = 재시작 안전성 핵심
  • 17편 (등록) + 24편 (인터페이스) = 그림 완성

공식 문서: ItemStream 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!