Spring Batch 입문 22편. ItemReader 인터페이스 단 1개 메서드 read() 의 contract — forward-only · null 종료 · transactional rollback 의미, Flat File · XML · DB 소스별 구현, Delegate Pattern 의 Step 수동 등록 함정까지 정리한 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 22편이에요. 21편 까지 Part 4 (Tasklet · Flow Control) 를 끝냈다면, 이번 22편부터 Part 5 — ItemReader · ItemWriter 기본으로 들어갑니다. 첫 주제는 ItemReader 의 인터페이스 contract 와 Delegate Pattern.
ItemReader — 단 1개의 메서드
public interface ItemReader<T> {
T read() throws Exception;
}
진짜 이게 끝이에요. read() 한 번 호출이 곧 Item 1개 반환, 더 이상 없으면 null 반환.
이 단순함이 Spring Batch 의 가장 강력한 추상화입니다. Flat File·XML·DB·JMS (Java Message Service, 자바 메시지 큐 표준)·Kafka·MongoDB 모두 같은 인터페이스 하나로 추상화돼요.
read() 의 5가지 Contract
Contract 1: null = 종료 신호
Item item;
while ((item = reader.read()) != null) {
// 처리
}
null 반환은 Chunk 처리 종료 신호고, Spring Batch 는 null 을 받자마자 Step 을 종료합니다. 그래서 컬렉션이 비어도 예외가 나지 않아요. 처음 read 가 null 이면 자연스러운 0건 처리 종료가 됩니다.
Contract 2: Forward-Only
ItemReader 구현체는 앞으로만 진행해요. 같은 위치로 되돌아가지 않습니다.
Item item1 = reader.read(); // 1번 row
Item item2 = reader.read(); // 2번 row (1번으로 안 돌아감)
Item item3 = reader.read(); // 3번 row
Reader 는 iterator (반복자) 와 비슷한 동작이고, 위치는 내부 상태로 들고 있어요.
Contract 3: Transactional Rollback 시 동일 item 재read 가능
예외 케이스가 있는데, transactional 소스 (JMS queue·Kafka 등) 의 경우 rollback 시 같은 item 이 다시 read 될 수 있어요.
read(item1) → process → write FAIL → rollback
↓
read 다시 호출 → item1 다시 반환 (queue 가 rollback 됐으니)
이게 재시도 가능성의 핵심이고, Spring Batch 의 retry·skip 로직이 이 contract 에 의존합니다. 반면 비-transactional 소스 (File·DB cursor) 는 rollback 후 같은 item 이 다시 온다는 보장이 없어서, ItemStream 과 ExecutionContext (재시작 위치 저장소) 같은 다른 메커니즘으로 위치를 복구해요.
Contract 4: 매핑은 권장이지만 강제 X
T read() throws Exception;
T 는 자유예요. domain object (Trade·Customer·Order) 가 권장이긴 하지만, raw String 이나 Map·JSONObject 도 그대로 받을 수 있어요. 대체로 ItemProcessor 가 변환을 맡아서, Reader 는 원본 형태로 읽기만 하고 Processor 가 변환·필터·검증을 담당합니다.
Contract 5: 예외 = 처리 실패
read() 가 예외를 throw 하면 해당 read 실패로 간주해요. 14편 skip 로직이나 38편 retry 가 이걸 처리하고, 예외 throw 가 곧 Step 종료를 직접 의미하지는 않습니다.
ItemReader 의 세 부류
부류 1: Flat File Reader
CSV·TSV·고정 길이 파일을 line 단위로 읽고, FieldSet (한 줄을 컬럼 배열로 파싱한 객체) 으로 컬럼을 분해해요.
@Bean
public FlatFileItemReader<Customer> customerReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerReader")
.resource(new FileSystemResource("customers.csv"))
.delimited()
.names("id", "name", "email")
.targetType(Customer.class)
.build();
}
대표는 FlatFileItemReader 고 자세한 내용은 28편·29편에서 다룹니다.
부류 2: XML Reader
XML 문서를 element 단위로 읽고, JAXB (Java XML 바인딩 표준) 나 Jackson XML 로 객체에 매핑해요.
@Bean
public StaxEventItemReader<Customer> xmlReader() {
return new StaxEventItemReaderBuilder<Customer>()
.name("xmlReader")
.resource(new FileSystemResource("customers.xml"))
.addFragmentRootElements("customer")
.unmarshaller(jaxbMarshaller())
.build();
}
대표는 StaxEventItemReader 고 자세한 내용은 31편에서.
부류 3: Database Reader
JDBC·JPA·MongoDB 같은 DB 에서 row 단위로 읽고, RowMapper (row 한 줄을 자바 객체로 변환하는 콜백) 로 매핑해요.
@Bean
public JdbcCursorItemReader<Order> orderReader(DataSource ds) {
return new JdbcCursorItemReaderBuilder<Order>()
.name("orderReader")
.dataSource(ds)
.sql("SELECT id, customer_id, amount FROM orders")
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
}
대표는 JdbcCursorItemReader 고 자세한 내용은 34편. 전체 구현체 카탈로그는 25편에 정리돼 있어요.
Cursor vs Paging — DB Reader 의 두 전략
| 항목 | Cursor-based | Paging-based |
|---|---|---|
| 메커니즘 | DB cursor 한 번 열어 stream | LIMIT/OFFSET 으로 page 단위 query |
| Connection | 1회 유지 (긴 connection) | Page 마다 close |
| 메모리 | row 1개씩 (저점유) | page 크기만큼 (중간) |
| 재시작 안전성 | 위치 추적 가능 | page 번호로 복구 |
| 대용량 | ✓ | ✓ |
| Long-running 안전 | connection timeout 위험 | ✓ |
JdbcCursorItemReader (cursor) 와 JdbcPagingItemReader (paging) 가 각각의 대표 구현체고, connection lifetime 과 DB 부하를 보고 선택해요.
Forward-Only 의 함정 — Restart 시 위치 복구
ItemReader 가 forward-only 라면 재시작 시 처음부터 다시 도는 게 아닐까요? 그렇지 않아요. ItemStream 인터페이스 (24편) 와 결합하면 ExecutionContext 에 위치를 저장해서, 재시작 시 마지막 위치부터 재개합니다. 표준 ItemReader 구현체 대부분이 ItemStream 을 같이 구현하고 있어서, 17편의 ItemStream 등록 패턴이 자동으로 적용돼요.
Delegate Pattern — 합성 reader/writer
Spring Batch 에서 자주 등장하는 패턴이고, CompositeItemWriter 가 대표예요.
public class CompositeItemWriter<T> implements ItemWriter<T> {
private List<ItemWriter<? super T>> delegates;
@Override
public void write(Chunk<? extends T> items) throws Exception {
for (ItemWriter<? super T> writer : delegates) {
writer.write(items);
}
}
}
여러 Writer 를 묶어 한 번에 호출하는 구조라, 같은 데이터를 DB 와 파일에 동시에 저장할 때 이런 합성을 씁니다.
Delegate Pattern 의 Step 등록 함정
여기서 시험 함정이 하나 있어요.
@Bean
public Step step1(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("step1", repo)
.<String, String>chunk(2, tx)
.reader(fooReader())
.writer(compositeItemWriter()) // wrapper
// .stream(barWriter()) 누락 시...
.build();
}
@Bean
public CompositeItemWriter compositeItemWriter() {
CompositeItemWriter writer = new CompositeItemWriter();
writer.setDelegate(barWriter()); // 내부 delegate
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter(); // ItemStream·StepListener 구현
}
문제는 Step 이 직접 인식하는 게 compositeItemWriter 뿐이라는 점이에요. 내부 delegate 인 barWriter 가 ItemStream 이나 StepListener (Step 라이프사이클 콜백 인터페이스) 를 구현했어도 Step 이 그 사실을 모르고, 그래서 open·update·close 가 호출되지 않아 재시작 안전성이 깨집니다.
해결 — .stream() 으로 명시 등록
return new StepBuilder("step1", repo)
.<String, String>chunk(2, tx)
.reader(fooReader())
.writer(compositeItemWriter())
.stream(barWriter()) // ★ delegate 명시 등록
.build();
.stream(delegate) 를 호출하면 Step 이 delegate 의 ItemStream 메서드를 호출해 줍니다. 17편 ItemStream 등록의 핵심이 바로 이거예요.
일반 규칙 — Step 자동 등록 조건
A reader, writer, or processor that is directly wired into the Step gets registered automatically if it implements ItemStream or a StepListener interface. However, because the delegates are not known to the Step, they need to be injected as listeners or streams.
자동 등록 조건은 두 가지예요. .reader()·.writer()·.processor() 의 직접 인자여야 하고, ItemStream 이나 StepListener 를 구현해야 합니다. Delegate (nested) 는 수동 등록이 필수입니다.
Reader 직접 구현 예제
단순 list-based reader
public class ListItemReader<T> implements ItemReader<T> {
private final List<T> items;
private int index = 0;
public ListItemReader(List<T> items) {
this.items = items;
}
@Override
public T read() {
if (index < items.size()) {
return items.get(index++);
}
return null;
}
}
테스트나 prototype 용으로 적당해요. ItemStream 을 구현하지 않아서 재시작 안전성은 없습니다.
재시작 안전 reader
public class ResumableListReader<T> extends ItemStreamSupport
implements ItemReader<T> {
private final List<T> items;
private int index = 0;
private static final String INDEX_KEY = "list.reader.index";
public ResumableListReader(List<T> items) {
this.items = items;
setName("listReader");
}
@Override
public void open(ExecutionContext context) {
super.open(context);
if (context.containsKey(INDEX_KEY)) {
index = context.getInt(INDEX_KEY);
}
}
@Override
public void update(ExecutionContext context) {
super.update(context);
context.putInt(INDEX_KEY, index);
}
@Override
public T read() {
if (index < items.size()) {
return items.get(index++);
}
return null;
}
}
ItemStreamSupport 를 상속하고 setName() 을 부르면 ExecutionContext key prefix 가 자동으로 붙고 재시작도 안전해져요.
자주 만나는 사고
사고 1: 0건 처리 시 예외
원인은 read() 가 처음부터 null 인데 Reader 가 예외를 throw 하는 경우. 0건은 자연스러운 종료라, SQL 결과 0건이나 빈 파일은 null 을 반환하는 게 맞아요.
사고 2: Delegate 의 open·update·close 안 불림
원인은 .stream() 등록 누락이고 (Delegate Pattern 함정), 해결은 .stream(delegate) 명시 등록.
사고 3: 재시작 시 처음부터 다시
Reader 가 ItemStream 을 구현하지 않았거나 등록이 빠진 경우예요. ItemStreamSupport 상속 + .reader() 직접 등록 (자동) 또는 .stream() 명시로 해결합니다.
사고 4: Cursor connection timeout
JdbcCursorItemReader 의 connection 이 long-running 인데 DB 의 connection idle timeout 을 넘어버리면 발생해요. JdbcPagingItemReader 로 전환하거나 connection timeout 을 늘리는 두 갈래 해결책이 있습니다.
사고 5: 같은 item 두 번 처리
원인이 두 가지예요. 첫째, Transactional source 의 rollback 후 재read. 둘째, Step 재시작 시 ExecutionContext key 오류로 위치 복구가 실패한 경우. 해결은 idempotent processing (같은 입력으로 여러 번 실행해도 결과가 같은 처리) 으로 설계하는 거예요. process 와 write 가 같은 item 을 여러 번 처리해도 안전하도록.
사고 6: forward-only 위반 시도
read() 안에서 내부 list index 를 감소시키거나 이전 row 로 점프하는 케이스. Reader 의 forward-only contract 를 지키고, 재처리가 필요하면 별도 Step 으로 분리하세요.
ItemReader 의 thread-safety
대부분의 표준 ItemReader 는 thread-safe 하지 않아요.
FlatFileItemReader— line 위치가 mutable stateJdbcCursorItemReader— cursor 가 mutable stateStaxEventItemReader— XML parser state
37편 Multi-threaded Step 환경에서는 ItemReader 가 thread-safe 인지 반드시 확인해야 해요. 대안은 다음과 같아요.
SynchronizedItemReaderwrapper (Spring Batch 제공)JdbcPagingItemReader— page 단위 fetch 라 thread-safe- Partitioning (데이터를 분할해 thread 마다 별도 Reader 를 띄우는 방식) 으로 각 thread 마다 독립 Reader
운영 권장 패턴
Pattern 1: 표준 StepScope Reader
@Bean
@StepScope
public FlatFileItemReader<Customer> customerReader(
@Value("#{jobParameters['input.file']}") Resource resource) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerReader")
.resource(resource)
.delimited()
.names("id", "name", "email")
.targetType(Customer.class)
.build();
}
21편 Late Binding 과 표준 reader 의 조합.
Pattern 2: SynchronizedItemReader (multi-threaded)
@Bean
@StepScope
public SynchronizedItemReader<Customer> syncReader(
FlatFileItemReader<Customer> delegate) {
return new SynchronizedItemReader<>(delegate);
}
multi-threaded Step 에서 thread-safe 하지 않은 Reader 를 synchronized 로 감싸 쓰는 패턴.
Pattern 3: CompositeItemReader 패턴
여러 reader 를 순차로 read 하는 경우, Spring Batch 6 에서 CompositeItemReader 가 추가됐어요.
@Bean
public CompositeItemReader<Customer> compositeReader(
FlatFileItemReader<Customer> file1Reader,
FlatFileItemReader<Customer> file2Reader) {
return new CompositeItemReader<>(List.of(file1Reader, file2Reader));
}
여러 파일을 하나의 reader 처럼 다루는 33편 multi-file input 의 한 갈래.
시험 직전 한 번 더 — ItemReader 함정 압축 노트
- ItemReader 인터페이스 =
T read()메서드 1개 - read 반환 = item 또는
null(= 종료) null= 종료 신호 — 컬렉션이 비어도 예외 X- Forward-only contract — 같은 위치로 안 돌아감
- 예외 = 처음 read 가 null 자연스러운 0건 종료
- Transactional source (JMS·Kafka) = rollback 시 같은 item 재 read 가능
- 비-transactional source (File·DB cursor) = rollback ≠ 재 read 보장 X
- T 타입 자유 (도메인 객체 권장이지만 String·Map·JSON 도 OK)
- 세 부류 = Flat File · XML · Database
- 대표 —
FlatFileItemReader·StaxEventItemReader·JdbcCursorItemReader·JdbcPagingItemReader - Cursor vs Paging — connection 1회 vs page 단위, 메모리 vs timeout 안전성
- Delegate Pattern —
CompositeItemWriter대표 - 함정 — Delegate 가 ItemStream·StepListener 구현해도 Step 자동 등록 X
- 자동 등록 조건 = (1)
.reader()·.writer()직접 인자 (2) ItemStream·StepListener 구현 - 해결 —
.stream(delegate)또는.listener(delegate)명시 등록 - 함정 — 0건 결과 시 예외 throw 금지 —
null반환 - 함정 — forward-only 위반 (index 감소·이전 row 점프)
- 함정 — Cursor connection idle timeout
- 대부분 표준 ItemReader = NOT thread-safe
- multi-threaded →
SynchronizedItemReaderwrapper 또는JdbcPagingItemReader(thread-safe) 또는 partitioning - 함정 — 재시작 시 처음부터 다시 (ItemStream 누락 또는 등록 누락)
- 패턴 — 표준 StepScope reader + Late Binding
- 패턴 —
CompositeItemReader(Spring Batch 6 신규) 로 여러 source 결합
공식 문서: ItemReader · Delegate Pattern and Registering with the Step 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 17편 — ItemStream 등록 (재시작 안전성의 핵심)
- 18편 — Step 라이프사이클 Listener 종합
- 19편 — TaskletStep (단발 작업의 정석)
- 20편 — Flow Control · Decision · Split · 조건 분기
- 21편 — Late Binding · @StepScope · @JobScope · SpEL
다음 글: