Spring Batch 입문 17편 — ItemStream 등록 (재시작 안전성의 핵심)

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 17편. ItemStream 인터페이스 등록 — open·update·close 의 자리, ItemReader/Writer 자동 등록 vs 수동 등록, Delegate·Wrapping 패턴에서의 함정, 재시작 안전성 보장까지 풀어쓴 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 17편 — ItemStream 등록 (재시작 안전성의 핵심)

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 17편이에요. 16편 까지 transaction 제어를 잡았다면, 이번 17편은 재시작 안전성의 핵심ItemStream 등록.

ItemStream 의 의미

public interface ItemStream {
    void open(ExecutionContext context) throws ItemStreamException;
    void update(ExecutionContext context) throws ItemStreamException;
    void close() throws ItemStreamException;
}

세 메서드가 한 묶음으로 동작해요. open() 은 Step 시작 시 1회 호출돼서 이전 ExecutionContext(잡 실행 상태 저장소)를 복구하고, update() 는 매 chunk(한 트랜잭션 묶음 단위) 종료마다 현재 상태를 ExecutionContext 에 저장하고, close() 는 Step 종료 시 1회 호출돼서 리소스를 정리해요.

13편 Step Restart 에서 본 current position 추적 의 정체가 바로 이거예요. 이 인터페이스가 재시작 안전성 을 구현하는 진짜 골격.

자동 등록 — 가장 흔한 경우

@Bean
public Step myStep(JobRepository repo, PlatformTransactionManager tx) {
    return new StepBuilder("myStep", repo)
        .<X, Y>chunk(100, tx)
        .reader(flatFileItemReader())     // FlatFileItemReader implements ItemStream
        .writer(flatFileItemWriter())     // FlatFileItemWriter implements ItemStream
        .build();
}

Reader·Writer 가 ItemStream 을 구현하고, StepBuilderreader()·writer() 인자로 직접 들어가면 자동 등록이 끝나요. JobRepository(잡 메타데이터 저장소) 와 PlatformTransactionManager(스프링 트랜잭션 추상화) 는 Step 마다 받는 표준 인자.

대부분 공식 ItemReader/Writer:

  • FlatFileItemReader/Writer
  • JdbcCursorItemReader·JdbcBatchItemWriter
  • JpaItemWriter·JpaCursorItemReader
  • KafkaItemReader/Writer
  • MongoItemReader/Writer

대부분 환경 = 별도 등록 코드 없이 재시작 안전.

수동 등록 — Delegate · Wrapping 패턴

시험에서 자주 발을 거는 자리가 여기예요. Reader·Writer 가 다른 Reader 를 내부 wrap 하면 자동 등록이 안 돼요.

Wrapping 예제

public class FilteringReader<T> implements ItemReader<T> {
    private final ItemReader<T> delegate;
    private final Predicate<T> filter;

    public FilteringReader(ItemReader<T> delegate, Predicate<T> filter) {
        this.delegate = delegate;
        this.filter = filter;
    }

    @Override
    public T read() throws Exception {
        T item;
        while ((item = delegate.read()) != null) {
            if (filter.test(item)) return item;
        }
        return null;
    }
}

FilteringReaderFlatFileItemReader 를 wrap. delegateItemStream자동 등록 X.

해결 — 수동 등록

@Bean
public Step wrappingStep(JobRepository repo, PlatformTransactionManager tx,
                         FlatFileItemReader<X> delegate) {
    FilteringReader<X> filtering = new FilteringReader<>(delegate, filter);

    return new StepBuilder("wrappingStep", repo)
        .<X, Y>chunk(100, tx)
        .reader(filtering)
        .stream(delegate)                     // ★ 수동 등록
        .writer(...)
        .build();
}

.stream(itemStream) 이 delegate 의 ItemStream 을 명시적으로 등록해 줘요. 이러면 open·update·close 가 모두 호출돼요.

Wrapping Reader 가 직접 ItemStream 구현

public class FilteringReader<T> implements ItemReader<T>, ItemStream {
    private final ItemReader<T> delegate;

    @Override
    public T read() { ... }

    @Override
    public void open(ExecutionContext context) {
        if (delegate instanceof ItemStream s) {
            s.open(context);
        }
    }

    @Override
    public void update(ExecutionContext context) {
        if (delegate instanceof ItemStream s) {
            s.update(context);
        }
    }

    @Override
    public void close() {
        if (delegate instanceof ItemStream s) {
            s.close();
        }
    }
}

Wrapping Reader 가 delegate 의 ItemStream 메서드 위임. .stream() 호출 불필요.

CompositeItemStream — 여러 stream

여러 Reader·Writer 를 조합 하는 Composite 패턴:

@Bean
public CompositeItemWriter<X> compositeWriter(
        JdbcBatchItemWriter<X> jdbcWriter,
        FlatFileItemWriter<X> fileWriter) {
    CompositeItemWriter<X> composite = new CompositeItemWriter<>();
    composite.setDelegates(List.of(jdbcWriter, fileWriter));
    return composite;
}

CompositeItemWriter자체로 ItemStream 구현. 내부적으로 모든 delegate 의 open·update·close 호출.

Composite 만 등록 하면 자동.

패턴별 등록 가이드

패턴 ItemStream 등록
표준 ItemReader/Writer 단독 자동
Composite (공식) 자동
Custom Wrapping (ItemStream 미구현) 수동 .stream()
Custom Wrapping (ItemStream 위임 구현) 자동
Tasklet 내부에서 ItemReader 사용 수동 .stream()

자주 헷갈리는 자리

.reader() 의 자동 등록 조건

조건 1: 객체가 ItemStream 구현
조건 2: .reader() 또는 .writer() 의 직접 인자

둘 다 만족 = 자동 등록.

Delegate 가 ItemStream 미구현

ItemReader<X> reader = new SomeCustomReader();    // ItemStream X
WrapperReader wrapper = new WrapperReader(reader);

new StepBuilder(...)
    .reader(wrapper)         // wrapper 만 등록 (reader 가 ItemStream 이라도)
    // ...

delegate (= reader) 가 ItemStream 이라도 wrapper 를 통하면 자동 X.

Tasklet 안 ItemReader

Tasklet(단발 작업 단위 Step) 안에서 ItemReader 를 직접 꺼내 쓰는 경우.

@Component
public class MyTasklet implements Tasklet {
    @Autowired
    private FlatFileItemReader<X> reader;

    @Override
    public RepeatStatus execute(...) {
        // reader 사용
    }
}

Tasklet 안에서 직접 ItemReader 사용 = .reader() 호출 X = ItemStream 자동 등록 X.

해결:

new StepBuilder("myStep", repo)
    .tasklet(myTasklet, tx)
    .stream(reader)             // ★ 수동
    .build();

운영 권장 패턴

Pattern 1: 단순 ItemReader/Writer

new StepBuilder(...)
    .reader(reader)             // 자동
    .writer(writer)
    .build();

대부분 환경. 별도 작업 없음.

Pattern 2: Composite 사용

new StepBuilder(...)
    .reader(reader)
    .writer(compositeWriter)    // 자동 (CompositeItemWriter 가 자체 ItemStream)
    .build();

Pattern 3: Wrapping — 위임 구현 권장

public class CountingReader<T> implements ItemReader<T>, ItemStream {
    private final ItemReader<T> delegate;

    @Override
    public T read() {
        T item = delegate.read();
        if (item != null) count++;
        return item;
    }

    // ItemStream 메서드 — delegate 위임
    @Override
    public void open(ExecutionContext context) {
        if (delegate instanceof ItemStream s) s.open(context);
        count = context.getInt("count", 0);
    }

    @Override
    public void update(ExecutionContext context) {
        if (delegate instanceof ItemStream s) s.update(context);
        context.putInt("count", count);
    }

    @Override
    public void close() {
        if (delegate instanceof ItemStream s) s.close();
    }
}

위임 + 자체 state 추가. 자동 등록.

Pattern 4: Tasklet 안 Stream 사용

new StepBuilder("myStep", repo)
    .tasklet(myTasklet, tx)
    .stream(reader)
    .stream(writer)
    .build();

여러 stream 가능. 모두 .stream() 으로 명시.

ItemStream 의 ExecutionContext 활용

FlatFileItemReader 예제 (공식 구현체 내부)

public class FlatFileItemReader<T> implements ItemReader<T>, ItemStream {
    private long currentItemCount = 0;
    private long maxItemCount = Long.MAX_VALUE;

    @Override
    public void open(ExecutionContext context) {
        // 이전 실행의 위치 복구
        currentItemCount = context.getInt("currentItemCount", 0);
        skipToLine(currentItemCount);
    }

    @Override
    public void update(ExecutionContext context) {
        context.putInt("currentItemCount", currentItemCount);
    }

    @Override
    public T read() {
        T item = doRead();
        currentItemCount++;
        return item;
    }
}

open() 에서 복구하고, update() 에서 저장하고, read() 에서 카운트가 올라가요. 이 세 박자가 도는 사이클이 재시작 안전성 의 실체.

Custom Reader 의 ExecutionContext key

private static final String CURRENT_KEY = "myReader.currentPosition";

@Override
public void open(ExecutionContext context) {
    if (context.containsKey(CURRENT_KEY)) {
        this.position = context.getLong(CURRENT_KEY);
    }
}

@Override
public void update(ExecutionContext context) {
    context.putLong(CURRENT_KEY, this.position);
}

Key 이름이 unique 해야 해요. 여러 ItemStream 이 같은 key 를 쓰면 충돌이 나니까, class prefix 를 붙이는 쪽을 권장.

ItemStreamSupport — 추상 base class

public class MyReader extends ItemStreamSupport implements ItemReader<X> {

    @Override
    public X read() { ... }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);              // executionContextName 자동 prefix
        context.put("position", position);
    }

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        position = (Long) context.get("position");
    }
}

@Bean
public MyReader myReader() {
    MyReader reader = new MyReader();
    reader.setName("customers");           // executionContextName
    return reader;
}

ItemStreamSupportexecutionContextName prefix 자동. 같은 class 의 여러 instance 라도 다른 prefix 로 분리돼요.

ExecutionContext key 가 자동으로 "customers.position" 같이 붙어요.

자주 만나는 사고

사고 1: 재시작 시 처음부터 다시

원인 — ItemStream 미구현 또는 등록 누락.

해결.stream() 명시 + ItemStream 구현 확인.

사고 2: ExecutionContext key 충돌

원인 — 두 Reader 가 같은 key (예: "position") 사용.

해결ItemStreamSupport.setName() 또는 class-prefix key.

사고 3: Delegate 의 update 누락

원인 — Wrapping Reader 가 update() 위임 안 함.

해결delegate.update(context) 호출 또는 .stream(delegate).

사고 4: Composite 의 stream 누락

원인CompositeItemWriterdelegate stream 자동 위임 하는데 delegate 가 ItemStream 미구현.

해결 — delegate 도 ItemStream 구현 또는 수동 .stream().

한계·실무 함정

1. ExecutionContext 의 직렬화

putLong·putString·putInt 는 안전한데, put(key, obj) 로 복잡 객체를 넣으면 직렬화에서 깨질 위험이 있어요. 단순 type 으로 가는 쪽을 권장.

2. ExecutionContext 크기

너무 큰 metadata 는 DB BLOB 부담으로 돌아와요. 핵심 position 만 담는다는 감각.

3. close 호출 보장

close() 는 Step 종료 시 자동으로 불려요. 다만 예외가 중간에 터지면 호출이 누락될 수도 있어서, 리소스는 try-with-resourcesfinally 로 받쳐주는 게 안전해요.

4. open 의 비용

매 Step 시작마다 open 이 호출되니까, 큰 파일 open·DB connection 같은 비싼 작업이 여기 들어가면 시작 시간이 그만큼 늘어요. 측정 후 최적화.

5. Custom ItemStream 구현 시 thread-safety

37편 Multi-threaded Step 환경에서는 ItemStream 의 update·read 가 동시에 호출돼요. thread-safe 보장 필요.

6. ExecutionContext 의 transaction

update(context) 가 chunk transaction 안에서 호출되니까, chunk rollback 이 일어나면 ExecutionContext 변경도 같이 rollback. 자연스러운 동작.

시험 직전 한 번 더 — ItemStream 등록 함정 압축 노트

  • ItemStream 3 메서드 = open (시작) · update (매 chunk) · close (종료)
  • 자동 등록 조건 = (1) 객체가 ItemStream 구현 (2) .reader()·.writer() 직접 인자
  • 대부분 공식 ItemReader/Writer = ItemStream 자동 구현
  • 수동 등록 = .stream(itemStream) Builder 메서드
  • 자주 자리 — Custom Wrapping (delegate 가 ItemStream 인데 wrapper 가 ItemStream X) · Tasklet 안 Reader 사용
  • CompositeItemWriter/Reader = 자체 ItemStream 구현, delegate 도 ItemStream 이어야 안전
  • Wrapping 권장 패턴 = ItemStream 위임 구현 (if (delegate instanceof ItemStream s) s.update(...))
  • ItemStreamSupport = base class, setName() 으로 executionContextName prefix 자동
  • ExecutionContext key 충돌 방지
  • ExecutionContext 활용 — open() 복구 · update() 저장 · read() 카운트
  • 단순 type (putLong·putString·putInt) 권장, 복잡 객체 = 직렬화 위험
  • 사고 자주 — 재시작 처음부터 (ItemStream 누락) · key 충돌 · delegate update 누락 · composite stream 누락
  • 함정 — ExecutionContext 크기 (BLOB 부담)
  • 함정 — close 호출 보장 (try-with-resources)
  • 함정 — open 비용 (큰 파일·DB conn)
  • 함정 — Multi-threaded 의 thread-safety
  • 함정 — chunk rollback = ExecutionContext rollback (자연스러운)

공식 문서: Registering ItemStream with a Step 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!