Reactive 결합 완전 정복 — Publisher 결합 핵심 정리

2026-05-03AWS SAA-C03 스터디

Java Reactive Programming 핵심 정리 시리즈 8편. Reactive 결합 연산자 완전 정복 — merge(병렬·순서X) vs concat(순차·순서보장) vs zip(인덱스 쌍·짧은 소스 기준) vs combineLatest(최신값 조합) 차이, switchMap·startWith·firstWithValue 패턴까지 여러 수도꼭지 비유로 친절하게 풀어쓴 8편.

📚 Java Reactive Programming 핵심 정리 · 8편 / 14편 — Publisher 결합 핵심 정리

이 글은 Java Reactive Programming 핵심 정리 시리즈의 여덟 번째 편입니다. 마이크로서비스 환경에서는 DB 조회·외부 HTTP·이벤트 스트림처럼 여러 소스의 데이터를 결합해야 하는 상황이 일상입니다. Project Reactor의 Reactive 결합 연산자들을 제대로 이해하면 병렬 API 호출, 순서 보장 처리, 실시간 조합 계산이 몇 줄 코드로 깔끔하게 정리됩니다.

Reactive 결합 단원이 처음엔 어렵게 느껴지는 이유

이유는 네 가지예요.

첫째, 비슷하게 생긴 연산자가 너무 많습니다. merge, concat, zip, combineLatest — 이름만 봐서는 뭐가 다른지 알기 어려워요. 실제 동작을 보기 전까지는 그냥 "데이터를 합치는 것들"로 보입니다.

둘째, 순서 보장 여부가 명확하지 않습니다. merge는 순서를 보장하지 않고, concat은 보장하며, zip은 인덱스 쌍으로 맞추는데 — 이 세 가지가 헷갈려 실무에서 잘못 쓰면 데이터가 뒤섞이거나 유실됩니다.

셋째, zip의 완료 조건이 직관에 반합니다. "가장 짧은 소스 기준으로 종료된다"는 걸 모르면 데이터가 조용히 버려지는 상황을 만납니다.

넷째, flatMapconcatMapswitchMap이 나란히 있습니다. 세 개 다 "각 아이템으로 새 Publisher를 만든다"는 구조인데, 동시성·순서·취소 동작이 모두 달라요.

해결법은 비유입니다. Reactive 결합 = "여러 수도꼭지에서 나온 물을 한 호스로 합치기" 로 상상하면 차이가 명확해집니다. 동시에 여는지(merge), 순서대로 여는지(concat), 짝지어 섞는지(zip), 아무 꼭지라도 새 물이 나오면 최신 조합을 내보내는지(combineLatest) — 각각의 결합 방식이 구분됩니다.

결합 연산자 전체 지도

먼저 어떤 연산자들이 있는지 한눈에 볼게요.

연산자소스 수순서동시성완료 조건
startWith1+prefixOX순차 완료
concat / concatWithNOX순차 완료
merge / mergeWithNXO모두 완료
zip / zipWithNO (쌍 매칭)O가장 짧은 완료
combineLatestNX (최신 조합)O모두 완료
flatMap1→NXO모두 완료
concatMap1→NOX모두 완료
switchMap1→N최신만O(이전 취소)최신 완료
firstWithSignalN가장 빠른 것O첫 완료

startWith — "일단 캐시부터, 그 다음 실시간"

startWith는 현재 스트림 에 데이터를 추가합니다. 캐시 데이터를 먼저 내보내고 실시간 스트림을 이어 붙이는 패턴에 딱 맞아요.

Flux<Integer> producer = Flux.just(1, 2, 3);

// 단일 값 앞에 추가
producer.startWith(0)
    .subscribe(System.out::println);
// 출력: 0, 1, 2, 3

// 실용 사례: 캐시 + 실시간 데이터
Flux<String> cached = Flux.fromIterable(cache.getAll());
Flux<String> realtime = realTimeService.getStream();

realtime
    .startWith(cached)  // 캐시 먼저, 그 후 실시간
    .subscribe(item -> processItem(item));

take(n)과 조합하면 startWith에서 n개를 가져가는 순간 이후 소스는 구독조차 하지 않아 효율적입니다.

한 줄 정리 — startWith는 앞에 prefix 추가, 캐시+실시간 패턴에 자주 쓰임.

concat — "첫 번째 끝나면 두 번째 시작"

concat은 소스1이 완료된 에 소스2를 구독합니다. 순서가 보장되지만, 소스1의 완료를 기다리는 동안 소스2는 구독되지 않아요.

Flux<Integer> source1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> source2 = Flux.just(51, 52, 53);

source1.concatWith(source2)
    .subscribe(System.out::println);
Thread.sleep(500);
// 출력: 1, 2, 3, 51, 52, 53 (source1 완료 후 source2)

// concatDelayError: 에러를 마지막에 처리
Flux<Integer> errorSource = Flux.error(new RuntimeException("에러"));

Flux.concatDelayError(source1, errorSource, source2)
    .subscribe(
        n -> System.out.println("받음: " + n),
        e -> System.out.println("에러: " + e.getMessage())
    );
// 출력: 1, 2, 3, 51, 52, 53, 에러: 에러

여기서 시험 함정이 하나 있어요. concat 중 에러가 발생하면 이후 소스는 실행되지 않습니다. 모든 소스를 처리하고 에러는 마지막에 받으려면 concatDelayError를 써야 합니다.

한 줄 정리 — concat은 순서 보장, 순차 구독, 에러 발생 시 이후 소스 차단.

merge — "모두 동시에, 먼저 오는 순서로"

merge는 모든 소스를 동시에 구독합니다. 데이터가 도착하는 순서대로 발행하기 때문에 순서가 보장되지 않아요. 하지만 병렬로 처리하기 때문에 concat보다 빠릅니다.

Flux<Integer> slow = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300));
Flux<Integer> fast = Flux.just(10, 20, 30).delayElements(Duration.ofMillis(100));

Flux.merge(slow, fast)
    .subscribe(n -> System.out.println("받음: " + n));
Thread.sleep(1000);
// 출력: 10, 20, 1, 30, 2, 3 (fast 먼저 도착, 순서 보장 안 됨)

여기서 시험 함정이 하나 있어요. merge 중 하나에서 에러가 발생하면 즉시 나머지 소스도 모두 취소됩니다. 에러 격리가 필요하면 각 소스에 onErrorResume을 먼저 달아야 합니다.

zip — "인덱스 짝 맞추기 — N번째끼리"

zip은 여러 소스에서 같은 인덱스의 아이템을 묶어 새 아이템으로 결합합니다. 소스1의 첫 번째와 소스2의 첫 번째, 소스1의 두 번째와 소스2의 두 번째 — 이렇게 짝지어 내보내요.

Flux<String> names = Flux.just("alice", "bob", "charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);

// Tuple 형태로 결합
Flux.zip(names, ages)
    .subscribe(tuple -> System.out.println(tuple.getT1() + " = " + tuple.getT2()));
// 출력:
// alice = 25
// bob = 30
// charlie = 35

// combiner 함수로 직접 조합
Flux.zip(names, ages, (name, age) -> name + "(" + age + ")")
    .subscribe(System.out::println);
// 출력: alice(25), bob(30), charlie(35)

// 실용 사례: 여러 서비스 병렬 호출 후 결합
Mono<String> productName = productService.getName(productId);
Mono<String> review = reviewService.getReview(productId);
Mono<Integer> price = priceService.getPrice(productId);

Mono<Product> product = Mono.zip(productName, review, price)
    .map(tuple -> new Product(tuple.getT1(), tuple.getT2(), tuple.getT3()));

여기서 시험 함정이 하나 있어요. zip은 가장 짧은 소스 기준으로 완료됩니다. 소스 A가 2개, 소스 B가 3개라면 2쌍만 결합하고 나머지 1개는 조용히 버려져요. 데이터 개수가 불일치할 때 유실을 모르면 버그로 이어집니다.

Flux<String> names2 = Flux.just("alice", "bob");  // 2개
Flux<Integer> ages2 = Flux.just(25, 30, 35);      // 3개

Flux.zip(names2, ages2)
    .subscribe(tuple -> System.out.println(tuple.getT1() + " = " + tuple.getT2()));
// 출력: alice=25, bob=30 (35는 버려짐!)

한 줄 정리 — zip은 인덱스 쌍 결합, 가장 짧은 소스 기준 완료, 병렬 소스 동시 조회에 최적.

combineLatest — "어느 꼭지든 새 물이 나오면 최신 조합"

combineLatest는 소스 중 어느 하나라도 새 값을 발행하면 모든 소스의 최신 값을 조합해 내보냅니다. 실시간 가격·환율 조합처럼 두 스트림 중 하나가 바뀔 때마다 최신 조합이 필요한 상황에 씁니다.

Flux<String> source1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<Integer> source2 = Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(150));

Flux.combineLatest(source1, source2, (s, n) -> s + n)
    .subscribe(System.out::println);
Thread.sleep(700);
// 출력 예시: A1, B1, B2, C2, C3, C4

// 실용 사례: 실시간 주가 × 환율 원화 계산
Flux<Double> stockPrice = stockService.getPriceStream("AAPL");
Flux<Double> exchangeRate = forexService.getRateStream("USD/KRW");

Flux.combineLatest(stockPrice, exchangeRate,
    (price, rate) -> "AAPL: " + (price * rate) + "원")
    .subscribe(System.out::println);

여기서 시험 함정이 하나 있어요. combineLatest는 모든 소스에 최초 값이 생길 때까지 아무것도 내보내지 않습니다. 소스1에 값이 10개 쌓여도 소스2의 첫 번째 값이 도착하기 전까지는 결합이 시작되지 않아요.

zipcombineLatest의 핵심 차이:

  • zip: 인덱스 순서로 짝 맞춤 — 소스1의 1번째 + 소스2의 1번째
  • combineLatest: 최신 값 조합 — 소스1의 최신 + 소스2의 최신

한 줄 정리 — combineLatest는 어느 소스든 새 값 도착 시 최신 조합 발행.

flatMap vs concatMap vs switchMap

세 연산자 모두 "각 아이템으로 새 Publisher를 만든다"는 구조지만 동시성·순서·취소가 모두 달라요.

flatMapconcatMapswitchMap
동시성높음 (병렬)없음 (순차)최신 1개만
순서 보장XOX (최신만)
이전 소스 취소XXO
성능빠름느림중간
사용 사례병렬 API 호출순서 중요한 처리실시간 검색 자동완성
// flatMap: 병렬, 순서 보장 없음
userService.getUsers()
    .flatMap(user -> orderService.getOrders(user.getId()))
    .subscribe(order -> System.out.println("flatMap 주문: " + order));

// concatMap: 순차, 순서 보장
userService.getUsers()
    .concatMap(user -> orderService.getOrders(user.getId()))
    .subscribe(order -> System.out.println("concatMap 주문: " + order));

// switchMap: 새 아이템 오면 이전 내부 Publisher 취소 — 검색 자동완성
Flux.just("j", "ja", "jav", "java")
    .delayElements(Duration.ofMillis(50))
    .switchMap(query ->
        searchService.search(query)
            .delayElements(Duration.ofMillis(200))
    )
    .subscribe(result -> System.out.println("검색 결과: " + result));
// "java"에 대한 결과만 표시 (이전 검색어는 취소됨)

여기서 시험 함정이 하나 있어요. 순서가 중요한 처리(로그 저장, 결제 처리)에 flatMap을 쓰면 데이터가 뒤섞입니다. 순서 보장이 필요하면 반드시 concatMap을 써야 해요.

firstWithSignal — "가장 빠른 응답을 선택"

여러 소스 중 먼저 데이터를 발행한 소스만 선택하고 나머지는 취소합니다. 이중화된 서버나 지역 복제본 중 응답이 빠른 쪽을 자동으로 선택하는 패턴에 씁니다.

Flux<String> server1 = Flux.just("server1-data").delayElements(Duration.ofMillis(200));
Flux<String> server2 = Flux.just("server2-data").delayElements(Duration.ofMillis(100));
Flux<String> server3 = Flux.just("server3-data").delayElements(Duration.ofMillis(300));

Flux.firstWithSignal(server1, server2, server3)
    .subscribe(System.out::println);
Thread.sleep(500);
// 출력: server2-data (가장 빠른 서버의 데이터만)

// Mono.firstWithValue: 첫 번째 값을 방출하는 Mono 선택
Mono<String> replica1 = Mono.just("replica1").delayElement(Duration.ofMillis(200));
Mono<String> replica2 = Mono.just("replica2").delayElement(Duration.ofMillis(50));

Mono.firstWithValue(replica1, replica2)
    .subscribe(System.out::println);
// 출력: replica2

자세한 Project Reactor 결합 연산자 사양은 Project Reactor 공식 문서에서 확인할 수 있습니다.

결합 연산자 선택 가이드

상황연산자
순차적 스트림 연결 (순서 중요)concat
병렬 스트림 병합 (순서 무관)merge
N개 소스 인덱스별 쌍 결합zip
N개 소스 최신 값 실시간 조합combineLatest
앞에 데이터 추가startWith
A 결과로 B 비동기 조회 (병렬)flatMap
A 결과로 B 비동기 조회 (순차)concatMap
최신 소스만 유지 (검색 자동완성)switchMap
가장 빠른 소스 선택firstWithSignal
Mono → Flux 변환flatMapMany

Reactive 결합 완전 정복 — 압축 노트

여기까지가 Publisher 결합의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Reactive 결합 = "여러 수도꼭지에서 나온 물을 한 호스로 합치기"
  • merge — 모두 동시 구독, 도착 순서 발행, 순서 보장 X
  • concat — 순차 구독, 순서 보장 O, 에러 시 이후 소스 차단
  • concatDelayError — 에러를 마지막에 모아 처리
  • zip — 인덱스 쌍 결합, 가장 짧은 소스 기준 완료, 데이터 유실 주의
  • combineLatest — 어느 소스든 새 값 도착 시 최신 조합 발행
  • zip vs combineLatest — zip은 인덱스 짝, combineLatest는 최신값 조합
  • merge vs concat — merge가 빠름(병렬), concat이 순서 보장(순차)
  • flatMap — 병렬, 순서 보장 X, 빠름
  • concatMap — 순차, 순서 보장 O, 느림
  • switchMap — 새 아이템 오면 이전 내부 Publisher 취소, 검색 자동완성
  • flatMapMany — Mono → Flux 변환
  • 독립 병렬 호출 → Mono.zip — 모두 완료 후 단일 결합 객체
  • 종속 순차 호출 → flatMap — A의 결과로 B 조회
  • 오류에도 모든 소스 처리 → concatDelayError
  • 가장 빠른 응답 선택 → firstWithSignal
  • startWith — 앞에 prefix 추가, 캐시+실시간 조합 패턴
  • zip에서 소스 길이 다름 → 짧은 것 기준, 나머지 조용히 버려짐
  • 순서 중요한 로직에 flatMap 쓰면 뒤섞임 — concatMap으로 교체
  • merge 중 에러 → 즉시 모든 소스 취소 — 에러 격리 필요 시 onErrorResume 먼저
  • combineLatest는 모든 소스에 최초 값 생길 때까지 아무것도 발행 안 함

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!