ItemWriter 마스터 — FlatFile·JdbcBatch·Composite

2026-05-03확률과 통계 마스터 노트

Spring Batch 마스터 노트 시리즈 5편. ItemWriter의 거울 구현 — FlatFileItemWriter의 LineAggregator·FieldExtractor 파이프라인, JdbcBatchItemWriter의 executeBatch 한 번 호출 Bulk INSERT, ?와 Named Parameter 두 방식, MySQL UPSERT, CompositeItemWriter로 DB+파일 동시 출력까지.

이 글은 Spring Batch 마스터 노트 시리즈의 다섯 번째 편입니다. 4편(ItemReader)이 데이터를 읽는 자리였다면, 이번엔 거울 — 데이터를 쓰는 자리.

ItemWriter는 청크 단위로 List를 한 번에 받아 처리해요. 이 구조 덕분에 DB Bulk INSERT버퍼 파일 쓰기 같은 최적화가 가능합니다. 이번 편의 핵심 — FlatFileItemWriter·JdbcBatchItemWriter·CompositeItemWriter 세 구현체.

처음 ItemWriter 선택이 어렵게 느껴지는 이유

이유는 두 가지예요.

첫째, JdbcBatchItemWriter의 두 가지 방식이 헷갈립니다. ItemPreparedStatementSetter (위치 기반 ?) vs ItemSqlParameterSourceProvider (이름 기반 :name) — 어느 쪽이 더 좋은지, 언제 어느 쪽을 쓸지가 첫 단계에서 안 보여요.

둘째, 트랜잭션 동작이 Reader와 Writer에서 다릅니다. Writer 실패 시 청크 전체가 롤백 — 99개 성공해도 1개 실패면 100개 모두. 이 동작이 직관과 어긋나서 처음엔 당황스러워요.

해결법은 한 가지예요. 두 가지 패턴 카드로 외우세요. (1) 파일 출력 = FlatFileItemWriter (LineAggregator + FieldExtractor). (2) DB 출력 = JdbcBatchItemWriter (Bulk INSERT). 다른 곳 동시 = Composite. 이 셋만 잡으면 90%.

ItemWriter 구현체 한눈에

구현체 출력 대상 특징
FlatFileItemWriter CSV, TXT 구분자 또는 고정 폭
JdbcBatchItemWriter JDBC DB PreparedStatement 배치
HibernateItemWriter Hibernate Session.saveOrUpdate()
JpaItemWriter JPA EntityManager.merge()
MongoItemWriter MongoDB BulkOperations
StaxEventItemWriter XML StAX
JsonFileItemWriter JSON Jackson
CompositeItemWriter 여러 대상 동시 출력
커스텀 ItemWriter 임의 직접 구현

FlatFileItemWriter — CSV/TXT 출력

처리 파이프라인

Java 객체 (Product)
    └→ LineAggregator (객체 → 텍스트)
           ├→ DelimitedLineAggregator (구분자로 필드 연결)
           │    └→ FieldExtractor (객체에서 필드 추출)
           │         └→ BeanWrapperFieldExtractor (getter 자동 추출)
           └→ FormatterLineAggregator (포맷 문자열)
파일에 텍스트 라인 쓰기

기본 설정

@Bean
public FlatFileItemWriter<Product> csvWriter() {
    FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
    
    // 1. 출력 파일
    writer.setResource(new FileSystemResource("output/product_details_output.csv"));
    
    // 2. 덮어쓰기 / Append
    // writer.setAppendAllowed(true);
    
    // 3. LineAggregator
    DelimitedLineAggregator<Product> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    
    // 4. FieldExtractor
    BeanWrapperFieldExtractor<Product> fieldExtractor = new BeanWrapperFieldExtractor<>();
    fieldExtractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
    // 순서가 곧 CSV 컬럼 순서
    
    lineAggregator.setFieldExtractor(fieldExtractor);
    writer.setLineAggregator(lineAggregator);
    
    return writer;
}

BeanWrapperFieldExtractor

BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();

// 아래 순서로 CSV 컬럼 생성
extractor.setNames(new String[]{
    "productId",      // → getProductId()
    "productName",    // → getProductName()
    "productCategory", // → getProductCategory()
    "productPrice"    // → getProductPrice()
});

여기서 시험 함정이 하나 있어요. setNames()의 이름은 getter 메서드와 정확히 대응해야 합니다. "productId"getProductId(). Lombok @Getter@Data 쓰면 자동 생성, 직접 작성 시 명명 규칙 확인.

헤더 추가

writer.setHeaderCallback(writer -> {
    writer.write("product_ID,product_name,product_category,product_price");
});

Append vs 덮어쓰기

writer.setAppendAllowed(true);   // 추가
writer.setAppendAllowed(false);  // 덮어쓰기 (기본)

배치를 여러 번 실행해 결과를 누적할 때 Append.

다양한 출력 형식

// TSV
DelimitedLineAggregator<Product> tsvAggregator = new DelimitedLineAggregator<>();
tsvAggregator.setDelimiter("\t");

// 파이프
DelimitedLineAggregator<Product> pipeAggregator = new DelimitedLineAggregator<>();
pipeAggregator.setDelimiter("|");

// 포맷 (printf 스타일)
FormatterLineAggregator<Product> formatterAggregator = new FormatterLineAggregator<>();
formatterAggregator.setFormat("%-5d %-30s %-20s %10.2f");
BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
formatterAggregator.setFieldExtractor(extractor);

JdbcBatchItemWriter — DB Bulk INSERT

개요

PreparedStatement.executeBatch()로 청크 내 모든 아이템을 한 번의 DB 호출로 처리. 단건 INSERT보다 수십~수백 배 빠름.

위치 기반 (?) 방식

@Bean
public JdbcBatchItemWriter<Product> dbWriter() {
    JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
    
    writer.setDataSource(dataSource);
    
    writer.setSql("INSERT INTO product_details_output " +
                  "(product_id, product_name, product_category, product_price) " +
                  "VALUES (?, ?, ?, ?)");
    
    writer.setItemPreparedStatementSetter((item, ps) -> {
        ps.setInt(1, item.getProductId());
        ps.setString(2, item.getProductName());
        ps.setString(3, item.getProductCategory());
        ps.setDouble(4, item.getProductPrice());
    });
    
    return writer;
}

Named Parameter (:name) 방식

이름 기반 — 코드가 더 읽기 쉽고 순서 의존 X.

@Bean
public JdbcBatchItemWriter<Product> namedParamWriter() {
    JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    
    writer.setSql("INSERT INTO product_details_output " +
                  "(product_id, product_name, product_category, product_price) " +
                  "VALUES (:productId, :productName, :productCategory, :productPrice)");
    
    // BeanProperty 자동 매핑 — 도메인 객체 필드명을 직접 사용
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    
    return writer;
}

여기서 시험 함정이 하나 있어요. ? 방식은 인덱스 1부터 시작합니다. ps.setInt(0, ...) 아니라 ps.setInt(1, ...). 인덱스 한 번 어긋나면 모든 컬럼이 잘못된 값으로 들어가요. 디버깅도 어렵고. 이름 기반(BeanProperty)이 더 안전합니다.

UPDATE 구문

@Bean
public JdbcBatchItemWriter<Product> updateWriter() {
    JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    
    writer.setSql("UPDATE product_details " +
                  "SET product_name = ?, product_category = ?, product_price = ? " +
                  "WHERE product_id = ?");
    
    writer.setItemPreparedStatementSetter((item, ps) -> {
        ps.setString(1, item.getProductName());
        ps.setString(2, item.getProductCategory());
        ps.setDouble(3, item.getProductPrice());
        ps.setInt(4, item.getProductId());  // WHERE
    });
    
    return writer;
}

UPSERT (MySQL)

@Bean
public JdbcBatchItemWriter<Product> upsertWriter() {
    JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    
    // MySQL ON DUPLICATE KEY UPDATE
    writer.setSql("INSERT INTO product_details " +
                  "(product_id, product_name, product_category, product_price) " +
                  "VALUES (?, ?, ?, ?) " +
                  "ON DUPLICATE KEY UPDATE " +
                  "product_name = VALUES(product_name), " +
                  "product_category = VALUES(product_category), " +
                  "product_price = VALUES(product_price)");
    
    writer.setItemPreparedStatementSetter((item, ps) -> {
        ps.setInt(1, item.getProductId());
        ps.setString(2, item.getProductName());
        ps.setString(3, item.getProductCategory());
        ps.setDouble(4, item.getProductPrice());
    });
    
    return writer;
}

PK 충돌 시 자동 UPDATE — 멱등 처리에 유용.

커스텀 ItemWriter

기본

public class ProductLogWriter implements ItemWriter<Product> {
    
    @Override
    public void write(List<? extends Product> items) throws Exception {
        System.out.println("=== 청크 쓰기 시작 (" + items.size() + "개 아이템) ===");
        for (Product product : items) {
            System.out.printf("저장: ID=%d, 이름=%s, 카테고리=%s, 가격=%.2f%n",
                product.getProductId(),
                product.getProductName(),
                product.getProductCategory(),
                product.getProductPrice());
        }
        System.out.println("=== 청크 쓰기 완료 ===");
    }
}

파일 출력

public class ProductFileWriter implements ItemWriter<Product> {
    
    private static final String OUTPUT_FILE = "output/products_log.txt";
    
    @Override
    public void write(List<? extends Product> items) throws Exception {
        try (FileWriter fw = new FileWriter(OUTPUT_FILE, true);
             BufferedWriter bw = new BufferedWriter(fw)) {
            
            for (Product product : items) {
                bw.write(product.toString());
                bw.newLine();
            }
        }
    }
}

REST API 호출

@Component
public class ApiItemWriter implements ItemWriter<Product> {
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Value("${api.endpoint.url}")
    private String apiUrl;
    
    @Override
    public void write(List<? extends Product> items) throws Exception {
        for (Product product : items) {
            ResponseEntity<String> response = restTemplate.postForEntity(
                apiUrl, product, String.class
            );
            
            if (!response.getStatusCode().is2xxSuccessful()) {
                throw new RuntimeException("API 호출 실패: " + product.getProductId());
            }
        }
    }
}

CompositeItemWriter — 동시 다중 출력

같은 데이터를 DB와 파일에 동시 저장.

@Bean
public CompositeItemWriter<Product> compositeWriter() {
    CompositeItemWriter<Product> writer = new CompositeItemWriter<>();
    writer.setDelegates(Arrays.asList(
        csvWriter(),    // 1. CSV 파일
        dbWriter(),     // 2. DB
        logWriter()     // 3. 로그
    ));
    return writer;
}

모든 delegate가 순서대로 실행. 하나라도 실패 → 트랜잭션 롤백.

DB 테이블 설정

출력 테이블 (MySQL)

CREATE TABLE IF NOT EXISTS product_details_output (
    product_id      INT PRIMARY KEY,
    product_name    VARCHAR(100) NOT NULL,
    product_category VARCHAR(50),
    product_price   DECIMAL(10, 2)
);

TRUNCATE TABLE product_details_output;
SELECT * FROM product_details_output;

입력 테이블 + 테스트 데이터

CREATE TABLE IF NOT EXISTS product_details (
    product_id      INT AUTO_INCREMENT PRIMARY KEY,
    product_name    VARCHAR(100) NOT NULL,
    product_category VARCHAR(50),
    product_price   DECIMAL(10, 2)
);

INSERT INTO product_details (product_name, product_category, product_price) VALUES
('IPhone 14 Pro', 'smartphone', 120000),
('Samsung Galaxy S23', 'smartphone', 110000),
('Sony WH-1000XM5', 'headphone', 35000),
('Nike Air Max 270', 'sports accessories', 15000),
('Dell XPS 15', 'laptop', 200000);

FlatFile vs JdbcBatch — 한 번 더 비교

구분 FlatFileItemWriter JdbcBatchItemWriter
출력 대상 파일 (CSV·TXT) 관계형 DB
핵심 컴포넌트 LineAggregator + FieldExtractor SQL + PreparedStatementSetter
트랜잭션 파일 시스템 (OS) DB 트랜잭션
롤백 어려움 쉬움
재시작 파일 위치부터 마지막 커밋부터
적합 상황 보고서·로그·내보내기 DB 적재·마이그레이션
성능 버퍼 쓰기 Batch INSERT

언제 무엇을 선택?

FlatFileItemWriter — 외부 시스템과 파일 교환 / 사람이 읽는 보고서 / Excel·분석 도구로 처리할 파일.

JdbcBatchItemWriter — DB 영속화 / 트랜잭션 보장 필요 / 대용량 빠른 INSERT / 다른 Step에서 DB 조회로 사용.

전체 ETL 예시

CSV → DB

@Configuration
@EnableBatchProcessing
public class EtlBatchConfig {
    
    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;
    @Autowired private DataSource dataSource;
    
    @Bean
    public FlatFileItemReader<Product> csvReader() {
        FlatFileItemReader<Product> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/product_details.csv"));
        
        DefaultLineMapper<Product> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("product_ID", "product_name", "product_category", "product_price");
        
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSet -> {
            Product p = new Product();
            p.setProductId(fieldSet.readInt("product_ID"));
            p.setProductName(fieldSet.readString("product_name"));
            p.setProductCategory(fieldSet.readString("product_category"));
            p.setProductPrice(fieldSet.readDouble("product_price"));
            return p;
        });
        reader.setLineMapper(lineMapper);
        return reader;
    }
    
    @Bean
    public JdbcBatchItemWriter<Product> dbWriter() {
        JdbcBatchItemWriter<Product> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO product_details_output " +
                      "(product_id, product_name, product_category, product_price) " +
                      "VALUES (?, ?, ?, ?)");
        writer.setItemPreparedStatementSetter((item, ps) -> {
            ps.setInt(1, item.getProductId());
            ps.setString(2, item.getProductName());
            ps.setString(3, item.getProductCategory());
            ps.setDouble(4, item.getProductPrice());
        });
        return writer;
    }
    
    @Bean
    public Step etlStep() {
        return stepBuilderFactory.get("etlStep")
            .<Product, Product>chunk(10)
            .reader(csvReader())
            .writer(dbWriter())
            .build();
    }
    
    @Bean
    public Job etlJob() {
        return jobBuilderFactory.get("etlJob")
            .incrementer(new RunIdIncrementer())
            .start(etlStep())
            .build();
    }
}

DB → CSV

@Bean
public JdbcCursorItemReader<Product> dbReader() {
    JdbcCursorItemReader<Product> reader = new JdbcCursorItemReader<>();
    reader.setDataSource(dataSource);
    reader.setSql("SELECT product_id, product_name, product_category, product_price " +
                 "FROM product_details ORDER BY product_id");
    reader.setRowMapper((rs, rowNum) -> {
        Product p = new Product();
        p.setProductId(rs.getInt("product_id"));
        p.setProductName(rs.getString("product_name"));
        p.setProductCategory(rs.getString("product_category"));
        p.setProductPrice(rs.getDouble("product_price"));
        return p;
    });
    return reader;
}

@Bean
public FlatFileItemWriter<Product> csvWriter() {
    FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource("output/exported_products.csv"));
    
    writer.setHeaderCallback(w -> w.write("ID,Name,Category,Price"));
    
    DelimitedLineAggregator<Product> aggregator = new DelimitedLineAggregator<>();
    aggregator.setDelimiter(",");
    
    BeanWrapperFieldExtractor<Product> extractor = new BeanWrapperFieldExtractor<>();
    extractor.setNames(new String[]{"productId", "productName", "productCategory", "productPrice"});
    
    aggregator.setFieldExtractor(extractor);
    writer.setLineAggregator(aggregator);
    
    return writer;
}

일반적 실수와 주의사항

출력 디렉토리 미생성

FileSystemResource 경로의 디렉토리가 없으면 예외. Job 시작 전 디렉토리 생성 Tasklet.

@Bean
public Step createOutputDirStep() {
    return stepBuilderFactory.get("createOutputDirStep")
        .tasklet((contribution, chunkContext) -> {
            new File("output").mkdirs();
            return RepeatStatus.FINISHED;
        })
        .build();
}

BeanWrapperFieldExtractor 필드명

대소문자·이름 정확히 getter와 일치.

JdbcBatchItemWriter 트랜잭션

청크 트랜잭션 안에서 자동 동작. 명시적 트랜잭션 관리 X.

FlatFileItemWriter 재시작

기본 saveState(true) — 재시작 시 마지막 위치부터 이어쓰기. 처음부터 다시 만들고 싶으면 별도 처리 필요.

? 인덱스 1부터 시작

ps.setInt(0, ...) 아니라 ps.setInt(1, ...). Named Parameter 방식이 더 안전.

ItemWriter 선택 가이드

출력 대상이 CSV/TXT?
  → FlatFileItemWriter

출력 대상이 관계형 DB?
  → JdbcBatchItemWriter (Bulk INSERT)
  → JPA 사용 시 JpaItemWriter

여러 곳 동시?
  → CompositeItemWriter

특수한 출력?
  → 커스텀 ItemWriter

ItemWriter 성능 최적화

JdbcBatchItemWriter

// 1. 청크 크기 늘려 배치 INSERT 크기 증가
.<Product, Product>chunk(100)  // 10 → 100, DB 호출 감소

// 2. Named Parameter보다 ? 방식이 약간 빠를 수 있음 (상황 따라)

// 3. PreparedStatement 재사용 (자동)

FlatFileItemWriter

  • 버퍼 쓰기 (자동)
  • Append 모드 활용 (Step 시작 시 파일 한 번 열고 종료 시 닫음)
  • SSD에 출력 디렉토리 위치

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 5편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • ItemWriter — 청크 List를 한 번에 받아 처리
  • 구현체 — Flat/JdbcBatch/Hibernate/Jpa/Mongo/Stax/Json/Composite/커스텀
  • FlatFileItemWriter 파이프라인 = LineAggregator + FieldExtractor
  • DelimitedLineAggregator = 구분자 / FormatterLineAggregator = 포맷
  • BeanWrapperFieldExtractor = getter 자동 추출 (이름이 곧 컬럼 순서)
  • setNames() 이름 = getter 메서드와 정확히 대응
  • setHeaderCallback() = CSV 헤더 추가
  • setAppendAllowed(true) = Append, false = 덮어쓰기 (기본)
  • TSV (\t) / 파이프 (|) / 고정 폭 (%-5d %-30s) 다양한 형식
  • JdbcBatchItemWriter = executeBatch() Bulk INSERT
  • 두 방식 — ? (위치) vs :name (이름 기반)
  • ? 인덱스 1부터 시작 (0 X)
  • BeanPropertyItemSqlParameterSourceProvider = 도메인 자동 매핑
  • 이름 기반이 더 안전 (순서 의존 X)
  • UPDATE = SET ... WHERE ?
  • MySQL UPSERT = INSERT ... ON DUPLICATE KEY UPDATE
  • 커스텀 ItemWriter = write(List<T>) 직접 구현
  • CompositeItemWriter = 여러 Writer 동시 출력 (DB+파일+로그)
  • 하나라도 실패 시 청크 트랜잭션 롤백
  • 출력 디렉토리 미생성 시 예외 — Tasklet으로 미리 생성
  • 청크 크기 ↑ → DB 호출 ↓ → 성능 ↑
  • FlatFile 재시작 — 기본 마지막 위치부터 이어쓰기
  • ETL 패턴 — CSV Reader → JdbcBatch Writer (가장 흔함)

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 톤으로 묶어 정리되어 있어요. 5편까지 핵심 패턴이 잡혔으니 6편부터는 운영 영역으로 갑니다.

공식 문서: Spring Batch ItemWriter Reference에서 모든 구현체 옵션을 확인할 수 있어요.

다음 글(6편)에서는 Job Flow의 본격 — 조건부·병렬·Nested Job·6가지 리스너(Job/Step/Chunk/Read/Process/Write)·ExecutionContext·ExecutionContextPromotionListener까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!