Spring Batch 마스터 노트 시리즈 5편. ItemWriter의 거울 구현 — FlatFileItemWriter의 LineAggregator·FieldExtractor 파이프라인, JdbcBatchItemWriter의 executeBatch 한 번 호출 Bulk INSERT, ?와 Named Parameter 두 방식, MySQL UPSERT, CompositeItemWriter로 DB+파일 동시 출력까지.
이 글은 Spring Batch 마스터 노트 시리즈의 다섯 번째 편입니다. 4편(ItemReader)이 데이터를 읽는 자리였다면, 이번엔 거울 — 데이터를 쓰는 자리.
ItemWriter는 청크 단위로 List를 한 번에 받아 처리해요. 이 구조 덕분에 DB Bulk INSERT 나 버퍼 파일 쓰기 같은 최적화가 가능합니다. 이번 편의 핵심 — FlatFileItemWriter·JdbcBatchItemWriter·CompositeItemWriter 세 구현체.
처음 ItemWriter 선택이 어렵게 느껴지는 이유
이유는 두 가지예요.
첫째, JdbcBatchItemWriter의 두 가지 방식이 헷갈립니다. ItemPreparedStatementSetter (위치 기반 ?) vs ItemSqlParameterSourceProvider (이름 기반 :name) — 어느 쪽이 더 좋은지, 언제 어느 쪽을 쓸지가 첫 단계에서 안 보여요.
둘째, 트랜잭션 동작이 Reader와 Writer에서 다릅니다. Writer 실패 시 청크 전체가 롤백 — 99개 성공해도 1개 실패면 100개 모두. 이 동작이 직관과 어긋나서 처음엔 당황스러워요.
해결법은 한 가지예요. 두 가지 패턴 카드로 외우세요. (1) 파일 출력 = FlatFileItemWriter (LineAggregator + FieldExtractor). (2) DB 출력 = JdbcBatchItemWriter (Bulk INSERT). 다른 곳 동시 = Composite. 이 셋만 잡으면 90%.
ItemWriter 구현체 한눈에
| 구현체 | 출력 대상 | 특징 |
|---|---|---|
FlatFileItemWriter |
CSV, TXT | 구분자 또는 고정 폭 |
JdbcBatchItemWriter |
JDBC DB | PreparedStatement 배치 |
HibernateItemWriter |
Hibernate | Session.saveOrUpdate() |
JpaItemWriter |
JPA | EntityManager.merge() |
MongoItemWriter |
MongoDB | BulkOperations |
StaxEventItemWriter |
XML | StAX |
JsonFileItemWriter |
JSON | Jackson |
CompositeItemWriter |
여러 대상 | 동시 출력 |
| 커스텀 ItemWriter | 임의 | 직접 구현 |
FlatFileItemWriter — CSV/TXT 출력
처리 파이프라인
Java 객체 (Product)
└→ LineAggregator (객체 → 텍스트)
├→ DelimitedLineAggregator (구분자로 필드 연결)
│ └→ FieldExtractor (객체에서 필드 추출)
│ └→ BeanWrapperFieldExtractor (getter 자동 추출)
└→ FormatterLineAggregator (포맷 문자열)
파일에 텍스트 라인 쓰기
기본 설정
@Bean
public FlatFileItemWriter<Product> csvWriter() {
FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
// 1. 출력 파일
writer.setResource(new FileSystemResource("output/product_details_output.csv"));
// 2. 덮어쓰기 / Append
// writer.setAppendAllowed(true);
// 3. LineAggregator
DelimitedLineAggregator<Product> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
// 4. FieldExtractor
BeanWrapperFieldExtractor<Product> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
// 순서가 곧 CSV 컬럼 순서
lineAggregator.setFieldExtractor(fieldExtractor);
writer.setLineAggregator(lineAggregator);
return writer;
}
BeanWrapperFieldExtractor
BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();
// 아래 순서로 CSV 컬럼 생성
extractor.setNames(new String[]{
"productId", // → getProductId()
"productName", // → getProductName()
"productCategory", // → getProductCategory()
"productPrice" // → getProductPrice()
});
여기서 시험 함정이 하나 있어요. setNames()의 이름은 getter 메서드와 정확히 대응해야 합니다. "productId" → getProductId(). Lombok @Getter나 @Data 쓰면 자동 생성, 직접 작성 시 명명 규칙 확인.
헤더 추가
writer.setHeaderCallback(writer -> {
writer.write("product_ID,product_name,product_category,product_price");
});
Append vs 덮어쓰기
writer.setAppendAllowed(true); // 추가
writer.setAppendAllowed(false); // 덮어쓰기 (기본)
배치를 여러 번 실행해 결과를 누적할 때 Append.
다양한 출력 형식
// TSV
DelimitedLineAggregator<Product> tsvAggregator = new DelimitedLineAggregator<>();
tsvAggregator.setDelimiter("\t");
// 파이프
DelimitedLineAggregator<Product> pipeAggregator = new DelimitedLineAggregator<>();
pipeAggregator.setDelimiter("|");
// 포맷 (printf 스타일)
FormatterLineAggregator<Product> formatterAggregator = new FormatterLineAggregator<>();
formatterAggregator.setFormat("%-5d %-30s %-20s %10.2f");
BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
formatterAggregator.setFieldExtractor(extractor);
JdbcBatchItemWriter — DB Bulk INSERT
개요
PreparedStatement.executeBatch()로 청크 내 모든 아이템을 한 번의 DB 호출로 처리. 단건 INSERT보다 수십~수백 배 빠름.
위치 기반 (?) 방식
@Bean
public JdbcBatchItemWriter<Product> dbWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO product_details_output " +
"(product_id, product_name, product_category, product_price) " +
"VALUES (?, ?, ?, ?)");
writer.setItemPreparedStatementSetter((item, ps) -> {
ps.setInt(1, item.getProductId());
ps.setString(2, item.getProductName());
ps.setString(3, item.getProductCategory());
ps.setDouble(4, item.getProductPrice());
});
return writer;
}
Named Parameter (:name) 방식
이름 기반 — 코드가 더 읽기 쉽고 순서 의존 X.
@Bean
public JdbcBatchItemWriter<Product> namedParamWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO product_details_output " +
"(product_id, product_name, product_category, product_price) " +
"VALUES (:productId, :productName, :productCategory, :productPrice)");
// BeanProperty 자동 매핑 — 도메인 객체 필드명을 직접 사용
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
여기서 시험 함정이 하나 있어요. ? 방식은 인덱스 1부터 시작합니다. ps.setInt(0, ...) 아니라 ps.setInt(1, ...). 인덱스 한 번 어긋나면 모든 컬럼이 잘못된 값으로 들어가요. 디버깅도 어렵고. 이름 기반(BeanProperty)이 더 안전합니다.
UPDATE 구문
@Bean
public JdbcBatchItemWriter<Product> updateWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("UPDATE product_details " +
"SET product_name = ?, product_category = ?, product_price = ? " +
"WHERE product_id = ?");
writer.setItemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.getProductName());
ps.setString(2, item.getProductCategory());
ps.setDouble(3, item.getProductPrice());
ps.setInt(4, item.getProductId()); // WHERE
});
return writer;
}
UPSERT (MySQL)
@Bean
public JdbcBatchItemWriter<Product> upsertWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
// MySQL ON DUPLICATE KEY UPDATE
writer.setSql("INSERT INTO product_details " +
"(product_id, product_name, product_category, product_price) " +
"VALUES (?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"product_name = VALUES(product_name), " +
"product_category = VALUES(product_category), " +
"product_price = VALUES(product_price)");
writer.setItemPreparedStatementSetter((item, ps) -> {
ps.setInt(1, item.getProductId());
ps.setString(2, item.getProductName());
ps.setString(3, item.getProductCategory());
ps.setDouble(4, item.getProductPrice());
});
return writer;
}
PK 충돌 시 자동 UPDATE — 멱등 처리에 유용.
커스텀 ItemWriter
기본
public class ProductLogWriter implements ItemWriter<Product> {
@Override
public void write(List<? extends Product> items) throws Exception {
System.out.println("=== 청크 쓰기 시작 (" + items.size() + "개 아이템) ===");
for (Product product : items) {
System.out.printf("저장: ID=%d, 이름=%s, 카테고리=%s, 가격=%.2f%n",
product.getProductId(),
product.getProductName(),
product.getProductCategory(),
product.getProductPrice());
}
System.out.println("=== 청크 쓰기 완료 ===");
}
}
파일 출력
public class ProductFileWriter implements ItemWriter<Product> {
private static final String OUTPUT_FILE = "output/products_log.txt";
@Override
public void write(List<? extends Product> items) throws Exception {
try (FileWriter fw = new FileWriter(OUTPUT_FILE, true);
BufferedWriter bw = new BufferedWriter(fw)) {
for (Product product : items) {
bw.write(product.toString());
bw.newLine();
}
}
}
}
REST API 호출
@Component
public class ApiItemWriter implements ItemWriter<Product> {
@Autowired
private RestTemplate restTemplate;
@Value("${api.endpoint.url}")
private String apiUrl;
@Override
public void write(List<? extends Product> items) throws Exception {
for (Product product : items) {
ResponseEntity<String> response = restTemplate.postForEntity(
apiUrl, product, String.class
);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("API 호출 실패: " + product.getProductId());
}
}
}
}
CompositeItemWriter — 동시 다중 출력
같은 데이터를 DB와 파일에 동시 저장.
@Bean
public CompositeItemWriter<Product> compositeWriter() {
CompositeItemWriter<Product> writer = new CompositeItemWriter<>();
writer.setDelegates(Arrays.asList(
csvWriter(), // 1. CSV 파일
dbWriter(), // 2. DB
logWriter() // 3. 로그
));
return writer;
}
모든 delegate가 순서대로 실행. 하나라도 실패 → 트랜잭션 롤백.
DB 테이블 설정
출력 테이블 (MySQL)
CREATE TABLE IF NOT EXISTS product_details_output (
product_id INT PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
product_category VARCHAR(50),
product_price DECIMAL(10, 2)
);
TRUNCATE TABLE product_details_output;
SELECT * FROM product_details_output;
입력 테이블 + 테스트 데이터
CREATE TABLE IF NOT EXISTS product_details (
product_id INT AUTO_INCREMENT PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
product_category VARCHAR(50),
product_price DECIMAL(10, 2)
);
INSERT INTO product_details (product_name, product_category, product_price) VALUES
('IPhone 14 Pro', 'smartphone', 120000),
('Samsung Galaxy S23', 'smartphone', 110000),
('Sony WH-1000XM5', 'headphone', 35000),
('Nike Air Max 270', 'sports accessories', 15000),
('Dell XPS 15', 'laptop', 200000);
FlatFile vs JdbcBatch — 한 번 더 비교
| 구분 | FlatFileItemWriter | JdbcBatchItemWriter |
|---|---|---|
| 출력 대상 | 파일 (CSV·TXT) | 관계형 DB |
| 핵심 컴포넌트 | LineAggregator + FieldExtractor | SQL + PreparedStatementSetter |
| 트랜잭션 | 파일 시스템 (OS) | DB 트랜잭션 |
| 롤백 | 어려움 | 쉬움 |
| 재시작 | 파일 위치부터 | 마지막 커밋부터 |
| 적합 상황 | 보고서·로그·내보내기 | DB 적재·마이그레이션 |
| 성능 | 버퍼 쓰기 | Batch INSERT |
언제 무엇을 선택?
FlatFileItemWriter — 외부 시스템과 파일 교환 / 사람이 읽는 보고서 / Excel·분석 도구로 처리할 파일.
JdbcBatchItemWriter — DB 영속화 / 트랜잭션 보장 필요 / 대용량 빠른 INSERT / 다른 Step에서 DB 조회로 사용.
전체 ETL 예시
CSV → DB
@Configuration
@EnableBatchProcessing
public class EtlBatchConfig {
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
@Autowired private DataSource dataSource;
@Bean
public FlatFileItemReader<Product> csvReader() {
FlatFileItemReader<Product> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/product_details.csv"));
DefaultLineMapper<Product> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("product_ID", "product_name", "product_category", "product_price");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
Product p = new Product();
p.setProductId(fieldSet.readInt("product_ID"));
p.setProductName(fieldSet.readString("product_name"));
p.setProductCategory(fieldSet.readString("product_category"));
p.setProductPrice(fieldSet.readDouble("product_price"));
return p;
});
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public JdbcBatchItemWriter<Product> dbWriter() {
JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO product_details_output " +
"(product_id, product_name, product_category, product_price) " +
"VALUES (?, ?, ?, ?)");
writer.setItemPreparedStatementSetter((item, ps) -> {
ps.setInt(1, item.getProductId());
ps.setString(2, item.getProductName());
ps.setString(3, item.getProductCategory());
ps.setDouble(4, item.getProductPrice());
});
return writer;
}
@Bean
public Step etlStep() {
return stepBuilderFactory.get("etlStep")
.<Product, Product>chunk(10)
.reader(csvReader())
.writer(dbWriter())
.build();
}
@Bean
public Job etlJob() {
return jobBuilderFactory.get("etlJob")
.incrementer(new RunIdIncrementer())
.start(etlStep())
.build();
}
}
DB → CSV
@Bean
public JdbcCursorItemReader<Product> dbReader() {
JdbcCursorItemReader<Product> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT product_id, product_name, product_category, product_price " +
"FROM product_details ORDER BY product_id");
reader.setRowMapper((rs, rowNum) -> {
Product p = new Product();
p.setProductId(rs.getInt("product_id"));
p.setProductName(rs.getString("product_name"));
p.setProductCategory(rs.getString("product_category"));
p.setProductPrice(rs.getDouble("product_price"));
return p;
});
return reader;
}
@Bean
public FlatFileItemWriter<Product> csvWriter() {
FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("output/exported_products.csv"));
writer.setHeaderCallback(w -> w.write("ID,Name,Category,Price"));
DelimitedLineAggregator<Product> aggregator = new DelimitedLineAggregator<>();
aggregator.setDelimiter(",");
BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
aggregator.setFieldExtractor(extractor);
writer.setLineAggregator(aggregator);
return writer;
}
일반적 실수와 주의사항
출력 디렉토리 미생성
FileSystemResource 경로의 디렉토리가 없으면 예외. Job 시작 전 디렉토리 생성 Tasklet.
@Bean
public Step createOutputDirStep() {
return stepBuilderFactory.get("createOutputDirStep")
.tasklet((contribution, chunkContext) -> {
new File("output").mkdirs();
return RepeatStatus.FINISHED;
})
.build();
}
BeanWrapperFieldExtractor 필드명
대소문자·이름 정확히 getter와 일치.
JdbcBatchItemWriter 트랜잭션
청크 트랜잭션 안에서 자동 동작. 명시적 트랜잭션 관리 X.
FlatFileItemWriter 재시작
기본 saveState(true) — 재시작 시 마지막 위치부터 이어쓰기. 처음부터 다시 만들고 싶으면 별도 처리 필요.
? 인덱스 1부터 시작
ps.setInt(0, ...) 아니라 ps.setInt(1, ...). Named Parameter 방식이 더 안전.
ItemWriter 선택 가이드
출력 대상이 CSV/TXT?
→ FlatFileItemWriter
출력 대상이 관계형 DB?
→ JdbcBatchItemWriter (Bulk INSERT)
→ JPA 사용 시 JpaItemWriter
여러 곳 동시?
→ CompositeItemWriter
특수한 출력?
→ 커스텀 ItemWriter
ItemWriter 성능 최적화
JdbcBatchItemWriter
// 1. 청크 크기 늘려 배치 INSERT 크기 증가
.<Product, Product>chunk(100) // 10 → 100, DB 호출 감소
// 2. Named Parameter보다 ? 방식이 약간 빠를 수 있음 (상황 따라)
// 3. PreparedStatement 재사용 (자동)
FlatFileItemWriter
- 버퍼 쓰기 (자동)
- Append 모드 활용 (Step 시작 시 파일 한 번 열고 종료 시 닫음)
- SSD에 출력 디렉토리 위치
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 5편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- ItemWriter — 청크 List를 한 번에 받아 처리
- 구현체 — Flat/JdbcBatch/Hibernate/Jpa/Mongo/Stax/Json/Composite/커스텀
- FlatFileItemWriter 파이프라인 = LineAggregator + FieldExtractor
- DelimitedLineAggregator = 구분자 / FormatterLineAggregator = 포맷
- BeanWrapperFieldExtractor = getter 자동 추출 (이름이 곧 컬럼 순서)
setNames()이름 = getter 메서드와 정확히 대응setHeaderCallback()= CSV 헤더 추가setAppendAllowed(true)= Append, false = 덮어쓰기 (기본)- TSV (
\t) / 파이프 (|) / 고정 폭 (%-5d %-30s) 다양한 형식 - JdbcBatchItemWriter =
executeBatch()Bulk INSERT - 두 방식 —
?(위치) vs:name(이름 기반) ?인덱스 1부터 시작 (0 X)BeanPropertyItemSqlParameterSourceProvider= 도메인 자동 매핑- 이름 기반이 더 안전 (순서 의존 X)
- UPDATE =
SET ... WHERE ? - MySQL UPSERT =
INSERT ... ON DUPLICATE KEY UPDATE - 커스텀 ItemWriter =
write(List<T>)직접 구현 - CompositeItemWriter = 여러 Writer 동시 출력 (DB+파일+로그)
- 하나라도 실패 시 청크 트랜잭션 롤백
- 출력 디렉토리 미생성 시 예외 — Tasklet으로 미리 생성
- 청크 크기 ↑ → DB 호출 ↓ → 성능 ↑
- FlatFile 재시작 — 기본 마지막 위치부터 이어쓰기
- ETL 패턴 — CSV Reader → JdbcBatch Writer (가장 흔함)
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 톤으로 묶어 정리되어 있어요. 5편까지 핵심 패턴이 잡혔으니 6편부터는 운영 영역으로 갑니다.
- 1편 — Spring Batch 입문 (Job·Step·Chunk 모델)
- 2편 — Spring Batch Job 설정 (Tasklet과 Chunk Step)
- 3편 — 청크 처리 (Reader·Processor·Writer 패턴)
- 4편 — ItemReader 마스터 (CSV·JdbcCursor·Paging)
- 5편 — ItemWriter 마스터 (현재 글)
- 6편 — Job Flow와 리스너
- 7편 — 오류 처리 (Skip·Retry·SkipPolicy)
- 8편 — Spring Batch 5 마이그레이션
공식 문서: Spring Batch ItemWriter Reference에서 모든 구현체 옵션을 확인할 수 있어요.
다음 글(6편)에서는 Job Flow의 본격 — 조건부·병렬·Nested Job·6가지 리스너(Job/Step/Chunk/Read/Process/Write)·ExecutionContext·ExecutionContextPromotionListener까지 풀어 갑니다.