Spring WebFlux 핵심 정리 시리즈 10편. Server-Sent Events(SSE)의 HTTP 단방향 푸시 원리부터 Flux
이 글은 Spring WebFlux 핵심 정리 시리즈의 10번째 편입니다. 9편까지 WebClient·스트리밍 응답을 다뤘고, 이번 10편에서는 그 자연스러운 확장인 Server-Sent Events(SSE) 를 다룹니다. 실시간 대시보드, 알림 시스템, 주가 피드처럼 "서버가 클라이언트에게 계속 데이터를 보내야 하는 상황"에서 SSE가 어떻게 동작하는지, 그리고 Spring WebFlux가 이걸 얼마나 우아하게 지원하는지 살펴볼게요.
이번 편의 핵심 비유는 라디오 방송입니다. 채널을 맞추면 음악이 계속 흘러나오고, 청취자는 들을 수만 있고 방송국에 답장은 못 합니다. SSE가 딱 이 모델이에요. 단방향 — 서버에서 클라이언트로. WebSocket처럼 양방향이 아니고, 폴링처럼 반복 요청도 없습니다.
이 시리즈는 Spring 공식 문서, Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 백엔드 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.
SSE를 직접 손으로 만져 보려면 spring-boot-starter-webflux와 간단한 컨트롤러 하나면 충분해요. 브라우저 개발자 도구의 Network 탭을 열고 EventSource를 구독해 보면 데이터가 흘러오는 걸 눈으로 확인할 수 있습니다.
Server-Sent Events가 왜 만들어졌는가
웹에서 실시간 데이터를 보여주는 방법은 역사적으로 세 가지 경로를 거쳤어요.
첫 번째는 폴링(Polling) — 클라이언트가 "새 데이터 있나요?" 하고 주기적으로 서버를 두드리는 방식입니다. 구현이 단순하지만 빈 응답도 계속 날아오고, 간격이 짧을수록 서버 부하가 급증해요.
두 번째는 WebSocket — 연결을 한 번 열고 양방향으로 데이터를 주고받는 방식입니다. 채팅이나 게임처럼 클라이언트가 서버로 실시간 메시지를 보내야 할 때 적합해요. 단, 프로토콜이 HTTP와 다르고(ws://) 구현 복잡도가 올라갑니다.
세 번째가 Server-Sent Events — HTTP를 그대로 쓰면서 서버가 연결을 끊지 않고 계속 데이터를 보내는 방식입니다. W3C 표준이라 브라우저가 네이티브로 지원하고, 자동 재연결도 내장돼 있어요. 클라이언트가 서버로 메시지를 보낼 필요가 없는 상황 — 주가 피드, 알림, 실시간 대시보드 — 에서는 WebSocket보다 훨씬 간단합니다.
여기서 시험 함정이 하나 있어요. SSE는 HTTP 기반 단방향입니다. 브라우저 EventSource는 서버에서 보내주는 데이터만 받을 수 있고, 서버로 메시지를 보내려면 별도 HTTP 요청을 써야 해요. 양방향이 필요하면 WebSocket을 쓰면 됩니다.
SSE 메시지 형식 — 프로토콜 이해가 핵심
SSE는 text/event-stream 미디어 타입으로 응답하면서 연결을 유지합니다. 서버가 보내는 각 이벤트는 다음 형식을 따라요.
data: {"price": 150.25, "ticker": "AAPL"}
event: product-added
data: {"id": 1, "name": "New Widget"}
id: 1001
retry: 3000
필드별로 역할이 분명합니다.
data:— 이벤트 데이터 (필수)event:— 이벤트 타입 (없으면 기본값 "message")id:— 이벤트 ID. 재연결 시Last-Event-ID헤더로 전달되어 어디서부터 이어받을지 알 수 있음retry:— 재연결 대기 시간 (ms). 기본값은 브라우저마다 다르지만 보통 3초:— 주석 (클라이언트 무시, heartbeat로 활용)- 이벤트는 빈 줄로 구분 (
\n\n)
여기서 시험 함정이 하나 있어요. id 필드는 재연결 지원의 핵심입니다. 브라우저가 연결이 끊기면 자동으로 재연결을 시도하는데, 이때 Last-Event-ID 헤더에 마지막으로 받은 이벤트 ID를 담아서 보냅니다. 서버가 이 값을 읽으면 "아, 이 ID 이후부터 보내면 되는구나" 하고 이어서 스트리밍할 수 있어요. Flux 같은 단순 타입으로 SSE를 구현하면 id 필드를 제어할 수 없습니다 — 이 경우 Flux 래퍼를 써야 합니다.
Spring WebFlux SSE 기본 구현 — Flux vs ServerSentEvent
Spring WebFlux에서 SSE 응답을 만드는 방법은 두 가지예요.
방법 1: Flux + produces = MediaType.TEXT_EVENT_STREAM_VALUE
@RestController
@RequestMapping("/sse")
public class SseController {
@GetMapping(value = "/time", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamTime() {
return Flux.interval(Duration.ofSeconds(1))
.map(tick -> "Server time: " + LocalDateTime.now());
// 각 String이 "data: {값}\n\n" 형식으로 자동 직렬화됨
}
@GetMapping(value = "/products", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductDto> streamProducts() {
return productRepository.findAll()
.map(p -> new ProductDto(p.getId(), p.getName(), p.getPrice()))
.delayElements(Duration.ofMillis(500));
// ProductDto가 JSON으로 직렬화되어 data: 필드에 들어감
}
}
방법 2: Flux — id·event·retry 명시 제어 필요 시
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ProductDto>> streamEvents() {
AtomicLong counter = new AtomicLong();
return productRepository.findAll()
.map(p -> ServerSentEvent.<ProductDto>builder()
.id(String.valueOf(counter.incrementAndGet())) // 이벤트 ID
.event("product") // 이벤트 타입
.data(new ProductDto(p.getId(), p.getName(), p.getPrice()))
.retry(Duration.ofSeconds(3)) // 재연결 간격
.build());
}
두 방식의 차이를 정리하면 이렇습니다.
| 항목 | Flux | Flux |
|---|---|---|
data: 필드 | 자동 직렬화 | 명시 제어 |
event: 필드 | 기본값 "message" | 명시 지정 가능 |
id: 필드 | 없음 | 명시 지정 가능 |
retry: 필드 | 없음 | 명시 지정 가능 |
| 재연결 시 이어받기 | 불가 | 가능 (Last-Event-ID) |
| 클라이언트 이벤트 필터링 | 불가 | 가능 (addEventListener) |
여기서 시험 함정이 하나 있어요. produces = MediaType.TEXT_EVENT_STREAM_VALUE를 빼면 SSE가 동작하지 않습니다. 이 어노테이션이 없으면 Spring WebFlux가 Flux를 JSON 배열로 전체 수집 후 한 번에 응답해요. 브라우저 EventSource가 이벤트 스트림으로 인식하려면 반드시 이 헤더가 있어야 합니다.
Sinks — 명령형 코드에서 반응형 스트림으로 이벤트 주입
단순히 DB의 데이터를 스트리밍하는 건 쉬워요. 문제는 "미래에 발생할 이벤트를 구독자들에게 실시간으로 전달하는 것" 입니다. 예를 들어 새 제품이 추가될 때마다 현재 연결된 모든 SSE 클라이언트에게 알림을 보내야 한다면?
이때 필요한 게 Sinks (Project Reactor 3.4+)입니다. Sinks는 일반 메서드 호출(sink.tryEmitNext(data))로 반응형 스트림에 데이터를 주입하는 "프로그래머블 이벤트 소스"예요.
@Service
public class ProductService {
// replay().limit(20): 새로 연결한 구독자에게 최근 20개 이벤트 즉시 전달
private final Sinks.Many<ProductDto> productSink =
Sinks.many().replay().limit(20);
// 신제품 저장 + 모든 SSE 구독자에게 이벤트 발행
public Mono<ProductDto> saveProduct(ProductDto productDto) {
return productRepository.save(toEntity(productDto))
.map(this::toDto)
.doOnNext(dto -> {
Sinks.EmitResult result = productSink.tryEmitNext(dto);
if (result.isFailure()) {
log.warn("Failed to emit product event: {}", result);
}
});
}
// SSE 엔드포인트가 구독할 Flux (외부에서 Sink 직접 접근 불가)
public Flux<ProductDto> getProductStream() {
return productSink.asFlux();
}
}
@RestController
@RequestMapping("/product")
public class ProductController {
private final ProductService productService;
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<ProductDto> saveProduct(@RequestBody ProductDto productDto) {
return productService.saveProduct(productDto);
// 저장 성공 시 Sink를 통해 모든 SSE 구독자에게 자동 알림
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductDto> streamProducts() {
return productService.getProductStream();
}
}
이벤트 흐름을 그림으로 보면 이렇습니다.
[클라이언트 A] → GET /product/stream → SSE 연결 유지
[클라이언트 B] → GET /product/stream → SSE 연결 유지
[어떤 요청] → POST /product → saveProduct() → sink.tryEmitNext(dto)
↓
┌───────────────────────┴───────────────────────┐
↓ ↓
[클라이언트 A]에게 이벤트 전달 [클라이언트 B]에게 이벤트 전달
Sinks.Many 세 가지 타입 — 어떤 걸 언제 쓰는가
Sinks.Many는 크게 세 가지 타입이 있어요.
// unicast: 구독자 1명만 허용, 두 번째 구독 시 IllegalStateException
Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer();
// multicast: 여러 구독자 허용, 단 구독 이전 이벤트는 받지 못함
// 구독자가 없는 상태에서 발행된 이벤트는 소실됨
Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer();
// replay: 여러 구독자 허용, 나중에 구독해도 과거 이벤트 재생
Sinks.Many<String> replay = Sinks.many().replay().limit(10); // 최근 10개 재생
| 항목 | unicast | multicast | replay |
|---|---|---|---|
| 구독자 수 | 1명 | N명 | N명 |
| 과거 이벤트 | 없음 | 없음 (소실) | 있음 (제한적) |
| SSE 적합성 | 부적합 | 현재 연결된 클라이언트만 | 재연결 시 이어받기 가능 |
| 메모리 사용 | 낮음 | 낮음 | 중간 |
여기서 시험 함정이 하나 있어요. multicast는 구독자가 없을 때 발행된 이벤트가 소실됩니다. SSE 사용 중 클라이언트가 잠깐 끊기면 그 사이의 이벤트를 영영 못 받아요. replay().limit(n)을 쓰면 재연결 시 최근 n개를 즉시 전달받고, 그 이후부터 실시간으로 이어받을 수 있습니다. 대부분의 SSE 알림 시스템에서 replay가 더 적합한 이유입니다.
또 하나 — replay().all()은 모든 이벤트를 영구 보존하므로 메모리 누수 위험이 있어요. limit(n) 또는 limit(Duration) 으로 범위를 제한하는 게 안전합니다.
브라우저 EventSource API
SSE의 장점 중 하나는 브라우저가 네이티브로 지원한다는 점이에요. 별도 라이브러리 없이 JavaScript 한 줄이면 구독할 수 있습니다.
const eventSource = new EventSource('http://localhost:8080/product/stream');
// 기본 이벤트 수신 (event 타입이 "message"인 경우)
eventSource.onmessage = (event) => {
const product = JSON.parse(event.data);
console.log('New product:', product.name, product.price);
};
// 특정 이벤트 타입만 수신 (ServerSentEvent의 event() 필드 사용 시)
eventSource.addEventListener('product-added', (event) => {
console.log('Product added:', JSON.parse(event.data));
});
// 오류 처리 (연결 끊기면 브라우저가 자동으로 재연결 시도)
eventSource.onerror = (error) => {
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Connection closed, will auto-reconnect');
}
};
// 명시적 종료 (컴포넌트 언마운트 시)
// eventSource.close();
여기서 시험 함정이 하나 있어요. EventSource는 자동 재연결이 내장되어 있습니다. 연결이 끊기면 브라우저가 스스로 재연결을 시도하고, 이때 Last-Event-ID 헤더에 마지막 이벤트 ID를 담아 보냅니다. 이 헤더를 서버가 읽어서 "어디부터 보내면 되는지" 알 수 있어요.
Last-Event-ID로 재연결 시 이어받기
@GetMapping(value = "/stream/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ProductDto>> streamWithResume(
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
long fromId = lastEventId != null ? Long.parseLong(lastEventId) : 0L;
return productRepository.findByIdGreaterThan(fromId)
.map(p -> ServerSentEvent.<ProductDto>builder()
.id(String.valueOf(p.getId())) // 반드시 id 설정
.event("product")
.data(new ProductDto(p.getId(), p.getName(), p.getPrice()))
.build());
}
클라이언트가 ID 1001까지 받고 연결이 끊겼다면, 재연결 시 Last-Event-ID: 1001 헤더를 보내고, 서버는 1001 이후 데이터만 스트리밍합니다. 놓친 이벤트 없이 이어받는 거예요.
Heartbeat 패턴 — 연결 유지
SSE는 데이터가 없으면 프록시 서버나 로드밸런서가 연결을 타임아웃으로 끊을 수 있어요. 주기적으로 "살아있어요" 신호를 보내서 연결을 유지하는 게 Heartbeat 패턴입니다.
@GetMapping(value = "/stream/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithHeartbeat() {
// 실제 이벤트 스트림
Flux<ServerSentEvent<String>> eventStream = productService.getProductStream()
.map(dto -> ServerSentEvent.<String>builder()
.event("product")
.data(dto.toString())
.build());
// 30초마다 주석 이벤트 (클라이언트는 무시, 연결만 유지)
Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(30))
.map(tick -> ServerSentEvent.<String>builder()
.comment("heartbeat")
.build());
return Flux.merge(eventStream, heartbeat);
}
흔한 실수 5가지
실수 1: produces = TEXT_EVENT_STREAM_VALUE 누락
// BAD — SSE가 아니라 JSON 배열로 한 번에 응답됨
@GetMapping("/stream")
public Flux<ProductDto> streamProducts() { ... }
// GOOD
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductDto> streamProducts() { ... }
실수 2: Sink를 요청 스코프로 생성
// BAD — 매 요청마다 새 Sink 생성, 이 Sink에 아무도 이벤트 발행 안 함
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductDto> streamProducts() {
Sinks.Many<ProductDto> sink = Sinks.many().replay().limit(10); // 요청마다 새 Sink!
return sink.asFlux();
}
// GOOD — 서비스에서 Sink를 필드로 관리 (싱글톤)
@Service
public class ProductService {
private final Sinks.Many<ProductDto> productSink =
Sinks.many().replay().limit(20); // 한 번만 생성
}
실수 3: CORS 미설정으로 브라우저 EventSource 차단
// 브라우저에서 EventSource가 차단되면 @CrossOrigin 확인
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@CrossOrigin(origins = "*") // 개발 시, 프로덕션에서는 구체적 도메인 지정
public Flux<ProductDto> streamProducts() { ... }
실수 4: replay().all() 사용으로 메모리 누수
// BAD — 모든 이벤트 영구 보존 → 메모리 계속 증가
Sinks.Many<ProductDto> leakySink = Sinks.many().replay().all();
// GOOD — 크기 또는 시간 제한
Sinks.Many<ProductDto> safeSink = Sinks.many().replay().limit(100);
실수 5: 트랜잭션 미커밋 상태에서 이벤트 발행
// BAD — 트랜잭션이 아직 커밋 안 된 상태에서 이벤트 발행
// 구독자가 이 이벤트를 받아 DB 조회하면 데이터가 없을 수 있음
.doOnNext(saved -> sink.tryEmitNext(saved)); // 커밋 전
// GOOD — doOnSuccess 사용 (Mono 완료 후 실행)
.doOnSuccess(saved -> sink.tryEmitNext(saved));
자주 만나는 함정 — 시험 직전 압축 노트
10편의 핵심을 정리합니다.
- SSE = HTTP 기반 서버 → 클라이언트 단방향 실시간 푸시
produces = MediaType.TEXT_EVENT_STREAM_VALUE필수 — 없으면 스트리밍 안 됨Flux— data 필드만, 단순 push /Flux— id·event·retry 명시 제어> ServerSentEvent필드:data(필수),event(타입),id(재연결 이어받기),retry(재연결 간격)- EventSource 자동 재연결 — 브라우저 내장,
Last-Event-ID헤더로 이어받기 - Sinks.Many.unicast — 구독자 1명 / multicast — N명, 과거 소실 / replay — N명, 과거 재생
replay().limit(n)— 재연결 지원에 적합,replay().all()사용 금지 (메모리 누수)tryEmitNext반환값EmitResult확인 — FAIL_ZERO_SUBSCRIBER·FAIL_OVERFLOW·FAIL_TERMINATED- Sink는 서비스에서 필드로 관리 (싱글톤), 요청마다 새로 만들면 이벤트 못 받음
- Heartbeat = 주기적 빈 이벤트로 프록시 타임아웃 방지
- SSE vs WebSocket — SSE는 단방향 (알림·대시보드), WebSocket은 양방향 (채팅·게임)
- 브라우저 EventSource CORS 주의 — WebFilter나 @CrossOrigin 설정 필요
- 서버 간 SSE 구독 = WebClient +
accept(MediaType.TEXT_EVENT_STREAM)+bodyToFlux()
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.
- 1편 — Spring WebFlux 입문
- 2편 — Spring Data R2DBC
- 3편 — R2DBC vs JPA
- 4편 — 리액티브 CRUD API
- 5편 — 입력 검증·예외 처리
- 6편 — WebFilter
- 7편 — 함수형 엔드포인트
- 8편 — WebClient
- 9편 — 스트리밍 응답
- 10편 — Server-Sent Events (SSE) (현재 글)
- 11편 — 성능 최적화
- 12편 — 리액티브 마이크로서비스
- 13편 — 다음 단계