Spring Batch 입문 35편. read 와 write 사이 가공의 자리 — ItemProcessor. process(item) 의 contract, null 반환 = filter (skip 과 다름), 예외 throw = skip, CompositeItemProcessor 체인, ValidatingItemProcessor · BeanValidatingItemProcessor (JSR-303), idempotent 원칙·fault tolerance 시 reprocess 함정까지 정리한 학습 노트. Part 7 시작.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 35편이에요. 34편 까지 Reader · Writer 양 끝을 봤다면, 이번 35편은 그 가운데 자리인 ItemProcessor 를 다뤄요. Part 7 (ItemProcessor · 재사용) 시작이에요.
왜 ItemProcessor 가 별도 필요한가
Reader 와 Writer 만으로 부족한 케이스가 몇 가지 있어요. 형태가 바뀌는 변환 (Foo → Bar), 일부 record 만 write 로 보내는 필터링, invalid record 를 골라내는 검증, 추가 데이터를 끌어와 합치는 enrichment (보강 — 외부 데이터 결합), DTO 를 Entity 로 옮기는 type 변경 같은 작업이 그 자리예요.
이런 비즈니스 로직이 들어가는 자리예요. write 안에 박을 수도 있지만 따로 떼어 두면 재사용·테스트·체인 결합이 모두 쉬워져요.
ItemProcessor — 단 1개의 메서드
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
I 는 입력 type 으로 Reader 가 반환하는 것, O 는 출력 type 으로 Writer 가 입력으로 받는 거예요. I 와 O 가 달라도 괜찮아요. Reader 가 Foo 를 반환하면 Processor 가 Foo → Bar 로 바꿔 Writer 가 Bar 를 받는 식이죠.
가장 단순한 변환 예제
public class FooProcessor implements ItemProcessor<Foo, Bar> {
@Override
public Bar process(Foo foo) throws Exception {
return new Bar(foo);
}
}
Step 연결
@Bean
public Step step1(JobRepository repo, PlatformTransactionManager tx) {
return new StepBuilder("step1", repo)
.<Foo, Bar>chunk(100, tx)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
chunk type parameter (chunk 의 <I, O> 제네릭 지정) 가 processor 의 <I, O> 와 일치해야 해요. 컴파일 시점에 검증돼요.
ItemProcessor 의 contract — 3가지 반환
contract (인터페이스가 약속한 동작 규칙) 측면에서 process() 의 반환은 세 갈래로 나뉘어요.
1. Non-null 반환 → Writer 로 전달
return new Bar(foo);
정상 변환 결과로, Writer 의 chunk 에 포함돼요.
2. null 반환 → 필터링 (skip 과 다름)
return null;
해당 item 을 Writer 로 보내지 않고 조용히 빼버려요.
Filtering is an action distinct from skipping. Skipping indicates that a record is invalid, while filtering indicates that a record should not be written. — 공식 reference
| skip | filter | |
|---|---|---|
| 의미 | record 가 invalid | record 가 대상 아님 |
| 원인 | 데이터 오류·예외 | 비즈니스 규칙 |
| 통계 | skipCount 증가 |
filterCount 증가 |
| 트리거 | 예외 throw | null 반환 |
| ItemWriter 호출? | X | X |
예외를 던지면 skip, null 을 반환하면 filter — 이렇게 명확히 갈라져요.
3. 예외 throw → skip 로직
throw new IllegalStateException("Invalid record");
14편의 skip 로직으로 흡수돼요. SkipListener (18편) 가 추적해요.
Filtering 예제 — Insert/Update/Delete
한 파일에 INSERT·UPDATE·DELETE 3가지 record. DELETE 는 시스템에서 지원 안 함 → filter. — 공식 reference
public class DeleteFilterProcessor implements ItemProcessor<Record, Record> {
@Override
public Record process(Record record) {
if ("DELETE".equals(record.getOperation())) {
return null; // ★ filter — Writer 안 보냄
}
return record;
}
}
DELETE record 는 invalid 가 아니라 대상이 아니라서 null 을 반환해요. skip 통계에 영향을 주지 않고 깔끔하게 갈라져요.
CompositeItemProcessor — 체인
여러 ItemProcessor 를 순차로 실행하고 싶을 때 쓰는 게 CompositeItemProcessor 예요.
public class Foobar {
public Foobar(Bar bar) { ... }
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
@Override
public Foobar process(Bar bar) {
return new Foobar(bar);
}
}
Foo → Bar → Foobar 변환 체인은 이렇게 묶어요.
@Bean
public CompositeItemProcessor<Foo, Foobar> compositeProcessor() {
CompositeItemProcessor<Foo, Foobar> processor = new CompositeItemProcessor<>();
processor.setDelegates(List.of(new FooProcessor(), new BarProcessor()));
return processor;
}
Step 연결은 이렇게 해요.
.<Foo, Foobar>chunk(100, tx)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
Chain 의 null 전파
체인 도중 어느 단계에서든 null 을 반환하면 그 자리에서 chain 전체가 끝나고 filter 로 처리돼요. 다음 단계는 실행되지 않아요.
Foo → FooProcessor → Bar
↓
null 반환? → filter (BarProcessor 실행 X)
↓
BarProcessor → Foobar
↓
Writer 로 전달
ValidatingItemProcessor — 검증
Spring Batch 는 별도 validation framework 만들지 않고 Validator 인터페이스만 제공. — 공식 reference
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
SpringValidator 활용
public class TradeValidator implements org.springframework.validation.Validator {
@Override
public boolean supports(Class<?> clazz) {
return Trade.class.isAssignableFrom(clazz);
}
@Override
public void validate(Object target, Errors errors) {
Trade trade = (Trade) target;
if (trade.getQuantity() < 0) {
errors.rejectValue("quantity", "negative", "Quantity must be non-negative");
}
}
}
@Bean
public SpringValidator<Trade> tradeValidator() {
SpringValidator<Trade> validator = new SpringValidator<>();
validator.setValidator(new TradeValidator());
return validator;
}
@Bean
public ValidatingItemProcessor<Trade> validatingProcessor(Validator<Trade> validator) {
ValidatingItemProcessor<Trade> processor = new ValidatingItemProcessor<>();
processor.setValidator(validator);
return processor;
}
Validator.validate() 가 예외를 던지면 skip, 정상으로 return 하면 pass 예요.
filter 옵션
processor.setFilter(true);
filter = true 로 설정하면 invalid item 을 예외 대신 null 로 반환해 filter 쪽으로 보내요. skip 통계는 올라가지 않아요.
BeanValidatingItemProcessor — JSR-303
JSR-303 (Java Bean Validation 표준 규격) annotation 을 그대로 활용할 수 있어요.
public class Person {
@NotEmpty
private String name;
@Min(0)
private int age;
@Email
private String email;
// getter/setter
}
@Bean
public BeanValidatingItemProcessor<Person> validator() throws Exception {
BeanValidatingItemProcessor<Person> processor = new BeanValidatingItemProcessor<>();
processor.setFilter(true); // invalid → filter
return processor;
}
내부적으로 Hibernate Validator 같은 Bean Validation API 구현체를 Validator 로 끼워 써요. annotation 만 박아두면 자동으로 검증해 줘요.
표준 annotation 은 이렇게 정리돼요.
| Annotation | 의미 |
|---|---|
@NotNull·@NotEmpty·@NotBlank |
null/empty 검증 |
@Min·@Max |
숫자 범위 |
@Size |
길이 |
@Email·@URL |
형식 |
@Pattern |
정규식 |
@Past·@Future |
날짜 |
@Valid |
중첩 검증 |
Fault Tolerance 시 reprocess 함정
여기서 시험 함정이 하나 있어요. reprocess (실패 후 같은 item 을 다시 process) 가 일어나거든요.
When a chunk is rolled back, items that have been cached during reading may be reprocessed. — 공식 reference
chunk 시작
read → item1·item2·...·item10
process → item1·...·item10 변환
write → 실패
rollback
retry → 같은 item1·...·item10 다시 process
같은 item 이 process() 를 여러 번 거칠 수 있다는 뜻이에요.
해결 — Idempotent 설계
Any ItemProcessor used should be implemented in a way that is idempotent. — 공식 reference
idempotent (몇 번 실행해도 같은 결과) 한 설계가 답이에요. 입력 item 을 mutate (객체 상태를 직접 변경) 하지 않도록 짜야 해요.
// ❌ 나쁜 예 — 입력 mutate
@Override
public Order process(Order order) {
order.setAmount(order.getAmount() * 1.1); // 원본 변경
return order;
}
// retry 시 → 두 번째 process = amount * 1.1 * 1.1 → 잘못된 값
// ✅ 좋은 예 — 새 instance 반환
@Override
public Order process(Order order) {
Order processed = new Order(order);
processed.setAmount(order.getAmount() * 1.1); // 새 instance 만 변경
return processed;
}
// retry 시 → 입력은 그대로 → 결과 일정
함정 — DB 호출의 idempotency
Processor 안에서 DB INSERT 나 외부 호출을 한다면 거기도 같은 원칙이 들어가요. 여러 번 호출돼도 안전하게 만들어야 해요.
- INSERT 대신 UPSERT (있으면 UPDATE, 없으면 INSERT)
- 외부 API 는 idempotency key 를 같이 보내요
- counter 를 늘리는 식 대신 외부 시스템에 최종 값을 저장해요 (delta 가 아니라)
ItemProcessor 옵션 — Step 안 위치
.<I, O>chunk(N, tx)
.reader(reader)
.processor(processor) // 옵션 (생략 가능)
.writer(writer)
ItemProcessor 는 Step 의 옵션이에요. 생략하면 입력과 출력 type 이 같은 chunk 가 돼서 변환 없이 흘려보내는 stream (Reader → Writer 그대로 흐름) 이 돼요.
.<Foo, Foo>chunk(100, tx) // processor 생략
.reader(fooReader())
.writer(fooWriter())
read 한 그대로 write 하는 단순 복사 batch 가 되는 셈이에요.
Spring Batch 6 — Functional 인터페이스
@Bean
public ItemProcessor<Foo, Bar> fooProcessor() {
return foo -> new Bar(foo); // lambda
}
ItemProcessor 가 functional interface (메서드 하나짜리, lambda 가능) 예요. 짧은 변환은 익명 객체나 lambda 로 처리하는 걸 권장해요.
자주 만나는 사고
사고 1: type mismatch
원인은 chunk type 이 <Foo, Bar> 인데 processor 는 ItemProcessor<Foo, Foo> (반환이 Foo) 인 경우예요. chunk type 과 processor type 이 맞는지 확인하면 돼요. 컴파일 에러로 잡혀요.
사고 2: null 반환을 skip 으로 오해
invalid record 를 return null 로 처리하면 skip count 가 안 올라가요. invalid 면 예외를 던지고, 대상이 아니면 null 을 반환하는 식으로 의도를 분명히 하면 돼요.
사고 3: 입력 mutate 후 retry 잘못된 결과
Processor 가 입력 item 을 직접 바꾸는 게 원인이에요. 새 instance 를 반환하거나 Record (불변 데이터 객체) 를 쓰면 해결돼요.
사고 4: CompositeItemProcessor 의 null 전파 무시
중간 단계에서 null 이 나왔는데 다음 단계가 실행될 거라고 가정하는 거예요. null 이 나올 수 있는 단계는 체인의 마지막에 두거나 null 처리를 명시하면 돼요.
사고 5: BeanValidation annotation 인식 안 됨
Hibernate Validator dependency 가 안 들어가 있어서 그래요. org.hibernate.validator:hibernate-validator 를 추가해요 (Spring Boot 는 기본으로 들어가 있어요).
사고 6: Processor 안 DB 호출 N+1
각 item 마다 DB 조회 (예: customer 정보 추가) 를 돌리면 N+1 이 터져요. chunk 단위 batch fetch (ItemReader 에서 join SQL 로 한 번에) 나 캐싱으로 묶어요.
사고 7: Processor 가 무거운 작업
외부 API 호출이 건당 100ms 라면 chunk 100 짜리에서 process 만 10초가 깨지고 write 시간이 추가로 붙어요. partitioning (데이터를 쪼개 병렬 실행) 이나 multi-threaded Step, 또는 Processor 안에서 비동기 batch fetch 로 풀어요.
운영 권장 패턴
Pattern 1: 표준 변환
@Bean
public ItemProcessor<Customer, CustomerDto> dtoProcessor() {
return customer -> new CustomerDto(
customer.getId(),
customer.getName().toUpperCase(),
customer.getEmail()
);
}
Entity → DTO 변환이에요. 가장 흔한 패턴이에요.
Pattern 2: Filter + 변환
@Bean
public ItemProcessor<Order, OrderDto> activeOrderProcessor() {
return order -> {
if (!"ACTIVE".equals(order.getStatus())) {
return null; // 비활성 주문 filter
}
return new OrderDto(order.getId(), order.getAmount());
};
}
조건부 filter 와 변환을 한 자리에서 결합한 모양이에요.
Pattern 3: BeanValidating
public class Customer {
@NotNull private Long id;
@NotBlank private String name;
@Email private String email;
@Min(0) private long age;
}
@Bean
public BeanValidatingItemProcessor<Customer> validator() throws Exception {
BeanValidatingItemProcessor<Customer> processor = new BeanValidatingItemProcessor<>();
processor.setFilter(false); // invalid = 예외 (skip 흡수)
processor.afterPropertiesSet();
return processor;
}
annotation 검증과 skip 정책을 같이 가져가는 구성이에요.
Pattern 4: 체인 (validate + 변환 + enrich)
@Bean
public CompositeItemProcessor<Customer, EnrichedCustomerDto> composite(
BeanValidatingItemProcessor<Customer> validator,
ItemProcessor<Customer, CustomerDto> dtoMapper,
ItemProcessor<CustomerDto, EnrichedCustomerDto> enricher) {
CompositeItemProcessor<Customer, EnrichedCustomerDto> processor = new CompositeItemProcessor<>();
processor.setDelegates(List.of(validator, dtoMapper, enricher));
return processor;
}
각 단계가 단일 책임이라 테스트와 재사용이 쉬워져요.
Pattern 5: Enrichment + Cache
public class CountryEnrichProcessor implements ItemProcessor<Customer, Customer> {
private final Map<String, Country> cache = new ConcurrentHashMap<>();
private final CountryRepository repo;
@Override
public Customer process(Customer customer) {
Country country = cache.computeIfAbsent(
customer.getCountryCode(),
code -> repo.findByCode(code).orElse(Country.UNKNOWN)
);
return customer.withCountry(country);
}
}
DB 조회를 캐싱으로 줄여 N+1 을 피해요.
Pattern 6: Idempotent (Record 활용)
public record Order(Long id, BigDecimal amount, String status) {
public Order applyDiscount(BigDecimal rate) {
return new Order(id, amount.multiply(BigDecimal.ONE.subtract(rate)), status);
}
}
@Bean
public ItemProcessor<Order, Order> discountProcessor() {
return order -> order.applyDiscount(new BigDecimal("0.1"));
}
Record 와 불변 transformation 을 묶으면 retry 가 와도 안전해요.
시험 직전 한 번 더 — ItemProcessor 함정 압축 노트
- 인터페이스 =
process(I item) → O메서드 1개 I≠O가능 — type 변환의 자리- chunk type parameter
<I, O>와 processor type 일치 - 3가지 반환:
- non-null → Writer 로 전달
- null → filter (skip 과 다름!)
- 예외 → skip 로직
- skip vs filter — skip = invalid, filter = 대상 아님
- skip = 예외 throw → skipCount
- filter = null 반환 → filterCount
- ItemProcessor 는 Step 의 옵션 — 생략 가능 (변환 없는 stream)
- CompositeItemProcessor = 여러 processor 체인
- 체인 중 어느 단계 null = 전체 chain 종료 + filter (다음 단계 X)
- ValidatingItemProcessor =
Validator인터페이스 활용 Validator.validate()예외 = skipprocessor.setFilter(true)= invalid → null (filter)- BeanValidatingItemProcessor = JSR-303 annotation 활용
- 표준 annotation =
@NotNull·@NotEmpty·@NotBlank·@Min·@Max·@Size·@Email·@Pattern·@Past·@Future·@Valid - Hibernate Validator dependency (Spring Boot 기본 포함)
- Fault tolerance reprocess — chunk rollback 시 같은 item 여러 번 process
- 해결 = idempotent 설계 — 입력 mutate 금지 → 새 instance 반환
- Record 활용 = 불변 transformation, retry 안전
- DB 호출도 idempotent — UPSERT · idempotency key · 최종 값 저장
- Spring Batch 6 —
ItemProcessorfunctional interface (lambda 가능) - 함정 — type mismatch (compile error)
- 함정 — null = skip 오해 (filterCount vs skipCount)
- 함정 — 입력 mutate → retry 잘못된 결과
- 함정 — Composite null 전파 무시
- 함정 — BeanValidation dependency 누락
- 함정 — Processor 안 DB N+1 → batch fetch 또는 cache
- 함정 — 무거운 외부 호출 → partitioning · async
- 패턴 — Entity → DTO 변환
- 패턴 — Filter + 변환 결합
- 패턴 — BeanValidating + skip 정책
- 패턴 — Composite chain (validate → map → enrich)
- 패턴 — Enrichment + cache
- 패턴 — Record 기반 idempotent transformation
공식 문서: Item processing 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 30편 — FlatFileItemWriter · LineAggregator · FieldExtractor
- 31편 — XML Reader · Writer · StAX 기반 streaming
- 32편 — JSON Reader · Writer · Jackson · Gson
- 33편 — Multi-File Input · MultiResourceItemReader
- 34편 — Database Reader · Writer · Cursor vs Paging
다음 글: