gRPC + Spring Boot 마스터 노트 시리즈 6편. Bidirectional Streaming이 N:N 양방향 통신으로 채팅·실시간 게임·트레이딩에 적합한 이유, 클라이언트·서버 모두 StreamObserver 다중 호출, full-duplex 동작 원리, Reactive Flux:Flux 통합, WebSocket과의 결정적 차이까지.
이 글은 gRPC + Spring Boot 마스터 노트 시리즈의 여섯 번째 편입니다. 5편(Client Streaming)이 N:1이었다면, 이번엔 N:N — Bidirectional Streaming.
가장 강력한 모드. 채팅·실시간 게임·트레이딩 시스템. 클라이언트와 서버가 동시에 메시지 흐름을 주고받음. WebSocket의 강력한 대체.
처음 Bidirectional이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, 양쪽 모두 StreamObserver라 코드가 복잡해 보입니다. 둘째, 언제 종료되는지 막연합니다.
해결법은 한 가지예요. "두 스트림이 독립". 클라이언트 스트림과 서버 스트림이 별도. 한쪽이 끝나도 다른쪽은 계속 가능. Full-duplex.
Proto 정의
service ChatService {
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
service GameService {
rpc PlayGame (stream PlayerAction) returns (stream GameUpdate);
}
service TradingService {
rpc Trade (stream TradeRequest) returns (stream TradeUpdate);
}
요청·응답 모두 stream.
서버 구현
@GrpcService
public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {
private final Map<String, StreamObserver<ChatMessage>> activeUsers = new ConcurrentHashMap<>();
@Override
public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
String[] userId = {null};
return new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage message) {
if (userId[0] == null) {
userId[0] = message.getUserId();
activeUsers.put(userId[0], responseObserver);
}
// 다른 모든 사용자에게 broadcast
activeUsers.forEach((id, observer) -> {
if (!id.equals(userId[0])) {
observer.onNext(message);
}
});
}
@Override
public void onCompleted() {
activeUsers.remove(userId[0]);
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
activeUsers.remove(userId[0]);
log.error("Chat error", t);
}
};
}
}
여기서 정말 중요한 시험 함정 — 양방향 = 두 StreamObserver 동시. 요청 받기 + 응답 보내기 둘 다 비동기. 동기화 주의.
클라이언트 구현
@GrpcClient("chat-service")
private ChatServiceGrpc.ChatServiceStub asyncStub;
public void chat() {
StreamObserver<ChatMessage> responseObserver = new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage message) {
// 다른 사용자 메시지 수신
log.info("[{}] {}", message.getUserId(), message.getContent());
}
@Override
public void onCompleted() {
log.info("Chat ended");
}
@Override
public void onError(Throwable t) {
log.error("Chat error", t);
}
};
StreamObserver<ChatMessage> requestObserver = asyncStub.chat(responseObserver);
// 사용자 입력 받아 전송
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (line.equals("quit")) {
requestObserver.onCompleted();
break;
}
ChatMessage msg = ChatMessage.newBuilder()
.setUserId("alice")
.setContent(line)
.build();
requestObserver.onNext(msg);
}
}
asyncStub.chat(responseObserver) → 요청 observer 받음. 양쪽 모두 active.
Reactive Flux:Flux
@GrpcService
public class ChatServiceImpl extends ReactorChatServiceGrpc.ChatServiceImplBase {
private final Sinks.Many<ChatMessage> broadcastSink =
Sinks.many().multicast().onBackpressureBuffer();
@Override
public Flux<ChatMessage> chat(Flux<ChatMessage> incoming) {
return incoming
.doOnNext(msg -> broadcastSink.tryEmitNext(msg))
.switchMap(msg -> broadcastSink.asFlux().filter(m -> !m.equals(msg)));
}
}
// 클라이언트
public Flux<ChatMessage> chat(Flux<ChatMessage> userInput) {
return reactorStub.chat(userInput);
}
훨씬 깔끔. WebFlux 친화.
사용 사례
1. 채팅
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
여러 사용자 동시 채팅. 서버가 broadcast.
2. 실시간 게임
rpc PlayGame(stream PlayerAction) returns (stream GameUpdate);
플레이어 액션 → 서버 → 모든 플레이어 게임 상태.
3. 트레이딩 시스템
rpc Trade(stream TradeRequest) returns (stream TradeUpdate);
주문 흐름 + 시세·체결 알림 양방향.
4. 협업 편집
rpc Edit(stream EditOp) returns (stream DocumentUpdate);
Google Docs 같은 실시간 협업.
5. IoT 양방향 텔레메트리
rpc Telemetry(stream Telemetry) returns (stream Command);
디바이스 → 서버 (센서 데이터), 서버 → 디바이스 (제어 명령).
채팅 룸 — 다중 사용자
@Component
public class ChatRoom {
private final Map<String, Sinks.Many<ChatMessage>> userSinks = new ConcurrentHashMap<>();
public Flux<ChatMessage> join(String userId) {
Sinks.Many<ChatMessage> sink = Sinks.many().multicast().onBackpressureBuffer();
userSinks.put(userId, sink);
return sink.asFlux()
.doFinally(s -> userSinks.remove(userId));
}
public void broadcast(ChatMessage msg) {
userSinks.forEach((id, sink) -> {
if (!id.equals(msg.getUserId())) {
sink.tryEmitNext(msg);
}
});
}
}
@GrpcService
public class ChatServiceImpl extends ReactorChatServiceGrpc.ChatServiceImplBase {
@Autowired
private ChatRoom room;
@Override
public Flux<ChatMessage> chat(Flux<ChatMessage> incoming) {
return Flux.<ChatMessage>create(emitter -> {
String[] userId = {null};
incoming
.doOnNext(msg -> {
if (userId[0] == null) {
userId[0] = msg.getUserId();
room.join(userId[0])
.doOnNext(emitter::next)
.subscribe();
}
room.broadcast(msg);
})
.doOnComplete(emitter::complete)
.doOnError(emitter::error)
.subscribe();
});
}
}
Bidirectional vs WebSocket
| 측면 | gRPC Bidi | WebSocket |
|---|---|---|
| 메시지 의미 | Protobuf | 자유 (raw) |
| 스키마 | 강력 | 없음 |
| 4 모드 | 4 RPC 모드 통합 | 양방향만 |
| 브라우저 | gRPC-Web (제한) | 네이티브 |
| 성능 | 매우 빠름 | 빠름 |
| 코드 자동 생성 | O (모든 언어) | X |
여기서 정말 중요한 시험 함정 — 마이크로서비스 양방향 = gRPC Bidi. 브라우저 양방향 = WebSocket. 서버 ↔ 서버는 gRPC가 강력.
Bidirectional vs RSocket Channel
거의 동일 모델. 차이:
- gRPC = Protobuf 강제, 큰 생태계
- RSocket = Reactive 네이티브, 백프레셔 표준
WebFlux + 백프레셔 = RSocket / 다국어 + 강력 스키마 = gRPC.
종료 시나리오
정상 종료 — 양쪽 모두 onCompleted
Client → onCompleted() → Server
Server → onCompleted() → Client
일방 종료 — 한쪽만 종료
Client → onCompleted() (더 이상 안 보냄)
Server → 계속 응답 가능 (또는 종료)
여기서 시험 함정이 하나 있어요. 양쪽 독립. 클라이언트가 안 보내도 서버는 계속 보낼 수 있음. 서버가 끝낼 시점 결정.
에러 — onError
Client → onError(t)
모든 스트림 종료
또는 서버 측에서 onError → 클라이언트 StatusRuntimeException.
동시성·Thread Safety
여기서 정말 중요한 시험 함정 — StreamObserver는 thread-safe X. 여러 스레드에서 동시 onNext 호출 X. 서버가 broadcast 시 동기화 필요 (ConcurrentHashMap·Sinks 등).
백프레셔 (제한적)
ServerCallStreamObserver<Response> serverObserver = (ServerCallStreamObserver) responseObserver;
serverObserver.setOnReadyHandler(() -> {
while (serverObserver.isReady()) {
serverObserver.onNext(generateMessage());
}
});
isReady() 체크. 클라이언트 처리 못 따라가면 false → 보내지 않음.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Bidirectional = N:N 양방향
- 가장 강력한 모드
- Proto —
rpc Method (stream Req) returns (stream Res) - 서버·클라이언트 양쪽 StreamObserver 다중 호출
- 두 스트림 독립 (한쪽 끝나도 다른쪽 계속)
- Reactive Flux:Flux = WebFlux 친화
- 서버 —
Flux<Req>→Flux<Res> - 사용처 — 채팅 / 실시간 게임 / 트레이딩 / 협업 편집 / IoT 텔레메트리
- 다중 사용자 =
Sinks.Many+ broadcast - gRPC Bidi vs WebSocket — Protobuf·스키마·브라우저 친화도
- 마이크로서비스 양방향 = gRPC / 브라우저 = WebSocket
- gRPC Bidi vs RSocket Channel — 백프레셔·Reactive·생태계
- WebFlux + 백프레셔 = RSocket / 다국어 + 스키마 = gRPC
- 종료 — 양쪽 onCompleted 또는 일방
- 양쪽 독립 — 한쪽만 끝낼 수 있음
- 에러 — onError → 모든 스트림 종료
StreamObserverthread-safe X → 동기화 필요- ConcurrentHashMap·Sinks 사용
- 백프레셔 부분 —
isReady()+setOnReadyHandler
시리즈 다른 편
- 1편 — 기본 개념·HTTP/2·4 RPC 모드
- 2편 — Protocol Buffers
- 3편 — Unary RPC
- 4편 — Server Streaming
- 5편 — Client Streaming
- 6편 — Bidirectional Streaming (현재 글)
- 7편 — Interceptors
- 8편 — Error Handling
- 9편 — Security
- 10편 — 고급 (Reflection·Health·LB·gRPC-Web)
공식 문서: gRPC Bidirectional Streaming 에서 더 깊이.
다음 글(7편)에서는 Interceptors — 인증·로깅·메트릭·재시도를 횡단 관심사로, 서버·클라이언트 인터셉터, 체인 구성까지 풀어 갑니다.