Hot Publisher 완전 정복 — Cold vs Hot 핵심 정리

2026-05-03AWS SAA-C03 스터디

Java Reactive Programming 핵심 정리 시리즈 5편. Cold(구독마다 새 실행) vs Hot(공유 실행) 차이, share()/publish()/replay()/autoConnect()/refCount()/ConnectableFlux 6가지 변환 연산자 — 각자 개별 DVD 재생(Cold) vs TV 생방송(Hot) 비유로 친절하게 풀어쓴 5편.

📚 Java Reactive Programming 핵심 정리 · 5편 / 14편 — Cold vs Hot 핵심 정리

이 글은 Java Reactive Programming 핵심 정리 시리즈의 다섯 번째 편입니다. 지금까지 Mono·Flux·연산자를 익혔다면, 이번 편에서는 리액티브 스트림의 구독 모델 자체를 바꾸는 개념 — Cold vs Hot Publisher 를 완전히 파헤칩니다.

비유로 먼저 잡겠습니다.

  • Cold Publisher = 각자 개별 DVD를 처음부터 재생. 구독자마다 독립적인 실행이 시작됩니다.
  • Hot Publisher = TV 생방송. 방송은 계속 진행 중이고, 나중에 채널을 켜면 그 시점부터 볼 수 있어요.

단건 DB 조회, HTTP 요청 — 대부분의 리액티브 코드는 Cold입니다. 여러 서비스가 같은 실시간 스트림(주식 가격, 웹소켓 이벤트)을 공유해야 할 때 Hot 변환이 필요해요.

📚 학습 노트

이 시리즈는 Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 프로그래밍 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.

Project Reactor 공식 문서의 Hot & Cold 섹션을 함께 보면 ConnectableFlux 동작을 시각적으로 확인할 수 있어요.

Hot & Cold가 처음엔 왜 어렵게 느껴질까요

이유는 네 가지예요.

첫째, Mono.just()도 Cold라는 사실이 직관에 반합니다. 캐싱되어 있는 것처럼 보여서 Hot인 줄 알기 쉬운데, 실제로는 구독마다 파이프라인을 새로 실행하는 Cold예요.

둘째, share()·publish()·autoConnect()·refCount()·cache()의 관계가 헷갈립니다. 모두 Hot 변환과 관련이 있는데, 미묘하게 다른 시나리오를 위한 도구들이에요.

셋째, publish().refCount(1)share()와 동일하다는 걸 모릅니다. 두 코드를 처음 보면 완전히 다른 것처럼 느껴져요.

넷째, autoConnectrefCount의 차이를 모릅니다. 마지막 구독자가 취소한 뒤 동작이 다른데, 이걸 이해하지 못하면 재구독 시 예상치 못한 동작이 나옵니다.

해결법은 한 가지예요. DVD(Cold) vs TV 생방송(Hot) 비유를 기준으로, 각 연산자를 "어떤 TV 채널 운영 방식이냐"로 분류하면 자연스럽게 정리됩니다.

Cold Publisher — 각자 DVD 재생

Project Reactor의 Mono, Flux는 기본적으로 모두 Cold Publisher입니다. 구독이 발생할 때마다 새로운 데이터 스트림을 처음부터 시작해요.

Flux<Integer> coldFlux = Flux.range(1, 5);

log.info("첫 번째 구독:");
coldFlux.subscribe(n -> log.info("sub1: {}", n));
// 출력: 1 2 3 4 5

log.info("두 번째 구독:");
coldFlux.subscribe(n -> log.info("sub2: {}", n));
// 출력: 1 2 3 4 5 (독립적으로 처음부터!)

실무에서의 함정 — 같은 Flux를 두 서비스에서 각각 구독하면 소스(DB 쿼리, HTTP 요청)가 두 번 실행됩니다.

// 비효율적: 같은 DB 쿼리가 두 번 실행됨
Flux<User> users = Flux.fromIterable(userRepository.findAll());  // Cold!
users.subscribe(u -> processForService1(u));  // DB 쿼리 실행 1
users.subscribe(u -> processForService2(u));  // DB 쿼리 실행 2 (낭비!)

// 해결: share()로 Hot 변환 → 한 번만 실행
Flux<User> sharedUsers = Flux.fromIterable(userRepository.findAll()).share();
sharedUsers.subscribe(u -> processForService1(u));  // DB 쿼리 실행 1번
sharedUsers.subscribe(u -> processForService2(u));  // 같은 스트림 참여

여기서 시험 함정이 하나 있어요. Mono.just()도 Cold입니다. 값이 메모리에 캐싱되어 있어서 Hot처럼 보이지만, 실제로 구독마다 파이프라인 전체가 다시 실행돼요. Mono.just("hello")는 항상 같은 값을 반환하므로 실질적 차이는 없지만, Mono.just(expensiveCall())이라면 expensiveCall()이 Mono 생성 시 한 번 실행되고 이후 구독마다 그 값이 재사용됩니다 — 이건 Cold의 동작이에요.

한 줄 정리 — Mono, Flux 모두 기본 Cold. 여러 구독자가 공유해야 하면 Hot 변환 필요

Hot Publisher — TV 생방송

Hot Publisher는 하나의 데이터 스트림을 여러 구독자가 공유해요. 나중에 구독한 구독자는 이미 지나간 데이터는 받지 못하고 현재 시점부터 받습니다.

Hot 변환 연산자는 6가지가 있고, 각각 다른 시나리오에 맞습니다.

연산자자동 시작구독자 0명 시재구독 시이전 데이터
publish().connect()수동계속 발행현재부터없음
publish().autoConnect(n)n명 구독 시계속 발행현재부터없음
publish().refCount(n)n명 구독 시소스 해제처음부터 재시작없음
share()1명 구독 시소스 해제처음부터 재시작없음
cache()즉시캐시 유지캐시부터있음
cache(n)즉시캐시 유지최근 n개최근 n개

publish() + connect() — 수동 방송 시작

publish()ConnectableFlux를 반환해요. connect()를 직접 호출하기 전까지 데이터가 흐르지 않습니다.

ConnectableFlux<Integer> connectableFlux = Flux.range(1, 5)
    .delayElements(Duration.ofMillis(100))
    .publish();

// 구독자 등록 (아직 데이터 발행 안 함)
connectableFlux.subscribe(n -> log.info("구독자1: {}", n));
connectableFlux.subscribe(n -> log.info("구독자2: {}", n));

Thread.sleep(200);

connectableFlux.connect();  // 이 시점부터 데이터 발행 시작!
Thread.sleep(1000);

// connect() 이후에 구독하면 이전 데이터는 놓침
connectableFlux.subscribe(n -> log.info("구독자3(늦음): {}", n));

autoConnect(n) — n명 모이면 자동 시작

Flux<Integer> hotFlux = Flux.range(1, 5)
    .delayElements(Duration.ofMillis(300))
    .publish()
    .autoConnect(2);  // 2명이 구독하면 자동 시작

// 1번째 구독 (아직 시작 안 됨)
hotFlux.subscribe(n -> log.info("구독자1: {}", n));
log.info("1명 구독, 아직 시작 안 됨");

Thread.sleep(100);

// 2번째 구독 → 2명 됐으므로 자동 시작!
hotFlux.subscribe(n -> log.info("구독자2: {}", n));
log.info("2명 구독, 자동 시작!");

Thread.sleep(200);

// 3번째 구독: 중간부터 참여, 이전 데이터 놓침
hotFlux.subscribe(n -> log.info("구독자3(늦음): {}", n));

autoConnect(0) 은 구독자가 없어도 즉시 시작됩니다. 실시간 데이터 피드를 항상 켜 두어야 할 때 사용해요.

refCount(n) / share() — 구독자 수 기반 관리

여기서 시험 함정이 하나 있어요. publish().refCount(1)share()와 완전히 동일합니다.

// 두 코드는 완전히 같음
Flux<Long> a = Flux.interval(Duration.ofMillis(100)).publish().refCount(1);
Flux<Long> b = Flux.interval(Duration.ofMillis(100)).share();  // = publish().refCount(1)

share()의 핵심 동작 — 마지막 구독자가 취소하면 소스 연결이 해제됩니다. 재구독하면 소스가 처음부터 다시 시작돼요.

Flux<Long> sharedFlux = Flux.interval(Duration.ofMillis(300)).share();

Disposable sub1 = sharedFlux.subscribe(n -> log.info("구독자1: {}", n));
Thread.sleep(500);

Disposable sub2 = sharedFlux.subscribe(n -> log.info("구독자2: {}", n));
Thread.sleep(500);

sub1.dispose();    // 구독자1 취소
Thread.sleep(500);

sub2.dispose();    // 마지막 구독자 취소 → 소스 연결 해제!
Thread.sleep(500);

// 재구독: 소스가 리셋되어 처음부터 다시 시작
sharedFlux.subscribe(n -> log.info("구독자3(재시작): {}", n));
// 출력: 구독자3: 0, 1, 2, ... (연속적이지 않음!)

autoConnect vs refCount/share — 핵심 차이

여기서 시험 함정이 하나 있어요. autoConnect는 구독자가 0명이 되어도 소스를 계속 실행하고, refCount(=share) 는 0명이 되면 소스를 해제합니다.

// refCount (= share): 0명이 되면 소스 해제 → 재구독 시 처음부터
Flux<Long> refCounted = Flux.interval(Duration.ofMillis(100)).share();
Disposable d = refCounted.subscribe(n -> log.info("refCount: {}", n));
Thread.sleep(500);  // 0,1,2,3,4 발행
d.dispose();        // 소스 연결 해제
Thread.sleep(500);  // 이 시간 동안 발행 없음
refCounted.subscribe(n -> log.info("refCount 재구독: {}", n));
// 출력: 재구독: 0, 1, 2, ... (처음부터!)

// autoConnect: 구독자 없어도 계속 실행 → 재구독 시 현재 값부터
Flux<Long> autoConnected = Flux.interval(Duration.ofMillis(100))
    .publish()
    .autoConnect(1);
Disposable d2 = autoConnected.subscribe(n -> log.info("auto: {}", n));
Thread.sleep(500);  // 0,1,2,3,4 발행
d2.dispose();       // 소스는 계속 실행 중!
Thread.sleep(500);  // 내부적으로 5,6,7,8,9 발행됨 (구독자 없어도)
autoConnected.subscribe(n -> log.info("auto 재구독: {}", n));
// 출력: 재구독: 10, 11, ... (중간부터!)

cache() — 늦게 온 시청자도 다시 볼 수 있는 VOD

cache()는 Hot Publisher이면서 이전에 발행된 데이터를 캐싱해서 늦게 구독한 구독자에게도 제공해요.

Flux<Integer> cachedFlux = Flux.range(1, 5)
    .delayElements(Duration.ofMillis(200))
    .cache();  // 모든 아이템 캐시

cachedFlux.subscribe(n -> log.info("구독자1: {}", n));
Thread.sleep(700);  // 1,2,3 발행 완료

// 두 번째 구독: 캐시된 1,2,3 즉시 받고, 이후 4,5도 받음
cachedFlux.subscribe(n -> log.info("구독자2: {}", n));
Thread.sleep(1000);

// cache(n): 최근 n개만 캐싱 (메모리 절약)
Flux<Long> recentCache = Flux.interval(Duration.ofMillis(100))
    .cache(3);  // 최근 3개만 캐시

메모리 주의: 무한 스트림에 cache()를 쓰면 메모리가 계속 증가해요. 무한 스트림에는 cache(n) 또는 cache(Duration)을 사용하세요.

한 줄 정리 — 늦은 구독자도 이전 데이터 필요 → cache(), 실시간 공유만 → share() 또는 publish().autoConnect()

실제 사용 패턴 — 주식 가격 스트림 공유

// 비효율적: 구독마다 별도 HTTP 연결
Flux<String> coldStock = webClient.get()
    .uri("/api/stock/AAPL")
    .retrieve()
    .bodyToFlux(String.class);

coldStock.subscribe(p -> displayChart(p));   // HTTP 연결 #1
coldStock.subscribe(p -> alertOnChange(p));  // HTTP 연결 #2 (같은 데이터를 두 번!)

// 효율적: share()로 하나의 스트림 공유
Flux<String> hotStock = webClient.get()
    .uri("/api/stock/AAPL")
    .retrieve()
    .bodyToFlux(String.class)
    .share();

hotStock.subscribe(p -> displayChart(p));    // HTTP 연결 #1 시작
hotStock.subscribe(p -> alertOnChange(p));   // 같은 스트림 참여 (추가 연결 없음!)

Hot Publisher 완전 정복 — 시험 직전 압축 노트

  • Cold Publisher = 구독마다 새 실행 (DVD 각자 재생). Mono, Flux 기본값
  • Hot Publisher = 공유 실행 (TV 생방송). 늦은 구독자는 이전 데이터 못 받음
  • Mono.just()도 Cold — 캐싱되어 보이지만 파이프라인은 구독마다 실행
  • publish()ConnectableFlux 반환, connect() 호출 전까지 데이터 없음
  • publish().autoConnect(n) — n명 구독 시 자동 시작, 구독자 0명이어도 소스 계속 실행
  • publish().refCount(n) — n명 구독 시 시작, 0명이면 소스 해제
  • share() = publish().refCount(1) ← 시험 단골
  • share() 재구독 시 → 처음부터 재시작 (소스가 해제됐으므로)
  • autoConnect 재구독 시 → 현재 시점부터 (소스가 계속 실행 중)
  • cache() — Hot + 이전 데이터 캐싱, 무한 스트림에 무제한 캐시 금지
  • cache(n) — 최근 n개만 캐싱
  • Cold Publisher 다중 구독 = 소스 N번 실행 → 비싼 작업이면 share() 또는 cache()
  • 실시간 이벤트(웹소켓, 가격 피드) → share() 또는 publish().autoConnect()
  • 비싼 연산을 공유하되 이전 데이터도 필요 → cache()
  • cache() 무한 스트림 = OOM 위험cache(n) 사용

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!