gRPC + Spring Boot 마스터 노트 시리즈 4편. Server Streaming RPC가 1 요청 → N 응답으로 페이징·실시간 알림·로그 스트리밍에 적합한 이유, StreamObserver의 onNext 다중 호출, 서버 측 Flux 통합, 클라이언트 Iterator 패턴, 무한 스트림 종료·취소·deadline까지.
이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 네 번째 편입니다. 3편(Unary)가 1:1이었다면, 이번엔 1:N — Server Streaming.
페이징의 더 우아한 대안. 실시간 알림. 로그 스트리밍. 한 요청에 여러 응답을 자연스럽게.
처음 Server Streaming이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Unary와 코드가 비슷한데 어떻게 다른가 막연합니다. 둘째, 언제 Server Streaming을 쓸지 막연합니다.
해결법은 한 가지예요. "onNext 여러 번 = Server Streaming" 한 줄. Unary는 1번, Streaming은 N번. 이 차이만 잡으면 끝.
Proto 정의
service NotificationService {
rpc Subscribe (Subscription) returns (stream Notification);
rpc ListUsers (Filter) returns (stream User);
rpc TailLogs (LogFilter) returns (stream LogEntry);
}
stream 키워드 — 응답이 streaming.
서버 구현
@GrpcService
public class NotificationServiceImpl extends NotificationServiceGrpc.NotificationServiceImplBase {
@Override
public void subscribe(Subscription request, StreamObserver<Notification> observer) {
try {
for (int i = 0; i < 10; i++) {
Notification n = Notification.newBuilder()
.setMessage("Notification " + i)
.setTimestamp(Instant.now().toEpochMilli())
.build();
observer.onNext(n); // 여러 번 호출
Thread.sleep(1000);
}
observer.onCompleted();
} catch (InterruptedException e) {
observer.onError(Status.CANCELLED.asRuntimeException());
}
}
}
onNext를 N번 → 클라이언트 N개 응답 받음. 마지막 onCompleted.
클라이언트 — BlockingStub (Iterator)
@GrpcClient("notification-service")
private NotificationServiceGrpc.NotificationServiceBlockingStub blockingStub;
public void subscribe() {
Subscription req = Subscription.newBuilder().setUserId("123").build();
Iterator<Notification> iterator = blockingStub.subscribe(req);
while (iterator.hasNext()) {
Notification n = iterator.next();
log.info("Got: {}", n.getMessage());
}
}
여기서 시험 함정이 하나 있어요. BlockingStub Server Streaming = Iterator. hasNext()·next()로 순회. 응답 없으면 블로킹.
클라이언트 — AsyncStub (콜백)
@GrpcClient("notification-service")
private NotificationServiceGrpc.NotificationServiceStub asyncStub;
public void subscribe() {
Subscription req = Subscription.newBuilder().setUserId("123").build();
asyncStub.subscribe(req, new StreamObserver<Notification>() {
@Override
public void onNext(Notification n) {
log.info("Got: {}", n.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream done");
}
@Override
public void onError(Throwable t) {
log.error("Failed", t);
}
});
}
논블로킹. 콜백마다 처리.
클라이언트 — Reactive Flux
@Autowired
private ReactorNotificationServiceGrpc.ReactorNotificationServiceStub reactorStub;
public Flux<Notification> subscribe() {
Subscription req = Subscription.newBuilder().setUserId("123").build();
return reactorStub.subscribe(req); // Flux<Notification>
}
WebFlux 친화. flatMap·take·filter 자연스럽게.
서버 — Reactive Flux 통합
@GrpcService
public class NotificationServiceImpl extends ReactorNotificationServiceGrpc.NotificationServiceImplBase {
@Override
public Flux<Notification> subscribe(Mono<Subscription> request) {
return request.flatMapMany(req ->
Flux.interval(Duration.ofSeconds(1))
.take(10)
.map(i -> Notification.newBuilder()
.setMessage("Notification " + i)
.build())
);
}
}
reactor-grpc 사용 시 Flux 직접 반환. StreamObserver 추상화.
사용 사례
1. 페이징 대안
// REST 페이징
GET /users?page=1&size=20
GET /users?page=2&size=20
...
// gRPC Streaming
rpc ListUsers(Filter) returns (stream User);
// 한 번 호출 → 모든 사용자 스트리밍
여기서 정말 중요한 시험 함정 — 대용량 결과 = Server Streaming. 페이징 OK. Streaming 더 효율 (단일 호출).
2. 실시간 알림
rpc Subscribe(Subscription) returns (stream Notification);
WebSocket 대안. 클라이언트가 구독 → 서버가 알림 푸시.
3. 로그 스트리밍
rpc TailLogs(LogFilter) returns (stream LogEntry);
tail -f 같은 흐름.
4. 실시간 시세
rpc WatchPrice(Symbol) returns (stream Price);
주식·암호화폐 등.
무한 스트림 — 클라이언트 종료
// 클라이언트
ClientCall<Subscription, Notification> call = ...;
// 일정 시간 후 또는 조건 만족 시
call.cancel("Client done", null);
또는 Iterator 사용 시:
Iterator<Notification> it = blockingStub.subscribe(req);
int count = 0;
while (it.hasNext() && count < 100) {
Notification n = it.next();
process(n);
count++;
}
// 100개 받고 종료 — gRPC 자동 cancel
서버 — 클라이언트 취소 감지
@Override
public void subscribe(Subscription req, StreamObserver<Notification> observer) {
Context context = Context.current();
Disposable subscription = realtimeService.notifications()
.doOnNext(observer::onNext)
.doOnComplete(observer::onCompleted)
.doOnError(observer::onError)
.subscribe();
context.addListener(ctx -> {
if (ctx.isCancelled()) {
log.info("Client cancelled");
subscription.dispose();
}
}, MoreExecutors.directExecutor());
}
여기서 정말 중요한 시험 함정 — 클라이언트 취소 시 서버도 정리. 안 하면 자원 누수. Context.addListener 또는 Reactive doOnCancel.
Deadline 적용
// 클라이언트
Iterator<Notification> it = blockingStub
.withDeadlineAfter(60, TimeUnit.SECONDS) // 60초 안 모두 받아야
.subscribe(req);
전체 스트림 시간 제한. 무한 스트림이면 무의미·short-lived만.
백프레셔 (gRPC의 한계)
여기서 시험 함정이 하나 있어요. gRPC는 백프레셔 부분 지원. HTTP/2 flow control 기반. 명시적 request(N) 같은 메커니즘 X. RSocket과 결정적 차이.
// AsyncStub은 백프레셔 일부 가능
ServerCallStreamObserver<Notification> serverObserver = (ServerCallStreamObserver<Notification>) observer;
serverObserver.setOnReadyHandler(() -> {
while (serverObserver.isReady()) {
// 클라이언트가 받을 준비 됐을 때만 보냄
observer.onNext(generateNext());
}
});
isReady() 체크. 완벽한 백프레셔는 아님.
에러 처리
@Override
public void subscribe(Subscription req, StreamObserver<Notification> observer) {
try {
Flux.interval(Duration.ofSeconds(1))
.take(10)
.doOnNext(i -> {
if (i == 5) throw new RuntimeException("Mid stream error");
observer.onNext(generateNotification(i));
})
.doOnComplete(observer::onCompleted)
.doOnError(e -> observer.onError(
Status.INTERNAL.withCause(e).asRuntimeException()
))
.subscribe();
} catch (Exception e) {
observer.onError(Status.UNKNOWN.asRuntimeException());
}
}
스트림 중간 에러 = onError. 클라이언트 = StatusRuntimeException.
Server Streaming vs RSocket Request-Stream
거의 동일 모델. 차이:
| 측면 | gRPC Server Streaming | RSocket Request-Stream |
|---|---|---|
| 백프레셔 | 부분 (HTTP/2 flow) | 표준 (REQUEST_N) |
| Reactive | 부분 | 네이티브 |
| 생태계 | 매우 큼 | 작음 |
Server Streaming vs WebSocket
| 측면 | gRPC | WebSocket |
|---|---|---|
| 메시지 의미 | Protobuf 강제 | 자유 (raw) |
| 양방향 | Bidirectional 모드만 | 항상 |
| 브라우저 | gRPC-Web (제한) | 네이티브 |
| 스키마 | 강력 | 없음 |
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 4편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Server Streaming = 1 요청 → N 응답
- Proto —
returns (stream Type) - 서버 —
onNext여러 번 →onCompleted - 클라이언트 BlockingStub = Iterator (
hasNext·next) - 클라이언트 AsyncStub = 콜백 (StreamObserver)
- Reactive Flux (
reactor-grpc) = WebFlux 친화 - 서버 —
Flux<Response>직접 반환 - 사용처 — 페이징 대안 / 실시간 알림 / 로그 / 시세
- WebSocket·RSocket 대안
- 무한 스트림 — 클라이언트
cancel()또는 Iterator 종료 - 서버 취소 감지 —
Context.addListener또는doOnCancel - 자원 정리 필수
- Deadline = 전체 스트림 시간 (short-lived만)
- gRPC 백프레셔 = 부분 (HTTP/2 flow control)
isReady()+setOnReadyHandler로 부분 제어- RSocket Request-Stream과 거의 동일·백프레셔만 차이
- 에러 — 스트림 중간
onError→StatusRuntimeException - WebSocket vs gRPC — Protobuf·스키마·브라우저 친화도
시리즈 다른 편
- 1편 — 기본 개념·HTTP/2·4 RPC 모드
- 2편 — Protocol Buffers
- 3편 — Unary RPC
- 4편 — Server Streaming (현재 글)
- 5편 — Client Streaming
- 6편 — Bidirectional Streaming
- 7편 — Interceptors
- 8편 — Error Handling
- 9편 — Security
- 10편 — 고급 (Reflection·Health·LB·gRPC-Web)
공식 문서: gRPC Streaming RPC 에서 더 깊이.
다음 글(5편)에서는 Client Streaming — N:1 패턴, 대량 업로드·로그 수집·집계 시나리오까지 풀어 갑니다.