Spring Batch 입문 25편. Spring Batch 가 기본 제공하는 ItemReader · ItemWriter 구현체 한눈 카탈로그 — 6 Decorator (Synchronized · Peekable · MultiResource · Classifier · Mapping), Messaging (AMQP · JMS · Kafka), Database (Mongo · Repository · JdbcBatch · Jpa), Specialized (LDIF · Avro · Mail · Script) 까지 정리한 학습 노트.
이 글은 Spring Batch 입문에서 운영까지 시리즈 48편 중 25편이에요. 24편까지 인터페이스 자체를 봤다면, 이번 25편은 Spring Batch가 미리 만들어둔 구현체 전체를 카탈로그 형태로 한눈에 훑어봅니다.
표준 구현체의 4범주
| 범주 | 무엇 |
|---|---|
| Decorator | 기존 Reader/Writer 에 부가 기능 wrapping |
| Messaging | AMQP · JMS · Kafka 같은 메시지 시스템 |
| Database | RDB · MongoDB · Spring Data Repository |
| Specialized | LDIF · Avro · Mail · Script 같은 특수 case |
각 범주 안에서 상황별로 어떻게 골라 쓰는지 그림을 그려보겠습니다.
1. Decorator — 부가 기능 wrapping
기존 Reader·Writer 위에 재사용 가능한 한 겹을 더 씌우는 패턴입니다.
SynchronizedItemStreamReader
@Bean
public SynchronizedItemStreamReader<Person> reader() {
FlatFileItemReader<Person> flatFileReader = new FlatFileItemReaderBuilder<Person>()
// ...
.build();
return new SynchronizedItemStreamReaderBuilder<Person>()
.delegate(flatFileReader)
.build();
}
역할 — non-thread-safe Reader를 thread-safe 로 감쌉니다.
22편의 핵심 함정을 떠올려 보면, FlatFileItemReader·JdbcCursorItemReader·StaxEventItemReader는 모두 NOT thread-safe 였죠. 37편 Multi-threaded Step 환경에서 이들을 쓰려면 이 decorator 가 필수입니다.
SingleItemPeekableItemReader
@Bean
public SingleItemPeekableItemReader<Item> peekableReader(ItemReader<Item> delegate) {
return new SingleItemPeekableItemReaderBuilder<Item>()
.delegate(delegate)
.build();
}
역할 — peek() 메서드를 추가합니다. 다음 item 을 미리 보되 소비하지는 않고, 다음 read() 가 같은 item 을 반환합니다.
용도 — 그룹화 패턴에서 씁니다. 다음 item 의 키 값을 보고 현재 그룹을 끊을지 판단하는 식이에요.
while ((item = reader.read()) != null) {
currentGroup.add(item);
Item next = reader.peek();
if (next == null || !sameGroup(item, next)) {
flushGroup(currentGroup);
currentGroup = new ArrayList<>();
}
}
주의 — peek() 은 thread-safe 하지 않습니다. multi-threaded 환경에서는 쓰면 안 돼요.
SynchronizedItemStreamWriter
@Bean
public SynchronizedItemStreamWriter<Person> writer() {
FlatFileItemWriter<Person> flatFileWriter = new FlatFileItemWriterBuilder<Person>()
// ...
.build();
return new SynchronizedItemStreamWriterBuilder<Person>()
.delegate(flatFileWriter)
.build();
}
SynchronizedItemStreamReader 의 Writer 버전입니다. FlatFileItemWriter 자체가 thread-safe 하지 않기 때문에 multi-threaded 환경에서는 이렇게 감싸 줍니다.
MultiResourceItemWriter
@Bean
public MultiResourceItemWriter<Person> multiWriter(
FlatFileItemWriter<Person> delegate) {
return new MultiResourceItemWriterBuilder<Person>()
.name("multiWriter")
.delegate(delegate)
.resource(new FileSystemResource("output/"))
.resourceSuffixCreator(index -> "output-" + index + ".csv")
.itemCountLimitPerResource(1000)
.build();
}
역할 — itemCountLimitPerResource 를 넘으면 새 출력 파일을 자동으로 만듭니다. 대량 출력을 작은 파일로 쪼개고 싶을 때 쓰죠 (예: 1000건씩 chunked file 출력).
ClassifierCompositeItemWriter
@Bean
public ClassifierCompositeItemWriter<Order> classifierWriter(
JdbcBatchItemWriter<Order> regularWriter,
JdbcBatchItemWriter<Order> priorityWriter) {
Classifier<Order, ItemWriter<? super Order>> classifier =
order -> order.isPriority() ? priorityWriter : regularWriter;
return new ClassifierCompositeItemWriterBuilder<Order>()
.classifier(classifier)
.build();
}
역할 — item 마다 다른 Writer 로 라우팅합니다 (router pattern). 23편 ItemWriter 의 핵심 패턴이었어요.
thread-safety — 모든 delegate 가 thread-safe 면 자동으로 보장됩니다.
ClassifierCompositeItemProcessor
@Bean
public ClassifierCompositeItemProcessor<Order, Order> classifierProcessor(
ItemProcessor<Order, Order> proc1,
ItemProcessor<Order, Order> proc2) {
Classifier<Order, ItemProcessor<?, ? extends Order>> classifier =
order -> "TYPE_A".equals(order.getType()) ? proc1 : proc2;
return new ClassifierCompositeItemProcessorBuilder<Order, Order>()
.classifier(classifier)
.build();
}
ClassifierCompositeItemWriter 의 Processor 버전으로, item type 별로 다른 처리 로직을 태웁니다.
MappingItemWriter
Spring Batch 6.0+ 신규.
@Bean
public MappingItemWriter<Customer, String> mappingWriter(
ItemWriter<String> downstream) {
return new MappingItemWriter<>(
downstream,
customer -> customer.getEmail() // 변환 함수
);
}
역할 — 입력 type 을 변환한 뒤 downstream Writer 로 흘려보냅니다. CompositeItemWriter 와 결합하면 deconstruction 패턴이 되는데, 한 객체에서 여러 필드를 뽑아 각 필드를 다른 Writer 로 보내는 식이에요.
@Bean
public CompositeItemWriter<Customer> dualWriter(
ItemWriter<String> emailWriter,
ItemWriter<Long> idWriter) {
MappingItemWriter<Customer, String> emailExtract =
new MappingItemWriter<>(emailWriter, Customer::getEmail);
MappingItemWriter<Customer, Long> idExtract =
new MappingItemWriter<>(idWriter, Customer::getId);
CompositeItemWriter<Customer> composite = new CompositeItemWriter<>();
composite.setDelegates(List.of(emailExtract, idExtract));
return composite;
}
2. Messaging Readers · Writers
KafkaItemReader
@Bean
public KafkaItemReader<String, String> kafkaReader(
ConsumerFactory<String, String> consumerFactory) {
return new KafkaItemReaderBuilder<String, String>()
.name("kafkaReader")
.partitions(0, 1, 2)
.pollTimeout(Duration.ofSeconds(10))
.topic("input-topic")
.consumerProperties(...)
.build();
}
역할 — Kafka topic 의 여러 partition 에서 message 를 읽습니다. offset 을 ExecutionContext 에 저장하기 때문에 재시작에도 안전합니다.
KafkaItemWriter
@Bean
public KafkaItemWriter<String, Order> kafkaWriter(
KafkaTemplate<String, Order> template) {
return new KafkaItemWriterBuilder<String, Order>()
.kafkaTemplate(template)
.itemKeyMapper(Order::getOrderKey)
.delete(false)
.build();
}
역할 — chunk 의 items 를 default topic 으로 send 합니다.
AmqpItemReader · AmqpItemWriter
@Bean
public AmqpItemReader<Order> amqpReader(AmqpTemplate template) {
return new AmqpItemReaderBuilder<Order>()
.amqpTemplate(template)
.itemType(Order.class)
.build();
}
역할 — RabbitMQ 같은 AMQP (메시지 큐 표준 프로토콜) exchange 에서 read/send 합니다. AmqpTemplate 을 그대로 활용합니다.
JmsItemReader · JmsItemWriter
@Bean
public JmsItemReader<Order> jmsReader(JmsTemplate template) {
return new JmsItemReaderBuilder<Order>()
.jmsTemplate(template)
.itemType(Order.class)
.build();
}
역할 — JMS (자바 메시징 표준 API) queue/topic 에서 read/send 합니다. JmsTemplate 의 default destination 을 그대로 씁니다.
Messaging 의 transaction 특성
22편에서 정리한 Transactional Source 의 정의를 떠올려 보세요. rollback 시 같은 item 을 다시 read 할 수 있어야 한다는 거였죠. JMS·Kafka·AMQP 가 모두 transactional source 라서, retry·skip 로직과 자연스럽게 맞물립니다.
3. Database Readers
MongoPagingItemReader
@Bean
public MongoPagingItemReader<Customer> mongoReader(MongoTemplate template) {
return new MongoPagingItemReaderBuilder<Customer>()
.name("mongoReader")
.template(template)
.targetType(Customer.class)
.jsonQuery("{ active: true }")
.sorts(Map.of("createdAt", Sort.Direction.ASC))
.pageSize(1000)
.build();
}
역할 — MongoDB 에서 page 단위로 읽습니다.
MongoCursorItemReader
@Bean
public MongoCursorItemReader<Customer> mongoCursorReader(MongoTemplate template) {
return new MongoCursorItemReaderBuilder<Customer>()
.name("mongoCursorReader")
.template(template)
.targetType(Customer.class)
.jsonQuery("{ active: true }")
.sorts(Map.of("createdAt", Sort.Direction.ASC))
.build();
}
역할 — MongoDB 를 streaming (cursor) 으로 훑습니다. 22편에서 본 Cursor vs Paging 의 MongoDB 판이에요.
RepositoryItemReader
@Bean
public RepositoryItemReader<Customer> repoReader(CustomerRepository repo) {
return new RepositoryItemReaderBuilder<Customer>()
.name("repoReader")
.repository(repo)
.methodName("findByActive")
.arguments(Arrays.asList(true))
.sorts(Map.of("id", Sort.Direction.ASC))
.pageSize(1000)
.build();
}
역할 — Spring Data 의 PagingAndSortingRepository 를 활용해, 이미 만들어둔 Repository 메서드를 재사용합니다.
4. Database Writers
JdbcBatchItemWriter
@Bean
public JdbcBatchItemWriter<Customer> jdbcWriter(DataSource ds) {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(ds)
.sql("INSERT INTO customers (id, name, email) VALUES (:id, :name, :email)")
.beanMapped()
.build();
}
역할 — NamedParameterJdbcTemplate 의 batch update 를 활용해, chunk 전체를 한 batch 로 INSERT 합니다. DB writer 가운데 성능이 가장 좋습니다.
JpaItemWriter
@Bean
public JpaItemWriter<Customer> jpaWriter(EntityManagerFactory emf) {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(emf)
.build();
}
역할 — JPA 의 EntityManager.merge() 를 씁니다. persistence context 에 없는 entity 도 자동으로 merge 됩니다. JPA 환경의 표준이지만, batch insert 성능은 JdbcBatchItemWriter 보다 떨어집니다.
MongoItemWriter
@Bean
public MongoItemWriter<Customer> mongoWriter(MongoOperations ops) {
return new MongoItemWriterBuilder<Customer>()
.template(ops)
.collection("customers")
.build();
}
역할 — MongoDB 에 chunk 의 items 를 저장합니다. MongoOperations (Spring Data MongoDB) 를 그대로 활용합니다.
RepositoryItemWriter
@Bean
public RepositoryItemWriter<Customer> repoWriter(CustomerRepository repo) {
return new RepositoryItemWriterBuilder<Customer>()
.repository(repo)
.methodName("save")
.build();
}
역할 — Spring Data CrudRepository.save() 를 활용해 기존 Repository 를 재사용합니다.
5. Specialized Readers
LdifReader
@Bean
public LdifReader ldifReader() {
return new LdifReaderBuilder()
.name("ldifReader")
.resource(new FileSystemResource("users.ldif"))
.build();
}
역할 — LDIF (LDAP 디렉터리 데이터 교환 포맷) 파일을 읽습니다. 각 read 결과가 LdapAttribute 로 나옵니다.
MappingLdifReader
@Bean
public MappingLdifReader<UserDto> mappingLdifReader(
RecordMapper<UserDto> mapper) {
return new MappingLdifReaderBuilder<UserDto>()
.name("mappingLdifReader")
.resource(new FileSystemResource("users.ldif"))
.recordMapper(mapper)
.build();
}
역할 — LDIF 를 POJO 까지 바로 매핑합니다. RecordMapper 로 변환 로직을 주입하는 형태예요.
AvroItemReader
@Bean
public AvroItemReader<User> avroReader() {
return new AvroItemReaderBuilder<User>()
.resource(new FileSystemResource("users.avro"))
.type(User.class)
.embeddedSchema(true)
.build();
}
역할 — Avro (스키마 기반 이진 직렬화 포맷) 데이터를 읽습니다. schema embedded 옵션을 켤 수 있어요.
6. Specialized Writers
SimpleMailMessageItemWriter
@Bean
public SimpleMailMessageItemWriter mailWriter(MailSender sender) {
return new SimpleMailMessageItemWriterBuilder()
.mailSender(sender)
.build();
}
역할 — chunk 의 SimpleMailMessage 들을 batch mail 로 보냅니다. MailSender 에 위임하는 구조예요. 대량 알림 전송 batch 에 잘 맞습니다.
AvroItemWriter
@Bean
public AvroItemWriter<User> avroWriter() {
return new AvroItemWriterBuilder<User>()
.resource(new FileSystemResource("output.avro"))
.type(User.class)
.schema(new ClassPathResource("user.avsc"))
.build();
}
역할 — POJO 를 Avro 로 직렬화해 출력합니다.
7. Specialized Processors
ScriptItemProcessor
@Bean
public ScriptItemProcessor<Item, Item> scriptProcessor() {
return new ScriptItemProcessorBuilder<Item, Item>()
.scriptSource("item.value = item.value.toUpperCase(); item;", "javascript")
.build();
}
역할 — JavaScript·Groovy 같은 스크립트로 item 을 처리합니다. JSR-223 (자바 스크립트 엔진 표준) 위에서 동작하죠. runtime 에 변경 가능한 변환 로직이 필요할 때 유용합니다.
선택 가이드 — 어떤 상황에 어떤 구현체
Multi-threaded Step
| 기존 Reader | wrap 후 |
|---|---|
FlatFileItemReader |
SynchronizedItemStreamReader |
JdbcCursorItemReader |
SynchronizedItemStreamReader |
StaxEventItemReader |
SynchronizedItemStreamReader |
JdbcPagingItemReader |
(이미 thread-safe — wrap 불필요) |
KafkaItemReader |
(partition 단위 분리 — wrap 불필요) |
대량 출력 분할
MultiResourceItemWriter+FlatFileItemWriterdelegate
Grouping 패턴
SingleItemPeekableItemReader+ 사용자 그룹화 로직
조건부 처리
ClassifierCompositeItemProcessor(type 별 로직)ClassifierCompositeItemWriter(type 별 라우팅)
DB writer 성능 우선
JdbcBatchItemWriter(chunk 전체 batch INSERT)- avoid
JpaItemWriter(느림, 단 ORM 통합 필요 시)
Spring Data 재사용
RepositoryItemReader+RepositoryItemWriter
Messaging 환경
- Kafka —
KafkaItemReader·KafkaItemWriter - RabbitMQ —
AmqpItemReader·AmqpItemWriter - 표준 JMS —
JmsItemReader·JmsItemWriter
MongoDB
- Paging —
MongoPagingItemReader - Cursor (streaming) —
MongoCursorItemReader - Writer —
MongoItemWriter
함정 정리
함정 1: SynchronizedItemStreamReader 만 wrap, delegate 미등록
new StepBuilder(...)
.reader(syncReader)
// .stream(delegate) 빠뜨림 → 잘못된 패턴? NO!
SynchronizedItemStreamReader 가 내부에서 delegate 의 ItemStream 호출을 알아서 위임하기 때문에, 추가로 .stream() 을 걸어줄 필요가 없습니다. 22편에서 본 일반 Delegate Pattern 함정과는 결이 달라요.
함정 2: MultiResourceItemWriter 의 ExecutionContext
MultiResourceItemWriter 는 현재 resource index 와 위치를 ExecutionContext 에 저장하기 때문에 재시작에 안전합니다. 단 delegate 가 ItemStream 을 구현하고 name 이 설정돼 있어야 합니다.
함정 3: JpaItemWriter 의 batch insert 안 됨
JPA 자체가 개별 EntityManager.merge 를 호출하는 구조라, hibernate.jdbc.batch_size property 를 설정하고 entity 가 @GeneratedValue(strategy = SEQUENCE) 를 쓸 때에 한해서만 batch insert 가 활성화됩니다. 진짜 batch INSERT 가 필요하면 JdbcBatchItemWriter 를 쓰는 게 낫습니다.
함정 4: KafkaItemReader 의 partition 지정
KafkaItemReader 는 명시한 partition list 만 읽습니다. 자동 partition assignment 가 없어서, partition list 를 정확히 지정해 줘야 합니다.
함정 5: ClassifierComposite* 의 default branch
Classifier 가 모든 case 를 커버하지 못하면 NPE 가 터집니다. default delegate 를 두거나 전 case 를 명시해야 합니다.
시험 직전 한 번 더 — 구현체 카탈로그 함정 압축 노트
- 4범주 = Decorator · Messaging · Database · Specialized
- Decorator 6종 = SynchronizedItemStreamReader · SingleItemPeekable · SynchronizedItemStreamWriter · MultiResource · ClassifierComposite (Writer + Processor) · Mapping
- SynchronizedItemStreamReader/Writer = non-thread-safe wrap → multi-threaded 안전
- SingleItemPeekable = peek() 메서드, grouping 패턴, thread-safe X
- MultiResourceItemWriter = itemCountLimitPerResource 초과 시 새 파일
- ClassifierComposite = router pattern, item 별 다른 delegate
- MappingItemWriter = 변환 후 downstream Writer, deconstruction 패턴
- Messaging = AMQP · JMS · Kafka (transactional source — rollback 시 재 read)
- KafkaItemReader = offset ExecutionContext 저장, partition 명시 지정
- Database Readers = MongoPaging · MongoCursor · Repository
- Database Writers = MongoItemWriter · RepositoryItemWriter · JdbcBatchItemWriter · JpaItemWriter
- JdbcBatchItemWriter = NamedParameterJdbcTemplate batch, 최고 성능
- JpaItemWriter = EntityManager.merge, batch insert 약함
- Specialized Readers = LdifReader · MappingLdifReader · AvroItemReader
- Specialized Writers = SimpleMailMessageItemWriter · AvroItemWriter
- ScriptItemProcessor = JSR-223 script (JavaScript·Groovy)
- 함정 — Synchronized*Reader 는 자동 위임 → 추가
.stream()불필요 - 함정 — JpaItemWriter batch insert 는 hibernate.jdbc.batch_size 필요
- 함정 — KafkaItemReader 는 partition 명시
- 함정 — ClassifierComposite 의 null branch → default delegate
- thread-safe 기본 = JdbcPagingItemReader · 대부분 Writer
- thread-safe X = FlatFileItemReader · JdbcCursorItemReader · StaxEventItemReader · SingleItemPeekable.peek()
- 선택 — multi-threaded → Synchronized wrapper · 대량 분할 → MultiResource · 그룹화 → Peekable · 분기 → Classifier
- 선택 — DB 최고 성능 → JdbcBatch · ORM 통합 → Jpa · Spring Data 재사용 → Repository
공식 문서: Item Reader and Writer Implementations 에서 원문을 확인할 수 있어요.
시리즈 다른 편 (앞뒤 글 모음)
이전 글:
- 20편 — Flow Control · Decision · Split · 조건 분기
- 21편 — Late Binding · @StepScope · @JobScope · SpEL
- 22편 — ItemReader 인터페이스 종합 · Delegate Pattern
- 23편 — ItemWriter 인터페이스 종합
- 24편 — ItemStream 인터페이스 본격 풀이
다음 글: