Spring Batch 입문 35편 — ItemProcessor · 변환 · 필터 · 검증

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 35편. read 와 write 사이 가공의 자리 — ItemProcessor. process(item) 의 contract, null 반환 = filter (skip 과 다름), 예외 throw = skip, CompositeItemProcessor 체인, ValidatingItemProcessor · BeanValidatingItemProcessor (JSR-303), idempotent 원칙·fault tolerance 시 reprocess 함정까지 정리한 학습 노트. Part 7 시작.

📚 Spring Batch 입문에서 운영까지 · 35편 — ItemProcessor · 변환 · 필터 · 검증

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 35편이에요. 34편 까지 Reader · Writer 양 끝을 봤다면, 이번 35편은 그 가운데 자리인 ItemProcessor 를 다뤄요. Part 7 (ItemProcessor · 재사용) 시작이에요.

왜 ItemProcessor 가 별도 필요한가

Reader 와 Writer 만으로 부족한 케이스가 몇 가지 있어요. 형태가 바뀌는 변환 (FooBar), 일부 record 만 write 로 보내는 필터링, invalid record 를 골라내는 검증, 추가 데이터를 끌어와 합치는 enrichment (보강 — 외부 데이터 결합), DTO 를 Entity 로 옮기는 type 변경 같은 작업이 그 자리예요.

이런 비즈니스 로직이 들어가는 자리예요. write 안에 박을 수도 있지만 따로 떼어 두면 재사용·테스트·체인 결합이 모두 쉬워져요.

ItemProcessor — 단 1개의 메서드

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

I 는 입력 type 으로 Reader 가 반환하는 것, O 는 출력 type 으로 Writer 가 입력으로 받는 거예요. IO 가 달라도 괜찮아요. Reader 가 Foo 를 반환하면 Processor 가 FooBar 로 바꿔 Writer 가 Bar 를 받는 식이죠.

가장 단순한 변환 예제

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    @Override
    public Bar process(Foo foo) throws Exception {
        return new Bar(foo);
    }
}

Step 연결

@Bean
public Step step1(JobRepository repo, PlatformTransactionManager tx) {
    return new StepBuilder("step1", repo)
        .<Foo, Bar>chunk(100, tx)
        .reader(fooReader())
        .processor(fooProcessor())
        .writer(barWriter())
        .build();
}

chunk type parameter (chunk 의 <I, O> 제네릭 지정) 가 processor 의 <I, O> 와 일치해야 해요. 컴파일 시점에 검증돼요.

ItemProcessor 의 contract — 3가지 반환

contract (인터페이스가 약속한 동작 규칙) 측면에서 process() 의 반환은 세 갈래로 나뉘어요.

1. Non-null 반환 → Writer 로 전달

return new Bar(foo);

정상 변환 결과로, Writer 의 chunk 에 포함돼요.

2. null 반환 → 필터링 (skip 과 다름)

return null;

해당 item 을 Writer 로 보내지 않고 조용히 빼버려요.

Filtering is an action distinct from skipping. Skipping indicates that a record is invalid, while filtering indicates that a record should not be written. — 공식 reference

skip filter
의미 record 가 invalid record 가 대상 아님
원인 데이터 오류·예외 비즈니스 규칙
통계 skipCount 증가 filterCount 증가
트리거 예외 throw null 반환
ItemWriter 호출? X X

예외를 던지면 skip, null 을 반환하면 filter — 이렇게 명확히 갈라져요.

3. 예외 throw → skip 로직

throw new IllegalStateException("Invalid record");

14편의 skip 로직으로 흡수돼요. SkipListener (18편) 가 추적해요.

Filtering 예제 — Insert/Update/Delete

한 파일에 INSERT·UPDATE·DELETE 3가지 record. DELETE 는 시스템에서 지원 안 함 → filter. — 공식 reference

public class DeleteFilterProcessor implements ItemProcessor<Record, Record> {
    @Override
    public Record process(Record record) {
        if ("DELETE".equals(record.getOperation())) {
            return null;            // ★ filter — Writer 안 보냄
        }
        return record;
    }
}

DELETE record 는 invalid 가 아니라 대상이 아니라서 null 을 반환해요. skip 통계에 영향을 주지 않고 깔끔하게 갈라져요.

CompositeItemProcessor — 체인

여러 ItemProcessor 를 순차로 실행하고 싶을 때 쓰는 게 CompositeItemProcessor 예요.

public class Foobar {
    public Foobar(Bar bar) { ... }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    @Override
    public Foobar process(Bar bar) {
        return new Foobar(bar);
    }
}

Foo → Bar → Foobar 변환 체인은 이렇게 묶어요.

@Bean
public CompositeItemProcessor<Foo, Foobar> compositeProcessor() {
    CompositeItemProcessor<Foo, Foobar> processor = new CompositeItemProcessor<>();
    processor.setDelegates(List.of(new FooProcessor(), new BarProcessor()));
    return processor;
}

Step 연결은 이렇게 해요.

.<Foo, Foobar>chunk(100, tx)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())

Chain 의 null 전파

체인 도중 어느 단계에서든 null 을 반환하면 그 자리에서 chain 전체가 끝나고 filter 로 처리돼요. 다음 단계는 실행되지 않아요.

Foo → FooProcessor → Bar
                       ↓
        null 반환?  → filter (BarProcessor 실행 X)
                       ↓
                  BarProcessor → Foobar
                       ↓
                  Writer 로 전달

ValidatingItemProcessor — 검증

Spring Batch 는 별도 validation framework 만들지 않고 Validator 인터페이스만 제공. — 공식 reference

public interface Validator<T> {
    void validate(T value) throws ValidationException;
}

SpringValidator 활용

public class TradeValidator implements org.springframework.validation.Validator {
    @Override
    public boolean supports(Class<?> clazz) {
        return Trade.class.isAssignableFrom(clazz);
    }
    @Override
    public void validate(Object target, Errors errors) {
        Trade trade = (Trade) target;
        if (trade.getQuantity() < 0) {
            errors.rejectValue("quantity", "negative", "Quantity must be non-negative");
        }
    }
}

@Bean
public SpringValidator<Trade> tradeValidator() {
    SpringValidator<Trade> validator = new SpringValidator<>();
    validator.setValidator(new TradeValidator());
    return validator;
}

@Bean
public ValidatingItemProcessor<Trade> validatingProcessor(Validator<Trade> validator) {
    ValidatingItemProcessor<Trade> processor = new ValidatingItemProcessor<>();
    processor.setValidator(validator);
    return processor;
}

Validator.validate() 가 예외를 던지면 skip, 정상으로 return 하면 pass 예요.

filter 옵션

processor.setFilter(true);

filter = true 로 설정하면 invalid item 을 예외 대신 null 로 반환해 filter 쪽으로 보내요. skip 통계는 올라가지 않아요.

BeanValidatingItemProcessor — JSR-303

JSR-303 (Java Bean Validation 표준 규격) annotation 을 그대로 활용할 수 있어요.

public class Person {
    @NotEmpty
    private String name;

    @Min(0)
    private int age;

    @Email
    private String email;

    // getter/setter
}
@Bean
public BeanValidatingItemProcessor<Person> validator() throws Exception {
    BeanValidatingItemProcessor<Person> processor = new BeanValidatingItemProcessor<>();
    processor.setFilter(true);          // invalid → filter
    return processor;
}

내부적으로 Hibernate Validator 같은 Bean Validation API 구현체를 Validator 로 끼워 써요. annotation 만 박아두면 자동으로 검증해 줘요.

표준 annotation 은 이렇게 정리돼요.

Annotation 의미
@NotNull·@NotEmpty·@NotBlank null/empty 검증
@Min·@Max 숫자 범위
@Size 길이
@Email·@URL 형식
@Pattern 정규식
@Past·@Future 날짜
@Valid 중첩 검증

Fault Tolerance 시 reprocess 함정

여기서 시험 함정이 하나 있어요. reprocess (실패 후 같은 item 을 다시 process) 가 일어나거든요.

When a chunk is rolled back, items that have been cached during reading may be reprocessed. — 공식 reference

chunk 시작
  read → item1·item2·...·item10
  process → item1·...·item10 변환
  write → 실패
  rollback
  retry → 같은 item1·...·item10 다시 process

같은 item 이 process() 를 여러 번 거칠 수 있다는 뜻이에요.

해결 — Idempotent 설계

Any ItemProcessor used should be implemented in a way that is idempotent. — 공식 reference

idempotent (몇 번 실행해도 같은 결과) 한 설계가 답이에요. 입력 item 을 mutate (객체 상태를 직접 변경) 하지 않도록 짜야 해요.

// ❌ 나쁜 예 — 입력 mutate
@Override
public Order process(Order order) {
    order.setAmount(order.getAmount() * 1.1);    // 원본 변경
    return order;
}

// retry 시 → 두 번째 process = amount * 1.1 * 1.1 → 잘못된 값
// ✅ 좋은 예 — 새 instance 반환
@Override
public Order process(Order order) {
    Order processed = new Order(order);
    processed.setAmount(order.getAmount() * 1.1);    // 새 instance 만 변경
    return processed;
}

// retry 시 → 입력은 그대로 → 결과 일정

함정 — DB 호출의 idempotency

Processor 안에서 DB INSERT 나 외부 호출을 한다면 거기도 같은 원칙이 들어가요. 여러 번 호출돼도 안전하게 만들어야 해요.

  • INSERT 대신 UPSERT (있으면 UPDATE, 없으면 INSERT)
  • 외부 API 는 idempotency key 를 같이 보내요
  • counter 를 늘리는 식 대신 외부 시스템에 최종 값을 저장해요 (delta 가 아니라)

ItemProcessor 옵션 — Step 안 위치

.<I, O>chunk(N, tx)
.reader(reader)
.processor(processor)              // 옵션 (생략 가능)
.writer(writer)

ItemProcessor 는 Step 의 옵션이에요. 생략하면 입력과 출력 type 이 같은 chunk 가 돼서 변환 없이 흘려보내는 stream (Reader → Writer 그대로 흐름) 이 돼요.

.<Foo, Foo>chunk(100, tx)           // processor 생략
.reader(fooReader())
.writer(fooWriter())

read 한 그대로 write 하는 단순 복사 batch 가 되는 셈이에요.

Spring Batch 6 — Functional 인터페이스

@Bean
public ItemProcessor<Foo, Bar> fooProcessor() {
    return foo -> new Bar(foo);                  // lambda
}

ItemProcessor 가 functional interface (메서드 하나짜리, lambda 가능) 예요. 짧은 변환은 익명 객체나 lambda 로 처리하는 걸 권장해요.

자주 만나는 사고

사고 1: type mismatch

원인은 chunk type 이 <Foo, Bar> 인데 processor 는 ItemProcessor<Foo, Foo> (반환이 Foo) 인 경우예요. chunk type 과 processor type 이 맞는지 확인하면 돼요. 컴파일 에러로 잡혀요.

사고 2: null 반환을 skip 으로 오해

invalid record 를 return null 로 처리하면 skip count 가 안 올라가요. invalid 면 예외를 던지고, 대상이 아니면 null 을 반환하는 식으로 의도를 분명히 하면 돼요.

사고 3: 입력 mutate 후 retry 잘못된 결과

Processor 가 입력 item 을 직접 바꾸는 게 원인이에요. 새 instance 를 반환하거나 Record (불변 데이터 객체) 를 쓰면 해결돼요.

사고 4: CompositeItemProcessor 의 null 전파 무시

중간 단계에서 null 이 나왔는데 다음 단계가 실행될 거라고 가정하는 거예요. null 이 나올 수 있는 단계는 체인의 마지막에 두거나 null 처리를 명시하면 돼요.

사고 5: BeanValidation annotation 인식 안 됨

Hibernate Validator dependency 가 안 들어가 있어서 그래요. org.hibernate.validator:hibernate-validator 를 추가해요 (Spring Boot 는 기본으로 들어가 있어요).

사고 6: Processor 안 DB 호출 N+1

각 item 마다 DB 조회 (예: customer 정보 추가) 를 돌리면 N+1 이 터져요. chunk 단위 batch fetch (ItemReader 에서 join SQL 로 한 번에) 나 캐싱으로 묶어요.

사고 7: Processor 가 무거운 작업

외부 API 호출이 건당 100ms 라면 chunk 100 짜리에서 process 만 10초가 깨지고 write 시간이 추가로 붙어요. partitioning (데이터를 쪼개 병렬 실행) 이나 multi-threaded Step, 또는 Processor 안에서 비동기 batch fetch 로 풀어요.

운영 권장 패턴

Pattern 1: 표준 변환

@Bean
public ItemProcessor<Customer, CustomerDto> dtoProcessor() {
    return customer -> new CustomerDto(
        customer.getId(),
        customer.getName().toUpperCase(),
        customer.getEmail()
    );
}

Entity → DTO 변환이에요. 가장 흔한 패턴이에요.

Pattern 2: Filter + 변환

@Bean
public ItemProcessor<Order, OrderDto> activeOrderProcessor() {
    return order -> {
        if (!"ACTIVE".equals(order.getStatus())) {
            return null;            // 비활성 주문 filter
        }
        return new OrderDto(order.getId(), order.getAmount());
    };
}

조건부 filter 와 변환을 한 자리에서 결합한 모양이에요.

Pattern 3: BeanValidating

public class Customer {
    @NotNull private Long id;
    @NotBlank private String name;
    @Email private String email;
    @Min(0) private long age;
}

@Bean
public BeanValidatingItemProcessor<Customer> validator() throws Exception {
    BeanValidatingItemProcessor<Customer> processor = new BeanValidatingItemProcessor<>();
    processor.setFilter(false);     // invalid = 예외 (skip 흡수)
    processor.afterPropertiesSet();
    return processor;
}

annotation 검증과 skip 정책을 같이 가져가는 구성이에요.

Pattern 4: 체인 (validate + 변환 + enrich)

@Bean
public CompositeItemProcessor<Customer, EnrichedCustomerDto> composite(
        BeanValidatingItemProcessor<Customer> validator,
        ItemProcessor<Customer, CustomerDto> dtoMapper,
        ItemProcessor<CustomerDto, EnrichedCustomerDto> enricher) {

    CompositeItemProcessor<Customer, EnrichedCustomerDto> processor = new CompositeItemProcessor<>();
    processor.setDelegates(List.of(validator, dtoMapper, enricher));
    return processor;
}

각 단계가 단일 책임이라 테스트와 재사용이 쉬워져요.

Pattern 5: Enrichment + Cache

public class CountryEnrichProcessor implements ItemProcessor<Customer, Customer> {

    private final Map<String, Country> cache = new ConcurrentHashMap<>();
    private final CountryRepository repo;

    @Override
    public Customer process(Customer customer) {
        Country country = cache.computeIfAbsent(
            customer.getCountryCode(),
            code -> repo.findByCode(code).orElse(Country.UNKNOWN)
        );
        return customer.withCountry(country);
    }
}

DB 조회를 캐싱으로 줄여 N+1 을 피해요.

Pattern 6: Idempotent (Record 활용)

public record Order(Long id, BigDecimal amount, String status) {
    public Order applyDiscount(BigDecimal rate) {
        return new Order(id, amount.multiply(BigDecimal.ONE.subtract(rate)), status);
    }
}

@Bean
public ItemProcessor<Order, Order> discountProcessor() {
    return order -> order.applyDiscount(new BigDecimal("0.1"));
}

Record 와 불변 transformation 을 묶으면 retry 가 와도 안전해요.

시험 직전 한 번 더 — ItemProcessor 함정 압축 노트

  • 인터페이스 = process(I item) → O 메서드 1개
  • IO 가능 — type 변환의 자리
  • chunk type parameter <I, O> 와 processor type 일치
  • 3가지 반환:
  • non-null → Writer 로 전달
  • null → filter (skip 과 다름!)
  • 예외 → skip 로직
  • skip vs filter — skip = invalid, filter = 대상 아님
  • skip = 예외 throw → skipCount
  • filter = null 반환 → filterCount
  • ItemProcessor 는 Step 의 옵션 — 생략 가능 (변환 없는 stream)
  • CompositeItemProcessor = 여러 processor 체인
  • 체인 중 어느 단계 null = 전체 chain 종료 + filter (다음 단계 X)
  • ValidatingItemProcessor = Validator 인터페이스 활용
  • Validator.validate() 예외 = skip
  • processor.setFilter(true) = invalid → null (filter)
  • BeanValidatingItemProcessor = JSR-303 annotation 활용
  • 표준 annotation = @NotNull·@NotEmpty·@NotBlank·@Min·@Max·@Size·@Email·@Pattern·@Past·@Future·@Valid
  • Hibernate Validator dependency (Spring Boot 기본 포함)
  • Fault tolerance reprocess — chunk rollback 시 같은 item 여러 번 process
  • 해결 = idempotent 설계 — 입력 mutate 금지 → 새 instance 반환
  • Record 활용 = 불변 transformation, retry 안전
  • DB 호출도 idempotent — UPSERT · idempotency key · 최종 값 저장
  • Spring Batch 6 — ItemProcessor functional interface (lambda 가능)
  • 함정 — type mismatch (compile error)
  • 함정 — null = skip 오해 (filterCount vs skipCount)
  • 함정 — 입력 mutate → retry 잘못된 결과
  • 함정 — Composite null 전파 무시
  • 함정 — BeanValidation dependency 누락
  • 함정 — Processor 안 DB N+1 → batch fetch 또는 cache
  • 함정 — 무거운 외부 호출 → partitioning · async
  • 패턴 — Entity → DTO 변환
  • 패턴 — Filter + 변환 결합
  • 패턴 — BeanValidating + skip 정책
  • 패턴 — Composite chain (validate → map → enrich)
  • 패턴 — Enrichment + cache
  • 패턴 — Record 기반 idempotent transformation

공식 문서: Item processing 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!