청크 처리 — Reader·Processor·Writer 패턴

2026-05-03확률과 통계 마스터 노트

Spring Batch 마스터 노트 시리즈 3편. 대량 데이터 처리의 핵심 패턴 — read → process → write 청크 사이클, ItemReader의 null 종료 신호, ItemProcessor의 변환·필터링·검증 3가지 역할, JSR-303 BeanValidatingItemProcessor, 여러 프로세서를 잇는 CompositeItemProcessor, 트랜잭션 동작과 성능 튜닝까지.

이 글은 Spring Batch 마스터 노트 시리즈의 세 번째 편입니다. 2편(Job 설정)에서 도구를 잡았다면, 이번엔 Spring Batch의 가장 중요한 처리 패턴 — 청크 지향 처리.

대량 데이터를 한 번에 메모리에 올리는 건 불가능해요. 청크 처리는 데이터를 묶음 단위로 나눠 메모리 효율을 잡으면서 트랜잭션 보장도 함께 챙기는 도구입니다. 이번 편의 핵심은 ItemReader → ItemProcessor → ItemWriter 세 인터페이스의 정확한 동작 이해.

처음 청크 처리가 어렵게 느껴지는 이유

이유는 두 가지예요.

첫째, 세 인터페이스의 협력 방식이 한 번에 안 들어옵니다. Reader가 청크 크기만큼 채울 때까지 반복 호출되고, Processor가 각 아이템을 개별 처리하고, Writer가 묶음으로 한 번에 쓰는 — 이 3단계 사이클이 머릿속에 그림으로 잡혀야 디버깅이 가능해요.

둘째, null 반환의 의미가 두 자리에서 다릅니다. ItemReader의 null = "데이터 끝", ItemProcessor의 null = "이 아이템 필터링". 같은 null인데 의미가 정반대라 헷갈립니다.

해결법은 한 가지예요. 청크 사이클을 그림으로 외우세요. read·read·read·…(청크 크기까지) → process·process·process·… → write(묶음으로) → 커밋. 이 그림이 잡히면 모든 청크 동작이 자연스럽게 보여요.

청크 처리 vs Tasklet — 한 번 더 비교

구분 청크 처리 Tasklet
적합한 작업 대량 데이터 읽기/변환/쓰기 단순 작업 (파일 삭제·이메일)
구성 요소 ItemReader + (Processor) + Writer Tasklet 인터페이스 하나
트랜잭션 청크 단위 커밋 메서드 실행 단위
재시작 청크 단위 재시작 가능 처음부터 재실행
복잡도 높음 (3개 컴포넌트) 낮음
메모리 청크 크기만큼만 한꺼번에

청크 처리 흐름 — 한 번에 잡기

Step 시작
│
├─ [청크 반복 시작] ─────────────────────────────────────┐
│   │                                                   │
│   ├─ ItemReader.read() → item1                        │
│   ├─ ItemReader.read() → item2     (청크 크기만큼 반복)│
│   ├─ ItemReader.read() → item3                        │
│   └─ ItemReader.read() → null      (또는 청크 채움)   │
│                                                       │
│   ├─ ItemProcessor.process(item1) → processedItem1    │
│   ├─ ItemProcessor.process(item2) → null (필터링!)    │
│   └─ ItemProcessor.process(item3) → processedItem3    │
│                                                       │
│   ├─ ItemWriter.write([processedItem1, processedItem3])│
│   └─ Commit Transaction                               │
│                                                       │
└─ [청크 반복 종료 - read()가 null 반환 시] ─────────────┘
│
Step 완료

청크의 트랜잭션 동작

각 청크는 독립 트랜잭션. read → process → write 사이클 완료 → 커밋.

청크1 [item1, item2, item3] → COMMIT ✓
청크2 [item4, item5, item6] → COMMIT ✓
청크3 [item7, item8, item9] → 오류 발생! → ROLLBACK ✗
청크4 [item10, ...] → 재실행 시 청크3부터 재처리

여기서 정말 중요한 시험 함정 — 이미 커밋된 청크는 영향 받지 않습니다. 청크3에서 실패해도 청크1·2의 데이터는 그대로 보존돼요. 대량 데이터 중 일부 실패에도 부분 성공 데이터를 유지할 수 있는 게 청크 처리의 강점.

ItemReader — read()와 null

인터페이스

public interface ItemReader<T> {
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

read()null을 반환하면 "데이터 끝" 신호. 현재 청크를 완료하고 Step 종료.

커스텀 ItemReader

public class ProductNameItemReader implements ItemReader<String> {
    
    private final List<String> productNames;
    private Iterator<String> iterator;
    
    public ProductNameItemReader() {
        this.productNames = Arrays.asList(
            "Phone", "TV", "Laptop", "Tablet", "Camera"
        );
        this.iterator = productNames.iterator();
    }
    
    @Override
    public String read() {
        if (iterator.hasNext()) {
            return iterator.next();
        }
        return null;  // 필수: null = 종료 신호
    }
}

여기서 정말 중요한 시험 함정 — null 반환 빠뜨리면 무한 루프예요. 데이터 다 소진된 후에도 계속 뭔가 반환하면 Step이 절대 끝나지 않아요. FlatFileItemReaderJdbcCursorItemReader 같은 내장 구현체는 자동 처리하지만, 커스텀 작성 시 항상 종료 조건 확인.

ItemProcessor — 변환·필터링·검증

인터페이스

public interface ItemProcessor<I, O> {
    @Nullable
    O process(I item) throws Exception;
}

입력 타입 I, 출력 타입 O — 다를 수 있음 (변환). null 반환 = 해당 아이템 필터링 — Writer로 안 전달.

선택사항 — 입출력 타입 같고 변환 불필요하면 생략 가능.

변환 (Transformation)

public class ProductTransformProcessor implements ItemProcessor<Product, OSProduct> {
    
    @Override
    public OSProduct process(Product item) throws Exception {
        OSProduct osProduct = new OSProduct();
        
        // 카테고리에 따른 세금 계산
        if (item.getProductCategory().equalsIgnoreCase("sports accessories")) {
            osProduct.setTaxPercent(5);
        } else {
            osProduct.setTaxPercent(18);
        }
        
        // SKU 코드 생성: 카테고리 앞 3자리 + 상품 ID
        String sku = item.getProductCategory().substring(0, 3).toUpperCase() 
                    + item.getProductId();
        osProduct.setSku(sku);
        osProduct.setProductName(item.getProductName());
        osProduct.setProductPrice(item.getProductPrice());
        
        return osProduct;
    }
}

필터링 (Filtering) — null 반환

public class ExpensiveProductFilter implements ItemProcessor<Product, Product> {
    
    private static final double PRICE_THRESHOLD = 100.0;
    
    @Override
    public Product process(Product item) throws Exception {
        if (item.getProductPrice() <= PRICE_THRESHOLD) {
            return item;  // 통과
        }
        return null;  // 필터링 — Writer에 안 전달
    }
}

여기서 시험 함정이 하나 있어요. 필터링된 아이템은 filterCount에 기록됩니다. skipCount(오류 스킵)와 다른 카운터예요. 필터링은 정상적인 비즈니스 결정이고, 스킵은 오류 처리. 두 카운터를 보면 어떤 이유로 아이템이 빠졌는지 구분 가능.

검증 (Validation)

커스텀 Validator

public class ProductValidator implements Validator<Product> {
    
    @Override
    public void validate(Product product) throws ValidationException {
        if (product.getProductName() == null || product.getProductName().isEmpty()) {
            throw new ValidationException("상품명이 비어있습니다: " + product);
        }
        if (product.getProductPrice() < 0) {
            throw new ValidationException("가격이 음수입니다: " + product);
        }
        if (product.getProductCategory() == null) {
            throw new ValidationException("카테고리가 null입니다: " + product);
        }
    }
}

@Bean
public ValidatingItemProcessor<Product> validatingProcessor() {
    ValidatingItemProcessor<Product> processor = new ValidatingItemProcessor<>(new ProductValidator());
    processor.setFilter(true);  // true: 검증 실패 시 필터링 / false: 예외 발생
    return processor;
}

setFilter(true) — 검증 실패 = null 반환(필터링). setFilter(false) — 검증 실패 = ValidationException 발생.

BeanValidatingItemProcessor — JSR-303

public class Product {
    
    @NotNull(message = "상품 ID는 필수")
    private Integer productId;
    
    @NotBlank(message = "상품명 비어있으면 안 됨")
    private String productName;
    
    @NotBlank(message = "카테고리 비어있으면 안 됨")
    private String productCategory;
    
    @Min(value = 0, message = "가격은 0 이상")
    private Double productPrice;
}

@Bean
public BeanValidatingItemProcessor<Product> beanValidatingProcessor() {
    BeanValidatingItemProcessor<Product> processor = new BeanValidatingItemProcessor<>();
    processor.setFilter(true);
    return processor;
}

JSR-303 어노테이션 기반 자동 검증. 도메인 모델에 @NotNull, @NotBlank, @Min, @Max 등 박으면 자동 검증.

CompositeItemProcessor — 여러 프로세서 체이닝

여러 ItemProcessor를 순서대로 연결.

@Bean
public CompositeItemProcessor<Product, OSProduct> compositeProcessor() {
    CompositeItemProcessor<Product, OSProduct> composite = new CompositeItemProcessor<>();
    
    List<ItemProcessor<?, ?>> processors = new ArrayList<>();
    processors.add(beanValidatingProcessor());      // 1. JSR-303 검증
    processors.add(expensiveProductFilter());        // 2. 가격 필터
    processors.add(productTransformProcessor());     // 3. 타입 변환
    
    composite.setDelegates(processors);
    return composite;
}

앞 프로세서의 출력 → 다음 프로세서의 입력. 중간에 null 반환되면 파이프라인 단축 종료(short-circuit).

Item → [Validator] → valid Item → [Filter] → null → (Transformer 실행 X) → 필터링
Item → [Validator] → valid Item → [Filter] → Item → [Transformer] → TransformedItem → Write

여기서 정말 중요한 시험 함정 — 순서가 성능에 직접 영향입니다. 가장 많이 필터링하는 프로세서를 앞에 배치하면 이후 프로세서 실행이 줄어요.

// 효율적 순서 — 많이 필터링하는 것 먼저
processors.add(quickValidationFilter());    // 빠르고 70% 필터링
processors.add(businessRuleFilter());       // 중간 처리량
processors.add(expensiveTransformation()); // 느린 변환

검증으로 70% 필터링되면, 변환 로직 실행 횟수가 70% 감소.

ItemWriter — 묶음으로 한 번에

인터페이스

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

청크 크기만큼의 아이템 List를 한 번에 받아 처리. DB Bulk INSERT파일 일괄 쓰기 가 가능.

커스텀 ItemWriter

public class ProductItemWriter implements ItemWriter<Product> {
    
    @Override
    public void write(List<? extends Product> items) throws Exception {
        System.out.println("쓰기 시작 — 아이템 수: " + items.size());
        for (Product product : items) {
            System.out.println("저장: " + product.getProductName());
            // 실제 DB 저장 또는 파일 쓰기 로직
        }
    }
}

여기서 시험 함정이 하나 있어요. 모든 아이템이 Processor에서 null로 필터링되면 Writer는 빈 List를 받아요. items.size() == 0 케이스도 안전하게 처리해야 합니다. 내장 Writer는 자동 처리하지만 커스텀 Writer 작성 시 주의.

청크 처리 완전한 예시

도메인 모델

@Data
public class Product {
    private Integer productId;
    private String productName;
    private String productCategory;
    private Double productPrice;
    
    @Override
    public String toString() {
        return "Product{id=" + productId + 
               ", name=" + productName + 
               ", category=" + productCategory + 
               ", price=" + productPrice + "}";
    }
}

완전한 청크 Step

@Configuration
@EnableBatchProcessing
public class ProductBatchConfig {
    
    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;
    @Autowired private DataSource dataSource;
    
    // ItemReader
    @Bean
    public FlatFileItemReader<Product> productReader() {
        FlatFileItemReader<Product> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/products.csv"));
        
        DefaultLineMapper<Product> mapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("productId", "productName", "productCategory", "productPrice");
        mapper.setLineTokenizer(tokenizer);
        mapper.setFieldSetMapper(fieldSet -> {
            Product product = new Product();
            product.setProductId(fieldSet.readInt("productId"));
            product.setProductName(fieldSet.readString("productName"));
            product.setProductCategory(fieldSet.readString("productCategory"));
            product.setProductPrice(fieldSet.readDouble("productPrice"));
            return product;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
    
    // ItemProcessor
    @Bean
    public ItemProcessor<Product, Product> productProcessor() {
        return item -> {
            if (item.getProductPrice() <= 0) {
                return null;  // 필터링
            }
            item.setProductName(item.getProductName().toUpperCase());
            return item;
        };
    }
    
    // ItemWriter
    @Bean
    public JdbcBatchItemWriter<Product> productWriter() {
        JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO products (id, name, category, price) VALUES (?, ?, ?, ?)");
        writer.setItemPreparedStatementSetter((item, ps) -> {
            ps.setInt(1, item.getProductId());
            ps.setString(2, item.getProductName());
            ps.setString(3, item.getProductCategory());
            ps.setDouble(4, item.getProductPrice());
        });
        return writer;
    }
    
    // Step
    @Bean
    public Step productStep() {
        return stepBuilderFactory.get("productStep")
            .<Product, Product>chunk(10)
            .reader(productReader())
            .processor(productProcessor())
            .writer(productWriter())
            .build();
    }
    
    // Job
    @Bean
    public Job productJob() {
        return jobBuilderFactory.get("productJob")
            .incrementer(new RunIdIncrementer())
            .start(productStep())
            .build();
    }
}

데이터 흐름 단계별 상세

Read 단계

read() 호출 1: item1 반환 → readCount = 1
read() 호출 2: item2 반환 → readCount = 2
...
read() 호출 10: item10 반환 → readCount = 10 → 청크 완료 (크기 10)

또는

read() 호출 N: null 반환 → Step 종료

각 호출마다 StepExecution.readCount 1씩 증가.

Process 단계

process(item1) → processedItem1 → 목록 추가
process(item2) → null → 필터링, filterCount++
process(item3) → processedItem3 → 목록 추가

청크에 수집된 각 아이템에 대해 개별 호출.

Write 단계

처리된 아이템 List를 한 번에 받아 실행. 성공 시 writeCount 증가 + 트랜잭션 커밋. 예외 시 트랜잭션 롤백.

JdbcBatchItemWriterPreparedStatement.executeBatch() 사용 — 한 번의 DB 호출.

다양한 ItemProcessor 사용 패턴

로깅·모니터링용

@Bean
public ItemProcessor<Product, Product> loggingProcessor() {
    return item -> {
        log.debug("Processing: {}", item.getProductName());
        item.setProcessedAt(LocalDateTime.now());
        return item;
    };
}

외부 서비스 호출

@Component
public class EnrichProductProcessor implements ItemProcessor<Product, EnrichedProduct> {
    
    @Autowired
    private CategoryService categoryService;
    
    @Override
    public EnrichedProduct process(Product item) throws Exception {
        CategoryInfo categoryInfo = categoryService.getCategoryInfo(item.getProductCategory());
        
        EnrichedProduct enriched = new EnrichedProduct(item);
        enriched.setCategoryDescription(categoryInfo.getDescription());
        enriched.setTaxRate(categoryInfo.getTaxRate());
        return enriched;
    }
}

타입 변환

public class ProductToDtoProcessor implements ItemProcessor<Product, ProductDTO> {
    
    @Autowired
    private ModelMapper modelMapper;
    
    @Override
    public ProductDTO process(Product item) throws Exception {
        return modelMapper.map(item, ProductDTO.class);
    }
}

자주 묻는 질문

Q: ItemProcessor의 null 반환과 SkipPolicy의 skip은 어떻게 다른가요?

A: ItemProcessor의 null = "정상적인 비즈니스 결정으로 이 아이템 스킵" → filterCount 기록. SkipPolicy의 skip = "처리 중 예외 발생했지만 건너뛰고 계속" → skipCount 기록. 목적이 완전히 다른 두 메커니즘.

Q: 청크 크기를 어떻게 결정하나요?

A: 일반 가이드 — DB 트랜잭션 비용 높으면 키움, 메모리 제한이면 줄임, 실패 가능성 높으면 줄여서 재처리 범위 좁힘. 성능 테스트로 최적값 찾기. 시작은 100 정도가 무난.

Q: ItemProcessor 없이 Step 가능한가요?

A: 네. 입출력 타입 같고 변환 불필요하면 .processor() 생략. 청크 설정의 입출력 타입 동일 — .<Product, Product>chunk(10).

청크 처리 성능 최적화

청크 크기 튜닝

청크 크기 ↑ → DB 트랜잭션 ↓ → 성능 ↑ (단 메모리 ↑).

성능 측정 ChunkListener:

@Component
public class PerformanceChunkListener implements ChunkListener {
    
    private long chunkStartTime;
    private int chunkCount = 0;
    
    @Override
    public void beforeChunk(ChunkContext context) {
        chunkStartTime = System.currentTimeMillis();
    }
    
    @Override
    public void afterChunk(ChunkContext context) {
        long elapsed = System.currentTimeMillis() - chunkStartTime;
        chunkCount++;
        System.out.printf("청크 %d 처리 시간: %dms%n", chunkCount, elapsed);
    }
    
    @Override
    public void afterChunkError(ChunkContext context) {
        System.out.println("청크 오류 발생");
    }
}

Reader 최적화

  • JdbcCursorItemReader.setFetchSize() — DB에서 한 번에 가져올 행 수
  • JdbcPagingItemReader.pageSize ≈ 청크 크기 일치
  • 정렬 컬럼(sortKey)에 DB 인덱스

Writer 최적화

  • JdbcBatchItemWriter — 단건 INSERT 대신 Bulk INSERT
  • 청크 크기 ↑ → 트랜잭션 커밋 횟수 ↓

일반적 실수와 주의사항

ItemReader는 반드시 null 반환

데이터 소진되면 무조건 null. 안 그러면 Step 무한 루프.

CompositeItemProcessor 타입 체크

제네릭 타입은 첫 프로세서 입력 + 마지막 프로세서 출력. 중간 프로세서들은 List<ItemProcessor<?, ?>> — 런타임 체크.

Writer가 빈 List 받을 수 있음

모든 아이템이 Processor에서 필터링되면 빈 List. items.size() == 0 처리.

ItemProcessor 상태 유지 주의

기본 싱글톤 — 인스턴스 변수에 상태 저장하면 청크 간 공유. 카운터·누적 값은 ExecutionContext 사용.

병렬 처리 시 Thread Safety

ItemProcessor 가능하면 stateless. 공유 리소스 접근 시 동기화.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 3편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • 청크 처리 = read·read·…·read → process·process·… → write → 커밋
  • 각 청크 = 독립 트랜잭션 (이미 커밋된 청크는 영향 X)
  • ItemReader.read() null = 데이터 끝 신호
  • null 빠뜨리면 무한 루프
  • ItemProcessor.process() null = 필터링 (Writer에 X)
  • 필터링은 filterCount / 스킵은 skipCount (다른 카운터)
  • ItemProcessor 3가지 역할 — 변환·필터링·검증
  • 검증 — 커스텀 Validator + ValidatingItemProcessor
  • JSR-303 = @NotNull, @NotBlank, @Min + BeanValidatingItemProcessor
  • setFilter(true) = 검증 실패 시 필터링 / false = 예외 발생
  • CompositeItemProcessor = 여러 프로세서 체이닝
  • 중간 null = short-circuit (이후 프로세서 실행 X)
  • 순서가 성능 영향 — 많이 필터링하는 것 먼저
  • ItemWriter.write() = List를 한 번에
  • 빈 List도 처리 가능해야 (모든 아이템 필터링 시)
  • 청크 크기 trade-off — 메모리 vs 트랜잭션 vs 재처리
  • ItemProcessor 상태 유지 주의 (싱글톤)
  • 누적 값 → ExecutionContext 사용
  • JdbcBatchItemWriter = executeBatch() Bulk INSERT
  • 병렬 처리 시 ItemProcessor stateless 권장
  • ChunkListener — beforeChunk/afterChunk/afterChunkError
  • 입출력 타입 같으면 ItemProcessor 생략 가능

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 톤으로 묶어 정리되어 있어요. 3편 청크 패턴이 잡히면 4·5편의 ItemReader/Writer 구현체가 같은 패턴 반복으로 보입니다.

공식 문서: Spring Batch Item Processing에서 모든 ItemProcessor 패턴을 확인할 수 있어요.

다음 글(4편)에서는 ItemReader 구현체의 본격 — FlatFileItemReader·JdbcCursorItemReader·JdbcPagingItemReader 차이, RowMapper와 FieldSetMapper, 페이지·커서 선택 가이드까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!