Spring Batch 입문 26편 — Custom Reader · Writer 직접 구현

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 26편. 표준 구현체로 안 될 때, 직접 ItemReader · ItemWriter 작성하는 길. Stateless / Stateful 분기, ItemStream 결합으로 restart 안전성 확보, ExecutionContext key 이름 unique 보장, TransactionAwareProxyFactory 패턴까지 정리한 학습 노트. Part 5 마무리.

📚 Spring Batch 입문에서 운영까지 · 26편 — Custom Reader · Writer 직접 구현

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 26편이에요. 25편 의 표준 구현체 카탈로그까지 봤다면, 이번 26편은 그래도 안 될 때직접 구현. Part 5 (ItemReader · ItemWriter 기본) 의 마지막.

Custom 구현이 필요한 case

표준 구현체가 지원 안 하는 데이터 소스 (예: 사내 메시지 큐, 비표준 protocol) 를 다룰 때, 복잡한 변환 로직이 Reader 단계부터 필요할 때, 표준 Reader 의 옵션으로 표현 안 되는 동작이 있을 때, 외부 API 의 페이지네이션·rate-limit 같은 도메인 특화 동작이 필요할 때, 메트릭·로깅·트레이싱 wrapping 을 프로젝트 표준으로 묶고 싶을 때 — 이런 자리에서 custom 이 등장해요.

대부분 케이스는 표준 구현체 + Late Binding (21편) 으로 해결. 진짜 custom 이 필요한 자리만 직접 작성.

기본 원칙 — Stateless 우선

A stateless reader does not need to worry about restartability, but a stateful one has to try to reconstitute its last known state on restart. For this reason, we recommend that you keep custom readers stateless if possible. — 공식 reference

Stateless = 내부에 위치 상태 없음. 매 read 가 외부 transactional source 의 next item.

→ 22편의 Transactional Source (Kafka·JMS·queue) 가 자연스럽게 stateless.

Stateful = current position·count 같은 내부 상태 보유.

→ 파일·DB cursor·collection iteration 처럼 위치 추적 필요.

Stateful = ItemStream 도 구현 필수.

가장 단순한 Custom Reader — List

public class CustomItemReader<T> implements ItemReader<T> {

    private final List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    @Override
    public T read() {
        if (!items.isEmpty()) {
            return items.remove(0);     // 상태 변경 — 첫 element 제거
        }
        return null;
    }
}

테스트:

List<String> items = new ArrayList<>(List.of("1", "2", "3"));
ItemReader<String> reader = new CustomItemReader<>(items);

assertEquals("1", reader.read());
assertEquals("2", reader.read());
assertEquals("3", reader.read());
assertNull(reader.read());            // 종료 신호

문제remove(0)원본 list 를 mutate = 재시작 시 list 가 비어 있음 = 처음부터 다시 X.

Restart 안전성 추가 — ItemStream 결합

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    private final List<T> items;
    private int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    @Override
    public T read() {
        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }
        return null;
    }

    @Override
    public void open(ExecutionContext context) {
        if (context.containsKey(CURRENT_INDEX)) {
            currentIndex = (int) context.getLong(CURRENT_INDEX);
        } else {
            currentIndex = 0;
        }
    }

    @Override
    public void update(ExecutionContext context) {
        context.putLong(CURRENT_INDEX, currentIndex);
    }

    @Override
    public void close() {
        // 자원 없음 — 빈 구현
    }
}

핵심 변화는 셋이에요. 먼저 remove(0)get(currentIndex++) 로 바꿔서 원본을 더 이상 mutate 하지 않게 만들었고, open() 에서 이전 currentIndex 를 복구해두고, update() 에서 현재 currentIndex 를 ExecutionContext 에 저장합니다.

테스트 — 재시작 시뮬레이션:

ExecutionContext context = new ExecutionContext();
((ItemStream) reader).open(context);
assertEquals("1", reader.read());
((ItemStream) reader).update(context);
// 여기서 시스템 죽음 시뮬레이션

// 새 Reader 인스턴스로 재시작
List<String> items = new ArrayList<>(List.of("1", "2", "3"));
reader = new CustomItemReader<>(items);

((ItemStream) reader).open(context);
assertEquals("2", reader.read());     // ★ "1" 이 아닌 "2" — 이어서

ExecutionContext key 이름 — Unique 보장

private static final String CURRENT_INDEX = "current.index";

여기 시험 함정이 하나 있어요.

같은 Step 안에 두 개의 같은 type Reader 가 있으면? (예: 두 파일을 따로 read)

@Bean public CustomItemReader<X> reader1() { ... }
@Bean public CustomItemReader<X> reader2() { ... }

둘 다 ExecutionContext 의 같은 key ("current.index") 사용 → 충돌.

해결 1: ItemStreamSupport 상속

24편의 패턴.

public class CustomItemReader<T> extends ItemStreamSupport
        implements ItemReader<T> {

    private final List<T> items;
    private int currentIndex = 0;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    @Override
    public T read() {
        return currentIndex < items.size() ? items.get(currentIndex++) : null;
    }

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        String key = getExecutionContextKey("current.index");
        if (context.containsKey(key)) {
            currentIndex = (int) context.getLong(key);
        }
    }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);
        context.putLong(getExecutionContextKey("current.index"), currentIndex);
    }
}
@Bean
public CustomItemReader<X> reader1() {
    CustomItemReader<X> reader = new CustomItemReader<>(items1);
    reader.setName("reader1");
    return reader;
}

@Bean
public CustomItemReader<X> reader2() {
    CustomItemReader<X> reader = new CustomItemReader<>(items2);
    reader.setName("reader2");
    return reader;
}

ExecutionContext key 가 "reader1.current.index"·"reader2.current.index" 로 분리.

해결 2: Class name prefix 수동

private static final String CURRENT_INDEX =
    CustomItemReader.class.getSimpleName() + ".current.index";

같은 class 하나만 쓸 때 OK. 인스턴스 두 개면 해결 1 필수.

외부 API Reader — 실전 예제

public class ExternalApiReader extends ItemStreamSupport
        implements ItemReader<Customer> {

    private final ApiClient client;
    private String nextPageToken;
    private Queue<Customer> buffer = new LinkedList<>();
    private static final String PAGE_TOKEN_KEY = "pageToken";

    public ExternalApiReader(ApiClient client) {
        this.client = client;
        setName("externalApiReader");
    }

    @Override
    public Customer read() throws Exception {
        if (buffer.isEmpty()) {
            fetchNextPage();
        }
        return buffer.poll();
    }

    private void fetchNextPage() throws Exception {
        if (nextPageToken == null && !buffer.isEmpty()) {
            return;
        }
        PageResponse<Customer> page = client.fetchCustomers(nextPageToken);
        buffer.addAll(page.items());
        nextPageToken = page.nextPageToken();
    }

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        String key = getExecutionContextKey(PAGE_TOKEN_KEY);
        if (context.containsKey(key)) {
            nextPageToken = context.getString(key);
        }
    }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);
        if (nextPageToken != null) {
            context.putString(getExecutionContextKey(PAGE_TOKEN_KEY), nextPageToken);
        }
    }
}

포인트는 셋. page 단위로 fetch 해서 버퍼링하고 read 는 1건씩 흘려보내요. 재시작 안전성은 nextPageToken 을 ExecutionContext 에 저장해서 챙기고, buffer 가 비고 token 도 null 이면 그게 read 의 끝입니다.

대부분 표준 ItemReader (예: JdbcCursorItemReader) 가 훨씬 정교한 restart 로직. 가능한 한 표준 reader 사용 + 정말 필요할 때만 custom.

Custom Writer — 단순 예제

public class CustomItemWriter<T> implements ItemWriter<T> {

    private List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    @Override
    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

핵심TransactionAwareProxyFactory.createTransactionalList().

TransactionAwareProxyFactory 의 의미

일반 ListaddAllSpring Batch transaction rollback 영향 받지 않음. 즉:

chunk 시작
addAll(items)        ← 메모리 list 에 들어감
write FAIL
rollback             ← list 는 그대로 — 일관성 깨짐

TransactionAwareProxyFactory 의 list = transaction 안에서만 가시화. commit 후에 실제 변경, rollback 시 변경 사라짐 — DB 와 동기.

→ 테스트·in-memory writer 에서 DB-like 일관성 흉내.

Custom Writer 의 Restart

In many realistic cases, custom ItemWriters also delegate to another writer that itself is restartable, or else it writes to a transactional resource and so does not need to be restartable, because it is stateless.

대부분 Custom Writer 는 restart 안전성 신경 안 써도 됨:

Writer 종류 restart 안전성
DB writer (transactional) DB transaction 자체가 보장
Queue writer (Kafka·JMS, transactional) source 가 보장
표준 writer delegate delegate 가 보장
Stateful (count·footer 필요) ItemStream 필수

Stateful Writer 예제 — count + footer

public class CountingFooterWriter<T> extends ItemStreamSupport
        implements ItemWriter<T> {

    private final ItemWriter<T> delegate;
    private long count = 0;
    private static final String COUNT_KEY = "totalCount";

    public CountingFooterWriter(ItemWriter<T> delegate) {
        this.delegate = delegate;
        setName("countingFooterWriter");
    }

    @Override
    public void write(Chunk<? extends T> items) throws Exception {
        delegate.write(items);
        count += items.size();
    }

    @Override
    public void open(ExecutionContext context) {
        super.open(context);
        if (delegate instanceof ItemStream s) s.open(context);

        String key = getExecutionContextKey(COUNT_KEY);
        count = context.getLong(key, 0);       // default 0
    }

    @Override
    public void update(ExecutionContext context) {
        super.update(context);
        if (delegate instanceof ItemStream s) s.update(context);
        context.putLong(getExecutionContextKey(COUNT_KEY), count);
    }

    @Override
    public void close() {
        if (delegate instanceof ItemStream s) s.close();
    }
}

포인트는 셋. count 가 stateful 상태 (총 처리 count) 이고, 이를 ExecutionContext 에 저장해서 재시작 시 복구하며, delegate 가 가진 ItemStream 도 함께 위임해줘요 (22편 Delegate Pattern).

Stateless vs Stateful 의 분기 결정

시그널 Stateless Stateful
데이터 소스 Kafka·JMS·DB cursor File · in-memory list · 외부 API page
위치 정보 외부 시스템이 보유 reader 내부에 보유
같은 input 으로 두 번 read 가능? ✓ (transactional rollback 시) X
ItemStream 필요? ✗ (대체로)
재시작 안전성 자동 수동 구현

가능한 한 stateless 설계. 도메인이 stateful 강제하면 ItemStream 구현 + name prefix 철저히.

단위 테스트 패턴

Reader 테스트

class CustomItemReaderTest {

    @Test
    void readsAllItemsThenNull() throws Exception {
        ItemReader<String> reader = new CustomItemReader<>(
            new ArrayList<>(List.of("a", "b")));

        assertEquals("a", reader.read());
        assertEquals("b", reader.read());
        assertNull(reader.read());
    }

    @Test
    void restartsFromSavedPosition() throws Exception {
        CustomItemReader<String> reader = new CustomItemReader<>(
            new ArrayList<>(List.of("a", "b", "c")));
        reader.setName("test");

        ExecutionContext context = new ExecutionContext();
        reader.open(context);
        reader.read();                       // "a"
        reader.update(context);

        // 새 인스턴스로 재시작
        CustomItemReader<String> restarted = new CustomItemReader<>(
            new ArrayList<>(List.of("a", "b", "c")));
        restarted.setName("test");
        restarted.open(context);

        assertEquals("b", restarted.read()); // ★ 이어서
    }
}

Writer 테스트

class CustomItemWriterTest {

    @Test
    void writesItemsToOutput() throws Exception {
        CustomItemWriter<String> writer = new CustomItemWriter<>();
        writer.write(new Chunk<>(List.of("a", "b")));

        assertEquals(List.of("a", "b"), writer.getOutput());
    }

    @Test
    void emptyChunkHandledGracefully() throws Exception {
        CustomItemWriter<String> writer = new CustomItemWriter<>();
        writer.write(new Chunk<>(List.of()));

        assertTrue(writer.getOutput().isEmpty());     // 예외 X
    }
}

통합 테스트 — Step 안 동작

@SpringBatchTest
@SpringJUnitConfig(classes = MyBatchConfig.class)
class CustomReaderIntegrationTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    void completesNormally() throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addLong("run.id", System.currentTimeMillis())
            .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(params);

        assertEquals(BatchStatus.COMPLETED, execution.getStatus());
    }
}

→ 40편 Testing 에서 자세히.

자주 만나는 사고

사고 1: 재시작 시 처음부터 다시

원인 1 — ItemStream 미구현. 원인 2 — Step 에 .stream() 등록 누락 (delegate 경우). 원인 3update() 호출 누락 또는 빈 구현.

해결 — ItemStream 구현 + name prefix + .stream() 명시.

사고 2: ExecutionContext key 충돌

원인 — 같은 type 두 인스턴스가 같은 key.

해결ItemStreamSupport.setName() 또는 class+instance prefix.

사고 3: List mutation 으로 재시작 시 list 비어 있음

원인remove(0) 같은 mutating 작업.

해결get(index++) 패턴 + ExecutionContext 에 index 저장.

사고 4: 외부 API rate-limit 안 지킴

원인 — Custom reader 가 읽을 때마다 API 호출.

해결page 버퍼링 + rate-limit + retry/backoff.

사고 5: Writer 안 외부 호출 실패 시 부분 commit

원인 — 외부 시스템 (이메일·API) = transaction rollback 안 됨.

해결 — idempotency key + 23편 idempotent design.

사고 6: Empty chunk 시 예외

원인 — Writer 안에서 items.get(0) 같은 직접 access.

해결if (items.isEmpty()) return; 가드 (23편).

사고 7: TransactionAwareProxyFactory 잘못 이해

원인 — production 에서 그냥 ArrayList 사용 후 rollback 일관성 문제.

해결production = DB·queue 등 실제 transactional resource. TransactionAwareProxyFactory테스트·in-memory 한정.

운영 권장 패턴

Pattern 1: 표준 reader + 메트릭 wrapping

public class MeteredReader<T> implements ItemReader<T>, ItemStream {
    private final ItemReader<T> delegate;
    private final MeterRegistry registry;

    @Override
    public T read() throws Exception {
        Timer.Sample sample = Timer.start(registry);
        try {
            T item = delegate.read();
            registry.counter("batch.read.count").increment();
            return item;
        } catch (Exception e) {
            registry.counter("batch.read.failure").increment();
            throw e;
        } finally {
            sample.stop(registry.timer("batch.read.duration"));
        }
    }

    @Override public void open(ExecutionContext c) {
        if (delegate instanceof ItemStream s) s.open(c);
    }
    @Override public void update(ExecutionContext c) {
        if (delegate instanceof ItemStream s) s.update(c);
    }
    @Override public void close() {
        if (delegate instanceof ItemStream s) s.close();
    }
}

표준 reader 를 delegate 로 wrapping + 메트릭만 추가. 45편 Observability 와 결합.

Pattern 2: 도메인 특화 reader

외부 시스템·protocol 의 도메인 특화 추상화 — 작성·테스트 코스트 정당화 필요.

Pattern 3: Composite reader (Spring Batch 6)

@Bean
public CompositeItemReader<Customer> compositeReader(
        FlatFileItemReader<Customer> file1Reader,
        FlatFileItemReader<Customer> file2Reader) {
    return new CompositeItemReader<>(List.of(file1Reader, file2Reader));
}

여러 reader 를 순차적으로 read. 33편 multi-file input 의 한 패턴.

시험 직전 한 번 더 — Custom Reader/Writer 함정 압축 노트

  • 기본 원칙 = stateless 우선, stateful 만 ItemStream 결합
  • Stateless = 외부 transactional source 가 위치 보유 (Kafka·JMS)
  • Stateful = reader 가 위치 보유 (file·list·외부 API page)
  • Stateful = 반드시 ItemStream 구현
  • read() 의 contract — item 또는 null (종료)
  • List mutation (remove(0)) = 재시작 시 list 비어 있음 → get(index++) 패턴
  • ExecutionContext key unique 보장 필수
  • 방법 1 — ItemStreamSupport 상속 + setName() + getExecutionContextKey()
  • 방법 2 — class+instance prefix 수동
  • 같은 class 두 인스턴스 = setName 다르게
  • 외부 API Reader = page 버퍼링 + nextPageToken ExecutionContext 저장
  • TransactionAwareProxyFactory.createTransactionalList() = transaction 안 가시화, rollback 시 변경 사라짐
  • production = DB·queue, 테스트·in-memory = TransactionAwareProxyFactory
  • Custom Writer = 대부분 restart 안전성 불필요 (delegate 또는 transactional resource 가 보장)
  • Stateful Writer (count·footer) = ItemStream 구현
  • Empty chunk graceful 처리 — if (items.isEmpty()) return;
  • Delegate ItemStream 위임 — if (delegate instanceof ItemStream s) s.update(c);
  • 함정 — ExecutionContext key 충돌 → name prefix
  • 함정 — List mutation 으로 재시작 list 비어 있음
  • 함정 — 외부 API rate-limit (page buffering + backoff)
  • 함정 — 외부 호출 부분 commit (idempotency key)
  • 함정 — TransactionAwareProxyFactory production 잘못 사용
  • 테스트 — Reader 의 restart 시뮬레이션 (open → read → update → 새 인스턴스 → open → 이어서)
  • 테스트 — Writer 의 empty chunk graceful
  • 통합 테스트 — @SpringBatchTest + JobLauncherTestUtils (40편)
  • 운영 패턴 — 메트릭 wrapping · 도메인 특화 · CompositeItemReader

공식 문서: Creating Custom ItemReaders and ItemWriters 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!