Reactive 연산자 완전 정복 — 핵심 정리

2026-05-03AWS SAA-C03 스터디

Java Reactive Programming 핵심 정리 시리즈 4편. map/flatMap/concatMap/switchMap의 순서·동시성 차이, handle로 map+filter+error 통합, buffer/window/groupBy 묶음 처리, reduce vs scan 집계, filter→map 순서가 성능에 미치는 영향까지 — 파이프라인 중간 정수기 비유로 친절하게 풀어쓴 4편.

📚 Java Reactive Programming 핵심 정리 · 4편 / 14편 — 핵심 정리

이 글은 Java Reactive Programming 핵심 정리 시리즈의 네 번째 편입니다. 2편·3편에서 Mono와 Flux라는 두 컨테이너 타입을 잡았다면, 이번 편에서는 그 사이에 꽂히는 연산자(Operator) 를 완전히 해부합니다.

연산자는 "파이프라인 중간 정수기·필터" 예요. 수원지(Flux)에서 흘러오는 물이 각 연산자를 통과할 때마다 걸러지고(filter), 정제되고(map), 섞이거나(flatMap), 모이는(reduce) 과정을 거칩니다. 에스프레소(원본 스트림)에 물을 더하면 아메리카노, 우유를 더하면 라떼가 되듯 — 연산자 조합이 비즈니스 로직을 만들어 냅니다.

📚 학습 노트

이 시리즈는 Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 프로그래밍 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.

Project Reactor 공식 문서의 연산자 레퍼런스를 함께 보면 각 연산자의 마블 다이어그램을 시각적으로 확인할 수 있어요.

연산자가 처음엔 왜 어렵게 느껴질까요

이유는 네 가지예요.

첫째, flatMap·concatMap·switchMap의 차이를 모릅니다. 셋 다 "아이템을 받아 새 Publisher를 반환"하는데, 동시성과 순서 보장이 완전히 달라요. 잘못 쓰면 순서가 깨지거나 이전 요청이 취소됩니다.

둘째, reducescan의 차이를 모릅니다. 둘 다 누산 연산자인데, 한 쪽은 Mono를 반환하고 다른 쪽은 Flux를 반환해요.

셋째, doOnNext에서 변환을 시도합니다. doOnNext(n -> n * 2)처럼 쓰면 반환값이 무시되어 아무 효과가 없는데, 이걸 변환 연산자로 착각하는 경우가 많아요.

넷째, 연산자 순서가 성능에 영향을 준다는 사실을 모릅니다. mapfilter 순서와 filtermap 순서는 결과는 같지만 처리량이 달라요.

해결법은 한 가지예요. 각 연산자를 파이프라인의 역할로 기억하면 됩니다. 변환 파이프(map·flatMap), 필터(filter·take), 집계 탱크(reduce·scan), 관측 센서(doOn*)로 분류하면 자연스럽게 구분됩니다.

변환 연산자 — 파이프 안의 정수기

map — 1:1 동기 변환

map은 각 아이템을 동기적으로 다른 타입이나 값으로 변환해요. 가장 기본적인 연산자이지만 두 가지 주의사항이 있습니다.

// 기본 map: 숫자 → 문자열
Flux.range(1, 5)
    .map(n -> "item-" + n)
    .subscribe(log::info);
// 출력: item-1, item-2, item-3, item-4, item-5

// 체이닝
Mono.just("  Hello World  ")
    .map(String::trim)
    .map(String::toLowerCase)
    .map(s -> s.replace(" ", "_"))
    .subscribe(log::info);
// 출력: hello_world

// map에서 null 반환하면 NullPointerException 발생!
Flux.range(1, 5)
    .map(n -> n == 3 ? null : n)  // 절대 금지
    .subscribe(log::info);

flatMap vs concatMap vs switchMap — 가장 중요한 세 연산자

이 세 연산자가 이번 편의 핵심이에요. 한 표로 먼저 정리합니다.

연산자동시성순서 보장이전 취소주 사용처
flatMap병렬 실행보장 안 됨없음병렬 DB·HTTP 호출
concatMap순차 실행완전 보장없음순서 중요한 처리
switchMap최신 것만최신 보장이전 취소검색어 자동완성
// flatMap: 동시에 여러 내부 Publisher 구독 (순서 보장 안 됨, 빠름)
Flux.range(1, 5)
    .flatMap(n -> getDataFromDB(n))  // 5개 DB 쿼리가 동시에 실행됨
    .subscribe(log::info);
// 출력 순서: 응답 도착 순서에 따라 랜덤 (1,3,2,5,4 등)

// concatMap: 하나 완료되면 다음 구독 (순서 보장, 느림)
Flux.range(1, 5)
    .concatMap(n -> getDataFromDB(n))
    .subscribe(log::info);
// 출력 순서: 반드시 1, 2, 3, 4, 5

// switchMap: 새 아이템 오면 이전 내부 Publisher 취소 (최신 값만)
searchInput.flatMap(query ->
    Flux.interval(Duration.ofMillis(100))
        .switchMap(n -> searchService.search(query))
)
.subscribe(log::info);
// 검색어가 바뀌면 이전 검색 요청 취소

여기서 시험 함정이 하나 있어요. flatMap은 병렬로 실행해서 빠르지만 순서가 보장되지 않고, concatMap은 순서를 보장하지만 순차 실행이라 느립니다. "빠른가 vs 순서가 맞는가"를 먼저 결정하세요. switchMap은 "이전 요청이 필요 없어졌을 때" — 검색어 자동완성이 대표적 예입니다.

한 줄 정리 — 병렬·빠름(순서X) → flatMap, 직렬·순서보장 → concatMap, 최신값만(이전취소) → switchMap

handle — map + filter + error 통합 처리

handle()SynchronousSink를 활용해 변환·필터·에러 발생을 한 연산자에서 처리합니다. 복잡한 조건 분기가 필요할 때 유용해요.

Flux.range(1, 10)
    .handle((item, sink) -> {
        if (item == 7) {
            sink.error(new RuntimeException("7은 처리 불가"));  // 에러 발생
        } else if (item % 2 == 0) {
            // 짝수는 필터링 (sink 메서드 미호출 = 건너뜀)
        } else {
            sink.next(item * 10);  // 홀수는 10배 변환
        }
    })
    .subscribe(
        v -> log.info("받음: {}", v),
        e -> log.error("에러: {}", e.getMessage())
    );
// 출력: 받음: 10, 받음: 30, 받음: 50, 에러: 7은 처리 불가
연산자변환필터에러 발생
mapOXX
filterXOX
handleOOO

필터 연산자 — 파이프의 거름망

filter, skip, distinct, distinctUntilChanged

// filter: 조건을 만족하는 아이템만 통과
Flux.range(1, 10)
    .filter(n -> n % 2 == 0)
    .subscribe(log::info);  // 2, 4, 6, 8, 10

// skip: 처음 N개 건너뜀
Flux.range(1, 10)
    .skip(3)
    .subscribe(log::info);  // 4, 5, 6, 7, 8, 9, 10

// distinct: 전체 중복 제거 (모든 이전 값과 비교)
Flux.just(1, 2, 2, 3, 3, 3, 4)
    .distinct()
    .subscribe(log::info);  // 1, 2, 3, 4

// distinctUntilChanged: 연속 중복만 제거 (직전 값과만 비교)
Flux.just(1, 1, 2, 2, 1, 1, 3)
    .distinctUntilChanged()
    .subscribe(log::info);  // 1, 2, 1, 3

distinct()는 내부적으로 HashSet을 유지하며 모든 이전 값과 비교해요. distinctUntilChanged()는 직전 값과만 비교해 메모리를 덜 사용합니다. 실시간 스트림에서는 distinctUntilChanged()가 더 적합한 경우가 많아요.

집계 연산자 — 탱크에 물 모으기

reduce vs scan

두 연산자 모두 누산이지만 결과를 언제 방출하느냐가 달라요.

// reduce: 모든 아이템을 하나의 값으로 누산 (최종 결과만 발행, Mono 반환)
Flux.range(1, 5)
    .reduce(0, (acc, n) -> acc + n)
    .subscribe(log::info);
// 출력: 15 (한 번만)

// scan: 중간 누산 값도 발행 (초기값 포함, Flux 반환)
Flux.range(1, 5)
    .scan(0, (acc, n) -> acc + n)
    .subscribe(log::info);
// 출력: 0, 1, 3, 6, 10, 15 (각 단계마다)

여기서 시험 함정이 하나 있어요. reduceMono를 반환하고, scanFlux를 반환합니다. 그리고 scan의 첫 번째 방출 값은 초기값(0) 이에요 — 아직 아무 아이템도 처리하지 않은 시점의 누산값입니다.

// collectList: 모든 아이템을 List로 수집 (Mono<List<T>>)
Flux.range(1, 5)
    .collectList()
    .subscribe(list -> log.info("리스트: {}", list));
// 출력: 리스트: [1, 2, 3, 4, 5]

// collectMap: 키-값 쌍으로 Map 수집
Flux.just("alice", "bob", "charlie")
    .collectMap(
        name -> name.charAt(0),  // 키: 첫 글자
        String::toUpperCase      // 값: 대문자
    )
    .subscribe(map -> log.info("맵: {}", map));
// 출력: 맵: {a=ALICE, b=BOB, c=CHARLIE}

// count: 총 아이템 수 (Mono<Long>)
Flux.range(1, 100)
    .count()
    .subscribe(log::info);  // 100
한 줄 정리 — 최종 결과만 → reduce(Mono 반환), 단계별 중간값도 → scan(Flux 반환, 초기값 포함)

부수효과 연산자 — 파이프의 관측 센서

doOn* 계열은 스트림을 변경하지 않고 특정 이벤트 시점에 부수효과만 실행해요. 로깅·메트릭 수집·디버깅에 사용합니다.

Flux.range(1, 4)
    .doOnSubscribe(s -> log.info("구독 시작"))
    .doOnRequest(n -> log.info("{}개 요청", n))
    .doOnNext(item -> log.info("데이터 통과: {}", item))   // 변환 불가!
    .doOnComplete(() -> log.info("완료"))
    .doOnError(e -> log.error("에러: {}", e.getMessage()))
    .doFinally(signal -> log.info("최종 종료: {}", signal))  // 취소 포함 항상
    .subscribe(v -> log.info("받음: {}", v));

// 실제 활용: 저장 전 로깅
customerRepository.save(customer)
    .doOnNext(saved -> log.info("저장 완료: {}", saved.getId()))
    .doOnError(e -> log.error("저장 실패", e))
    .subscribe();

doOnNext에서 변환 시도는 효과가 없습니다. doOnNext(n -> n * 2)처럼 쓰면 반환값이 완전히 무시돼요. 변환은 map, 부수효과는 doOnNext로 역할을 명확히 구분하세요.

빈 스트림 처리 — 물이 안 나올 때

// defaultIfEmpty: 스트림이 비어있으면 기본값 발행
Flux.range(1, 5)
    .filter(n -> n > 10)   // 아무것도 통과 못 함
    .defaultIfEmpty(-1)
    .subscribe(log::info);  // -1

// switchIfEmpty: 스트림이 비어있으면 다른 Publisher로 전환
Flux.empty()
    .switchIfEmpty(Flux.just("fallback1", "fallback2"))
    .subscribe(log::info);  // fallback1, fallback2

// 실용 예시: DB 조회 결과가 없으면 캐시에서 가져오기
userRepository.findById(userId)
    .switchIfEmpty(Mono.defer(() -> cacheService.get(userId)))
    .subscribe(user -> processUser(user));

에러 처리 연산자 — 파이프가 막혔을 때

// onErrorReturn: 에러 발생 시 기본값 반환 후 완료
Flux.just(1, 2, 0, 4, 5)
    .map(n -> 10 / n)         // 0으로 나누면 ArithmeticException
    .onErrorReturn(-1)         // 에러 시 -1 반환 후 스트림 종료
    .subscribe(log::info);
// 출력: 10, 5, -1 (0 이후 종료)

// onErrorResume: 에러 발생 시 다른 Publisher로 전환
Flux.error(new RuntimeException("서비스 오류"))
    .onErrorResume(e -> {
        log.warn("에러 복구: {}", e.getMessage());
        return Flux.just("FALLBACK1", "FALLBACK2");
    })
    .subscribe(log::info);

연산자 순서 — 성능 최적화의 핵심

여기서 시험 함정이 하나 있어요. filtermap의 순서가 성능에 영향을 줍니다. 결과는 같지만 처리 비용이 달라요.

// 비효율적: 100만 개 모두 변환 후 필터
Flux.range(1, 1_000_000)
    .map(n -> expensiveTransform(n))  // 100만 번 실행
    .filter(n -> n > 0)
    .subscribe(log::info);

// 효율적: 먼저 필터, 통과한 것만 변환
Flux.range(1, 1_000_000)
    .filter(n -> n > 0)               // 불필요한 것 먼저 제거
    .map(n -> expensiveTransform(n))  // 통과한 것만 변환
    .subscribe(log::info);

설계 원칙 5가지:

  1. filtermap 순서: 먼저 필터링, 그 후 변환
  2. 비동기 변환: 항상 flatMap·concatMap 사용
  3. 부수효과(로깅): doOn* 계열 사용
  4. 에러 처리: 에러 발생 연산자 이후에 onError* 배치
  5. handle은 복잡한 조건 분기가 필요할 때만

Reactive 연산자 — 시험 직전 압축 노트

  • 연산자 = 기존 Publisher를 감싸는 새 Publisher 반환 — 원본 변경 없음
  • map — 동기 1:1 변환, null 반환 금지, Publisher 반환 금지
  • flatMap병렬 실행, 순서 보장 안 됨 → 빠른 병렬 처리
  • concatMap순차 실행, 완전 순서 보장 → flatMap보다 느림
  • switchMap이전 내부 Publisher 취소, 최신 값만 처리 → 검색 자동완성
  • handle — map + filter + error 통합 (SynchronousSink 활용)
  • filtermap 순서가 성능에 유리
  • distinct — 전체 중복 제거 (HashSet 유지), distinctUntilChanged — 연속 중복만
  • reduceMono 반환, 최종 결과 하나만
  • scanFlux 반환, 초기값 포함 각 단계 결과 방출
  • collectList() — 모든 아이템을 Mono>
  • doOnNext변환 불가, 로깅·메트릭만 (반환값 무시)
  • doFinally — 취소 포함 항상 실행
  • defaultIfEmpty — 단일 기본값, switchIfEmpty — 대체 Publisher
  • 에러 처리 순서: 구체적 예외 타입 먼저, 넓은 타입은 나중에
  • log() — 스트림 이벤트 자동 INFO 레벨 로깅 (디버깅용)
  • flatMap에서 map처럼 일반 값 반환 = Flux> 타입 오류

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!