Java Reactive Programming 핵심 정리 시리즈 8편. Reactive 결합 연산자 완전 정복 — merge(병렬·순서X) vs concat(순차·순서보장) vs zip(인덱스 쌍·짧은 소스 기준) vs combineLatest(최신값 조합) 차이, switchMap·startWith·firstWithValue 패턴까지 여러 수도꼭지 비유로 친절하게 풀어쓴 8편.
이 글은 Java Reactive Programming 핵심 정리 시리즈의 여덟 번째 편입니다. 마이크로서비스 환경에서는 DB 조회·외부 HTTP·이벤트 스트림처럼 여러 소스의 데이터를 결합해야 하는 상황이 일상입니다. Project Reactor의 Reactive 결합 연산자들을 제대로 이해하면 병렬 API 호출, 순서 보장 처리, 실시간 조합 계산이 몇 줄 코드로 깔끔하게 정리됩니다.
Reactive 결합 단원이 처음엔 어렵게 느껴지는 이유
이유는 네 가지예요.
첫째, 비슷하게 생긴 연산자가 너무 많습니다. merge, concat, zip, combineLatest — 이름만 봐서는 뭐가 다른지 알기 어려워요. 실제 동작을 보기 전까지는 그냥 "데이터를 합치는 것들"로 보입니다.
둘째, 순서 보장 여부가 명확하지 않습니다. merge는 순서를 보장하지 않고, concat은 보장하며, zip은 인덱스 쌍으로 맞추는데 — 이 세 가지가 헷갈려 실무에서 잘못 쓰면 데이터가 뒤섞이거나 유실됩니다.
셋째, zip의 완료 조건이 직관에 반합니다. "가장 짧은 소스 기준으로 종료된다"는 걸 모르면 데이터가 조용히 버려지는 상황을 만납니다.
넷째, flatMap과 concatMap과 switchMap이 나란히 있습니다. 세 개 다 "각 아이템으로 새 Publisher를 만든다"는 구조인데, 동시성·순서·취소 동작이 모두 달라요.
해결법은 비유입니다. Reactive 결합 = "여러 수도꼭지에서 나온 물을 한 호스로 합치기" 로 상상하면 차이가 명확해집니다. 동시에 여는지(merge), 순서대로 여는지(concat), 짝지어 섞는지(zip), 아무 꼭지라도 새 물이 나오면 최신 조합을 내보내는지(combineLatest) — 각각의 결합 방식이 구분됩니다.
결합 연산자 전체 지도
먼저 어떤 연산자들이 있는지 한눈에 볼게요.
| 연산자 | 소스 수 | 순서 | 동시성 | 완료 조건 |
|---|---|---|---|---|
startWith | 1+prefix | O | X | 순차 완료 |
concat / concatWith | N | O | X | 순차 완료 |
merge / mergeWith | N | X | O | 모두 완료 |
zip / zipWith | N | O (쌍 매칭) | O | 가장 짧은 완료 |
combineLatest | N | X (최신 조합) | O | 모두 완료 |
flatMap | 1→N | X | O | 모두 완료 |
concatMap | 1→N | O | X | 모두 완료 |
switchMap | 1→N | 최신만 | O(이전 취소) | 최신 완료 |
firstWithSignal | N | 가장 빠른 것 | O | 첫 완료 |
startWith — "일단 캐시부터, 그 다음 실시간"
startWith는 현재 스트림 앞에 데이터를 추가합니다. 캐시 데이터를 먼저 내보내고 실시간 스트림을 이어 붙이는 패턴에 딱 맞아요.
Flux<Integer> producer = Flux.just(1, 2, 3);
// 단일 값 앞에 추가
producer.startWith(0)
.subscribe(System.out::println);
// 출력: 0, 1, 2, 3
// 실용 사례: 캐시 + 실시간 데이터
Flux<String> cached = Flux.fromIterable(cache.getAll());
Flux<String> realtime = realTimeService.getStream();
realtime
.startWith(cached) // 캐시 먼저, 그 후 실시간
.subscribe(item -> processItem(item));
take(n)과 조합하면 startWith에서 n개를 가져가는 순간 이후 소스는 구독조차 하지 않아 효율적입니다.
한 줄 정리 — startWith는 앞에 prefix 추가, 캐시+실시간 패턴에 자주 쓰임.
concat — "첫 번째 끝나면 두 번째 시작"
concat은 소스1이 완료된 후에 소스2를 구독합니다. 순서가 보장되지만, 소스1의 완료를 기다리는 동안 소스2는 구독되지 않아요.
Flux<Integer> source1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> source2 = Flux.just(51, 52, 53);
source1.concatWith(source2)
.subscribe(System.out::println);
Thread.sleep(500);
// 출력: 1, 2, 3, 51, 52, 53 (source1 완료 후 source2)
// concatDelayError: 에러를 마지막에 처리
Flux<Integer> errorSource = Flux.error(new RuntimeException("에러"));
Flux.concatDelayError(source1, errorSource, source2)
.subscribe(
n -> System.out.println("받음: " + n),
e -> System.out.println("에러: " + e.getMessage())
);
// 출력: 1, 2, 3, 51, 52, 53, 에러: 에러
여기서 시험 함정이 하나 있어요. concat 중 에러가 발생하면 이후 소스는 실행되지 않습니다. 모든 소스를 처리하고 에러는 마지막에 받으려면 concatDelayError를 써야 합니다.
한 줄 정리 — concat은 순서 보장, 순차 구독, 에러 발생 시 이후 소스 차단.
merge — "모두 동시에, 먼저 오는 순서로"
merge는 모든 소스를 동시에 구독합니다. 데이터가 도착하는 순서대로 발행하기 때문에 순서가 보장되지 않아요. 하지만 병렬로 처리하기 때문에 concat보다 빠릅니다.
Flux<Integer> slow = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300));
Flux<Integer> fast = Flux.just(10, 20, 30).delayElements(Duration.ofMillis(100));
Flux.merge(slow, fast)
.subscribe(n -> System.out.println("받음: " + n));
Thread.sleep(1000);
// 출력: 10, 20, 1, 30, 2, 3 (fast 먼저 도착, 순서 보장 안 됨)
여기서 시험 함정이 하나 있어요. merge 중 하나에서 에러가 발생하면 즉시 나머지 소스도 모두 취소됩니다. 에러 격리가 필요하면 각 소스에 onErrorResume을 먼저 달아야 합니다.
zip — "인덱스 짝 맞추기 — N번째끼리"
zip은 여러 소스에서 같은 인덱스의 아이템을 묶어 새 아이템으로 결합합니다. 소스1의 첫 번째와 소스2의 첫 번째, 소스1의 두 번째와 소스2의 두 번째 — 이렇게 짝지어 내보내요.
Flux<String> names = Flux.just("alice", "bob", "charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);
// Tuple 형태로 결합
Flux.zip(names, ages)
.subscribe(tuple -> System.out.println(tuple.getT1() + " = " + tuple.getT2()));
// 출력:
// alice = 25
// bob = 30
// charlie = 35
// combiner 함수로 직접 조합
Flux.zip(names, ages, (name, age) -> name + "(" + age + ")")
.subscribe(System.out::println);
// 출력: alice(25), bob(30), charlie(35)
// 실용 사례: 여러 서비스 병렬 호출 후 결합
Mono<String> productName = productService.getName(productId);
Mono<String> review = reviewService.getReview(productId);
Mono<Integer> price = priceService.getPrice(productId);
Mono<Product> product = Mono.zip(productName, review, price)
.map(tuple -> new Product(tuple.getT1(), tuple.getT2(), tuple.getT3()));
여기서 시험 함정이 하나 있어요. zip은 가장 짧은 소스 기준으로 완료됩니다. 소스 A가 2개, 소스 B가 3개라면 2쌍만 결합하고 나머지 1개는 조용히 버려져요. 데이터 개수가 불일치할 때 유실을 모르면 버그로 이어집니다.
Flux<String> names2 = Flux.just("alice", "bob"); // 2개
Flux<Integer> ages2 = Flux.just(25, 30, 35); // 3개
Flux.zip(names2, ages2)
.subscribe(tuple -> System.out.println(tuple.getT1() + " = " + tuple.getT2()));
// 출력: alice=25, bob=30 (35는 버려짐!)
한 줄 정리 — zip은 인덱스 쌍 결합, 가장 짧은 소스 기준 완료, 병렬 소스 동시 조회에 최적.
combineLatest — "어느 꼭지든 새 물이 나오면 최신 조합"
combineLatest는 소스 중 어느 하나라도 새 값을 발행하면 모든 소스의 최신 값을 조합해 내보냅니다. 실시간 가격·환율 조합처럼 두 스트림 중 하나가 바뀔 때마다 최신 조합이 필요한 상황에 씁니다.
Flux<String> source1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<Integer> source2 = Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(150));
Flux.combineLatest(source1, source2, (s, n) -> s + n)
.subscribe(System.out::println);
Thread.sleep(700);
// 출력 예시: A1, B1, B2, C2, C3, C4
// 실용 사례: 실시간 주가 × 환율 원화 계산
Flux<Double> stockPrice = stockService.getPriceStream("AAPL");
Flux<Double> exchangeRate = forexService.getRateStream("USD/KRW");
Flux.combineLatest(stockPrice, exchangeRate,
(price, rate) -> "AAPL: " + (price * rate) + "원")
.subscribe(System.out::println);
여기서 시험 함정이 하나 있어요. combineLatest는 모든 소스에 최초 값이 생길 때까지 아무것도 내보내지 않습니다. 소스1에 값이 10개 쌓여도 소스2의 첫 번째 값이 도착하기 전까지는 결합이 시작되지 않아요.
zip과 combineLatest의 핵심 차이:
zip: 인덱스 순서로 짝 맞춤 — 소스1의 1번째 + 소스2의 1번째combineLatest: 최신 값 조합 — 소스1의 최신 + 소스2의 최신
한 줄 정리 — combineLatest는 어느 소스든 새 값 도착 시 최신 조합 발행.
flatMap vs concatMap vs switchMap
세 연산자 모두 "각 아이템으로 새 Publisher를 만든다"는 구조지만 동시성·순서·취소가 모두 달라요.
flatMap | concatMap | switchMap | |
|---|---|---|---|
| 동시성 | 높음 (병렬) | 없음 (순차) | 최신 1개만 |
| 순서 보장 | X | O | X (최신만) |
| 이전 소스 취소 | X | X | O |
| 성능 | 빠름 | 느림 | 중간 |
| 사용 사례 | 병렬 API 호출 | 순서 중요한 처리 | 실시간 검색 자동완성 |
// flatMap: 병렬, 순서 보장 없음
userService.getUsers()
.flatMap(user -> orderService.getOrders(user.getId()))
.subscribe(order -> System.out.println("flatMap 주문: " + order));
// concatMap: 순차, 순서 보장
userService.getUsers()
.concatMap(user -> orderService.getOrders(user.getId()))
.subscribe(order -> System.out.println("concatMap 주문: " + order));
// switchMap: 새 아이템 오면 이전 내부 Publisher 취소 — 검색 자동완성
Flux.just("j", "ja", "jav", "java")
.delayElements(Duration.ofMillis(50))
.switchMap(query ->
searchService.search(query)
.delayElements(Duration.ofMillis(200))
)
.subscribe(result -> System.out.println("검색 결과: " + result));
// "java"에 대한 결과만 표시 (이전 검색어는 취소됨)
여기서 시험 함정이 하나 있어요. 순서가 중요한 처리(로그 저장, 결제 처리)에 flatMap을 쓰면 데이터가 뒤섞입니다. 순서 보장이 필요하면 반드시 concatMap을 써야 해요.
firstWithSignal — "가장 빠른 응답을 선택"
여러 소스 중 먼저 데이터를 발행한 소스만 선택하고 나머지는 취소합니다. 이중화된 서버나 지역 복제본 중 응답이 빠른 쪽을 자동으로 선택하는 패턴에 씁니다.
Flux<String> server1 = Flux.just("server1-data").delayElements(Duration.ofMillis(200));
Flux<String> server2 = Flux.just("server2-data").delayElements(Duration.ofMillis(100));
Flux<String> server3 = Flux.just("server3-data").delayElements(Duration.ofMillis(300));
Flux.firstWithSignal(server1, server2, server3)
.subscribe(System.out::println);
Thread.sleep(500);
// 출력: server2-data (가장 빠른 서버의 데이터만)
// Mono.firstWithValue: 첫 번째 값을 방출하는 Mono 선택
Mono<String> replica1 = Mono.just("replica1").delayElement(Duration.ofMillis(200));
Mono<String> replica2 = Mono.just("replica2").delayElement(Duration.ofMillis(50));
Mono.firstWithValue(replica1, replica2)
.subscribe(System.out::println);
// 출력: replica2
자세한 Project Reactor 결합 연산자 사양은 Project Reactor 공식 문서에서 확인할 수 있습니다.
결합 연산자 선택 가이드
| 상황 | 연산자 |
|---|---|
| 순차적 스트림 연결 (순서 중요) | concat |
| 병렬 스트림 병합 (순서 무관) | merge |
| N개 소스 인덱스별 쌍 결합 | zip |
| N개 소스 최신 값 실시간 조합 | combineLatest |
| 앞에 데이터 추가 | startWith |
| A 결과로 B 비동기 조회 (병렬) | flatMap |
| A 결과로 B 비동기 조회 (순차) | concatMap |
| 최신 소스만 유지 (검색 자동완성) | switchMap |
| 가장 빠른 소스 선택 | firstWithSignal |
| Mono → Flux 변환 | flatMapMany |
Reactive 결합 완전 정복 — 압축 노트
여기까지가 Publisher 결합의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Reactive 결합 = "여러 수도꼭지에서 나온 물을 한 호스로 합치기"
merge— 모두 동시 구독, 도착 순서 발행, 순서 보장 Xconcat— 순차 구독, 순서 보장 O, 에러 시 이후 소스 차단concatDelayError— 에러를 마지막에 모아 처리zip— 인덱스 쌍 결합, 가장 짧은 소스 기준 완료, 데이터 유실 주의combineLatest— 어느 소스든 새 값 도착 시 최신 조합 발행zipvscombineLatest— zip은 인덱스 짝, combineLatest는 최신값 조합mergevsconcat— merge가 빠름(병렬), concat이 순서 보장(순차)flatMap— 병렬, 순서 보장 X, 빠름concatMap— 순차, 순서 보장 O, 느림switchMap— 새 아이템 오면 이전 내부 Publisher 취소, 검색 자동완성flatMapMany— Mono → Flux 변환- 독립 병렬 호출 →
Mono.zip— 모두 완료 후 단일 결합 객체 - 종속 순차 호출 →
flatMap— A의 결과로 B 조회 - 오류에도 모든 소스 처리 →
concatDelayError - 가장 빠른 응답 선택 →
firstWithSignal startWith— 앞에 prefix 추가, 캐시+실시간 조합 패턴- zip에서 소스 길이 다름 → 짧은 것 기준, 나머지 조용히 버려짐
- 순서 중요한 로직에 flatMap 쓰면 뒤섞임 — concatMap으로 교체
- merge 중 에러 → 즉시 모든 소스 취소 — 에러 격리 필요 시 onErrorResume 먼저
- combineLatest는 모든 소스에 최초 값 생길 때까지 아무것도 발행 안 함
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Reactive Programming 입문
- 2편 — Mono 완전 정복
- 3편 — Flux 완전 정복
- 4편 — 연산자 (map·flatMap·filter·reduce 등)
- 5편 — Hot & Cold Publishers
- 6편 — Threading & Schedulers
- 7편 — Backpressure (배압)
- 8편 — Publisher 결합 (zip·merge·concat 등) (현재 글)
- 9편 — Batching·Windowing·Grouping
- 10편 — Repeat & Retry
- 11편 — Sinks
- 12편 — Context
- 13편 — 단위 테스트 (StepVerifier)