gRPC + Spring Boot 마스터 노트 시리즈 5편. Client Streaming RPC가 N:1 패턴(N 메시지 → 1 응답)으로 대량 업로드·로그 수집·집계 시나리오에 적합한 이유, 클라이언트의 onNext 다중·서버의 단일 응답, Reactive Mono 통합, 청크 업로드 패턴, 부분 실패 처리까지.
이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 다섯 번째 편입니다. 4편(Server Streaming)이 1:N이었다면, 이번엔 N:1 — Client Streaming.
대량 업로드·로그 수집·통계 집계에 적합. 클라이언트가 N 메시지를 보내고, 서버가 모든 메시지를 받은 후 1 응답.
처음 Client Streaming이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, 클라이언트도 StreamObserver라는 점이 헷갈립니다. 둘째, 언제 끝났다고 알리나 막연합니다.
해결법은 한 가지예요. "클라이언트도 onNext·onCompleted·onError 호출". 클라이언트가 보낼 게 끝나면 onCompleted → 서버가 응답. 이 흐름만 잡으면 끝.
Proto 정의
service AnalyticsService {
rpc UploadEvents (stream Event) returns (UploadResult);
rpc CollectMetrics (stream Metric) returns (Summary);
rpc UploadFile (stream FileChunk) returns (UploadStatus);
}
요청에 stream, 응답엔 X. N:1.
서버 구현
@GrpcService
public class AnalyticsServiceImpl extends AnalyticsServiceGrpc.AnalyticsServiceImplBase {
@Override
public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> responseObserver) {
AtomicInteger counter = new AtomicInteger();
AtomicLong totalBytes = new AtomicLong();
return new StreamObserver<Event>() {
@Override
public void onNext(Event event) {
// 클라이언트가 보낸 각 메시지 처리
eventService.persist(event);
counter.incrementAndGet();
totalBytes.addAndGet(event.getSerializedSize());
}
@Override
public void onCompleted() {
// 클라이언트가 모두 보낸 후 — 응답 1번
UploadResult result = UploadResult.newBuilder()
.setEventCount(counter.get())
.setTotalBytes(totalBytes.get())
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
log.error("Client error", t);
}
};
}
}
여기서 정말 중요한 시험 함정 — 반환 타입이 StreamObserver<Event>. 서버가 클라이언트 메시지 받을 observer를 반환. responseObserver는 응답용 (Unary와 비슷).
클라이언트 — AsyncStub
@GrpcClient("analytics-service")
private AnalyticsServiceGrpc.AnalyticsServiceStub asyncStub;
public void uploadEvents(List<Event> events) {
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<UploadResult> responseObserver = new StreamObserver<UploadResult>() {
@Override
public void onNext(UploadResult result) {
log.info("Uploaded {} events, {} bytes",
result.getEventCount(), result.getTotalBytes());
}
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable t) {
log.error("Failed", t);
latch.countDown();
}
};
StreamObserver<Event> requestObserver = asyncStub.uploadEvents(responseObserver);
try {
for (Event event : events) {
requestObserver.onNext(event); // 메시지 보내기
}
requestObserver.onCompleted(); // 끝났음 알림
} catch (Exception e) {
requestObserver.onError(e);
}
latch.await(30, TimeUnit.SECONDS);
}
흐름:
- 응답 observer 정의
asyncStub.uploadEvents(responseObserver)→ 요청 observer 받음- 요청 observer로 메시지 N개 →
onCompleted - 서버 응답 받음
여기서 정말 중요한 시험 함정 — 클라이언트도 onCompleted 호출 필수. 안 하면 서버가 영원히 더 받기 대기.
클라이언트 — BlockingStub은 X
여기서 시험 함정이 하나 있어요. Client Streaming은 BlockingStub 미지원. 응답이 모든 입력 후에 오므로 동기 호출 불가. AsyncStub만.
Reactive Mono 통합
@Autowired
private ReactorAnalyticsServiceGrpc.ReactorAnalyticsServiceStub reactorStub;
public Mono<UploadResult> upload(Flux<Event> events) {
return reactorStub.uploadEvents(events);
}
// 사용
Flux<Event> events = Flux.fromIterable(eventList);
upload(events)
.subscribe(result -> log.info("Done: {}", result));
WebFlux 친화. 자연스러운 흐름.
서버 — Reactive Mono
@GrpcService
public class AnalyticsServiceImpl extends ReactorAnalyticsServiceGrpc.AnalyticsServiceImplBase {
@Override
public Mono<UploadResult> uploadEvents(Flux<Event> request) {
return request
.doOnNext(eventService::persist)
.count()
.map(count -> UploadResult.newBuilder()
.setEventCount(count.intValue())
.build());
}
}
Flux로 받아 처리·집계 → Mono 응답.
사용 사례
1. 대량 업로드
rpc UploadEvents(stream Event) returns (UploadResult);
수백·수천 이벤트 한 번에. 개별 호출보다 효율.
2. 청크 파일 업로드
message FileChunk {
bytes data = 1;
string filename = 2;
}
rpc UploadFile(stream FileChunk) returns (UploadStatus);
큰 파일을 작은 청크로 나눠 업로드.
public Mono<UploadStatus> uploadFile(File file) {
return Flux.using(
() -> new FileInputStream(file),
is -> Flux.generate(sink -> {
byte[] buffer = new byte[64 * 1024];
try {
int read = is.read(buffer);
if (read == -1) {
sink.complete();
} else {
sink.next(FileChunk.newBuilder()
.setData(ByteString.copyFrom(buffer, 0, read))
.build());
}
} catch (IOException e) {
sink.error(e);
}
}),
is -> { try { is.close(); } catch (IOException e) {} }
)
.as(reactorStub::uploadFile);
}
3. 메트릭 집계
rpc CollectMetrics(stream Metric) returns (Summary);
여러 노드에서 메트릭 수집 → 서버가 집계.
4. 통계 계산
rpc CalculateStatistics(stream DataPoint) returns (Statistics);
평균·중앙값·표준편차 등.
부분 실패 — 일부 메시지 실패해도 계속
@Override
public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> observer) {
AtomicInteger success = new AtomicInteger();
AtomicInteger failed = new AtomicInteger();
return new StreamObserver<Event>() {
@Override
public void onNext(Event event) {
try {
eventService.persist(event);
success.incrementAndGet();
} catch (Exception e) {
failed.incrementAndGet();
// 에러 시 onError 안 부르고 계속
}
}
@Override
public void onCompleted() {
UploadResult result = UploadResult.newBuilder()
.setSuccess(success.get())
.setFailed(failed.get())
.build();
observer.onNext(result);
observer.onCompleted();
}
@Override
public void onError(Throwable t) {}
};
}
여기서 시험 함정이 하나 있어요. 개별 메시지 실패 ≠ onError. onError 부르면 전체 스트림 종료. 부분 실패는 카운트만, 응답에서 보고.
백프레셔 (제한적)
@Override
public StreamObserver<Event> uploadEvents(StreamObserver<UploadResult> observer) {
ServerCallStreamObserver<UploadResult> serverObserver = (ServerCallStreamObserver) observer;
serverObserver.setOnReadyHandler(() -> {
// 응답 보낼 준비 됐을 때
});
return new StreamObserver<Event>() {
// 처리
};
}
서버가 받기 너무 빨라 처리 못 따라가면? 명시적 백프레셔 X. 처리 큐 사용 또는 Reactive로.
타임아웃·deadline
StreamObserver<Event> requestObserver = asyncStub
.withDeadlineAfter(5, TimeUnit.MINUTES) // 5분 안 모두 보내야
.uploadEvents(responseObserver);
전체 스트림 시간. 길게 설정 가능.
큰 파일 vs S3 직접
gRPC Client Streaming = 작은~중간 (수MB ~ 수십MB)
S3 Direct Upload = 큰 파일 (수백MB+)
여기서 정말 중요한 시험 함정 — gRPC Streaming은 큰 파일 부적합. 메모리 사용·타임아웃 문제. 큰 파일 = S3 등 객체 스토리지 직접.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 5편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Client Streaming = N 메시지 → 1 응답
- Proto —
rpc Method (stream Req) returns (Res) - 서버 — 반환 타입이
StreamObserver<Req> - responseObserver는 응답용 (Unary와 비슷)
- 클라이언트 —
asyncStub.method(responseObserver)→ 요청 observer - 클라이언트가
onNext여러 번 →onCompleted필수 onCompleted안 하면 서버 영원 대기- BlockingStub 미지원 — AsyncStub만
- Reactive Mono (
reactor-grpc) —Flux<Req>→Mono<Res> - WebFlux 친화
- 사용처 — 대량 업로드 / 청크 파일 / 메트릭 집계 / 통계
- 청크 업로드 — 64KB 등 단위로 나눔
- 부분 실패 ≠ onError (전체 종료) — 카운트만, 응답에 보고
- 백프레셔 부분 —
ServerCallStreamObserver.setOnReadyHandler - Deadline = 전체 스트림 시간
- 큰 파일 부적합 (수백MB+) — S3 직접 권장
- 작은~중간 파일·이벤트 묶음에만
시리즈 다른 편
- 1편 — 기본 개념·HTTP/2·4 RPC 모드
- 2편 — Protocol Buffers
- 3편 — Unary RPC
- 4편 — Server Streaming
- 5편 — Client Streaming (현재 글)
- 6편 — Bidirectional Streaming
- 7편 — Interceptors
- 8편 — Error Handling
- 9편 — Security
- 10편 — 고급 (Reflection·Health·LB·gRPC-Web)
공식 문서: gRPC Client Streaming 에서 더 깊이.
다음 글(6편)에서는 Bidirectional Streaming — 양방향 N:N 통신, 채팅·실시간 게임·트레이딩 시스템까지 풀어 갑니다.