Java Reactive Programming 핵심 정리 시리즈 7편. Backpressure(배압) 메커니즘 — request(n) 흐름, onBackpressureBuffer/Drop/Latest/Error 4전략, limitRate, BaseSubscriber 직접 구현까지 수도꼭지와 컵 비유로 친절하게 풀어쓴 배압 완전 정복.
이 글은 Java Reactive Programming 핵심 정리 시리즈의 일곱 번째 편입니다. 1편에서 Reactive Streams의 핵심이 request(n) — 소비자가 처리할 수 있는 만큼만 요청하는 메커니즘이라고 짚었어요. 이번 편에서는 그 Backpressure(배압) 를 실전에서 어떻게 다루는지, 오버플로우 전략 4가지는 언제 쓰는지까지 깊게 파고듭니다.
Backpressure 단원이 처음엔 어렵게 느껴지는 이유
이유는 네 가지예요.
첫째, 단일 스레드 환경에서는 배압이 체감되지 않습니다. 생산자와 소비자가 같은 스레드에서 돌면 소비자가 처리를 마치기 전에 생산자가 다음 아이템을 못 내보내거든요. 멀티스레드에서야 비로소 "생산은 빠른데 소비는 느린" 불균형이 눈에 보입니다.
둘째, 기본 Flux는 unbounded request를 씁니다. Reactor가 내부적으로 Long.MAX_VALUE 만큼을 한 번에 요청해 버려서, "배압이 없는 것처럼" 보일 수 있어요.
셋째, 전략 이름이 행동이 아니라 결과를 묘사합니다. Drop이 "소비자 입장에서 버리는 건지, 생산자 입장에서 버리는 건지"가 처음엔 헷갈립니다.
넷째, **limitRate와 onBackpressure* 의 차이가 모호합니다.** 둘 다 "배압을 다루는 연산자"라 설명이 겹치는 것 같아 보이지만 작동 방식이 완전히 다릅니다.
해결법은 비유입니다. Backpressure = "수도꼭지와 컵의 수신호" 로 이해하면 명확해집니다. 컵이 차기 전에 "잠깐, 조금만 줘"라고 신호를 보내는 게 배압의 본질이에요. 컵이 넘치기 시작할 때 어떻게 할지 선택하는 게 오버플로우 전략입니다. 이 글은 그 비유를 따라 풀어 갑니다.
배압이 왜 필요한가 — 불균형 속도 문제
배압은 생산자와 소비자가 서로 다른 스레드에서 실행될 때 주로 발생합니다. 같은 스레드라면 소비자가 처리를 마쳐야 생산자가 다음 아이템을 보낼 수 있어서 자연스럽게 속도가 맞춰집니다. publishOn으로 소비자를 다른 스레드로 분리하는 순간, 생산자는 계속 데이터를 쏟아 내는데 소비자는 처리가 느려 큐에 아이템이 쌓이기 시작해요.
// 배압이 발생하지 않는 경우 (같은 스레드)
Flux.range(1, 10)
.subscribe(n -> {
try { Thread.sleep(100); } catch (Exception e) {}
System.out.println("처리: " + n);
});
// 소비자가 처리 완료 후 다음 생산 — 배압 없음
// 배압 발생 (다른 스레드로 분리)
Flux.range(1, 1000)
.publishOn(Schedulers.parallel()) // 소비자를 parallel 스레드로
.subscribe(n -> {
try { Thread.sleep(10); } catch (Exception e) {} // 소비가 느림
System.out.println("처리: " + n);
});
// 생산자: main 스레드에서 1000개 빠르게 생성
// 소비자: parallel 스레드에서 10ms마다 1개 처리 → 큐에 쌓임
처리되지 않은 데이터가 메모리에 계속 쌓이면 결국 OutOfMemoryError가 터집니다. Reactive Streams 명세의 request(n) 메커니즘은 소비자가 "지금 n개만 보내줘"라고 신호를 보내는 장치예요.
여기서 시험 함정이 하나 있어요. 기본 Flux subscribe는 Long.MAX_VALUE를 요청합니다. 따라서 Reactor 기본 연산자(Flux.range, Flux.generate 등)는 자동으로 배압을 처리하지만, Flux.create는 개발자가 직접 관리해야 해요. "기본 Flux는 배압이 자동이다"고만 기억하면 절반만 맞습니다.
한 줄 정리 — 배압은 멀티스레드 환경에서 생산/소비 속도 불균형을 다루는 메커니즘.
Reactor 내부 자동 배압 — 75% 규칙
Reactor는 생산자와 소비자 사이에 내부 큐를 유지합니다. 기본 큐 크기는 256이에요. 큐가 가득 차면 생산이 일시 중단되고, 소비자가 큐의 75%를 처리하면 생산이 재개됩니다. Flux.range, Flux.generate 같은 기본 연산자에서 이 흐름이 자동으로 처리됩니다.
여기서 시험 함정이 하나 있어요. limitRate(N)이 내부적으로 이 75% 규칙을 사용합니다. limitRate(100)을 쓰면 처음에 100개를 요청하고, 75개가 처리되면 다음 75개를 요청해요. limitRate(N) 안에서는 N이 아니라 N×0.75가 "다음 요청 임계값"입니다.
limitRate — 생산 속도 자체를 제한
limitRate(n)은 소비자가 생산자에게 한 번에 최대 n개만 요청하도록 강제합니다. "수도꼭지를 직접 잠그는" 방식이에요.
// limitRate(5): 한 번에 5개씩만 요청
Flux.create(sink -> {
for (int i = 1; i <= 50; i++) {
System.out.println("생성: " + i);
sink.next(i);
}
sink.complete();
})
.limitRate(5) // 5개씩만 요청
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> {
try { Thread.sleep(100); } catch (Exception e) {}
System.out.println("처리: " + n);
});
Thread.sleep(10000);
중요한 점이 있어요. Flux.create에서 limitRate만 붙여도 생산 루프 자체는 멈추지 않습니다. 소비 속도를 제한하지만, sink.next()가 이미 전부 호출된 이후라면 버퍼에 쌓이는 건 막을 수 없어요. 진짜로 생산 속도를 제어하려면 sink.onRequest()를 써야 합니다.
// 요청받은 만큼만 생산하는 올바른 패턴
Flux.create(sink -> {
sink.onRequest(n -> {
System.out.println("요청: " + n + "개");
for (long i = 0; i < n; i++) {
sink.next((int) i);
}
});
})
.limitRate(5)
.subscribe(System.out::println);
한 줄 정리 — limitRate는 소비 속도 제한, 진짜 생산 속도 제어는 sink.onRequest() 필요.
4가지 오버플로우 전략 — 컵이 넘치기 시작했을 때
수도꼭지를 잠가도 이미 컵이 넘치기 시작했다면? 그때 쓸 전략이 4가지입니다.
| 전략 | 연산자 | 큐 가득 찼을 때 | 데이터 유실 | 사용 사례 |
|---|---|---|---|---|
| Buffer | onBackpressureBuffer() | 버퍼에 모두 저장 | 없음 | 모든 데이터 필요 |
| Drop | onBackpressureDrop() | 새 데이터 버림 | 있음 (새 데이터) | 최신성 낮은 데이터 |
| Latest | onBackpressureLatest() | 최신 값만 유지 | 있음 (중간 데이터) | 최신 상태만 필요 |
| Error | onBackpressureError() | 에러 발생 후 종료 | 없음 (에러로 종료) | 빠른 실패 감지 |
onBackpressureBuffer — "버퍼 박스에 차곡차곡"
// 무제한 버퍼: 모든 데이터 저장 (메모리 주의!)
Flux.create(sink -> {
for (int i = 1; i <= 1000; i++) {
sink.next(i);
}
sink.complete();
})
.onBackpressureBuffer() // 무제한 버퍼
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> {
try { Thread.sleep(10); } catch (Exception e) {}
System.out.println("처리: " + n);
});
Thread.sleep(15000);
// 1000개 모두 처리됨 — 데이터 유실 없음
// 제한 버퍼: 100개 초과 시 가장 오래된 것 버림
Flux.create(sink -> {
for (int i = 1; i <= 1000; i++) {
sink.next(i);
}
sink.complete();
})
.onBackpressureBuffer(
100,
dropped -> System.out.println("버퍼 초과, 버림: " + dropped),
BufferOverflowStrategy.DROP_OLDEST
)
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> System.out.println("처리: " + n));
여기서 시험 함정이 하나 있어요. 무제한 onBackpressureBuffer()를 무한 스트림에 쓰면 OOM입니다. 끝없이 쏟아지는 데이터를 전부 버퍼에 담으려 하면 결국 메모리가 고갈돼요. 무한 스트림에는 크기 제한 버퍼 + 드롭 전략을 조합해야 합니다.
onBackpressureDrop — "늦게 온 택배는 반송"
처리 대기 중에 새로 들어온 데이터를 버립니다. 기존 버퍼에 있는 데이터는 유지하고 새 데이터를 버리는 것이에요.
Flux.create(sink -> {
for (int i = 1; i <= 500; i++) {
try { Thread.sleep(1); } catch (Exception e) {}
sink.next(i);
}
sink.complete();
})
.onBackpressureDrop(dropped -> System.out.println("버려짐: " + dropped))
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> {
try { Thread.sleep(10); } catch (Exception e) {} // 느린 소비자
System.out.println("처리: " + n);
});
로그 이벤트처럼 일부 유실이 괜찮은 상황에 씁니다. 가장 최근 데이터가 아니라 가장 오래된 데이터를 보존하는 점이 Latest와 다릅니다.
onBackpressureLatest — "가장 최신 상태만"
새 값이 오면 기존 대기 중인 값을 교체합니다. 센서 최신값, UI 상태 갱신처럼 중간 상태는 의미 없고 현재 상태만 중요한 케이스에 맞아요.
Flux.create(sink -> {
for (int i = 1; i <= 500; i++) {
try { Thread.sleep(1); } catch (Exception e) {}
sink.next(i);
}
sink.complete();
})
.onBackpressureLatest() // 최신 값만 유지
.publishOn(Schedulers.boundedElastic())
.subscribe(n -> {
try { Thread.sleep(50); } catch (Exception e) {} // 매우 느린 소비자
System.out.println("처리: " + n);
});
// 출력 예: 처음 몇 개 + 마지막 최신 값 (중간 건너뜀)
onBackpressureError — "빠른 실패, 즉시 알림"
소비자 요청을 초과하면 즉시 OverflowException을 발생시키고 스트림을 종료합니다. 배압 위반을 즉각 감지하고 싶은 디버깅 또는 엄격한 속도 제어가 필요한 상황에 씁니다.
Flux.create(sink -> {
for (int i = 1; i <= 1000; i++) {
sink.next(i);
}
sink.complete();
})
.onBackpressureError()
.publishOn(Schedulers.boundedElastic())
.subscribe(
n -> System.out.println("처리: " + n),
e -> System.out.println("에러 발생: " + e.getMessage())
);
Cold vs Hot Publisher와 배압
여기서 중요한 시험 함정이 하나 있어요. Cold Publisher만 자연스러운 배압을 지원합니다. Cold Publisher는 구독자가 요청하는 만큼만 데이터를 생산하도록 설계될 수 있어요. 반면 Hot Publisher는 구독자 요청과 무관하게 데이터를 발행하기 때문에 onBackpressureBuffer/Drop/Latest 같은 오버플로우 전략이 필수입니다.
// Hot Publisher: Sinks
Sinks.Many<Integer> hotSource = Sinks.many().multicast()
.onBackpressureBuffer(100); // OverflowStrategy 필수
// 빠른 생산자
for (int i = 0; i < 1000; i++) {
hotSource.tryEmitNext(i);
}
BaseSubscriber로 직접 배압 제어
Reactor가 제공하는 BaseSubscriber를 확장하면 request(n) 호출 시점을 직접 결정할 수 있습니다.
class SlowSubscriber extends BaseSubscriber<Integer> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 처음에 1개만 요청
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("처리: " + value + " @ " + Thread.currentThread().getName());
try { Thread.sleep(100); } catch (Exception e) {}
// 하나 처리 후 다음 1개 요청 — 배압 완전 수동 제어
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("완료");
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println("에러: " + throwable.getMessage());
}
}
// 사용
Flux.range(1, 10)
.subscribe(new SlowSubscriber());
BaseSubscriber는 Subscriber 인터페이스를 직접 구현하는 것보다 안전합니다. onSubscribe·onNext·onError·onComplete 규약을 알아서 지켜 주기 때문에 중요한 로직(hookOnNext 등)만 오버라이드하면 돼요.
자세한 Project Reactor Backpressure 사양은 Project Reactor 공식 문서에서 확인할 수 있어요.
한 줄 정리 — BaseSubscriber는 request(n) 호출 시점을 직접 제어하는 가장 유연한 방법.
전략 선택 가이드 요약
| 전략 | 데이터 손실 | 메모리 | 적합한 상황 |
|---|---|---|---|
onBackpressureBuffer() | 없음 | 많이 사용 | 모든 데이터 필요 (금융 거래) |
onBackpressureBuffer(n) | 가능 | 제한적 | 적당한 버퍼 + 일부 손실 허용 |
onBackpressureDrop() | 새 데이터 | 일정 유지 | 로그, 이벤트 (오래된 데이터 우선) |
onBackpressureLatest() | 중간 데이터 | 최소 | 센서 최신값, UI 상태 |
onBackpressureError() | 없음 (종료) | 최소 | 빠른 실패, 디버깅 |
limitRate(n) | 없음 | 일정 유지 | 생산 속도 자체 제한 |
limitRate와 onBackpressure 의 근본 차이: limitRate는 생산 속도 자체를 줄이는 "근본 해결"이고, onBackpressure는 오버플로우가 이미 발생한 뒤의 "사후 처리"입니다.
Backpressure 완전 정복 — 압축 노트
여기까지가 Backpressure의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Backpressure = "수도꼭지와 컵의 수신호" — 컵이 차면 잠그라고 알림
- 배압 발생 조건 — 생산자·소비자가 다른 스레드 + 생산 속도 > 소비 속도
- 기본 Flux subscribe =
request(Long.MAX_VALUE)— unbounded, 사실상 배압 없음 - Cold Publisher = 자연스러운 배압 지원 가능
- Hot Publisher = 구독자 요청 무관 발행 →
onBackpressure*전략 필수 - 내부 큐 기본 256개 — 소비자가 75% 처리 시 생산 재개
limitRate(N)— N개씩 요청, 내부적으로 N×0.75 임계값- **
limitRatevsonBackpressure*** — 전자는 근본 해결, 후자는 사후 처리 onBackpressureBuffer()— 데이터 유실 없음, OOM 위험onBackpressureDrop()— 새 데이터 버림, 오래된 데이터 우선 보존onBackpressureLatest()— 최신 값만 유지, 센서·UI 상태에 적합onBackpressureError()— 즉시 에러, 디버깅·빠른 실패Flux.create는 자동 배압 없음 —sink.onRequest()또는sink.isCancelled()활용limitRate만으로는 Flux.create의 생산 루프 제어 불가 —sink.onRequest()병행 필요BaseSubscriber= request(n) 수동 제어 — hookOnNext에서 직접 요청- 단일 스레드에서는
onBackpressureBuffer효과 없음 —publishOn없으면 배압 발생 안 함 - 무제한 버퍼 + 무한 스트림 조합 = OOM 위험 — 크기 제한 버퍼 필수
request(Long.MAX_VALUE)호출 시 배압 신호 사실상 해제 — Reactor 내부 최적화됨- 금융 거래·결제 → Buffer (데이터 유실 불가)
- 로그·모니터링 메트릭 → Drop (일부 유실 허용, 최신성 낮음)
- 주식 시세·센서 온도 → Latest (중간 값 불필요, 최신만 의미 있음)
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Reactive Programming 입문
- 2편 — Mono 완전 정복
- 3편 — Flux 완전 정복
- 4편 — 연산자 (map·flatMap·filter·reduce 등)
- 5편 — Hot & Cold Publishers
- 6편 — Threading & Schedulers
- 7편 — Backpressure (배압) (현재 글)
- 8편 — Publisher 결합 (zip·merge·concat 등)
- 9편 — Batching·Windowing·Grouping
- 10편 — Repeat & Retry
- 11편 — Sinks
- 12편 — Context
- 13편 — 단위 테스트 (StepVerifier)