Spring Batch 마스터 노트 시리즈 3편. 대량 데이터 처리의 핵심 패턴 — read → process → write 청크 사이클, ItemReader의 null 종료 신호, ItemProcessor의 변환·필터링·검증 3가지 역할, JSR-303 BeanValidatingItemProcessor, 여러 프로세서를 잇는 CompositeItemProcessor, 트랜잭션 동작과 성능 튜닝까지.
이 글은 Spring Batch 마스터 노트 시리즈의 세 번째 편입니다. 2편(Job 설정)에서 도구를 잡았다면, 이번엔 Spring Batch의 가장 중요한 처리 패턴 — 청크 지향 처리.
대량 데이터를 한 번에 메모리에 올리는 건 불가능해요. 청크 처리는 데이터를 묶음 단위로 나눠 메모리 효율을 잡으면서 트랜잭션 보장도 함께 챙기는 도구입니다. 이번 편의 핵심은 ItemReader → ItemProcessor → ItemWriter 세 인터페이스의 정확한 동작 이해.
처음 청크 처리가 어렵게 느껴지는 이유
이유는 두 가지예요.
첫째, 세 인터페이스의 협력 방식이 한 번에 안 들어옵니다. Reader가 청크 크기만큼 채울 때까지 반복 호출되고, Processor가 각 아이템을 개별 처리하고, Writer가 묶음으로 한 번에 쓰는 — 이 3단계 사이클이 머릿속에 그림으로 잡혀야 디버깅이 가능해요.
둘째, null 반환의 의미가 두 자리에서 다릅니다. ItemReader의 null = "데이터 끝", ItemProcessor의 null = "이 아이템 필터링". 같은 null인데 의미가 정반대라 헷갈립니다.
해결법은 한 가지예요. 청크 사이클을 그림으로 외우세요. read·read·read·…(청크 크기까지) → process·process·process·… → write(묶음으로) → 커밋. 이 그림이 잡히면 모든 청크 동작이 자연스럽게 보여요.
청크 처리 vs Tasklet — 한 번 더 비교
| 구분 | 청크 처리 | Tasklet |
|---|---|---|
| 적합한 작업 | 대량 데이터 읽기/변환/쓰기 | 단순 작업 (파일 삭제·이메일) |
| 구성 요소 | ItemReader + (Processor) + Writer | Tasklet 인터페이스 하나 |
| 트랜잭션 | 청크 단위 커밋 | 메서드 실행 단위 |
| 재시작 | 청크 단위 재시작 가능 | 처음부터 재실행 |
| 복잡도 | 높음 (3개 컴포넌트) | 낮음 |
| 메모리 | 청크 크기만큼만 | 한꺼번에 |
청크 처리 흐름 — 한 번에 잡기
Step 시작
│
├─ [청크 반복 시작] ─────────────────────────────────────┐
│ │ │
│ ├─ ItemReader.read() → item1 │
│ ├─ ItemReader.read() → item2 (청크 크기만큼 반복)│
│ ├─ ItemReader.read() → item3 │
│ └─ ItemReader.read() → null (또는 청크 채움) │
│ │
│ ├─ ItemProcessor.process(item1) → processedItem1 │
│ ├─ ItemProcessor.process(item2) → null (필터링!) │
│ └─ ItemProcessor.process(item3) → processedItem3 │
│ │
│ ├─ ItemWriter.write([processedItem1, processedItem3])│
│ └─ Commit Transaction │
│ │
└─ [청크 반복 종료 - read()가 null 반환 시] ─────────────┘
│
Step 완료
청크의 트랜잭션 동작
각 청크는 독립 트랜잭션. read → process → write 사이클 완료 → 커밋.
청크1 [item1, item2, item3] → COMMIT ✓
청크2 [item4, item5, item6] → COMMIT ✓
청크3 [item7, item8, item9] → 오류 발생! → ROLLBACK ✗
청크4 [item10, ...] → 재실행 시 청크3부터 재처리
여기서 정말 중요한 시험 함정 — 이미 커밋된 청크는 영향 받지 않습니다. 청크3에서 실패해도 청크1·2의 데이터는 그대로 보존돼요. 대량 데이터 중 일부 실패에도 부분 성공 데이터를 유지할 수 있는 게 청크 처리의 강점.
ItemReader — read()와 null
인터페이스
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
read()가 null을 반환하면 "데이터 끝" 신호. 현재 청크를 완료하고 Step 종료.
커스텀 ItemReader
public class ProductNameItemReader implements ItemReader<String> {
private final List<String> productNames;
private Iterator<String> iterator;
public ProductNameItemReader() {
this.productNames = Arrays.asList(
"Phone", "TV", "Laptop", "Tablet", "Camera"
);
this.iterator = productNames.iterator();
}
@Override
public String read() {
if (iterator.hasNext()) {
return iterator.next();
}
return null; // 필수: null = 종료 신호
}
}
여기서 정말 중요한 시험 함정 — null 반환 빠뜨리면 무한 루프예요. 데이터 다 소진된 후에도 계속 뭔가 반환하면 Step이 절대 끝나지 않아요. FlatFileItemReader나 JdbcCursorItemReader 같은 내장 구현체는 자동 처리하지만, 커스텀 작성 시 항상 종료 조건 확인.
ItemProcessor — 변환·필터링·검증
인터페이스
public interface ItemProcessor<I, O> {
@Nullable
O process(I item) throws Exception;
}
입력 타입 I, 출력 타입 O — 다를 수 있음 (변환).
null 반환 = 해당 아이템 필터링 — Writer로 안 전달.
선택사항 — 입출력 타입 같고 변환 불필요하면 생략 가능.
변환 (Transformation)
public class ProductTransformProcessor implements ItemProcessor<Product, OSProduct> {
@Override
public OSProduct process(Product item) throws Exception {
OSProduct osProduct = new OSProduct();
// 카테고리에 따른 세금 계산
if (item.getProductCategory().equalsIgnoreCase("sports accessories")) {
osProduct.setTaxPercent(5);
} else {
osProduct.setTaxPercent(18);
}
// SKU 코드 생성: 카테고리 앞 3자리 + 상품 ID
String sku = item.getProductCategory().substring(0, 3).toUpperCase()
+ item.getProductId();
osProduct.setSku(sku);
osProduct.setProductName(item.getProductName());
osProduct.setProductPrice(item.getProductPrice());
return osProduct;
}
}
필터링 (Filtering) — null 반환
public class ExpensiveProductFilter implements ItemProcessor<Product, Product> {
private static final double PRICE_THRESHOLD = 100.0;
@Override
public Product process(Product item) throws Exception {
if (item.getProductPrice() <= PRICE_THRESHOLD) {
return item; // 통과
}
return null; // 필터링 — Writer에 안 전달
}
}
여기서 시험 함정이 하나 있어요. 필터링된 아이템은 filterCount에 기록됩니다. skipCount(오류 스킵)와 다른 카운터예요. 필터링은 정상적인 비즈니스 결정이고, 스킵은 오류 처리. 두 카운터를 보면 어떤 이유로 아이템이 빠졌는지 구분 가능.
검증 (Validation)
커스텀 Validator
public class ProductValidator implements Validator<Product> {
@Override
public void validate(Product product) throws ValidationException {
if (product.getProductName() == null || product.getProductName().isEmpty()) {
throw new ValidationException("상품명이 비어있습니다: " + product);
}
if (product.getProductPrice() < 0) {
throw new ValidationException("가격이 음수입니다: " + product);
}
if (product.getProductCategory() == null) {
throw new ValidationException("카테고리가 null입니다: " + product);
}
}
}
@Bean
public ValidatingItemProcessor<Product> validatingProcessor() {
ValidatingItemProcessor<Product> processor = new ValidatingItemProcessor<>(new ProductValidator());
processor.setFilter(true); // true: 검증 실패 시 필터링 / false: 예외 발생
return processor;
}
setFilter(true) — 검증 실패 = null 반환(필터링).
setFilter(false) — 검증 실패 = ValidationException 발생.
BeanValidatingItemProcessor — JSR-303
public class Product {
@NotNull(message = "상품 ID는 필수")
private Integer productId;
@NotBlank(message = "상품명 비어있으면 안 됨")
private String productName;
@NotBlank(message = "카테고리 비어있으면 안 됨")
private String productCategory;
@Min(value = 0, message = "가격은 0 이상")
private Double productPrice;
}
@Bean
public BeanValidatingItemProcessor<Product> beanValidatingProcessor() {
BeanValidatingItemProcessor<Product> processor = new BeanValidatingItemProcessor<>();
processor.setFilter(true);
return processor;
}
JSR-303 어노테이션 기반 자동 검증. 도메인 모델에 @NotNull, @NotBlank, @Min, @Max 등 박으면 자동 검증.
CompositeItemProcessor — 여러 프로세서 체이닝
여러 ItemProcessor를 순서대로 연결.
@Bean
public CompositeItemProcessor<Product, OSProduct> compositeProcessor() {
CompositeItemProcessor<Product, OSProduct> composite = new CompositeItemProcessor<>();
List<ItemProcessor<?, ?>> processors = new ArrayList<>();
processors.add(beanValidatingProcessor()); // 1. JSR-303 검증
processors.add(expensiveProductFilter()); // 2. 가격 필터
processors.add(productTransformProcessor()); // 3. 타입 변환
composite.setDelegates(processors);
return composite;
}
앞 프로세서의 출력 → 다음 프로세서의 입력. 중간에 null 반환되면 파이프라인 단축 종료(short-circuit).
Item → [Validator] → valid Item → [Filter] → null → (Transformer 실행 X) → 필터링
Item → [Validator] → valid Item → [Filter] → Item → [Transformer] → TransformedItem → Write
여기서 정말 중요한 시험 함정 — 순서가 성능에 직접 영향입니다. 가장 많이 필터링하는 프로세서를 앞에 배치하면 이후 프로세서 실행이 줄어요.
// 효율적 순서 — 많이 필터링하는 것 먼저
processors.add(quickValidationFilter()); // 빠르고 70% 필터링
processors.add(businessRuleFilter()); // 중간 처리량
processors.add(expensiveTransformation()); // 느린 변환
검증으로 70% 필터링되면, 변환 로직 실행 횟수가 70% 감소.
ItemWriter — 묶음으로 한 번에
인터페이스
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
청크 크기만큼의 아이템 List를 한 번에 받아 처리. DB Bulk INSERT 나 파일 일괄 쓰기 가 가능.
커스텀 ItemWriter
public class ProductItemWriter implements ItemWriter<Product> {
@Override
public void write(List<? extends Product> items) throws Exception {
System.out.println("쓰기 시작 — 아이템 수: " + items.size());
for (Product product : items) {
System.out.println("저장: " + product.getProductName());
// 실제 DB 저장 또는 파일 쓰기 로직
}
}
}
여기서 시험 함정이 하나 있어요. 모든 아이템이 Processor에서 null로 필터링되면 Writer는 빈 List를 받아요. items.size() == 0 케이스도 안전하게 처리해야 합니다. 내장 Writer는 자동 처리하지만 커스텀 Writer 작성 시 주의.
청크 처리 완전한 예시
도메인 모델
@Data
public class Product {
private Integer productId;
private String productName;
private String productCategory;
private Double productPrice;
@Override
public String toString() {
return "Product{id=" + productId +
", name=" + productName +
", category=" + productCategory +
", price=" + productPrice + "}";
}
}
완전한 청크 Step
@Configuration
@EnableBatchProcessing
public class ProductBatchConfig {
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
@Autowired private DataSource dataSource;
// ItemReader
@Bean
public FlatFileItemReader<Product> productReader() {
FlatFileItemReader<Product> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/products.csv"));
DefaultLineMapper<Product> mapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("productId", "productName", "productCategory", "productPrice");
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(fieldSet -> {
Product product = new Product();
product.setProductId(fieldSet.readInt("productId"));
product.setProductName(fieldSet.readString("productName"));
product.setProductCategory(fieldSet.readString("productCategory"));
product.setProductPrice(fieldSet.readDouble("productPrice"));
return product;
});
reader.setLineMapper(mapper);
return reader;
}
// ItemProcessor
@Bean
public ItemProcessor<Product, Product> productProcessor() {
return item -> {
if (item.getProductPrice() <= 0) {
return null; // 필터링
}
item.setProductName(item.getProductName().toUpperCase());
return item;
};
}
// ItemWriter
@Bean
public JdbcBatchItemWriter<Product> productWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO products (id, name, category, price) VALUES (?, ?, ?, ?)");
writer.setItemPreparedStatementSetter((item, ps) -> {
ps.setInt(1, item.getProductId());
ps.setString(2, item.getProductName());
ps.setString(3, item.getProductCategory());
ps.setDouble(4, item.getProductPrice());
});
return writer;
}
// Step
@Bean
public Step productStep() {
return stepBuilderFactory.get("productStep")
.<Product, Product>chunk(10)
.reader(productReader())
.processor(productProcessor())
.writer(productWriter())
.build();
}
// Job
@Bean
public Job productJob() {
return jobBuilderFactory.get("productJob")
.incrementer(new RunIdIncrementer())
.start(productStep())
.build();
}
}
데이터 흐름 단계별 상세
Read 단계
read() 호출 1: item1 반환 → readCount = 1
read() 호출 2: item2 반환 → readCount = 2
...
read() 호출 10: item10 반환 → readCount = 10 → 청크 완료 (크기 10)
또는
read() 호출 N: null 반환 → Step 종료
각 호출마다 StepExecution.readCount 1씩 증가.
Process 단계
process(item1) → processedItem1 → 목록 추가
process(item2) → null → 필터링, filterCount++
process(item3) → processedItem3 → 목록 추가
청크에 수집된 각 아이템에 대해 개별 호출.
Write 단계
처리된 아이템 List를 한 번에 받아 실행. 성공 시 writeCount 증가 + 트랜잭션 커밋. 예외 시 트랜잭션 롤백.
JdbcBatchItemWriter는 PreparedStatement.executeBatch() 사용 — 한 번의 DB 호출.
다양한 ItemProcessor 사용 패턴
로깅·모니터링용
@Bean
public ItemProcessor<Product, Product> loggingProcessor() {
return item -> {
log.debug("Processing: {}", item.getProductName());
item.setProcessedAt(LocalDateTime.now());
return item;
};
}
외부 서비스 호출
@Component
public class EnrichProductProcessor implements ItemProcessor<Product, EnrichedProduct> {
@Autowired
private CategoryService categoryService;
@Override
public EnrichedProduct process(Product item) throws Exception {
CategoryInfo categoryInfo = categoryService.getCategoryInfo(item.getProductCategory());
EnrichedProduct enriched = new EnrichedProduct(item);
enriched.setCategoryDescription(categoryInfo.getDescription());
enriched.setTaxRate(categoryInfo.getTaxRate());
return enriched;
}
}
타입 변환
public class ProductToDtoProcessor implements ItemProcessor<Product, ProductDTO> {
@Autowired
private ModelMapper modelMapper;
@Override
public ProductDTO process(Product item) throws Exception {
return modelMapper.map(item, ProductDTO.class);
}
}
자주 묻는 질문
Q: ItemProcessor의 null 반환과 SkipPolicy의 skip은 어떻게 다른가요?
A: ItemProcessor의 null = "정상적인 비즈니스 결정으로 이 아이템 스킵" → filterCount 기록. SkipPolicy의 skip = "처리 중 예외 발생했지만 건너뛰고 계속" → skipCount 기록. 목적이 완전히 다른 두 메커니즘.
Q: 청크 크기를 어떻게 결정하나요?
A: 일반 가이드 — DB 트랜잭션 비용 높으면 키움, 메모리 제한이면 줄임, 실패 가능성 높으면 줄여서 재처리 범위 좁힘. 성능 테스트로 최적값 찾기. 시작은 100 정도가 무난.
Q: ItemProcessor 없이 Step 가능한가요?
A: 네. 입출력 타입 같고 변환 불필요하면 .processor() 생략. 청크 설정의 입출력 타입 동일 — .<Product, Product>chunk(10).
청크 처리 성능 최적화
청크 크기 튜닝
청크 크기 ↑ → DB 트랜잭션 ↓ → 성능 ↑ (단 메모리 ↑).
성능 측정 ChunkListener:
@Component
public class PerformanceChunkListener implements ChunkListener {
private long chunkStartTime;
private int chunkCount = 0;
@Override
public void beforeChunk(ChunkContext context) {
chunkStartTime = System.currentTimeMillis();
}
@Override
public void afterChunk(ChunkContext context) {
long elapsed = System.currentTimeMillis() - chunkStartTime;
chunkCount++;
System.out.printf("청크 %d 처리 시간: %dms%n", chunkCount, elapsed);
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("청크 오류 발생");
}
}
Reader 최적화
JdbcCursorItemReader.setFetchSize()— DB에서 한 번에 가져올 행 수JdbcPagingItemReader.pageSize≈ 청크 크기 일치- 정렬 컬럼(sortKey)에 DB 인덱스
Writer 최적화
JdbcBatchItemWriter— 단건 INSERT 대신 Bulk INSERT- 청크 크기 ↑ → 트랜잭션 커밋 횟수 ↓
일반적 실수와 주의사항
ItemReader는 반드시 null 반환
데이터 소진되면 무조건 null. 안 그러면 Step 무한 루프.
CompositeItemProcessor 타입 체크
제네릭 타입은 첫 프로세서 입력 + 마지막 프로세서 출력. 중간 프로세서들은 List<ItemProcessor<?, ?>> — 런타임 체크.
Writer가 빈 List 받을 수 있음
모든 아이템이 Processor에서 필터링되면 빈 List. items.size() == 0 처리.
ItemProcessor 상태 유지 주의
기본 싱글톤 — 인스턴스 변수에 상태 저장하면 청크 간 공유. 카운터·누적 값은 ExecutionContext 사용.
병렬 처리 시 Thread Safety
ItemProcessor 가능하면 stateless. 공유 리소스 접근 시 동기화.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 3편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- 청크 처리 = read·read·…·read → process·process·… → write → 커밋
- 각 청크 = 독립 트랜잭션 (이미 커밋된 청크는 영향 X)
- ItemReader.read() null = 데이터 끝 신호
- null 빠뜨리면 무한 루프
- ItemProcessor.process() null = 필터링 (Writer에 X)
- 필터링은
filterCount/ 스킵은skipCount(다른 카운터) - ItemProcessor 3가지 역할 — 변환·필터링·검증
- 검증 — 커스텀 Validator + ValidatingItemProcessor
- JSR-303 =
@NotNull,@NotBlank,@Min+ BeanValidatingItemProcessor setFilter(true)= 검증 실패 시 필터링 /false= 예외 발생- CompositeItemProcessor = 여러 프로세서 체이닝
- 중간 null = short-circuit (이후 프로세서 실행 X)
- 순서가 성능 영향 — 많이 필터링하는 것 먼저
- ItemWriter.write() = List를 한 번에
- 빈 List도 처리 가능해야 (모든 아이템 필터링 시)
- 청크 크기 trade-off — 메모리 vs 트랜잭션 vs 재처리
- ItemProcessor 상태 유지 주의 (싱글톤)
- 누적 값 → ExecutionContext 사용
JdbcBatchItemWriter=executeBatch()Bulk INSERT- 병렬 처리 시 ItemProcessor stateless 권장
- ChunkListener — beforeChunk/afterChunk/afterChunkError
- 입출력 타입 같으면 ItemProcessor 생략 가능
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 톤으로 묶어 정리되어 있어요. 3편 청크 패턴이 잡히면 4·5편의 ItemReader/Writer 구현체가 같은 패턴 반복으로 보입니다.
- 1편 — Spring Batch 입문 (Job·Step·Chunk 모델)
- 2편 — Spring Batch Job 설정 (Tasklet과 Chunk Step)
- 3편 — 청크 처리 (현재 글)
- 4편 — ItemReader 마스터 (CSV·JdbcCursor·Paging)
- 5편 — ItemWriter 마스터 (FlatFile·JdbcBatch·Composite)
- 6편 — Job Flow와 리스너
- 7편 — 오류 처리 (Skip·Retry·SkipPolicy)
- 8편 — Spring Batch 5 마이그레이션
공식 문서: Spring Batch Item Processing에서 모든 ItemProcessor 패턴을 확인할 수 있어요.
다음 글(4편)에서는 ItemReader 구현체의 본격 — FlatFileItemReader·JdbcCursorItemReader·JdbcPagingItemReader 차이, RowMapper와 FieldSetMapper, 페이지·커서 선택 가이드까지 풀어 갑니다.