Spring Batch 입문 25편 — Reader · Writer 구현체 카탈로그

2026-05-17Spring Batch 입문에서 운영까지

Spring Batch 입문 25편. Spring Batch 가 기본 제공하는 ItemReader · ItemWriter 구현체 한눈 카탈로그 — 6 Decorator (Synchronized · Peekable · MultiResource · Classifier · Mapping), Messaging (AMQP · JMS · Kafka), Database (Mongo · Repository · JdbcBatch · Jpa), Specialized (LDIF · Avro · Mail · Script) 까지 정리한 학습 노트.

📚 Spring Batch 입문에서 운영까지 · 25편 — Reader · Writer 구현체 카탈로그

이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 25편이에요. 24편까지 인터페이스 자체를 봤다면, 이번 25편은 Spring Batch가 미리 만들어둔 구현체 전체를 카탈로그 형태로 한눈에 훑어봅니다.

표준 구현체의 4범주

범주 무엇
Decorator 기존 Reader/Writer 에 부가 기능 wrapping
Messaging AMQP · JMS · Kafka 같은 메시지 시스템
Database RDB · MongoDB · Spring Data Repository
Specialized LDIF · Avro · Mail · Script 같은 특수 case

각 범주 안에서 상황별로 어떻게 골라 쓰는지 그림을 그려보겠습니다.

1. Decorator — 부가 기능 wrapping

기존 Reader·Writer 위에 재사용 가능한 한 겹을 더 씌우는 패턴입니다.

SynchronizedItemStreamReader

@Bean
public SynchronizedItemStreamReader<Person> reader() {
    FlatFileItemReader<Person> flatFileReader = new FlatFileItemReaderBuilder<Person>()
        // ...
        .build();

    return new SynchronizedItemStreamReaderBuilder<Person>()
        .delegate(flatFileReader)
        .build();
}

역할 — non-thread-safe Reader를 thread-safe 로 감쌉니다.

22편의 핵심 함정을 떠올려 보면, FlatFileItemReader·JdbcCursorItemReader·StaxEventItemReader는 모두 NOT thread-safe 였죠. 37편 Multi-threaded Step 환경에서 이들을 쓰려면 이 decorator 가 필수입니다.

SingleItemPeekableItemReader

@Bean
public SingleItemPeekableItemReader<Item> peekableReader(ItemReader<Item> delegate) {
    return new SingleItemPeekableItemReaderBuilder<Item>()
        .delegate(delegate)
        .build();
}

역할peek() 메서드를 추가합니다. 다음 item 을 미리 보되 소비하지는 않고, 다음 read() 가 같은 item 을 반환합니다.

용도 — 그룹화 패턴에서 씁니다. 다음 item 의 키 값을 보고 현재 그룹을 끊을지 판단하는 식이에요.

while ((item = reader.read()) != null) {
    currentGroup.add(item);

    Item next = reader.peek();
    if (next == null || !sameGroup(item, next)) {
        flushGroup(currentGroup);
        currentGroup = new ArrayList<>();
    }
}

주의peek() 은 thread-safe 하지 않습니다. multi-threaded 환경에서는 쓰면 안 돼요.

SynchronizedItemStreamWriter

@Bean
public SynchronizedItemStreamWriter<Person> writer() {
    FlatFileItemWriter<Person> flatFileWriter = new FlatFileItemWriterBuilder<Person>()
        // ...
        .build();

    return new SynchronizedItemStreamWriterBuilder<Person>()
        .delegate(flatFileWriter)
        .build();
}

SynchronizedItemStreamReader 의 Writer 버전입니다. FlatFileItemWriter 자체가 thread-safe 하지 않기 때문에 multi-threaded 환경에서는 이렇게 감싸 줍니다.

MultiResourceItemWriter

@Bean
public MultiResourceItemWriter<Person> multiWriter(
        FlatFileItemWriter<Person> delegate) {
    return new MultiResourceItemWriterBuilder<Person>()
        .name("multiWriter")
        .delegate(delegate)
        .resource(new FileSystemResource("output/"))
        .resourceSuffixCreator(index -> "output-" + index + ".csv")
        .itemCountLimitPerResource(1000)
        .build();
}

역할itemCountLimitPerResource 를 넘으면 새 출력 파일을 자동으로 만듭니다. 대량 출력을 작은 파일로 쪼개고 싶을 때 쓰죠 (예: 1000건씩 chunked file 출력).

ClassifierCompositeItemWriter

@Bean
public ClassifierCompositeItemWriter<Order> classifierWriter(
        JdbcBatchItemWriter<Order> regularWriter,
        JdbcBatchItemWriter<Order> priorityWriter) {

    Classifier<Order, ItemWriter<? super Order>> classifier =
        order -> order.isPriority() ? priorityWriter : regularWriter;

    return new ClassifierCompositeItemWriterBuilder<Order>()
        .classifier(classifier)
        .build();
}

역할 — item 마다 다른 Writer 로 라우팅합니다 (router pattern). 23편 ItemWriter 의 핵심 패턴이었어요.

thread-safety — 모든 delegate 가 thread-safe 면 자동으로 보장됩니다.

ClassifierCompositeItemProcessor

@Bean
public ClassifierCompositeItemProcessor<Order, Order> classifierProcessor(
        ItemProcessor<Order, Order> proc1,
        ItemProcessor<Order, Order> proc2) {

    Classifier<Order, ItemProcessor<?, ? extends Order>> classifier =
        order -> "TYPE_A".equals(order.getType()) ? proc1 : proc2;

    return new ClassifierCompositeItemProcessorBuilder<Order, Order>()
        .classifier(classifier)
        .build();
}

ClassifierCompositeItemWriter 의 Processor 버전으로, item type 별로 다른 처리 로직을 태웁니다.

MappingItemWriter

Spring Batch 6.0+ 신규.

@Bean
public MappingItemWriter<Customer, String> mappingWriter(
        ItemWriter<String> downstream) {
    return new MappingItemWriter<>(
        downstream,
        customer -> customer.getEmail()     // 변환 함수
    );
}

역할 — 입력 type 을 변환한 뒤 downstream Writer 로 흘려보냅니다. CompositeItemWriter 와 결합하면 deconstruction 패턴이 되는데, 한 객체에서 여러 필드를 뽑아 각 필드를 다른 Writer 로 보내는 식이에요.

@Bean
public CompositeItemWriter<Customer> dualWriter(
        ItemWriter<String> emailWriter,
        ItemWriter<Long> idWriter) {

    MappingItemWriter<Customer, String> emailExtract =
        new MappingItemWriter<>(emailWriter, Customer::getEmail);
    MappingItemWriter<Customer, Long> idExtract =
        new MappingItemWriter<>(idWriter, Customer::getId);

    CompositeItemWriter<Customer> composite = new CompositeItemWriter<>();
    composite.setDelegates(List.of(emailExtract, idExtract));
    return composite;
}

2. Messaging Readers · Writers

KafkaItemReader

@Bean
public KafkaItemReader<String, String> kafkaReader(
        ConsumerFactory<String, String> consumerFactory) {
    return new KafkaItemReaderBuilder<String, String>()
        .name("kafkaReader")
        .partitions(0, 1, 2)
        .pollTimeout(Duration.ofSeconds(10))
        .topic("input-topic")
        .consumerProperties(...)
        .build();
}

역할 — Kafka topic 의 여러 partition 에서 message 를 읽습니다. offset 을 ExecutionContext 에 저장하기 때문에 재시작에도 안전합니다.

KafkaItemWriter

@Bean
public KafkaItemWriter<String, Order> kafkaWriter(
        KafkaTemplate<String, Order> template) {
    return new KafkaItemWriterBuilder<String, Order>()
        .kafkaTemplate(template)
        .itemKeyMapper(Order::getOrderKey)
        .delete(false)
        .build();
}

역할 — chunk 의 items 를 default topic 으로 send 합니다.

AmqpItemReader · AmqpItemWriter

@Bean
public AmqpItemReader<Order> amqpReader(AmqpTemplate template) {
    return new AmqpItemReaderBuilder<Order>()
        .amqpTemplate(template)
        .itemType(Order.class)
        .build();
}

역할 — RabbitMQ 같은 AMQP (메시지 큐 표준 프로토콜) exchange 에서 read/send 합니다. AmqpTemplate 을 그대로 활용합니다.

JmsItemReader · JmsItemWriter

@Bean
public JmsItemReader<Order> jmsReader(JmsTemplate template) {
    return new JmsItemReaderBuilder<Order>()
        .jmsTemplate(template)
        .itemType(Order.class)
        .build();
}

역할 — JMS (자바 메시징 표준 API) queue/topic 에서 read/send 합니다. JmsTemplate 의 default destination 을 그대로 씁니다.

Messaging 의 transaction 특성

22편에서 정리한 Transactional Source 의 정의를 떠올려 보세요. rollback 시 같은 item 을 다시 read 할 수 있어야 한다는 거였죠. JMS·Kafka·AMQP 가 모두 transactional source 라서, retry·skip 로직과 자연스럽게 맞물립니다.

3. Database Readers

MongoPagingItemReader

@Bean
public MongoPagingItemReader<Customer> mongoReader(MongoTemplate template) {
    return new MongoPagingItemReaderBuilder<Customer>()
        .name("mongoReader")
        .template(template)
        .targetType(Customer.class)
        .jsonQuery("{ active: true }")
        .sorts(Map.of("createdAt", Sort.Direction.ASC))
        .pageSize(1000)
        .build();
}

역할 — MongoDB 에서 page 단위로 읽습니다.

MongoCursorItemReader

@Bean
public MongoCursorItemReader<Customer> mongoCursorReader(MongoTemplate template) {
    return new MongoCursorItemReaderBuilder<Customer>()
        .name("mongoCursorReader")
        .template(template)
        .targetType(Customer.class)
        .jsonQuery("{ active: true }")
        .sorts(Map.of("createdAt", Sort.Direction.ASC))
        .build();
}

역할 — MongoDB 를 streaming (cursor) 으로 훑습니다. 22편에서 본 Cursor vs Paging 의 MongoDB 판이에요.

RepositoryItemReader

@Bean
public RepositoryItemReader<Customer> repoReader(CustomerRepository repo) {
    return new RepositoryItemReaderBuilder<Customer>()
        .name("repoReader")
        .repository(repo)
        .methodName("findByActive")
        .arguments(Arrays.asList(true))
        .sorts(Map.of("id", Sort.Direction.ASC))
        .pageSize(1000)
        .build();
}

역할 — Spring Data 의 PagingAndSortingRepository 를 활용해, 이미 만들어둔 Repository 메서드를 재사용합니다.

4. Database Writers

JdbcBatchItemWriter

@Bean
public JdbcBatchItemWriter<Customer> jdbcWriter(DataSource ds) {
    return new JdbcBatchItemWriterBuilder<Customer>()
        .dataSource(ds)
        .sql("INSERT INTO customers (id, name, email) VALUES (:id, :name, :email)")
        .beanMapped()
        .build();
}

역할NamedParameterJdbcTemplate 의 batch update 를 활용해, chunk 전체를 한 batch 로 INSERT 합니다. DB writer 가운데 성능이 가장 좋습니다.

JpaItemWriter

@Bean
public JpaItemWriter<Customer> jpaWriter(EntityManagerFactory emf) {
    return new JpaItemWriterBuilder<Customer>()
        .entityManagerFactory(emf)
        .build();
}

역할 — JPA 의 EntityManager.merge() 를 씁니다. persistence context 에 없는 entity 도 자동으로 merge 됩니다. JPA 환경의 표준이지만, batch insert 성능은 JdbcBatchItemWriter 보다 떨어집니다.

MongoItemWriter

@Bean
public MongoItemWriter<Customer> mongoWriter(MongoOperations ops) {
    return new MongoItemWriterBuilder<Customer>()
        .template(ops)
        .collection("customers")
        .build();
}

역할 — MongoDB 에 chunk 의 items 를 저장합니다. MongoOperations (Spring Data MongoDB) 를 그대로 활용합니다.

RepositoryItemWriter

@Bean
public RepositoryItemWriter<Customer> repoWriter(CustomerRepository repo) {
    return new RepositoryItemWriterBuilder<Customer>()
        .repository(repo)
        .methodName("save")
        .build();
}

역할 — Spring Data CrudRepository.save() 를 활용해 기존 Repository 를 재사용합니다.

5. Specialized Readers

LdifReader

@Bean
public LdifReader ldifReader() {
    return new LdifReaderBuilder()
        .name("ldifReader")
        .resource(new FileSystemResource("users.ldif"))
        .build();
}

역할 — LDIF (LDAP 디렉터리 데이터 교환 포맷) 파일을 읽습니다. 각 read 결과가 LdapAttribute 로 나옵니다.

MappingLdifReader

@Bean
public MappingLdifReader<UserDto> mappingLdifReader(
        RecordMapper<UserDto> mapper) {
    return new MappingLdifReaderBuilder<UserDto>()
        .name("mappingLdifReader")
        .resource(new FileSystemResource("users.ldif"))
        .recordMapper(mapper)
        .build();
}

역할 — LDIF 를 POJO 까지 바로 매핑합니다. RecordMapper 로 변환 로직을 주입하는 형태예요.

AvroItemReader

@Bean
public AvroItemReader<User> avroReader() {
    return new AvroItemReaderBuilder<User>()
        .resource(new FileSystemResource("users.avro"))
        .type(User.class)
        .embeddedSchema(true)
        .build();
}

역할 — Avro (스키마 기반 이진 직렬화 포맷) 데이터를 읽습니다. schema embedded 옵션을 켤 수 있어요.

6. Specialized Writers

SimpleMailMessageItemWriter

@Bean
public SimpleMailMessageItemWriter mailWriter(MailSender sender) {
    return new SimpleMailMessageItemWriterBuilder()
        .mailSender(sender)
        .build();
}

역할 — chunk 의 SimpleMailMessage 들을 batch mail 로 보냅니다. MailSender 에 위임하는 구조예요. 대량 알림 전송 batch 에 잘 맞습니다.

AvroItemWriter

@Bean
public AvroItemWriter<User> avroWriter() {
    return new AvroItemWriterBuilder<User>()
        .resource(new FileSystemResource("output.avro"))
        .type(User.class)
        .schema(new ClassPathResource("user.avsc"))
        .build();
}

역할 — POJO 를 Avro 로 직렬화해 출력합니다.

7. Specialized Processors

ScriptItemProcessor

@Bean
public ScriptItemProcessor<Item, Item> scriptProcessor() {
    return new ScriptItemProcessorBuilder<Item, Item>()
        .scriptSource("item.value = item.value.toUpperCase(); item;", "javascript")
        .build();
}

역할 — JavaScript·Groovy 같은 스크립트로 item 을 처리합니다. JSR-223 (자바 스크립트 엔진 표준) 위에서 동작하죠. runtime 에 변경 가능한 변환 로직이 필요할 때 유용합니다.

선택 가이드 — 어떤 상황에 어떤 구현체

Multi-threaded Step

기존 Reader wrap 후
FlatFileItemReader SynchronizedItemStreamReader
JdbcCursorItemReader SynchronizedItemStreamReader
StaxEventItemReader SynchronizedItemStreamReader
JdbcPagingItemReader (이미 thread-safe — wrap 불필요)
KafkaItemReader (partition 단위 분리 — wrap 불필요)

대량 출력 분할

  • MultiResourceItemWriter + FlatFileItemWriter delegate

Grouping 패턴

  • SingleItemPeekableItemReader + 사용자 그룹화 로직

조건부 처리

  • ClassifierCompositeItemProcessor (type 별 로직)
  • ClassifierCompositeItemWriter (type 별 라우팅)

DB writer 성능 우선

  • JdbcBatchItemWriter (chunk 전체 batch INSERT)
  • avoid JpaItemWriter (느림, 단 ORM 통합 필요 시)

Spring Data 재사용

  • RepositoryItemReader + RepositoryItemWriter

Messaging 환경

  • Kafka — KafkaItemReader·KafkaItemWriter
  • RabbitMQ — AmqpItemReader·AmqpItemWriter
  • 표준 JMS — JmsItemReader·JmsItemWriter

MongoDB

  • Paging — MongoPagingItemReader
  • Cursor (streaming) — MongoCursorItemReader
  • Writer — MongoItemWriter

함정 정리

함정 1: SynchronizedItemStreamReader 만 wrap, delegate 미등록

new StepBuilder(...)
    .reader(syncReader)
    // .stream(delegate) 빠뜨림 → 잘못된 패턴? NO!

SynchronizedItemStreamReader 가 내부에서 delegate 의 ItemStream 호출을 알아서 위임하기 때문에, 추가로 .stream() 을 걸어줄 필요가 없습니다. 22편에서 본 일반 Delegate Pattern 함정과는 결이 달라요.

함정 2: MultiResourceItemWriter 의 ExecutionContext

MultiResourceItemWriter 는 현재 resource index 와 위치를 ExecutionContext 에 저장하기 때문에 재시작에 안전합니다. 단 delegate 가 ItemStream 을 구현하고 name 이 설정돼 있어야 합니다.

함정 3: JpaItemWriter 의 batch insert 안 됨

JPA 자체가 개별 EntityManager.merge 를 호출하는 구조라, hibernate.jdbc.batch_size property 를 설정하고 entity 가 @GeneratedValue(strategy = SEQUENCE) 를 쓸 때에 한해서만 batch insert 가 활성화됩니다. 진짜 batch INSERT 가 필요하면 JdbcBatchItemWriter 를 쓰는 게 낫습니다.

함정 4: KafkaItemReader 의 partition 지정

KafkaItemReader 는 명시한 partition list 만 읽습니다. 자동 partition assignment 가 없어서, partition list 를 정확히 지정해 줘야 합니다.

함정 5: ClassifierComposite* 의 default branch

Classifier 가 모든 case 를 커버하지 못하면 NPE 가 터집니다. default delegate 를 두거나 전 case 를 명시해야 합니다.

시험 직전 한 번 더 — 구현체 카탈로그 함정 압축 노트

  • 4범주 = Decorator · Messaging · Database · Specialized
  • Decorator 6종 = SynchronizedItemStreamReader · SingleItemPeekable · SynchronizedItemStreamWriter · MultiResource · ClassifierComposite (Writer + Processor) · Mapping
  • SynchronizedItemStreamReader/Writer = non-thread-safe wrap → multi-threaded 안전
  • SingleItemPeekable = peek() 메서드, grouping 패턴, thread-safe X
  • MultiResourceItemWriter = itemCountLimitPerResource 초과 시 새 파일
  • ClassifierComposite = router pattern, item 별 다른 delegate
  • MappingItemWriter = 변환 후 downstream Writer, deconstruction 패턴
  • Messaging = AMQP · JMS · Kafka (transactional source — rollback 시 재 read)
  • KafkaItemReader = offset ExecutionContext 저장, partition 명시 지정
  • Database Readers = MongoPaging · MongoCursor · Repository
  • Database Writers = MongoItemWriter · RepositoryItemWriter · JdbcBatchItemWriter · JpaItemWriter
  • JdbcBatchItemWriter = NamedParameterJdbcTemplate batch, 최고 성능
  • JpaItemWriter = EntityManager.merge, batch insert 약함
  • Specialized Readers = LdifReader · MappingLdifReader · AvroItemReader
  • Specialized Writers = SimpleMailMessageItemWriter · AvroItemWriter
  • ScriptItemProcessor = JSR-223 script (JavaScript·Groovy)
  • 함정 — Synchronized*Reader 는 자동 위임 → 추가 .stream() 불필요
  • 함정 — JpaItemWriter batch insert 는 hibernate.jdbc.batch_size 필요
  • 함정 — KafkaItemReader 는 partition 명시
  • 함정 — ClassifierComposite 의 null branch → default delegate
  • thread-safe 기본 = JdbcPagingItemReader · 대부분 Writer
  • thread-safe X = FlatFileItemReader · JdbcCursorItemReader · StaxEventItemReader · SingleItemPeekable.peek()
  • 선택 — multi-threaded → Synchronized wrapper · 대량 분할 → MultiResource · 그룹화 → Peekable · 분기 → Classifier
  • 선택 — DB 최고 성능 → JdbcBatch · ORM 통합 → Jpa · Spring Data 재사용 → Repository

공식 문서: Item Reader and Writer Implementations 에서 원문을 확인할 수 있어요.

시리즈 다른 편 (앞뒤 글 모음)

이전 글:

다음 글:

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!