Java Reactive Programming 핵심 정리 시리즈 4편. map/flatMap/concatMap/switchMap의 순서·동시성 차이, handle로 map+filter+error 통합, buffer/window/groupBy 묶음 처리, reduce vs scan 집계, filter→map 순서가 성능에 미치는 영향까지 — 파이프라인 중간 정수기 비유로 친절하게 풀어쓴 4편.
이 글은 Java Reactive Programming 핵심 정리 시리즈의 네 번째 편입니다. 2편·3편에서 Mono와 Flux라는 두 컨테이너 타입을 잡았다면, 이번 편에서는 그 사이에 꽂히는 연산자(Operator) 를 완전히 해부합니다.
연산자는 "파이프라인 중간 정수기·필터" 예요. 수원지(Flux)에서 흘러오는 물이 각 연산자를 통과할 때마다 걸러지고(filter), 정제되고(map), 섞이거나(flatMap), 모이는(reduce) 과정을 거칩니다. 에스프레소(원본 스트림)에 물을 더하면 아메리카노, 우유를 더하면 라떼가 되듯 — 연산자 조합이 비즈니스 로직을 만들어 냅니다.
이 시리즈는 Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 프로그래밍 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.
Project Reactor 공식 문서의 연산자 레퍼런스를 함께 보면 각 연산자의 마블 다이어그램을 시각적으로 확인할 수 있어요.
연산자가 처음엔 왜 어렵게 느껴질까요
이유는 네 가지예요.
첫째, flatMap·concatMap·switchMap의 차이를 모릅니다. 셋 다 "아이템을 받아 새 Publisher를 반환"하는데, 동시성과 순서 보장이 완전히 달라요. 잘못 쓰면 순서가 깨지거나 이전 요청이 취소됩니다.
둘째, reduce와 scan의 차이를 모릅니다. 둘 다 누산 연산자인데, 한 쪽은 Mono를 반환하고 다른 쪽은 Flux를 반환해요.
셋째, doOnNext에서 변환을 시도합니다. doOnNext(n -> n * 2)처럼 쓰면 반환값이 무시되어 아무 효과가 없는데, 이걸 변환 연산자로 착각하는 경우가 많아요.
넷째, 연산자 순서가 성능에 영향을 준다는 사실을 모릅니다. map → filter 순서와 filter → map 순서는 결과는 같지만 처리량이 달라요.
해결법은 한 가지예요. 각 연산자를 파이프라인의 역할로 기억하면 됩니다. 변환 파이프(map·flatMap), 필터(filter·take), 집계 탱크(reduce·scan), 관측 센서(doOn*)로 분류하면 자연스럽게 구분됩니다.
변환 연산자 — 파이프 안의 정수기
map — 1:1 동기 변환
map은 각 아이템을 동기적으로 다른 타입이나 값으로 변환해요. 가장 기본적인 연산자이지만 두 가지 주의사항이 있습니다.
// 기본 map: 숫자 → 문자열
Flux.range(1, 5)
.map(n -> "item-" + n)
.subscribe(log::info);
// 출력: item-1, item-2, item-3, item-4, item-5
// 체이닝
Mono.just(" Hello World ")
.map(String::trim)
.map(String::toLowerCase)
.map(s -> s.replace(" ", "_"))
.subscribe(log::info);
// 출력: hello_world
// map에서 null 반환하면 NullPointerException 발생!
Flux.range(1, 5)
.map(n -> n == 3 ? null : n) // 절대 금지
.subscribe(log::info);
flatMap vs concatMap vs switchMap — 가장 중요한 세 연산자
이 세 연산자가 이번 편의 핵심이에요. 한 표로 먼저 정리합니다.
| 연산자 | 동시성 | 순서 보장 | 이전 취소 | 주 사용처 |
|---|---|---|---|---|
flatMap | 병렬 실행 | 보장 안 됨 | 없음 | 병렬 DB·HTTP 호출 |
concatMap | 순차 실행 | 완전 보장 | 없음 | 순서 중요한 처리 |
switchMap | 최신 것만 | 최신 보장 | 이전 취소 | 검색어 자동완성 |
// flatMap: 동시에 여러 내부 Publisher 구독 (순서 보장 안 됨, 빠름)
Flux.range(1, 5)
.flatMap(n -> getDataFromDB(n)) // 5개 DB 쿼리가 동시에 실행됨
.subscribe(log::info);
// 출력 순서: 응답 도착 순서에 따라 랜덤 (1,3,2,5,4 등)
// concatMap: 하나 완료되면 다음 구독 (순서 보장, 느림)
Flux.range(1, 5)
.concatMap(n -> getDataFromDB(n))
.subscribe(log::info);
// 출력 순서: 반드시 1, 2, 3, 4, 5
// switchMap: 새 아이템 오면 이전 내부 Publisher 취소 (최신 값만)
searchInput.flatMap(query ->
Flux.interval(Duration.ofMillis(100))
.switchMap(n -> searchService.search(query))
)
.subscribe(log::info);
// 검색어가 바뀌면 이전 검색 요청 취소
여기서 시험 함정이 하나 있어요. flatMap은 병렬로 실행해서 빠르지만 순서가 보장되지 않고, concatMap은 순서를 보장하지만 순차 실행이라 느립니다. "빠른가 vs 순서가 맞는가"를 먼저 결정하세요. switchMap은 "이전 요청이 필요 없어졌을 때" — 검색어 자동완성이 대표적 예입니다.
handle — map + filter + error 통합 처리
handle()은 SynchronousSink를 활용해 변환·필터·에러 발생을 한 연산자에서 처리합니다. 복잡한 조건 분기가 필요할 때 유용해요.
Flux.range(1, 10)
.handle((item, sink) -> {
if (item == 7) {
sink.error(new RuntimeException("7은 처리 불가")); // 에러 발생
} else if (item % 2 == 0) {
// 짝수는 필터링 (sink 메서드 미호출 = 건너뜀)
} else {
sink.next(item * 10); // 홀수는 10배 변환
}
})
.subscribe(
v -> log.info("받음: {}", v),
e -> log.error("에러: {}", e.getMessage())
);
// 출력: 받음: 10, 받음: 30, 받음: 50, 에러: 7은 처리 불가
| 연산자 | 변환 | 필터 | 에러 발생 |
|---|---|---|---|
map | O | X | X |
filter | X | O | X |
handle | O | O | O |
필터 연산자 — 파이프의 거름망
filter, skip, distinct, distinctUntilChanged
// filter: 조건을 만족하는 아이템만 통과
Flux.range(1, 10)
.filter(n -> n % 2 == 0)
.subscribe(log::info); // 2, 4, 6, 8, 10
// skip: 처음 N개 건너뜀
Flux.range(1, 10)
.skip(3)
.subscribe(log::info); // 4, 5, 6, 7, 8, 9, 10
// distinct: 전체 중복 제거 (모든 이전 값과 비교)
Flux.just(1, 2, 2, 3, 3, 3, 4)
.distinct()
.subscribe(log::info); // 1, 2, 3, 4
// distinctUntilChanged: 연속 중복만 제거 (직전 값과만 비교)
Flux.just(1, 1, 2, 2, 1, 1, 3)
.distinctUntilChanged()
.subscribe(log::info); // 1, 2, 1, 3
distinct()는 내부적으로 HashSet을 유지하며 모든 이전 값과 비교해요. distinctUntilChanged()는 직전 값과만 비교해 메모리를 덜 사용합니다. 실시간 스트림에서는 distinctUntilChanged()가 더 적합한 경우가 많아요.
집계 연산자 — 탱크에 물 모으기
reduce vs scan
두 연산자 모두 누산이지만 결과를 언제 방출하느냐가 달라요.
// reduce: 모든 아이템을 하나의 값으로 누산 (최종 결과만 발행, Mono 반환)
Flux.range(1, 5)
.reduce(0, (acc, n) -> acc + n)
.subscribe(log::info);
// 출력: 15 (한 번만)
// scan: 중간 누산 값도 발행 (초기값 포함, Flux 반환)
Flux.range(1, 5)
.scan(0, (acc, n) -> acc + n)
.subscribe(log::info);
// 출력: 0, 1, 3, 6, 10, 15 (각 단계마다)
여기서 시험 함정이 하나 있어요. reduce는 Mono를 반환하고, scan은 Flux를 반환합니다. 그리고 scan의 첫 번째 방출 값은 초기값(0) 이에요 — 아직 아무 아이템도 처리하지 않은 시점의 누산값입니다.
// collectList: 모든 아이템을 List로 수집 (Mono<List<T>>)
Flux.range(1, 5)
.collectList()
.subscribe(list -> log.info("리스트: {}", list));
// 출력: 리스트: [1, 2, 3, 4, 5]
// collectMap: 키-값 쌍으로 Map 수집
Flux.just("alice", "bob", "charlie")
.collectMap(
name -> name.charAt(0), // 키: 첫 글자
String::toUpperCase // 값: 대문자
)
.subscribe(map -> log.info("맵: {}", map));
// 출력: 맵: {a=ALICE, b=BOB, c=CHARLIE}
// count: 총 아이템 수 (Mono<Long>)
Flux.range(1, 100)
.count()
.subscribe(log::info); // 100
부수효과 연산자 — 파이프의 관측 센서
doOn* 계열은 스트림을 변경하지 않고 특정 이벤트 시점에 부수효과만 실행해요. 로깅·메트릭 수집·디버깅에 사용합니다.
Flux.range(1, 4)
.doOnSubscribe(s -> log.info("구독 시작"))
.doOnRequest(n -> log.info("{}개 요청", n))
.doOnNext(item -> log.info("데이터 통과: {}", item)) // 변환 불가!
.doOnComplete(() -> log.info("완료"))
.doOnError(e -> log.error("에러: {}", e.getMessage()))
.doFinally(signal -> log.info("최종 종료: {}", signal)) // 취소 포함 항상
.subscribe(v -> log.info("받음: {}", v));
// 실제 활용: 저장 전 로깅
customerRepository.save(customer)
.doOnNext(saved -> log.info("저장 완료: {}", saved.getId()))
.doOnError(e -> log.error("저장 실패", e))
.subscribe();
doOnNext에서 변환 시도는 효과가 없습니다. doOnNext(n -> n * 2)처럼 쓰면 반환값이 완전히 무시돼요. 변환은 map, 부수효과는 doOnNext로 역할을 명확히 구분하세요.
빈 스트림 처리 — 물이 안 나올 때
// defaultIfEmpty: 스트림이 비어있으면 기본값 발행
Flux.range(1, 5)
.filter(n -> n > 10) // 아무것도 통과 못 함
.defaultIfEmpty(-1)
.subscribe(log::info); // -1
// switchIfEmpty: 스트림이 비어있으면 다른 Publisher로 전환
Flux.empty()
.switchIfEmpty(Flux.just("fallback1", "fallback2"))
.subscribe(log::info); // fallback1, fallback2
// 실용 예시: DB 조회 결과가 없으면 캐시에서 가져오기
userRepository.findById(userId)
.switchIfEmpty(Mono.defer(() -> cacheService.get(userId)))
.subscribe(user -> processUser(user));
에러 처리 연산자 — 파이프가 막혔을 때
// onErrorReturn: 에러 발생 시 기본값 반환 후 완료
Flux.just(1, 2, 0, 4, 5)
.map(n -> 10 / n) // 0으로 나누면 ArithmeticException
.onErrorReturn(-1) // 에러 시 -1 반환 후 스트림 종료
.subscribe(log::info);
// 출력: 10, 5, -1 (0 이후 종료)
// onErrorResume: 에러 발생 시 다른 Publisher로 전환
Flux.error(new RuntimeException("서비스 오류"))
.onErrorResume(e -> {
log.warn("에러 복구: {}", e.getMessage());
return Flux.just("FALLBACK1", "FALLBACK2");
})
.subscribe(log::info);
연산자 순서 — 성능 최적화의 핵심
여기서 시험 함정이 하나 있어요. filter와 map의 순서가 성능에 영향을 줍니다. 결과는 같지만 처리 비용이 달라요.
// 비효율적: 100만 개 모두 변환 후 필터
Flux.range(1, 1_000_000)
.map(n -> expensiveTransform(n)) // 100만 번 실행
.filter(n -> n > 0)
.subscribe(log::info);
// 효율적: 먼저 필터, 통과한 것만 변환
Flux.range(1, 1_000_000)
.filter(n -> n > 0) // 불필요한 것 먼저 제거
.map(n -> expensiveTransform(n)) // 통과한 것만 변환
.subscribe(log::info);
설계 원칙 5가지:
filter→map순서: 먼저 필터링, 그 후 변환- 비동기 변환: 항상
flatMap·concatMap사용 - 부수효과(로깅):
doOn*계열 사용 - 에러 처리: 에러 발생 연산자 이후에
onError*배치 handle은 복잡한 조건 분기가 필요할 때만
Reactive 연산자 — 시험 직전 압축 노트
- 연산자 = 기존 Publisher를 감싸는 새 Publisher 반환 — 원본 변경 없음
map— 동기 1:1 변환, null 반환 금지, Publisher 반환 금지flatMap— 병렬 실행, 순서 보장 안 됨 → 빠른 병렬 처리concatMap— 순차 실행, 완전 순서 보장 → flatMap보다 느림switchMap— 이전 내부 Publisher 취소, 최신 값만 처리 → 검색 자동완성handle— map + filter + error 통합 (SynchronousSink활용)filter→map순서가 성능에 유리distinct— 전체 중복 제거 (HashSet 유지),distinctUntilChanged— 연속 중복만reduce— Mono반환 , 최종 결과 하나만scan— Flux반환 , 초기값 포함 각 단계 결과 방출collectList()— 모든 아이템을Mono로- >
doOnNext— 변환 불가, 로깅·메트릭만 (반환값 무시)doFinally— 취소 포함 항상 실행defaultIfEmpty— 단일 기본값,switchIfEmpty— 대체 Publisher- 에러 처리 순서: 구체적 예외 타입 먼저, 넓은 타입은 나중에
log()— 스트림 이벤트 자동 INFO 레벨 로깅 (디버깅용)flatMap에서map처럼 일반 값 반환 =Flux타입 오류>
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Reactive Programming 입문
- 2편 — Mono 완전 정복
- 3편 — Flux 완전 정복
- 4편 — 연산자 (map·flatMap·filter·reduce 등) (현재 글)
- 5편 — Hot & Cold Publishers
- 6편 — Threading & Schedulers
- 7편 — Backpressure (배압)
- 8편 — Publisher 결합 (zip·merge·concat 등)
- 9편 — Batching·Windowing·Grouping
- 10편 — Repeat & Retry
- 11편 — Sinks
- 12편 — Context
- 13편 — 단위 테스트 (StepVerifier)