gRPC + Spring Boot — Server Streaming

2026-05-03확률과 통계 마스터 노트

gRPC + Spring Boot 마스터 노트 시리즈 4편. Server Streaming RPC가 1 요청 → N 응답으로 페이징·실시간 알림·로그 스트리밍에 적합한 이유, StreamObserver의 onNext 다중 호출, 서버 측 Flux 통합, 클라이언트 Iterator 패턴, 무한 스트림 종료·취소·deadline까지.

이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 네 번째 편입니다. 3편(Unary)가 1:1이었다면, 이번엔 1:N — Server Streaming.

페이징의 더 우아한 대안. 실시간 알림. 로그 스트리밍. 한 요청에 여러 응답을 자연스럽게.

처음 Server Streaming이 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Unary와 코드가 비슷한데 어떻게 다른가 막연합니다. 둘째, 언제 Server Streaming을 쓸지 막연합니다.

해결법은 한 가지예요. "onNext 여러 번 = Server Streaming" 한 줄. Unary는 1번, Streaming은 N번. 이 차이만 잡으면 끝.

Proto 정의

service NotificationService {
  rpc Subscribe (Subscription) returns (stream Notification);
  rpc ListUsers (Filter) returns (stream User);
  rpc TailLogs (LogFilter) returns (stream LogEntry);
}

stream 키워드 — 응답이 streaming.

서버 구현

@GrpcService
public class NotificationServiceImpl extends NotificationServiceGrpc.NotificationServiceImplBase {

    @Override
    public void subscribe(Subscription request, StreamObserver<Notification> observer) {
        try {
            for (int i = 0; i < 10; i++) {
                Notification n = Notification.newBuilder()
                    .setMessage("Notification " + i)
                    .setTimestamp(Instant.now().toEpochMilli())
                    .build();
                
                observer.onNext(n);              // 여러 번 호출
                Thread.sleep(1000);
            }
            observer.onCompleted();
        } catch (InterruptedException e) {
            observer.onError(Status.CANCELLED.asRuntimeException());
        }
    }
}

onNext를 N번 → 클라이언트 N개 응답 받음. 마지막 onCompleted.

클라이언트 — BlockingStub (Iterator)

@GrpcClient("notification-service")
private NotificationServiceGrpc.NotificationServiceBlockingStub blockingStub;

public void subscribe() {
    Subscription req = Subscription.newBuilder().setUserId("123").build();
    
    Iterator<Notification> iterator = blockingStub.subscribe(req);
    
    while (iterator.hasNext()) {
        Notification n = iterator.next();
        log.info("Got: {}", n.getMessage());
    }
}

여기서 시험 함정이 하나 있어요. BlockingStub Server Streaming = Iterator. hasNext()·next()로 순회. 응답 없으면 블로킹.

클라이언트 — AsyncStub (콜백)

@GrpcClient("notification-service")
private NotificationServiceGrpc.NotificationServiceStub asyncStub;

public void subscribe() {
    Subscription req = Subscription.newBuilder().setUserId("123").build();
    
    asyncStub.subscribe(req, new StreamObserver<Notification>() {
        @Override
        public void onNext(Notification n) {
            log.info("Got: {}", n.getMessage());
        }
        
        @Override
        public void onCompleted() {
            log.info("Stream done");
        }
        
        @Override
        public void onError(Throwable t) {
            log.error("Failed", t);
        }
    });
}

논블로킹. 콜백마다 처리.

클라이언트 — Reactive Flux

@Autowired
private ReactorNotificationServiceGrpc.ReactorNotificationServiceStub reactorStub;

public Flux<Notification> subscribe() {
    Subscription req = Subscription.newBuilder().setUserId("123").build();
    return reactorStub.subscribe(req);   // Flux<Notification>
}

WebFlux 친화. flatMap·take·filter 자연스럽게.

서버 — Reactive Flux 통합

@GrpcService
public class NotificationServiceImpl extends ReactorNotificationServiceGrpc.NotificationServiceImplBase {

    @Override
    public Flux<Notification> subscribe(Mono<Subscription> request) {
        return request.flatMapMany(req ->
            Flux.interval(Duration.ofSeconds(1))
                .take(10)
                .map(i -> Notification.newBuilder()
                    .setMessage("Notification " + i)
                    .build())
        );
    }
}

reactor-grpc 사용 시 Flux 직접 반환. StreamObserver 추상화.

사용 사례

1. 페이징 대안

// REST 페이징
GET /users?page=1&size=20
GET /users?page=2&size=20
...

// gRPC Streaming
rpc ListUsers(Filter) returns (stream User);
// 한 번 호출 → 모든 사용자 스트리밍

여기서 정말 중요한 시험 함정 — 대용량 결과 = Server Streaming. 페이징 OK. Streaming 더 효율 (단일 호출).

2. 실시간 알림

rpc Subscribe(Subscription) returns (stream Notification);

WebSocket 대안. 클라이언트가 구독 → 서버가 알림 푸시.

3. 로그 스트리밍

rpc TailLogs(LogFilter) returns (stream LogEntry);

tail -f 같은 흐름.

4. 실시간 시세

rpc WatchPrice(Symbol) returns (stream Price);

주식·암호화폐 등.

무한 스트림 — 클라이언트 종료

// 클라이언트
ClientCall<Subscription, Notification> call = ...;

// 일정 시간 후 또는 조건 만족 시
call.cancel("Client done", null);

또는 Iterator 사용 시:

Iterator<Notification> it = blockingStub.subscribe(req);
int count = 0;
while (it.hasNext() && count < 100) {
    Notification n = it.next();
    process(n);
    count++;
}
// 100개 받고 종료 — gRPC 자동 cancel

서버 — 클라이언트 취소 감지

@Override
public void subscribe(Subscription req, StreamObserver<Notification> observer) {
    Context context = Context.current();
    
    Disposable subscription = realtimeService.notifications()
        .doOnNext(observer::onNext)
        .doOnComplete(observer::onCompleted)
        .doOnError(observer::onError)
        .subscribe();
    
    context.addListener(ctx -> {
        if (ctx.isCancelled()) {
            log.info("Client cancelled");
            subscription.dispose();
        }
    }, MoreExecutors.directExecutor());
}

여기서 정말 중요한 시험 함정 — 클라이언트 취소 시 서버도 정리. 안 하면 자원 누수. Context.addListener 또는 Reactive doOnCancel.

Deadline 적용

// 클라이언트
Iterator<Notification> it = blockingStub
    .withDeadlineAfter(60, TimeUnit.SECONDS)   // 60초 안 모두 받아야
    .subscribe(req);

전체 스트림 시간 제한. 무한 스트림이면 무의미·short-lived만.

백프레셔 (gRPC의 한계)

여기서 시험 함정이 하나 있어요. gRPC는 백프레셔 부분 지원. HTTP/2 flow control 기반. 명시적 request(N) 같은 메커니즘 X. RSocket과 결정적 차이.

// AsyncStub은 백프레셔 일부 가능
ServerCallStreamObserver<Notification> serverObserver = (ServerCallStreamObserver<Notification>) observer;
serverObserver.setOnReadyHandler(() -> {
    while (serverObserver.isReady()) {
        // 클라이언트가 받을 준비 됐을 때만 보냄
        observer.onNext(generateNext());
    }
});

isReady() 체크. 완벽한 백프레셔는 아님.

에러 처리

@Override
public void subscribe(Subscription req, StreamObserver<Notification> observer) {
    try {
        Flux.interval(Duration.ofSeconds(1))
            .take(10)
            .doOnNext(i -> {
                if (i == 5) throw new RuntimeException("Mid stream error");
                observer.onNext(generateNotification(i));
            })
            .doOnComplete(observer::onCompleted)
            .doOnError(e -> observer.onError(
                Status.INTERNAL.withCause(e).asRuntimeException()
            ))
            .subscribe();
    } catch (Exception e) {
        observer.onError(Status.UNKNOWN.asRuntimeException());
    }
}

스트림 중간 에러 = onError. 클라이언트 = StatusRuntimeException.

Server Streaming vs RSocket Request-Stream

거의 동일 모델. 차이:

측면 gRPC Server Streaming RSocket Request-Stream
백프레셔 부분 (HTTP/2 flow) 표준 (REQUEST_N)
Reactive 부분 네이티브
생태계 매우 큼 작음

Server Streaming vs WebSocket

측면 gRPC WebSocket
메시지 의미 Protobuf 강제 자유 (raw)
양방향 Bidirectional 모드만 항상
브라우저 gRPC-Web (제한) 네이티브
스키마 강력 없음

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 4편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Server Streaming = 1 요청 → N 응답
  • Proto — returns (stream Type)
  • 서버 — onNext 여러 번 → onCompleted
  • 클라이언트 BlockingStub = Iterator (hasNext·next)
  • 클라이언트 AsyncStub = 콜백 (StreamObserver)
  • Reactive Flux (reactor-grpc) = WebFlux 친화
  • 서버 — Flux<Response> 직접 반환
  • 사용처 — 페이징 대안 / 실시간 알림 / 로그 / 시세
  • WebSocket·RSocket 대안
  • 무한 스트림 — 클라이언트 cancel() 또는 Iterator 종료
  • 서버 취소 감지Context.addListener 또는 doOnCancel
  • 자원 정리 필수
  • Deadline = 전체 스트림 시간 (short-lived만)
  • gRPC 백프레셔 = 부분 (HTTP/2 flow control)
  • isReady() + setOnReadyHandler로 부분 제어
  • RSocket Request-Stream과 거의 동일·백프레셔만 차이
  • 에러 — 스트림 중간 onErrorStatusRuntimeException
  • WebSocket vs gRPC — Protobuf·스키마·브라우저 친화도

시리즈 다른 편

공식 문서: gRPC Streaming RPC 에서 더 깊이.

다음 글(5편)에서는 Client Streaming — N:1 패턴, 대량 업로드·로그 수집·집계 시나리오까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!