Java Reactive Programming 핵심 정리 시리즈 3편. Flux.just/range/fromIterable/interval/generate/create/push 7가지 팩토리 메서드 차이, take·skip·distinct 필터, Flux ↔ Mono 변환까지 — 물이 계속 흐르는 호스 비유로 다건 비동기 스트림의 모든 패턴을 친절하게 풀어쓴 3편.
이 글은 Java Reactive Programming 핵심 정리 시리즈의 세 번째 편입니다. 2편에서 0~1개짜리 단건 타입 Mono를 완전히 잡았다면, 이번 편에서는 0~N개(무한 포함) 의 타입 Flux 를 파헤칩니다.
Flux는 "물이 계속 흐르는 호스" 예요. 밸브(구독)를 열면 물이 흐르기 시작하고, 수압에 따라 느리거나 빠르게 흐르고, 호스를 자르면(cancel) 물이 멈춥니다. 목록 조회, 이벤트 스트림, 주기적 타이머, 무한 데이터 — 결과가 여러 개이거나 끝이 없는 비동기 작업은 전부 Flux로 표현해요.
이 시리즈는 Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 프로그래밍 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.
Project Reactor 공식 문서를 옆에 열어 두고 직접 실행해 보면 훨씬 빠르게 체화됩니다.
Flux가 처음엔 왜 어렵게 느껴질까요
이유는 네 가지예요.
첫째, 팩토리 메서드가 너무 많습니다. just, range, fromIterable, interval, generate, create, push — 이름만 봐서는 언제 어떤 걸 써야 할지 감이 안 잡혀요.
둘째, generate와 create의 차이를 모릅니다. 둘 다 "동적으로 데이터를 만드는" 메서드인데, 내부 동작이 완전히 달라서 잘못 쓰면 IllegalStateException이 터집니다.
셋째, Flux.interval()이 왜 아무 것도 안 출력되는지 모릅니다. interval은 별도 스레드에서 실행되는데 메인 스레드가 먼저 종료돼 버려요.
넷째, takeWhile과 takeUntil의 차이가 헷갈립니다. 마지막 항목이 포함되느냐 안 되느냐 — 이게 결과를 완전히 바꾸는데, 처음엔 직관적으로 잘 안 잡혀요.
해결법은 한 가지예요. Flux를 "물이 계속 흐르는 호스" 로 생각하고, 각 팩토리 메서드를 "호스에 물을 공급하는 수원지의 종류"로 이해하면 됩니다. 수돗물(range), 타이머(interval), 손 펌프(generate), 전자 밸브(create) — 이 그림으로 처음부터 풀어 갑니다.
Flux 팩토리 메서드 — 어떤 수원지를 쓸 것인가
정적 값들 →
just/fromIterable/fromArray | 연속 정수 → range | 주기적 실행 → interval | 동기 순차 생성 → generate | 동적/멀티스레드 → create
Flux.just() / fromIterable() / fromArray() — 이미 있는 물
// just: 가변 인자로 여러 값 생성
Flux<String> flux = Flux.just("Java", "Kotlin", "Scala");
flux.subscribe(lang -> log.info("언어: {}", lang));
// 출력: 언어: Java, 언어: Kotlin, 언어: Scala, 완료!
// fromIterable: List, Set 등 컬렉션에서 생성
List<String> names = List.of("Alice", "Bob", "Charlie");
Flux<String> nameFlux = Flux.fromIterable(names);
// fromArray: 배열에서 생성
Integer[] nums = {1, 2, 3, 4, 5};
Flux.fromArray(nums)
.map(n -> n * 2)
.subscribe(log::info); // 2, 4, 6, 8, 10
Flux.fromStream() — Java Stream과의 통합
여기서 시험 함정이 하나 있어요. Flux.fromStream()에 Stream 인스턴스를 직접 전달하면 다중 구독이 불가능합니다. Java Stream은 일회용이라 이미 소비된 Stream을 다시 쓰면 IllegalStateException이 터져요.
List<Integer> list = List.of(1, 2, 3, 4, 5);
// 잘못된 방법: Stream 인스턴스 직접 전달 → 다중 구독 에러
Stream<Integer> stream = list.stream();
Flux<Integer> badFlux = Flux.fromStream(stream);
badFlux.subscribe(v -> log.info("sub1: {}", v)); // OK
badFlux.subscribe(v -> log.info("sub2: {}", v)); // ERROR! 스트림 이미 소비됨
// 올바른 방법: Supplier<Stream> 전달 → 구독마다 새 Stream 생성
Flux<Integer> goodFlux = Flux.fromStream(list::stream); // 메서드 참조 = Supplier
goodFlux.subscribe(v -> log.info("sub1: {}", v)); // OK
goodFlux.subscribe(v -> log.info("sub2: {}", v)); // OK (새 Stream 생성)
Flux.range() — 연속 정수 수돗물
여기서 시험 함정이 하나 있어요. Flux.range(start, count)의 두 번째 인자는 종료 숫자가 아니라 개수입니다. Flux.range(3, 5)는 3~8이 아니라 3, 4, 5, 6, 7 (3부터 5개)이에요.
Flux.range(1, 5).subscribe(log::info); // 1, 2, 3, 4, 5
Flux.range(3, 5).subscribe(log::info); // 3, 4, 5, 6, 7 (3부터 5개!)
// 리액티브 방식의 for 루프
Flux.range(1, 10)
.flatMap(i -> httpClient.get("/api/data/" + i))
.subscribe(log::info);
Flux.interval() — 타이머 호스
interval(Duration)은 지정 시간마다 0, 1, 2, ... 순서로 Long 값을 방출해요. 별도 스레드(parallel scheduler)에서 실행된다는 특징이 있어요.
Flux.interval(Duration.ofSeconds(1))
.subscribe(tick -> log.info("틱: {}", tick));
Thread.sleep(5000); // 메인 스레드가 종료되지 않도록 대기 필수!
// 출력: 틱: 0, 틱: 1, 틱: 2, 틱: 3, 틱: 4
// take와 결합하여 개수 제한
Flux.interval(Duration.ofMillis(500))
.take(3) // 처음 3개만 받고 구독 취소
.subscribe(log::info); // 0, 1, 2 출력 후 완료
Thread.sleep() 없이 바로 실행하면 메인 스레드가 종료돼 아무것도 출력되지 않아요. 테스트에서는 StepVerifier를 활용하세요.
generate vs create vs push — 세 가지 동적 생성
이 세 메서드가 가장 헷갈리는 부분이에요. 한 표로 먼저 정리합니다.
| 특성 | generate | create | push |
|---|---|---|---|
| 한 번에 방출 가능 수 | 반드시 하나 | 여러 개 | 여러 개 |
| 멀티스레드 안전 | N/A (동기) | YES | NO |
| 상태 관리 | 내부 람다 파라미터 | 외부에서 | 외부에서 |
| 적합한 시나리오 | 순차 상태 기반 생성 | 이벤트 리스너·멀티스레드 | 단일 스레드 이벤트 |
generate() — 손 펌프 (동기, 하나씩)
generate()는 SynchronousSink를 사용해서 한 번에 하나씩 동기적으로 아이템을 방출합니다. 각 호출에서 sink.next()를 딱 한 번만 호출해야 해요.
// 기본 사용 — 무한 생성 (take로 개수 제한)
Flux<String> flux = Flux.generate(synchronousSink -> {
synchronousSink.next("데이터");
// complete() 없으면 무한 반복
});
flux.take(5).subscribe(log::info);
// 상태(state)를 가진 generate
Flux<String> statefulFlux = Flux.generate(
() -> 0, // 초기 상태
(counter, sink) -> {
sink.next("Item " + counter);
if (counter >= 9) {
sink.complete();
}
return counter + 1; // 다음 상태 반환
}
);
statefulFlux.subscribe(log::info); // Item 0 ~ Item 9
여기서 시험 함정이 하나 있어요. generate()에서 sink.next()를 두 번 호출하면 IllegalStateException 이 발생합니다. generate는 "한 번에 반드시 하나만" 이 원칙이에요.
create() — 전자 밸브 (비동기, 멀티스레드 안전)
create()는 FluxSink를 사용해서 여러 아이템을 한꺼번에 방출할 수 있고, 멀티스레드에서 안전해요.
Flux.create(fluxSink -> {
fluxSink.next("데이터1");
fluxSink.next("데이터2");
fluxSink.next("데이터3");
fluxSink.complete(); // 완료 신호 필수!
}).subscribe(log::info);
// 배압 전략 설정 (버퍼 오버플로 시 처리 방식)
Flux.create(sink -> {
for (int i = 0; i < 1000; i++) {
sink.next(i);
}
sink.complete();
}, FluxSink.OverflowStrategy.DROP) // 버퍼 초과 시 DROP
.subscribe(log::info);
complete() 호출을 잊으면 구독자가 완료 신호를 받지 못해 프로그램이 종료되지 않을 수 있어요.
push() — 단일 스레드 전용 밸브
push()는 create()와 거의 동일하지만 단일 스레드에서만 안전합니다. 내부 동기화 비용이 없어 단일 스레드 이벤트 기반 API 통합에 적합해요.
Flux<String> flux = Flux.push(fluxSink -> {
// 단일 스레드에서만 호출해야 함
eventEmitter.on("data", data -> fluxSink.next(data));
eventEmitter.on("end", () -> fluxSink.complete());
eventEmitter.on("error", e -> fluxSink.error(e));
});
take 계열 연산자 — 호스를 언제 잠글 것인가
take 계열은 "얼마만큼 흘려보낼 것인가"를 제어하는 연산자예요.
| 연산자 | 동작 | 마지막 항목 포함 |
|---|---|---|
take(n) | 처음 n개 후 cancel | — |
takeLast(n) | 마지막 n개만 | — |
takeWhile(pred) | 조건이 true인 동안 | false 항목 미포함 |
takeUntil(pred) | 조건이 true까지 | true 항목 포함 |
// take(n): 처음 5개만
Flux.range(1, 100)
.take(5)
.subscribe(log::info); // 1, 2, 3, 4, 5
// takeWhile: 조건 false인 항목은 제외
Flux.range(1, 10)
.takeWhile(n -> n < 5)
.subscribe(log::info); // 1, 2, 3, 4 (5는 포함 안 됨!)
// takeUntil: 조건 true인 항목은 포함
Flux.range(1, 10)
.takeUntil(n -> n == 5)
.subscribe(log::info); // 1, 2, 3, 4, 5 (5도 포함됨!)
takeWhile과 takeUntil의 차이 — takeWhile(n -> n < 5)와 takeUntil(n -> n == 5)는 결과가 같아 보이지만, takeWhile(n -> n < 5)는 5를 만나면 5를 버리고 끝내고, takeUntil(n -> n == 5)는 5를 만나면 5를 포함하고 끝내요.
Cold Publisher와 다중 구독자
Flux는 기본적으로 Cold Publisher예요. 구독할 때마다 처음부터 새로 실행됩니다.
Flux<Integer> coldFlux = Flux.range(1, 5);
coldFlux.subscribe(v -> log.info("sub1: {}", v)); // 1,2,3,4,5
coldFlux.subscribe(v -> log.info("sub2: {}", v)); // 1,2,3,4,5 (독립적!)
// 두 구독자 모두 처음부터 1~5를 받음
여러 구독자가 같은 스트림을 공유하려면 Hot 변환이 필요해요. 이 내용은 5편(Hot & Cold Publishers)에서 자세히 다룹니다.
Flux ↔ Mono 변환
두 타입은 서로 변환할 수 있어요.
// Mono → Flux
Mono<String> mono = Mono.just("hello");
Flux<String> flux = mono.flux(); // 0~1개짜리 Flux
// Flux → Mono (첫 번째 아이템만)
Flux<Integer> numFlux = Flux.range(1, 10);
Mono<Integer> first = numFlux.next(); // 첫 번째 아이템만
// Flux → Mono (전체 수집)
Mono<List<Integer>> listMono = numFlux.collectList(); // 모든 아이템을 List로
Flux 완전 정복 — 시험 직전 압축 노트
- Flux = 0~N개 아이템을 비동기로 처리 — Stream의 비동기 버전
Flux.just(a,b,c)/fromIterable/fromArray— 이미 있는 값들fromStream(list.stream())→ 다중 구독 불가.fromStream(list::stream)← Supplier 필수!Flux.range(start, count)— 두 번째 인자는 종료가 아닌 개수Flux.interval(Duration)— 별도 스레드 실행, 메인 스레드 종료 주의generate— 동기적, 반드시 하나씩, 상태 기반 순차 생성create— 비동기 가능, 여러 개 한꺼번에, 멀티스레드 안전push—create와 동일하지만 단일 스레드만 안전- generate에서
sink.next()두 번 호출 = IllegalStateException - create에서
complete()누락 = 구독자 완료 신호 못 받음 take(n)— 처음 n개 후 canceltakeWhile(pred)— 조건 false인 항목 미포함 후 완료takeUntil(pred)— 조건 true인 항목 포함 후 완료- Flux는 기본 Cold Publisher — 구독마다 새로 실행
mono.flux()— Mono → Flux 변환flux.next()— Flux에서 첫 아이템만 Mono로flux.collectList()— 모든 아이템을Mono로 수집- >
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Reactive Programming 입문
- 2편 — Mono 완전 정복
- 3편 — Flux 완전 정복 (현재 글)
- 4편 — 연산자 (map·flatMap·filter·reduce 등)
- 5편 — Hot & Cold Publishers
- 6편 — Threading & Schedulers
- 7편 — Backpressure (배압)
- 8편 — Publisher 결합 (zip·merge·concat 등)
- 9편 — Batching·Windowing·Grouping
- 10편 — Repeat & Retry
- 11편 — Sinks
- 12편 — Context
- 13편 — 단위 테스트 (StepVerifier)