Spring Batch 입문 26편. 표준 구현체로 안 될 때, 직접 ItemReader · ItemWriter 작성하는 길. Stateless / Stateful 분기, ItemStream 결합으로 restart 안전성 확보, ExecutionContext key 이름 unique 보장, TransactionAwareProxyFactory 패턴까지 정리한 학습 노트. Part 5 마무리.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 26편이에요. 25편 의 표준 구현체 카탈로그까지 봤다면, 이번 26편은 그래도 안 될 때 — 직접 구현. Part 5 (ItemReader · ItemWriter 기본) 의 마지막.
Custom 구현이 필요한 case
표준 구현체가 지원 안 하는 데이터 소스 (예: 사내 메시지 큐, 비표준 protocol) 를 다룰 때, 복잡한 변환 로직이 Reader 단계부터 필요할 때, 표준 Reader 의 옵션으로 표현 안 되는 동작이 있을 때, 외부 API 의 페이지네이션·rate-limit 같은 도메인 특화 동작이 필요할 때, 메트릭·로깅·트레이싱 wrapping 을 프로젝트 표준으로 묶고 싶을 때 — 이런 자리에서 custom 이 등장해요.
대부분 케이스는 표준 구현체 + Late Binding (21편) 으로 해결. 진짜 custom 이 필요한 자리만 직접 작성.
기본 원칙 — Stateless 우선
A stateless reader does not need to worry about restartability, but a stateful one has to try to reconstitute its last known state on restart. For this reason, we recommend that you keep custom readers stateless if possible. — 공식 reference
Stateless = 내부에 위치 상태 없음. 매 read 가 외부 transactional source 의 next item.
→ 22편의 Transactional Source (Kafka·JMS·queue) 가 자연스럽게 stateless.
Stateful = current position·count 같은 내부 상태 보유.
→ 파일·DB cursor·collection iteration 처럼 위치 추적 필요.
→ Stateful = ItemStream 도 구현 필수.
가장 단순한 Custom Reader — List
public class CustomItemReader<T> implements ItemReader<T> {
private final List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
@Override
public T read() {
if (!items.isEmpty()) {
return items.remove(0); // 상태 변경 — 첫 element 제거
}
return null;
}
}
테스트:
List<String> items = new ArrayList<>(List.of("1", "2", "3"));
ItemReader<String> reader = new CustomItemReader<>(items);
assertEquals("1", reader.read());
assertEquals("2", reader.read());
assertEquals("3", reader.read());
assertNull(reader.read()); // 종료 신호
문제 — remove(0) 이 원본 list 를 mutate = 재시작 시 list 가 비어 있음 = 처음부터 다시 X.
Restart 안전성 추가 — ItemStream 결합
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
private final List<T> items;
private int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
@Override
public T read() {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
@Override
public void open(ExecutionContext context) {
if (context.containsKey(CURRENT_INDEX)) {
currentIndex = (int) context.getLong(CURRENT_INDEX);
} else {
currentIndex = 0;
}
}
@Override
public void update(ExecutionContext context) {
context.putLong(CURRENT_INDEX, currentIndex);
}
@Override
public void close() {
// 자원 없음 — 빈 구현
}
}
핵심 변화는 셋이에요. 먼저 remove(0) 을 get(currentIndex++) 로 바꿔서 원본을 더 이상 mutate 하지 않게 만들었고, open() 에서 이전 currentIndex 를 복구해두고, update() 에서 현재 currentIndex 를 ExecutionContext 에 저장합니다.
테스트 — 재시작 시뮬레이션:
ExecutionContext context = new ExecutionContext();
((ItemStream) reader).open(context);
assertEquals("1", reader.read());
((ItemStream) reader).update(context);
// 여기서 시스템 죽음 시뮬레이션
// 새 Reader 인스턴스로 재시작
List<String> items = new ArrayList<>(List.of("1", "2", "3"));
reader = new CustomItemReader<>(items);
((ItemStream) reader).open(context);
assertEquals("2", reader.read()); // ★ "1" 이 아닌 "2" — 이어서
ExecutionContext key 이름 — Unique 보장
private static final String CURRENT_INDEX = "current.index";
여기 시험 함정이 하나 있어요.
같은 Step 안에 두 개의 같은 type Reader 가 있으면? (예: 두 파일을 따로 read)
@Bean public CustomItemReader<X> reader1() { ... }
@Bean public CustomItemReader<X> reader2() { ... }
둘 다 ExecutionContext 의 같은 key ("current.index") 사용 → 충돌.
해결 1: ItemStreamSupport 상속
24편의 패턴.
public class CustomItemReader<T> extends ItemStreamSupport
implements ItemReader<T> {
private final List<T> items;
private int currentIndex = 0;
public CustomItemReader(List<T> items) {
this.items = items;
}
@Override
public T read() {
return currentIndex < items.size() ? items.get(currentIndex++) : null;
}
@Override
public void open(ExecutionContext context) {
super.open(context);
String key = getExecutionContextKey("current.index");
if (context.containsKey(key)) {
currentIndex = (int) context.getLong(key);
}
}
@Override
public void update(ExecutionContext context) {
super.update(context);
context.putLong(getExecutionContextKey("current.index"), currentIndex);
}
}
@Bean
public CustomItemReader<X> reader1() {
CustomItemReader<X> reader = new CustomItemReader<>(items1);
reader.setName("reader1");
return reader;
}
@Bean
public CustomItemReader<X> reader2() {
CustomItemReader<X> reader = new CustomItemReader<>(items2);
reader.setName("reader2");
return reader;
}
ExecutionContext key 가 "reader1.current.index"·"reader2.current.index" 로 분리.
해결 2: Class name prefix 수동
private static final String CURRENT_INDEX =
CustomItemReader.class.getSimpleName() + ".current.index";
같은 class 하나만 쓸 때 OK. 인스턴스 두 개면 해결 1 필수.
외부 API Reader — 실전 예제
public class ExternalApiReader extends ItemStreamSupport
implements ItemReader<Customer> {
private final ApiClient client;
private String nextPageToken;
private Queue<Customer> buffer = new LinkedList<>();
private static final String PAGE_TOKEN_KEY = "pageToken";
public ExternalApiReader(ApiClient client) {
this.client = client;
setName("externalApiReader");
}
@Override
public Customer read() throws Exception {
if (buffer.isEmpty()) {
fetchNextPage();
}
return buffer.poll();
}
private void fetchNextPage() throws Exception {
if (nextPageToken == null && !buffer.isEmpty()) {
return;
}
PageResponse<Customer> page = client.fetchCustomers(nextPageToken);
buffer.addAll(page.items());
nextPageToken = page.nextPageToken();
}
@Override
public void open(ExecutionContext context) {
super.open(context);
String key = getExecutionContextKey(PAGE_TOKEN_KEY);
if (context.containsKey(key)) {
nextPageToken = context.getString(key);
}
}
@Override
public void update(ExecutionContext context) {
super.update(context);
if (nextPageToken != null) {
context.putString(getExecutionContextKey(PAGE_TOKEN_KEY), nextPageToken);
}
}
}
포인트는 셋. page 단위로 fetch 해서 버퍼링하고 read 는 1건씩 흘려보내요. 재시작 안전성은 nextPageToken 을 ExecutionContext 에 저장해서 챙기고, buffer 가 비고 token 도 null 이면 그게 read 의 끝입니다.
대부분 표준 ItemReader (예: JdbcCursorItemReader) 가 훨씬 정교한 restart 로직. 가능한 한 표준 reader 사용 + 정말 필요할 때만 custom.
Custom Writer — 단순 예제
public class CustomItemWriter<T> implements ItemWriter<T> {
private List<T> output = TransactionAwareProxyFactory.createTransactionalList();
@Override
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
핵심 — TransactionAwareProxyFactory.createTransactionalList().
TransactionAwareProxyFactory 의 의미
일반 List 에 addAll 시 Spring Batch transaction rollback 영향 받지 않음. 즉:
chunk 시작
addAll(items) ← 메모리 list 에 들어감
write FAIL
rollback ← list 는 그대로 — 일관성 깨짐
TransactionAwareProxyFactory 의 list = transaction 안에서만 가시화. commit 후에 실제 변경, rollback 시 변경 사라짐 — DB 와 동기.
→ 테스트·in-memory writer 에서 DB-like 일관성 흉내.
Custom Writer 의 Restart
In many realistic cases, custom ItemWriters also delegate to another writer that itself is restartable, or else it writes to a transactional resource and so does not need to be restartable, because it is stateless.
대부분 Custom Writer 는 restart 안전성 신경 안 써도 됨:
| Writer 종류 | restart 안전성 |
|---|---|
| DB writer (transactional) | DB transaction 자체가 보장 |
| Queue writer (Kafka·JMS, transactional) | source 가 보장 |
| 표준 writer delegate | delegate 가 보장 |
| Stateful (count·footer 필요) | ItemStream 필수 |
Stateful Writer 예제 — count + footer
public class CountingFooterWriter<T> extends ItemStreamSupport
implements ItemWriter<T> {
private final ItemWriter<T> delegate;
private long count = 0;
private static final String COUNT_KEY = "totalCount";
public CountingFooterWriter(ItemWriter<T> delegate) {
this.delegate = delegate;
setName("countingFooterWriter");
}
@Override
public void write(Chunk<? extends T> items) throws Exception {
delegate.write(items);
count += items.size();
}
@Override
public void open(ExecutionContext context) {
super.open(context);
if (delegate instanceof ItemStream s) s.open(context);
String key = getExecutionContextKey(COUNT_KEY);
count = context.getLong(key, 0); // default 0
}
@Override
public void update(ExecutionContext context) {
super.update(context);
if (delegate instanceof ItemStream s) s.update(context);
context.putLong(getExecutionContextKey(COUNT_KEY), count);
}
@Override
public void close() {
if (delegate instanceof ItemStream s) s.close();
}
}
포인트는 셋. count 가 stateful 상태 (총 처리 count) 이고, 이를 ExecutionContext 에 저장해서 재시작 시 복구하며, delegate 가 가진 ItemStream 도 함께 위임해줘요 (22편 Delegate Pattern).
Stateless vs Stateful 의 분기 결정
| 시그널 | Stateless | Stateful |
|---|---|---|
| 데이터 소스 | Kafka·JMS·DB cursor | File · in-memory list · 외부 API page |
| 위치 정보 | 외부 시스템이 보유 | reader 내부에 보유 |
| 같은 input 으로 두 번 read 가능? | ✓ (transactional rollback 시) | X |
| ItemStream 필요? | ✗ (대체로) | ✓ |
| 재시작 안전성 | 자동 | 수동 구현 |
→ 가능한 한 stateless 설계. 도메인이 stateful 강제하면 ItemStream 구현 + name prefix 철저히.
단위 테스트 패턴
Reader 테스트
class CustomItemReaderTest {
@Test
void readsAllItemsThenNull() throws Exception {
ItemReader<String> reader = new CustomItemReader<>(
new ArrayList<>(List.of("a", "b")));
assertEquals("a", reader.read());
assertEquals("b", reader.read());
assertNull(reader.read());
}
@Test
void restartsFromSavedPosition() throws Exception {
CustomItemReader<String> reader = new CustomItemReader<>(
new ArrayList<>(List.of("a", "b", "c")));
reader.setName("test");
ExecutionContext context = new ExecutionContext();
reader.open(context);
reader.read(); // "a"
reader.update(context);
// 새 인스턴스로 재시작
CustomItemReader<String> restarted = new CustomItemReader<>(
new ArrayList<>(List.of("a", "b", "c")));
restarted.setName("test");
restarted.open(context);
assertEquals("b", restarted.read()); // ★ 이어서
}
}
Writer 테스트
class CustomItemWriterTest {
@Test
void writesItemsToOutput() throws Exception {
CustomItemWriter<String> writer = new CustomItemWriter<>();
writer.write(new Chunk<>(List.of("a", "b")));
assertEquals(List.of("a", "b"), writer.getOutput());
}
@Test
void emptyChunkHandledGracefully() throws Exception {
CustomItemWriter<String> writer = new CustomItemWriter<>();
writer.write(new Chunk<>(List.of()));
assertTrue(writer.getOutput().isEmpty()); // 예외 X
}
}
통합 테스트 — Step 안 동작
@SpringBatchTest
@SpringJUnitConfig(classes = MyBatchConfig.class)
class CustomReaderIntegrationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
void completesNormally() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(params);
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
}
}
→ 40편 Testing 에서 자세히.
자주 만나는 사고
사고 1: 재시작 시 처음부터 다시
원인 1 — ItemStream 미구현.
원인 2 — Step 에 .stream() 등록 누락 (delegate 경우).
원인 3 — update() 호출 누락 또는 빈 구현.
해결 — ItemStream 구현 + name prefix + .stream() 명시.
사고 2: ExecutionContext key 충돌
원인 — 같은 type 두 인스턴스가 같은 key.
해결 — ItemStreamSupport.setName() 또는 class+instance prefix.
사고 3: List mutation 으로 재시작 시 list 비어 있음
원인 — remove(0) 같은 mutating 작업.
해결 — get(index++) 패턴 + ExecutionContext 에 index 저장.
사고 4: 외부 API rate-limit 안 지킴
원인 — Custom reader 가 읽을 때마다 API 호출.
해결 — page 버퍼링 + rate-limit + retry/backoff.
사고 5: Writer 안 외부 호출 실패 시 부분 commit
원인 — 외부 시스템 (이메일·API) = transaction rollback 안 됨.
해결 — idempotency key + 23편 idempotent design.
사고 6: Empty chunk 시 예외
원인 — Writer 안에서 items.get(0) 같은 직접 access.
해결 — if (items.isEmpty()) return; 가드 (23편).
사고 7: TransactionAwareProxyFactory 잘못 이해
원인 — production 에서 그냥 ArrayList 사용 후 rollback 일관성 문제.
해결 — production = DB·queue 등 실제 transactional resource. TransactionAwareProxyFactory 는 테스트·in-memory 한정.
운영 권장 패턴
Pattern 1: 표준 reader + 메트릭 wrapping
public class MeteredReader<T> implements ItemReader<T>, ItemStream {
private final ItemReader<T> delegate;
private final MeterRegistry registry;
@Override
public T read() throws Exception {
Timer.Sample sample = Timer.start(registry);
try {
T item = delegate.read();
registry.counter("batch.read.count").increment();
return item;
} catch (Exception e) {
registry.counter("batch.read.failure").increment();
throw e;
} finally {
sample.stop(registry.timer("batch.read.duration"));
}
}
@Override public void open(ExecutionContext c) {
if (delegate instanceof ItemStream s) s.open(c);
}
@Override public void update(ExecutionContext c) {
if (delegate instanceof ItemStream s) s.update(c);
}
@Override public void close() {
if (delegate instanceof ItemStream s) s.close();
}
}
표준 reader 를 delegate 로 wrapping + 메트릭만 추가. 45편 Observability 와 결합.
Pattern 2: 도메인 특화 reader
외부 시스템·protocol 의 도메인 특화 추상화 — 작성·테스트 코스트 정당화 필요.
Pattern 3: Composite reader (Spring Batch 6)
@Bean
public CompositeItemReader<Customer> compositeReader(
FlatFileItemReader<Customer> file1Reader,
FlatFileItemReader<Customer> file2Reader) {
return new CompositeItemReader<>(List.of(file1Reader, file2Reader));
}
여러 reader 를 순차적으로 read. 33편 multi-file input 의 한 패턴.
시험 직전 한 번 더 — Custom Reader/Writer 함정 압축 노트
- 기본 원칙 = stateless 우선, stateful 만 ItemStream 결합
- Stateless = 외부 transactional source 가 위치 보유 (Kafka·JMS)
- Stateful = reader 가 위치 보유 (file·list·외부 API page)
- Stateful = 반드시 ItemStream 구현
- read() 의 contract — item 또는 null (종료)
- List mutation (
remove(0)) = 재시작 시 list 비어 있음 →get(index++)패턴 - ExecutionContext key unique 보장 필수
- 방법 1 —
ItemStreamSupport상속 +setName()+getExecutionContextKey() - 방법 2 — class+instance prefix 수동
- 같은 class 두 인스턴스 = setName 다르게
- 외부 API Reader = page 버퍼링 + nextPageToken ExecutionContext 저장
TransactionAwareProxyFactory.createTransactionalList()= transaction 안 가시화, rollback 시 변경 사라짐- production = DB·queue, 테스트·in-memory = TransactionAwareProxyFactory
- Custom Writer = 대부분 restart 안전성 불필요 (delegate 또는 transactional resource 가 보장)
- Stateful Writer (count·footer) = ItemStream 구현
- Empty chunk graceful 처리 —
if (items.isEmpty()) return; - Delegate ItemStream 위임 —
if (delegate instanceof ItemStream s) s.update(c); - 함정 — ExecutionContext key 충돌 → name prefix
- 함정 — List mutation 으로 재시작 list 비어 있음
- 함정 — 외부 API rate-limit (page buffering + backoff)
- 함정 — 외부 호출 부분 commit (idempotency key)
- 함정 — TransactionAwareProxyFactory production 잘못 사용
- 테스트 — Reader 의 restart 시뮬레이션 (open → read → update → 새 인스턴스 → open → 이어서)
- 테스트 — Writer 의 empty chunk graceful
- 통합 테스트 —
@SpringBatchTest+JobLauncherTestUtils(40편) - 운영 패턴 — 메트릭 wrapping · 도메인 특화 · CompositeItemReader
공식 문서: Creating Custom ItemReaders and ItemWriters 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 21편 — Late Binding · @StepScope · @JobScope · SpEL
- 22편 — ItemReader 인터페이스 종합 · Delegate Pattern
- 23편 — ItemWriter 인터페이스 종합
- 24편 — ItemStream 인터페이스 본격 풀이
- 25편 — Reader · Writer 구현체 카탈로그
다음 글: