Reactor Sinks 완전 정복 — unicast·multicast·replay

2026-05-03AWS SAA-C03 스터디

Java Reactive Programming 핵심 정리 시리즈 11편. Reactor Sinks 완전 정복 — Sinks.One / Many.unicast / multicast / replay 타입 선택 기준, EmitFailureHandler 멀티스레드 안전 패턴, tryEmitNext vs emitValue 차이, 이벤트 버스 실용 패턴까지 비유와 시험 함정을 곁들여 정리.

📚 Java Reactive Programming 핵심 정리 · 11편 / 14편 — unicast·multicast·replay

이 글은 Java Reactive Programming 핵심 정리 시리즈의 열한 번째 편입니다. 지금까지 배운 Mono, Flux, 연산자들은 모두 "선언된 시점의 데이터 소스"를 기반으로 파이프라인을 흘렸어요. 그런데 실무에서는 "내가 원하는 시점에 직접 데이터를 파이프라인에 밀어 넣어야" 하는 상황이 자주 생깁니다. 버튼 클릭, 외부 이벤트 수신, 수동 완료 신호 — 이게 Sinks가 해결하는 문제예요.

📚 학습 노트

이 시리즈는 Project Reactor 공식 문서와 Reactive Streams 명세를 포함한 공개 학습 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.

IDE에 reactor-core 의존성을 추가하고 예제 코드를 직접 실행해 보세요.

왜 Reactor Sinks가 처음엔 어렵게 느껴질까요

이유는 네 가지예요.

첫째, 기존 Flux.create와 뭐가 다른지 모호합니다. Flux.create도 수동으로 데이터를 밀어넣을 수 있는데 Sinks가 따로 필요한 이유가 처음엔 보이지 않아요.

둘째, unicast / multicast / replay — 세 종류의 차이가 한눈에 안 들어옵니다. 각각 언제 쓰는지 예시 없이는 구분하기 어려워요.

셋째, tryEmitNextemitValue가 왜 따로 있는지 헷갈립니다. 둘 다 값을 발행하는데 어느 걸 써야 하는지 처음엔 감이 안 잡혀요.

넷째, 멀티스레드 환경에서 데이터가 조용히 사라집니다. tryEmitNext를 여러 스레드에서 동시에 호출하면 FAIL_NON_SERIALIZED 결과가 반환되면서 데이터가 손실되는데, 아무 예외도 던지지 않아서 원인을 찾기 어려워요.

비유로 잡으면 명확해요. Sinks = 프로그래머가 직접 손으로 물을 따르는 수도꼭지입니다. 기존 Flux는 파이프가 연결돼 있어서 수원지(소스)가 알아서 흘려주는 구조라면, Sinks는 빈 수도꼭지에 내가 직접 물을 부어 넣는 구조예요. 구독과 완전히 독립적으로, 내가 원하는 시점에 원하는 양을 주입할 수 있습니다.

Sinks.One — 단일 값 또는 완료/에러 주입

Sinks.One은 0개 또는 1개의 값을 발행하는 Sink입니다. Mono의 프로그래밍 방식 버전이에요.

// Sinks.One 기본 사용
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();

// 구독 먼저
mono.subscribe(value -> System.out.println("sam: " + value));
mono.subscribe(value -> System.out.println("mike: " + value));

// 나중에 값 주입 — 모든 구독자에게 전달
sink.tryEmitValue("hello world");
// 출력: sam: hello world, mike: hello world

// 값 없이 완료 신호
Sinks.One<String> completeSink = Sinks.one();
completeSink.asMono().subscribe(
    v -> System.out.println("값: " + v),
    e -> System.out.println("에러: " + e),
    () -> System.out.println("완료")
);
completeSink.tryEmitEmpty();
// 출력: 완료

// 에러 신호 주입
Sinks.One<String> errorSink = Sinks.one();
errorSink.asMono().subscribe(
    v -> System.out.println("값: " + v),
    e -> System.out.println("에러: " + e.getMessage())
);
errorSink.tryEmitError(new RuntimeException("강제 에러"));
// 출력: 에러: 강제 에러

tryEmitValueEmitResult를 반환합니다. 성공이면 OK, 이미 값을 발행한 적 있으면 FAIL_TERMINATED가 돌아와요. 반환값을 무시하면 안 됩니다.

Sinks.One<String> sink = Sinks.one();
sink.asMono().subscribe(System.out::println);

Sinks.EmitResult result1 = sink.tryEmitValue("first");
System.out.println("첫 번째: " + result1);  // OK

Sinks.EmitResult result2 = sink.tryEmitValue("second");
System.out.println("두 번째: " + result2);  // FAIL_TERMINATED

여기서 시험 함정이 하나 있어요. Sinks.One에는 값을 한 번만 발행할 수 있습니다. 두 번째 tryEmitValue 호출은 조용히 무시되며 FAIL_TERMINATED만 반환해요.

Reactor Sinks.Many — unicast / multicast / replay

Sinks.Many는 여러 개의 값을 발행하는 Flux 버전 Sink입니다. 구독자 수와 이전 데이터 재전송 여부에 따라 세 가지 타입으로 나뉘어요.

unicast — 단일 구독자 전용

// unicast: 한 명의 구독자만 허용, 버퍼링 가능
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> flux = unicastSink.asFlux();

// 구독 전에 데이터 발행 (버퍼에 저장)
unicastSink.tryEmitNext("버퍼 메시지 1");
unicastSink.tryEmitNext("버퍼 메시지 2");

// 첫 번째 구독 (버퍼의 데이터도 받음)
flux.subscribe(s -> System.out.println("sam: " + s));
// 출력: sam: 버퍼 메시지 1, sam: 버퍼 메시지 2

// 두 번째 구독 시도 → 예외 발생!
flux.subscribe(s -> System.out.println("mike: " + s));
// IllegalStateException: allows only a single Subscriber

multicast — 다중 구독자 브로드캐스트

// multicast.onBackpressureBuffer: 여러 구독자, 버퍼링 지원
Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = multicastSink.asFlux();

flux.subscribe(s -> System.out.println("sam: " + s));
flux.subscribe(s -> System.out.println("mike: " + s));

multicastSink.tryEmitNext("hi");          // sam: hi, mike: hi
multicastSink.tryEmitNext("how are you"); // sam: how are you, mike: how are you

여기서 시험 함정이 하나 있어요. multicast는 첫 구독 이후 발행된 데이터만 새 구독자에게 실시간으로 전달합니다. 늦게 구독한 구독자는 이전에 발행된 데이터를 놓칠 수 있어요. 이전 데이터도 받아야 한다면 replay를 써야 합니다.

replay — 이전 데이터 재전송

// replay().all(): 모든 이전 데이터를 새 구독자에게
Sinks.Many<String> replaySink = Sinks.many().replay().all();
Flux<String> replayFlux = replaySink.asFlux();

replaySink.tryEmitNext("msg1");
replaySink.tryEmitNext("msg2");
replaySink.tryEmitNext("msg3");

// 나중에 구독해도 이전 데이터 모두 받음
replayFlux.subscribe(s -> System.out.println("늦은 구독자: " + s));
// 출력: msg1, msg2, msg3

// replay().limit(n): 최근 n개만 재전송
Sinks.Many<String> limitedReplay = Sinks.many().replay().limit(2);
Flux<String> limitedFlux = limitedReplay.asFlux();

limitedReplay.tryEmitNext("old1");
limitedReplay.tryEmitNext("old2");
limitedReplay.tryEmitNext("old3");

limitedFlux.subscribe(s -> System.out.println("새 구독자: " + s));
// 출력: old2, old3 (최근 2개만)

여기서 시험 함정이 하나 있어요. replay().all()은 무한 스트림에 쓰면 OOM(Out of Memory)이 됩니다. 모든 데이터를 메모리에 쌓기 때문에 지속적으로 데이터가 발행되는 경우엔 반드시 replay().limit(n) 또는 replay().limit(Duration)으로 범위를 제한해야 해요.

✅ H2 끝 정리

Sinks.Many 타입 선택 기준:

  • 단일 소비자 큐 → unicast().onBackpressureBuffer()
  • 실시간 브로드캐스트 → multicast().onBackpressureBuffer()
  • 이전 데이터도 새 구독자에게 → replay().limit(n)
  • 손실 허용 빠른 브로드캐스트 → multicast().directBestEffort()

멀티스레드 환경에서의 Reactor Sinks 안전 패턴

tryEmitNext를 여러 스레드에서 동시에 호출하면 FAIL_NON_SERIALIZED가 반환되며 데이터가 조용히 사라집니다. 예외가 발생하지 않아서 놓치기 쉬운 버그예요.

// 위험한 코드: 멀티스레드에서 일부 데이터 손실
Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
List<Integer> received = new ArrayList<>();
sink.asFlux().subscribe(received::add);

for (int i = 0; i < 1000; i++) {
    final int value = i;
    CompletableFuture.runAsync(() -> sink.tryEmitNext(value));
}
Thread.sleep(1000);
System.out.println("수신 수: " + received.size()); // 1000보다 작을 수 있음!

// 안전한 코드: FAIL_NON_SERIALIZED 시 재시도
for (int i = 0; i < 1000; i++) {
    final int value = i;
    CompletableFuture.runAsync(() -> {
        Sinks.EmitResult result;
        do {
            result = sink.tryEmitNext(value);
        } while (result == Sinks.EmitResult.FAIL_NON_SERIALIZED); // 충돌 시 재시도
    });
}
Thread.sleep(1000);
System.out.println("수신 수: " + received.size()); // 1000 (정확)

emitValue / emitNext 메서드에 EmitFailureHandler를 전달해도 같은 효과입니다.

sink.emitNext(value, (signalType, emitResult) -> {
    return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED; // true면 재시도
});

실용 패턴 — 이벤트 버스 구현

Sinks의 가장 전형적인 실무 활용은 앱 전체 이벤트 버스입니다.

public class EventBus {
    private static final Sinks.Many<Event> bus =
        Sinks.many().multicast().onBackpressureBuffer();

    public static void publish(Event event) {
        bus.tryEmitNext(event);
    }

    public static Flux<Event> subscribe(Class<? extends Event> eventType) {
        return bus.asFlux()
            .filter(e -> eventType.isInstance(e))
            .cast(eventType);
    }
}

// 사용
EventBus.subscribe(UserCreatedEvent.class)
    .subscribe(event -> sendWelcomeEmail(event.getUserId()));

EventBus.publish(new UserCreatedEvent(userId));

자세한 Sinks 동작과 옵션은 Project Reactor 공식 문서에서 확인할 수 있습니다.

핵심 압축 노트 — 시험 직전 20개

여기까지가 Sinks 편의 핵심입니다. 빠르게 복습할 수 있게 압축 노트로 마무리합니다.

  • Sinks = 프로그래머가 직접 파이프라인에 데이터를 주입하는 수도꼭지
  • Flux.create는 구독 시에만 주입 가능, Sinks는 언제든 주입 가능
  • Processor는 deprecated → Sinks 사용 권장
  • Sinks.One — 0~1개 값 발행, Mono 프로그래밍 버전
  • Sinks.One에 값은 한 번만 발행 가능 (두 번째 = FAIL_TERMINATED)
  • tryEmitValue / tryEmitNextEmitResult 반환, 처리 책임은 호출자
  • emitValue / emitNextEmitFailureHandler 전달, 실패 처리 내장
  • unicast().onBackpressureBuffer()단일 구독자만, 버퍼링 지원
  • unicast에 두 번째 구독 시 IllegalStateException 발생
  • multicast().onBackpressureBuffer()다중 구독자, 버퍼 후 전달
  • multicast는 첫 구독 이후 데이터만 실시간 전달 (이전 데이터 없음)
  • replay().all() — 모든 이전 데이터 캐싱 → 무한 스트림엔 OOM 주의
  • replay().limit(n) — 최근 n개만 캐싱 (메모리 안전)
  • replay().limit(Duration) — 최근 N시간 데이터만 캐싱
  • 멀티스레드 tryEmitNext = FAIL_NON_SERIALIZED 가능 (데이터 손실!)
  • 안전 패턴: do-while 루프로 FAIL_NON_SERIALIZED 재시도
  • EmitResult.OK — 성공, 무시, FAIL_OVERFLOW — 배압 전략 재검토
  • multicast().directBestEffort() — 구독자 없거나 느리면 드롭
  • Sinks.Many.asFlux() 로 일반 Flux처럼 사용 가능
  • 이벤트 버스 패턴 → multicast().onBackpressureBuffer() 가 적합

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!