gRPC + Spring Boot — Bidirectional Streaming

2026-05-03확률과 통계 마스터 노트

gRPC + Spring Boot 마스터 노트 시리즈 6편. Bidirectional Streaming이 N:N 양방향 통신으로 채팅·실시간 게임·트레이딩에 적합한 이유, 클라이언트·서버 모두 StreamObserver 다중 호출, full-duplex 동작 원리, Reactive Flux:Flux 통합, WebSocket과의 결정적 차이까지.

이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 여섯 번째 편입니다. 5편(Client Streaming)이 N:1이었다면, 이번엔 N:N — Bidirectional Streaming.

가장 강력한 모드. 채팅·실시간 게임·트레이딩 시스템. 클라이언트와 서버가 동시에 메시지 흐름을 주고받음. WebSocket의 강력한 대체.

처음 Bidirectional이 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, 양쪽 모두 StreamObserver라 코드가 복잡해 보입니다. 둘째, 언제 종료되는지 막연합니다.

해결법은 한 가지예요. "두 스트림이 독립". 클라이언트 스트림과 서버 스트림이 별도. 한쪽이 끝나도 다른쪽은 계속 가능. Full-duplex.

Proto 정의

service ChatService {
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

service GameService {
  rpc PlayGame (stream PlayerAction) returns (stream GameUpdate);
}

service TradingService {
  rpc Trade (stream TradeRequest) returns (stream TradeUpdate);
}

요청·응답 모두 stream.

서버 구현

@GrpcService
public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {

    private final Map<String, StreamObserver<ChatMessage>> activeUsers = new ConcurrentHashMap<>();

    @Override
    public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
        String[] userId = {null};
        
        return new StreamObserver<ChatMessage>() {
            @Override
            public void onNext(ChatMessage message) {
                if (userId[0] == null) {
                    userId[0] = message.getUserId();
                    activeUsers.put(userId[0], responseObserver);
                }
                
                // 다른 모든 사용자에게 broadcast
                activeUsers.forEach((id, observer) -> {
                    if (!id.equals(userId[0])) {
                        observer.onNext(message);
                    }
                });
            }
            
            @Override
            public void onCompleted() {
                activeUsers.remove(userId[0]);
                responseObserver.onCompleted();
            }
            
            @Override
            public void onError(Throwable t) {
                activeUsers.remove(userId[0]);
                log.error("Chat error", t);
            }
        };
    }
}

여기서 정말 중요한 시험 함정 — 양방향 = 두 StreamObserver 동시. 요청 받기 + 응답 보내기 둘 다 비동기. 동기화 주의.

클라이언트 구현

@GrpcClient("chat-service")
private ChatServiceGrpc.ChatServiceStub asyncStub;

public void chat() {
    StreamObserver<ChatMessage> responseObserver = new StreamObserver<ChatMessage>() {
        @Override
        public void onNext(ChatMessage message) {
            // 다른 사용자 메시지 수신
            log.info("[{}] {}", message.getUserId(), message.getContent());
        }
        
        @Override
        public void onCompleted() {
            log.info("Chat ended");
        }
        
        @Override
        public void onError(Throwable t) {
            log.error("Chat error", t);
        }
    };
    
    StreamObserver<ChatMessage> requestObserver = asyncStub.chat(responseObserver);
    
    // 사용자 입력 받아 전송
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {
        String line = scanner.nextLine();
        if (line.equals("quit")) {
            requestObserver.onCompleted();
            break;
        }
        
        ChatMessage msg = ChatMessage.newBuilder()
            .setUserId("alice")
            .setContent(line)
            .build();
        requestObserver.onNext(msg);
    }
}

asyncStub.chat(responseObserver) → 요청 observer 받음. 양쪽 모두 active.

Reactive Flux:Flux

@GrpcService
public class ChatServiceImpl extends ReactorChatServiceGrpc.ChatServiceImplBase {

    private final Sinks.Many<ChatMessage> broadcastSink = 
        Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Flux<ChatMessage> chat(Flux<ChatMessage> incoming) {
        return incoming
            .doOnNext(msg -> broadcastSink.tryEmitNext(msg))
            .switchMap(msg -> broadcastSink.asFlux().filter(m -> !m.equals(msg)));
    }
}
// 클라이언트
public Flux<ChatMessage> chat(Flux<ChatMessage> userInput) {
    return reactorStub.chat(userInput);
}

훨씬 깔끔. WebFlux 친화.

사용 사례

1. 채팅

rpc Chat(stream ChatMessage) returns (stream ChatMessage);

여러 사용자 동시 채팅. 서버가 broadcast.

2. 실시간 게임

rpc PlayGame(stream PlayerAction) returns (stream GameUpdate);

플레이어 액션 → 서버 → 모든 플레이어 게임 상태.

3. 트레이딩 시스템

rpc Trade(stream TradeRequest) returns (stream TradeUpdate);

주문 흐름 + 시세·체결 알림 양방향.

4. 협업 편집

rpc Edit(stream EditOp) returns (stream DocumentUpdate);

Google Docs 같은 실시간 협업.

5. IoT 양방향 텔레메트리

rpc Telemetry(stream Telemetry) returns (stream Command);

디바이스 → 서버 (센서 데이터), 서버 → 디바이스 (제어 명령).

채팅 룸 — 다중 사용자

@Component
public class ChatRoom {
    private final Map<String, Sinks.Many<ChatMessage>> userSinks = new ConcurrentHashMap<>();
    
    public Flux<ChatMessage> join(String userId) {
        Sinks.Many<ChatMessage> sink = Sinks.many().multicast().onBackpressureBuffer();
        userSinks.put(userId, sink);
        return sink.asFlux()
            .doFinally(s -> userSinks.remove(userId));
    }
    
    public void broadcast(ChatMessage msg) {
        userSinks.forEach((id, sink) -> {
            if (!id.equals(msg.getUserId())) {
                sink.tryEmitNext(msg);
            }
        });
    }
}

@GrpcService
public class ChatServiceImpl extends ReactorChatServiceGrpc.ChatServiceImplBase {
    
    @Autowired
    private ChatRoom room;
    
    @Override
    public Flux<ChatMessage> chat(Flux<ChatMessage> incoming) {
        return Flux.<ChatMessage>create(emitter -> {
            String[] userId = {null};
            
            incoming
                .doOnNext(msg -> {
                    if (userId[0] == null) {
                        userId[0] = msg.getUserId();
                        room.join(userId[0])
                            .doOnNext(emitter::next)
                            .subscribe();
                    }
                    room.broadcast(msg);
                })
                .doOnComplete(emitter::complete)
                .doOnError(emitter::error)
                .subscribe();
        });
    }
}

Bidirectional vs WebSocket

측면 gRPC Bidi WebSocket
메시지 의미 Protobuf 자유 (raw)
스키마 강력 없음
4 모드 4 RPC 모드 통합 양방향만
브라우저 gRPC-Web (제한) 네이티브
성능 매우 빠름 빠름
코드 자동 생성 O (모든 언어) X

여기서 정말 중요한 시험 함정 — 마이크로서비스 양방향 = gRPC Bidi. 브라우저 양방향 = WebSocket. 서버 ↔ 서버는 gRPC가 강력.

Bidirectional vs RSocket Channel

거의 동일 모델. 차이:

  • gRPC = Protobuf 강제, 큰 생태계
  • RSocket = Reactive 네이티브, 백프레셔 표준

WebFlux + 백프레셔 = RSocket / 다국어 + 강력 스키마 = gRPC.

종료 시나리오

정상 종료 — 양쪽 모두 onCompleted

Client → onCompleted() → Server
Server → onCompleted() → Client

일방 종료 — 한쪽만 종료

Client → onCompleted() (더 이상 안 보냄)
Server → 계속 응답 가능 (또는 종료)

여기서 시험 함정이 하나 있어요. 양쪽 독립. 클라이언트가 안 보내도 서버는 계속 보낼 수 있음. 서버가 끝낼 시점 결정.

에러 — onError

Client → onError(t)
모든 스트림 종료

또는 서버 측에서 onError → 클라이언트 StatusRuntimeException.

동시성·Thread Safety

여기서 정말 중요한 시험 함정 — StreamObserver는 thread-safe X. 여러 스레드에서 동시 onNext 호출 X. 서버가 broadcast 시 동기화 필요 (ConcurrentHashMap·Sinks 등).

백프레셔 (제한적)

ServerCallStreamObserver<Response> serverObserver = (ServerCallStreamObserver) responseObserver;
serverObserver.setOnReadyHandler(() -> {
    while (serverObserver.isReady()) {
        serverObserver.onNext(generateMessage());
    }
});

isReady() 체크. 클라이언트 처리 못 따라가면 false → 보내지 않음.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Bidirectional = N:N 양방향
  • 가장 강력한 모드
  • Proto — rpc Method (stream Req) returns (stream Res)
  • 서버·클라이언트 양쪽 StreamObserver 다중 호출
  • 두 스트림 독립 (한쪽 끝나도 다른쪽 계속)
  • Reactive Flux:Flux = WebFlux 친화
  • 서버 — Flux<Req>Flux<Res>
  • 사용처 — 채팅 / 실시간 게임 / 트레이딩 / 협업 편집 / IoT 텔레메트리
  • 다중 사용자 = Sinks.Many + broadcast
  • gRPC Bidi vs WebSocket — Protobuf·스키마·브라우저 친화도
  • 마이크로서비스 양방향 = gRPC / 브라우저 = WebSocket
  • gRPC Bidi vs RSocket Channel — 백프레셔·Reactive·생태계
  • WebFlux + 백프레셔 = RSocket / 다국어 + 스키마 = gRPC
  • 종료 — 양쪽 onCompleted 또는 일방
  • 양쪽 독립 — 한쪽만 끝낼 수 있음
  • 에러 — onError → 모든 스트림 종료
  • StreamObserver thread-safe X → 동기화 필요
  • ConcurrentHashMap·Sinks 사용
  • 백프레셔 부분 — isReady() + setOnReadyHandler

시리즈 다른 편

공식 문서: gRPC Bidirectional Streaming 에서 더 깊이.

다음 글(7편)에서는 Interceptors — 인증·로깅·메트릭·재시도를 횡단 관심사로, 서버·클라이언트 인터셉터, 체인 구성까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!