gRPC + Spring Boot — Client Streaming

2026-05-03확률과 통계 마스터 노트

gRPC + Spring Boot 마스터 노트 시리즈 5편. Client Streaming RPC가 N:1 패턴(N 메시지 → 1 응답)으로 대량 업로드·로그 수집·집계 시나리오에 적합한 이유, 클라이언트의 onNext 다중·서버의 단일 응답, Reactive Mono 통합, 청크 업로드 패턴, 부분 실패 처리까지.

이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 다섯 번째 편입니다. 4편(Server Streaming)이 1:N이었다면, 이번엔 N:1 — Client Streaming.

대량 업로드·로그 수집·통계 집계에 적합. 클라이언트가 N 메시지를 보내고, 서버가 모든 메시지를 받은 후 1 응답.

처음 Client Streaming이 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, 클라이언트도 StreamObserver라는 점이 헷갈립니다. 둘째, 언제 끝났다고 알리나 막연합니다.

해결법은 한 가지예요. "클라이언트도 onNext·onCompleted·onError 호출". 클라이언트가 보낼 게 끝나면 onCompleted → 서버가 응답. 이 흐름만 잡으면 끝.

Proto 정의

service AnalyticsService {
  rpc UploadEvents (stream Event) returns (UploadResult);
  rpc CollectMetrics (stream Metric) returns (Summary);
  rpc UploadFile (stream FileChunk) returns (UploadStatus);
}

요청에 stream, 응답엔 X. N:1.

서버 구현

@GrpcService
public class AnalyticsServiceImpl extends AnalyticsServiceGrpc.AnalyticsServiceImplBase {

    @Override
    public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> responseObserver) {
        AtomicInteger counter = new AtomicInteger();
        AtomicLong totalBytes = new AtomicLong();
        
        return new StreamObserver<Event>() {
            @Override
            public void onNext(Event event) {
                // 클라이언트가 보낸 각 메시지 처리
                eventService.persist(event);
                counter.incrementAndGet();
                totalBytes.addAndGet(event.getSerializedSize());
            }
            
            @Override
            public void onCompleted() {
                // 클라이언트가 모두 보낸 후 — 응답 1번
                UploadResult result = UploadResult.newBuilder()
                    .setEventCount(counter.get())
                    .setTotalBytes(totalBytes.get())
                    .build();
                responseObserver.onNext(result);
                responseObserver.onCompleted();
            }
            
            @Override
            public void onError(Throwable t) {
                log.error("Client error", t);
            }
        };
    }
}

여기서 정말 중요한 시험 함정 — 반환 타입이 StreamObserver<Event>. 서버가 클라이언트 메시지 받을 observer를 반환. responseObserver는 응답용 (Unary와 비슷).

클라이언트 — AsyncStub

@GrpcClient("analytics-service")
private AnalyticsServiceGrpc.AnalyticsServiceStub asyncStub;

public void uploadEvents(List<Event> events) {
    CountDownLatch latch = new CountDownLatch(1);
    
    StreamObserver<UploadResult> responseObserver = new StreamObserver<UploadResult>() {
        @Override
        public void onNext(UploadResult result) {
            log.info("Uploaded {} events, {} bytes", 
                result.getEventCount(), result.getTotalBytes());
        }
        
        @Override
        public void onCompleted() {
            latch.countDown();
        }
        
        @Override
        public void onError(Throwable t) {
            log.error("Failed", t);
            latch.countDown();
        }
    };
    
    StreamObserver<Event> requestObserver = asyncStub.uploadEvents(responseObserver);
    
    try {
        for (Event event : events) {
            requestObserver.onNext(event);   // 메시지 보내기
        }
        requestObserver.onCompleted();         // 끝났음 알림
    } catch (Exception e) {
        requestObserver.onError(e);
    }
    
    latch.await(30, TimeUnit.SECONDS);
}

흐름:

  1. 응답 observer 정의
  2. asyncStub.uploadEvents(responseObserver) → 요청 observer 받음
  3. 요청 observer로 메시지 N개 → onCompleted
  4. 서버 응답 받음

여기서 정말 중요한 시험 함정 — 클라이언트도 onCompleted 호출 필수. 안 하면 서버가 영원히 더 받기 대기.

클라이언트 — BlockingStub은 X

여기서 시험 함정이 하나 있어요. Client Streaming은 BlockingStub 미지원. 응답이 모든 입력 후에 오므로 동기 호출 불가. AsyncStub만.

Reactive Mono 통합

@Autowired
private ReactorAnalyticsServiceGrpc.ReactorAnalyticsServiceStub reactorStub;

public Mono<UploadResult> upload(Flux<Event> events) {
    return reactorStub.uploadEvents(events);
}
// 사용
Flux<Event> events = Flux.fromIterable(eventList);
upload(events)
    .subscribe(result -> log.info("Done: {}", result));

WebFlux 친화. 자연스러운 흐름.

서버 — Reactive Mono

@GrpcService
public class AnalyticsServiceImpl extends ReactorAnalyticsServiceGrpc.AnalyticsServiceImplBase {

    @Override
    public Mono<UploadResult> uploadEvents(Flux<Event> request) {
        return request
            .doOnNext(eventService::persist)
            .count()
            .map(count -> UploadResult.newBuilder()
                .setEventCount(count.intValue())
                .build());
    }
}

Flux로 받아 처리·집계 → Mono 응답.

사용 사례

1. 대량 업로드

rpc UploadEvents(stream Event) returns (UploadResult);

수백·수천 이벤트 한 번에. 개별 호출보다 효율.

2. 청크 파일 업로드

message FileChunk {
  bytes data = 1;
  string filename = 2;
}

rpc UploadFile(stream FileChunk) returns (UploadStatus);

큰 파일을 작은 청크로 나눠 업로드.

public Mono<UploadStatus> uploadFile(File file) {
    return Flux.using(
        () -> new FileInputStream(file),
        is -> Flux.generate(sink -> {
            byte[] buffer = new byte[64 * 1024];
            try {
                int read = is.read(buffer);
                if (read == -1) {
                    sink.complete();
                } else {
                    sink.next(FileChunk.newBuilder()
                        .setData(ByteString.copyFrom(buffer, 0, read))
                        .build());
                }
            } catch (IOException e) {
                sink.error(e);
            }
        }),
        is -> { try { is.close(); } catch (IOException e) {} }
    )
    .as(reactorStub::uploadFile);
}

3. 메트릭 집계

rpc CollectMetrics(stream Metric) returns (Summary);

여러 노드에서 메트릭 수집 → 서버가 집계.

4. 통계 계산

rpc CalculateStatistics(stream DataPoint) returns (Statistics);

평균·중앙값·표준편차 등.

부분 실패 — 일부 메시지 실패해도 계속

@Override
public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> observer) {
    AtomicInteger success = new AtomicInteger();
    AtomicInteger failed = new AtomicInteger();
    
    return new StreamObserver<Event>() {
        @Override
        public void onNext(Event event) {
            try {
                eventService.persist(event);
                success.incrementAndGet();
            } catch (Exception e) {
                failed.incrementAndGet();
                // 에러 시 onError 안 부르고 계속
            }
        }
        
        @Override
        public void onCompleted() {
            UploadResult result = UploadResult.newBuilder()
                .setSuccess(success.get())
                .setFailed(failed.get())
                .build();
            observer.onNext(result);
            observer.onCompleted();
        }
        
        @Override
        public void onError(Throwable t) {}
    };
}

여기서 시험 함정이 하나 있어요. 개별 메시지 실패 ≠ onError. onError 부르면 전체 스트림 종료. 부분 실패는 카운트만, 응답에서 보고.

백프레셔 (제한적)

@Override
public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> observer) {
    ServerCallStreamObserver<UploadResult> serverObserver = (ServerCallStreamObserver) observer;
    
    serverObserver.setOnReadyHandler(() -> {
        // 응답 보낼 준비 됐을 때
    });
    
    return new StreamObserver<Event>() {
        // 처리
    };
}

서버가 받기 너무 빨라 처리 못 따라가면? 명시적 백프레셔 X. 처리 큐 사용 또는 Reactive로.

타임아웃·deadline

StreamObserver<Event> requestObserver = asyncStub
    .withDeadlineAfter(5, TimeUnit.MINUTES)    // 5분 안 모두 보내야
    .uploadEvents(responseObserver);

전체 스트림 시간. 길게 설정 가능.

큰 파일 vs S3 직접

gRPC Client Streaming = 작은~중간 (수MB ~ 수십MB)
S3 Direct Upload = 큰 파일 (수백MB+)

여기서 정말 중요한 시험 함정 — gRPC Streaming은 큰 파일 부적합. 메모리 사용·타임아웃 문제. 큰 파일 = S3 등 객체 스토리지 직접.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 5편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Client Streaming = N 메시지 → 1 응답
  • Proto — rpc Method (stream Req) returns (Res)
  • 서버 — 반환 타입이 StreamObserver<Req>
  • responseObserver는 응답용 (Unary와 비슷)
  • 클라이언트 — asyncStub.method(responseObserver) → 요청 observer
  • 클라이언트가 onNext 여러 번 → onCompleted 필수
  • onCompleted 안 하면 서버 영원 대기
  • BlockingStub 미지원 — AsyncStub만
  • Reactive Mono (reactor-grpc) — Flux<Req>Mono<Res>
  • WebFlux 친화
  • 사용처 — 대량 업로드 / 청크 파일 / 메트릭 집계 / 통계
  • 청크 업로드 — 64KB 등 단위로 나눔
  • 부분 실패 ≠ onError (전체 종료) — 카운트만, 응답에 보고
  • 백프레셔 부분 — ServerCallStreamObserver.setOnReadyHandler
  • Deadline = 전체 스트림 시간
  • 큰 파일 부적합 (수백MB+) — S3 직접 권장
  • 작은~중간 파일·이벤트 묶음에만

시리즈 다른 편

공식 문서: gRPC Client Streaming 에서 더 깊이.

다음 글(6편)에서는 Bidirectional Streaming — 양방향 N:N 통신, 채팅·실시간 게임·트레이딩 시스템까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!