Spring Batch 입문 36편 — Reusing Services · ItemReaderAdapter · Process Indicator

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 36편. 기존 Service · DAO 메서드를 batch 인터페이스로 wrapping 하는 ItemReaderAdapter · ItemWriterAdapter · PropertyExtractingDelegatingItemWriter — `targetObject` / `targetMethod` 의 contract, null 반환 = 종료 규칙. Process Indicator 패턴 — DB flag column 으로 재실행 안전성, `saveState = false` 와의 결합, ExecutionContext 누적 회피까지 정리한 학습 노트. Part 7 마무리.

📚 Spring Batch 입문에서 운영까지 · 36편 — Reusing Services · ItemReaderAdapter · Process Indicator

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 36편이에요. 35편 의 ItemProcessor 까지 read–process–write 3축을 모두 봤다면, 이번 36편에서는 그 위에서 자주 등장하는 두 패턴, 곧 기존 Service 재사용과 Process Indicator(처리 표시 컬럼 패턴) 를 다뤄요. Part 7 (ItemProcessor · 재사용) 의 마무리예요.

왜 기존 Service 재사용이 흔한가

배치 시스템은 online 시스템 또는 다른 application 스타일함께 동작하는 경우가 흔함. 이미 DAO·Service 가 있는데, batch 안에서도 똑같은 로직 사용. — 공식 reference

운영 환경에서 흔히 마주치는 상황은 이래요. 이미 online API 가 사용하는 CustomerService.findActive() 가 있고, batch 도 active customer 만 처리하면 돼요. 이럴 때 새 Reader 를 만들지 않고 기존 Service 를 그대로 재사용하는 길이 가장 깔끔해요. 그래서 Service 메서드를 ItemReader 로 wrapping 하게 돼요.

ItemReaderAdapter — 기존 메서드 → ItemReader

ItemReaderAdapter 는 임의의 객체 메서드를 ItemReader 처럼 보이게 감싸주는 어댑터예요.

@Bean
public ItemReaderAdapter<Foo> fooReader(FooService fooService) {
    ItemReaderAdapter<Foo> reader = new ItemReaderAdapter<>();
    reader.setTargetObject(fooService);
    reader.setTargetMethod("generateFoo");
    return reader;
}

@Bean
public FooService fooService() {
    return new FooService();
}

작동은 단순해요. fooService.generateFoo() 가 매 read() 호출마다 한 번씩 실행돼요.

Target Method 의 contract — null = 종료

public class FooService {
    public Foo generateFoo() {
        // ...
        if (allDone()) return null;        // ★ 종료 신호
        return next;
    }
}

The contract of the targetMethod must be the same as the contract for read: When exhausted, it returns null. — 공식 reference

여기서 중요한 건 null 반환이 ItemReader 종료 신호라는 점이에요. 빈 Optional 이나 -1 같은 다른 신호로는 Spring Batch 가 종료를 인식하지 못해서 무한 루프에 빠지거나 잘못된 시점에 끝나버려요. 그래서 기존 Service 가 null 을 반환하지 않으면 직접 wrap 해주거나, 어댑터 자체가 부적합해요.

ItemWriterAdapter — 기존 메서드 → ItemWriter

ItemWriterAdapter 는 ItemReaderAdapter 의 쓰기 버전이에요.

@Bean
public ItemWriterAdapter<Foo> fooWriter(FooService fooService) {
    ItemWriterAdapter<Foo> writer = new ItemWriterAdapter<>();
    writer.setTargetObject(fooService);
    writer.setTargetMethod("processFoo");
    return writer;
}

작동은 chunk 의 각 item 마다 fooService.processFoo(item) 가 한 번씩 호출되는 식이에요.

Target Method signature

public class FooService {
    public void processFoo(Foo foo) {
        // 처리 로직
    }
}

void 든 return 값이 있든 결과는 무시돼요. 다만 item 을 하나씩 개별 호출한다는 게 핵심이에요. chunk 단위로 묶어서 한 번에 처리하는 게 아니라 N 번을 따로따로 부르거든요.

여기서 성능 함정이 생겨요. chunk 100 이면 processFoo() 가 100번, DB 호출도 100회 일어나서 batch 의 성능 이점이 사라져요. 그래서 대량 처리에는 ItemWriterAdapter 가 부적합해요. bulk 메서드가 필요하면 PropertyExtractingDelegatingItemWriter 를 쓰거나 Custom Writer 로 가야 해요.

PropertyExtractingDelegatingItemWriter

PropertyExtractingDelegatingItemWriter 는 item 의 property 를 자동 추출해서 delegate 메서드의 인자로 넘겨주는 writer 예요.

public class FooService {
    public void saveFoo(Long id, String name) {       // 인자 2개
        // ...
    }
}

@Bean
public PropertyExtractingDelegatingItemWriter<Foo> writer(FooService fooService) {
    PropertyExtractingDelegatingItemWriter<Foo> writer =
        new PropertyExtractingDelegatingItemWriter<>();
    writer.setTargetObject(fooService);
    writer.setTargetMethod("saveFoo");
    writer.setFieldsUsedAsTargetMethodArguments(new String[]{"id", "name"});
    return writer;
}

Foo.idFoo.name 을 자동 추출해서 saveFoo(id, name) 으로 넘겨요. getter 가 자동으로 매칭돼요.

Adapter vs Custom Wrapper 선택

시나리오 권장
기존 메서드가 Spring Batch contract 와 거의 동일 Adapter
추가 로직 필요 (로깅·검증·변환) Custom Wrapper
대량 batch 처리 (chunk 단위) Custom Writer (Adapter 는 1건씩)
메서드 signature 가 복잡 PropertyExtractingDelegatingItemWriter

대부분은 작은 batch 에 기존 Service 가 정확히 맞아 떨어지면 Adapter 로 충분해요. 그 외에는 Custom Reader/Writer (26편) 로 가는 게 안전해요.

Process Indicator 패턴 — 재실행 안전성

문제 상황은 이래요. DB Reader 가 대량으로 처리 중인데 일부가 실패하면, 재실행했을 때 어디까지 처리했는지 추적해야 해요.

ExecutionContext (Job·Step 의 상태 저장 맵) 와 saveState (24편) 가 표준 답이긴 한데, 한계가 있어요. 상태가 Spring Batch 의 내부 DB 테이블에 들어가서 운영자가 수동 SQL 로 확인하기가 어려워요. ExecutionContext 자체가 복잡하니 디버깅도 까다롭고요. 무엇보다 입력 row 자체에는 처리 흔적이 남지 않아요.

이럴 때 쓰는 게 Process Indicator 예요. 입력 row 에 직접 flag 컬럼을 추가하는 패턴이에요.

Process Indicator 의 동작

Schema

CREATE TABLE player_summary (
    player_id BIGINT,
    year_no INT,
    -- ... (집계 컬럼들)
    PROCESSED_IND BOOLEAN DEFAULT FALSE     -- ★ 처리 표시
);

Reader SQL

SELECT * FROM player_summary
WHERE PROCESSED_IND = FALSE
ORDER BY player_id;

이미 처리된 row 는 query 단계에서 자동으로 빠져요. 그래서 재실행하면 남은 것만 자연스럽게 잡혀요.

Writer 또는 후속 Step

UPDATE player_summary SET PROCESSED_IND = TRUE WHERE player_id = :playerId;

처리가 끝난 row 의 flag 를 바꿔주는 부분이에요. chunk transaction 안에서 함께 묶어요.

Process Indicator 의 핵심 장점

운영자 가시성이 가장 커요. DB 만 봐도 어디까지 처리됐는지 명확히 보여요. ExecutionContext 없이도 완벽한 재실행 안전성이 보장되고, 특정 row 만 다시 처리하고 싶으면 UPDATE PROCESSED_IND = FALSE WHERE ... 로 수동 개입도 쉬워요. 여러 batch 가 동시에 돌더라도 flag 가 false 인 것만 골라 읽으면 돼서 서로 간섭하지 않아요.

Process Indicator + saveState = false

@Bean
public JdbcCursorItemReader<PlayerSummary> reader(DataSource ds) {
    return new JdbcCursorItemReaderBuilder<PlayerSummary>()
        .name("summaryReader")
        .dataSource(ds)
        .rowMapper(new PlayerSummaryMapper())
        .saveState(false)              // ★ ExecutionContext 저장 X
        .sql("""
            SELECT player_id, year_no, completes, attempts
            FROM player_summary
            WHERE PROCESSED_IND = FALSE
            ORDER BY player_id
            """)
        .build();
}

saveState = falseItemStream (재시작 가능한 reader/writer 의 상태 인터페이스) 의 update() 에서 ExecutionContext 저장을 끄는 옵션이에요. 24편 ItemStream 의 핵심 옵션이기도 해요.

왜 끄냐면, Process Indicator 가 위치 정보를 이미 DB row 에 박아두고 있어서 ExecutionContext 가 굳이 필요 없거든요. 오히려 양쪽에 위치 정보가 있으면 혼란만 커져요.

In this scenario, it is preferable to not store any state, such as the current row number, since it is irrelevant upon restart. For this reason, all readers and writers include the 'saveState' property. — 공식 reference

End-to-end 예제

도메인

public class PlayerSummary {
    private long playerId;
    private int yearNo;
    private long completes;
    private long attempts;
    private boolean processedInd;       // optional - 코드에서는 안 씀
    // getter/setter
}

Reader

JdbcCursorItemReader (JDBC 커서 기반 reader) 로 미처리 row 만 가져와요.

@Bean
public JdbcCursorItemReader<PlayerSummary> summaryReader(DataSource ds) {
    return new JdbcCursorItemReaderBuilder<PlayerSummary>()
        .name("summaryReader")
        .dataSource(ds)
        .saveState(false)
        .sql("""
            SELECT player_id, year_no, completes, attempts
            FROM player_summary
            WHERE PROCESSED_IND = FALSE
            ORDER BY player_id
            LIMIT 10000
            """)
        .rowMapper(BeanPropertyRowMapper.newInstance(PlayerSummary.class))
        .build();
}

LIMIT 10000 은 한 번에 가져오는 양의 안전 한도예요. BeanPropertyRowMapper (컬럼명을 bean property 에 자동 매핑) 가 결과 row 를 도메인 객체로 옮겨줘요.

Processor — 집계 로직

@Bean
public ItemProcessor<PlayerSummary, PlayerSummary> aggregator() {
    return summary -> {
        // 집계·변환 로직
        return summary;
    };
}

Writer — 처리 + flag 변경

CompositeItemWriter (여러 writer 를 순차 호출하는 wrapper) 로 집계 writer 와 flag update writer 를 묶어요.

@Bean
public CompositeItemWriter<PlayerSummary> compositeWriter(
        ItemWriter<PlayerSummary> aggregateWriter,
        JdbcBatchItemWriter<PlayerSummary> markProcessed) {
    CompositeItemWriter<PlayerSummary> composite = new CompositeItemWriter<>();
    composite.setDelegates(List.of(aggregateWriter, markProcessed));
    return composite;
}

@Bean
public JdbcBatchItemWriter<PlayerSummary> markProcessed(DataSource ds) {
    return new JdbcBatchItemWriterBuilder<PlayerSummary>()
        .dataSource(ds)
        .sql("""
            UPDATE player_summary
            SET PROCESSED_IND = TRUE
            WHERE player_id = :playerId AND year_no = :yearNo
            """)
        .beanMapped()
        .build();
}

CompositeItemWriter (25편) 가 aggregate writer 와 flag update writer 를 같은 chunk transaction 안에서 함께 호출해 줘요. 트랜잭션을 공유하니 일관성이 깨질 일이 없어요.

Step

@Bean
public Step summaryStep(JobRepository repo, PlatformTransactionManager tx,
                        JdbcCursorItemReader<PlayerSummary> reader,
                        ItemProcessor<PlayerSummary, PlayerSummary> processor,
                        CompositeItemWriter<PlayerSummary> writer) {
    return new StepBuilder("summaryStep", repo)
        .<PlayerSummary, PlayerSummary>chunk(500, tx)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .build();
}

Process Indicator 의 변형 패턴

Variant 1: 별도 staging 테이블

처리가 끝난 row 를 별도 archive 테이블로 옮겨요.

-- chunk transaction 안
INSERT INTO player_summary_archive SELECT * FROM player_summary WHERE player_id = ?;
DELETE FROM player_summary WHERE player_id = ?;

원본 테이블에는 항상 미처리 row 만 남으니 query 가 빨라져요.

Variant 2: Timestamp 기반

SELECT * FROM order_request
WHERE created_at > :lastProcessedAt
ORDER BY created_at;

lastProcessedAtJobParameters (Job 실행 단위로 전달되는 파라미터) 또는 별도 metadata 테이블에 저장해요. Process Indicator 의 시간 버전이에요.

Variant 3: Status 컬럼

SELECT * FROM order WHERE status IN ('NEW', 'RETRY');
UPDATE order SET status = 'PROCESSING' WHERE id = ?;     -- 처리 중 표시
-- ... 처리 ...
UPDATE order SET status = 'DONE' WHERE id = ?;

NEW / PROCESSING / DONE / FAILED 같은 상태 머신 기반이에요.

saveState = false 의 다른 활용

Process Indicator 말고도 saveState = false 가 유용한 경우가 몇 가지 있어요.

시나리오 이유
Short-lived Step 재시작 가능성 없음
Multi-threaded Step 각 thread 독립 상태
입력이 매번 동일 (static) 위치 추적 불필요
외부 시스템 pull 기반 (Kafka) offset 이 외부

기본은 true 가 안전해요. 명시적으로 false 를 주는 건 운영 의도가 확실할 때만이에요.

자주 만나는 사고

사고 1: Adapter target method 가 null 반환 안 함

원인 — 기존 Service 가 Optional.empty 또는 Collection 끝으로 종료를 표현해요.

해결 — null 반환 wrapper 를 따로 두거나 Custom Reader (26편) 로 가요.

사고 2: ItemWriterAdapter 의 batch 성능

원인 — chunk 100 이면 개별 호출이 100회 발생해요.

해결 — bulk 메서드를 작성해서 Custom Writer 로 묶거나 JdbcBatchItemWriter 를 써요.

사고 3: PropertyExtractingDelegating 의 인자 순서

원인setFieldsUsedAsTargetMethodArguments(new String[]{"name", "id"}) 인데 method signature 는 (Long id, String name) 으로 잡혀 있어요.

해결 — getter 이름 순서와 method parameter 순서를 일치시켜요.

사고 4: Process Indicator 의 flag 변경 실패

원인 — UPDATE 자체가 DB 부하나 deadlock 으로 실패해요.

해결 — chunk transaction 안에서 처리하면 자동 rollback + retry 가 걸려요. 아니면 idempotent (같은 연산을 반복해도 결과가 동일한 성질) UPDATE 가 되도록 보장해요.

사고 5: saveState=false 인데 재시작 시도

원인 — Step 이 재시작을 안 한다는 가정인데 운영자가 재시작을 시도해요.

해결 — Process Indicator 나 idempotent 설계로 처리해서 어차피 재실행해도 안전하게 만들어요.

사고 6: Process Indicator + multi-threaded race

원인 — 두 thread 가 같은 row 를 read 한 다음 동시에 flag 를 바꿔요.

해결 — SELECT 시 FOR UPDATE SKIP LOCKED (PostgreSQL·MySQL 8+) 를 걸거나 partitioning 으로 row 범위를 분리해요.

사고 7: Adapter 로 wrap 한 Service 가 thread-safe X

원인 — 기존 Service 가 online 환경을 가정해서 stateful 하게 짜여 있어요.

해결 — thread-safe wrapper 를 두거나 partitioning 으로 partition 마다 독립 Service 를 둬요.

운영 권장 패턴

Pattern 1: 표준 ItemReaderAdapter

@Bean
public ItemReaderAdapter<Customer> customerReader(CustomerService service) {
    ItemReaderAdapter<Customer> reader = new ItemReaderAdapter<>();
    reader.setTargetObject(service);
    reader.setTargetMethod("findNextActive");
    return reader;
}

기존 paging service 가 null 을 반환하면 깔끔하게 wrap 돼요.

Pattern 2: Process Indicator + SKIP LOCKED

SELECT * FROM order_request
WHERE PROCESSED_IND = FALSE
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1000;

PostgreSQL 이나 MySQL 8+ 에서 동시 처리에 안전해요. multi-threaded 환경이나 여러 batch 인스턴스를 띄워도 안전하게 돌아가요.

Pattern 3: Timestamp + Idempotent

@Bean
@StepScope
public JdbcCursorItemReader<Order> incrementalReader(
        DataSource ds,
        @Value("#{jobParameters['lastProcessedAt']}") String lastProcessedAt) {
    return new JdbcCursorItemReaderBuilder<Order>()
        .name("incrementalReader")
        .dataSource(ds)
        .saveState(false)
        .sql("SELECT * FROM orders WHERE updated_at > ? ORDER BY updated_at")
        .preparedStatementSetter(ps -> ps.setString(1, lastProcessedAt))
        .rowMapper(BeanPropertyRowMapper.newInstance(Order.class))
        .build();
}

JobParameter 로 마지막 처리 시각을 받아요. 21편 Late Binding (런타임에 값을 주입하는 방식) 과 이어져요.

Pattern 4: Archive 이동 + Custom Tasklet

@Bean
public Step archiveStep(JobRepository repo, PlatformTransactionManager tx) {
    return new StepBuilder("archiveStep", repo)
        .tasklet((contribution, chunkContext) -> {
            jdbc.update("""
                INSERT INTO order_archive
                SELECT * FROM orders WHERE PROCESSED_IND = TRUE
                """);
            jdbc.update("DELETE FROM orders WHERE PROCESSED_IND = TRUE");
            return RepeatStatus.FINISHED;
        }, tx)
        .build();
}

처리가 끝난 row 를 일괄로 archive 해서 운영 부담을 줄여요.

Pattern 5: CompositeItemWriter + flag

@Bean
public CompositeItemWriter<Order> writerChain(
        JdbcBatchItemWriter<Order> primary,
        JdbcBatchItemWriter<Order> markProcessed) {
    CompositeItemWriter<Order> composite = new CompositeItemWriter<>();
    composite.setDelegates(List.of(primary, markProcessed));
    return composite;
}

기본 처리와 flag 변경을 같은 트랜잭션 안에서 원자적으로 묶어요.

시험 직전 한 번 더 — Reusing Services · Process Indicator 함정 압축 노트

  • ItemReaderAdapter = 기존 메서드 → ItemReader
  • setTargetObject + setTargetMethod
  • target method contract = read 와 동일 → null 반환 = 종료
  • null 반환 안 하면 → 무한 루프 또는 잘못된 종료
  • ItemWriterAdapter = 기존 메서드 → ItemWriter
  • item 하나씩 호출 (chunk batch X) → 대량 성능 X
  • PropertyExtractingDelegatingItemWriter = item property 추출 후 method 인자
  • setFieldsUsedAsTargetMethodArguments(["id", "name"]) 순서 = method 인자 순서
  • Adapter 적합 — 기존 메서드 contract 가 거의 fit
  • Adapter 부적합 — 추가 로직 / 대량 batch / 복잡 signature
  • Process Indicator = DB row 에 처리 flag column 추가
  • Reader SQL = WHERE PROCESSED_IND = FALSE
  • Writer = UPDATE ... SET PROCESSED_IND = TRUE
  • 장점 — 운영자 가시성 · 재실행 안전성 · 수동 개입 용이 · 동시 batch
  • saveState = false 와 결합 권장
  • saveState=false = ItemStream update() 가 ExecutionContext 저장 X
  • 위치 정보가 DB 에 있음 → ExecutionContext 불필요
  • saveState=false 활용 — short-lived · multi-threaded · static input · 외부 pull
  • 기본 = true (안전), 명시적 false 는 의도 확실할 때
  • Process Indicator 변형 — staging 테이블 이동 · timestamp 기반 · status 컬럼 (NEW·PROCESSING·DONE·FAILED)
  • 함정 — target method null 반환 안 함 → wrapper 또는 Custom
  • 함정 — Adapter 의 batch 성능 (개별 호출) → bulk Custom Writer
  • 함정 — PropertyExtracting 인자 순서 불일치
  • 함정 — Process Indicator multi-threaded race → FOR UPDATE SKIP LOCKED
  • 함정 — Adapter 로 wrap 된 Service 가 stateful → thread-safe wrapper 또는 partitioning
  • 함정 — saveState=false 인데 재시작 시도 → idempotent 보장
  • 패턴 — 표준 ItemReaderAdapter (기존 paging service wrap)
  • 패턴 — Process Indicator + SKIP LOCKED (동시 안전)
  • 패턴 — Timestamp + Idempotent + Late Binding
  • 패턴 — Archive Tasklet (처리 후 분리)
  • 패턴 — CompositeItemWriter + flag (원자적 결합)
  • Part 7 마무리 — 다음 글부터 Part 8 (Scaling · Parallel)

공식 문서: Reusing Existing Services · Preventing State Persistence 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!