Spring RSocket 마스터 노트 시리즈 4편. RSocketRequester 빌더로 클라이언트 만들기, route().data().retrieve* 메서드 패턴, 연결 관리와 자동 재연결, 재시도·타임아웃·서킷 브레이커, RSocketRequester.Builder의 ConnectMapping 핸들러, 연결 상태 모니터링, Spring Boot 자동 구성까지.
이 글은 Spring RSocket 마스터 노트 시리즈의 네 번째 편입니다. 3편(서버)에서 서버를 만들었다면, 이번엔 클라이언트 — RSocketRequester.
WebClient·RestTemplate과 비슷한 친화. 다만 4 모델 호출·연결 관리·양방향 처리가 RSocket 고유. 클라이언트도 라우트 노출하면 양방향 RPC.
처음 클라이언트가 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, retrieveMono·retrieveFlux·send 메서드가 헷갈립니다. 어느 게 어디? 둘째, 연결이 자동인지 수동인지 막연합니다.
해결법은 한 가지예요. 메서드 = 모델 매핑: retrieveMono (RR) / retrieveFlux (Stream/Channel) / send (FNF). 연결은 자동이지만 명시적 종료 가능. 이 매핑만 잡으면 끝.
RSocketRequester — 빌더
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.setupRoute("auth")
.setupData(authToken)
.setupMetadata(traceId, MimeTypeUtils.TEXT_PLAIN)
.transport(TcpClientTransport.create("server-host", 7000));
// WebSocket
.transport(WebsocketClientTransport.create(URI.create("ws://server/rsocket")))
Spring Boot 자동 구성
@Configuration
public class RSocketConfig {
@Bean
public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder
.setupRoute("connection.init")
.setupData(authToken)
.tcp("localhost", 7000);
}
}
RSocketRequester.Builder는 자동 주입.
4 모델별 메서드
// 1. Request-Response
Mono<Result> result = requester.route("user.{id}", "123")
.data(request)
.retrieveMono(Result.class);
// 2. Fire-and-Forget
requester.route("event.log")
.data(event)
.send()
.subscribe();
// 3. Request-Stream
Flux<StockPrice> prices = requester.route("stocks.watch")
.data("AAPL")
.retrieveFlux(StockPrice.class);
// 4. Channel
Flux<TradeOut> output = requester.route("trade.session")
.data(inputFlux)
.retrieveFlux(TradeOut.class);
| 메서드 | 모델 |
|---|---|
retrieveMono(T) |
Request-Response |
retrieveFlux(T) |
Request-Stream / Channel |
send() |
Fire-and-Forget |
여기서 정말 중요한 시험 함정 — retrieveMono vs send. RR은 응답 받음 (Mono<T>), FNF는 응답 X (Mono<Void>). 잘못 사용 시 의미 다름.
라우트 — 패턴 변수
requester.route("user.{id}.posts", "123")
.data(req)
.retrieveMono(Posts.class);
// 다중 변수
requester.route("region.{r}.dept.{d}.users", "us", "engineering")
.retrieveFlux(User.class);
서버 측 @DestinationVariable과 매칭.
메타데이터 추가
requester.route("user.create")
.metadata("trace-123", MimeTypeUtils.TEXT_PLAIN)
.data(user)
.retrieveMono(User.class);
// 다중 메타데이터
requester.route("user.create")
.metadata(Tuple.of("trace-id", "trace-123"))
.metadata(Tuple.of("auth-token", "Bearer abc"))
.data(user)
.retrieveMono(User.class);
5편에서 자세히.
양방향 클라이언트 — @MessageMapping 노출
클라이언트도 라우트 노출:
@Controller
public class ClientHandler {
@MessageMapping("notification")
public void onNotification(String message) {
log.info("Got: {}", message);
}
}
RSocketRequester.Builder builder = RSocketRequester.builder()
.rsocketConnector(connector ->
connector.acceptor(messageHandler.responder()));
서버가 클라이언트로 호출 가능.
연결 관리
자동 연결
RSocketRequester requester = RSocketRequester.builder()
.tcp("localhost", 7000);
// 첫 호출 시 자동 연결
requester.route("ping").retrieveMono(String.class).subscribe();
명시적 종료
requester.dispose();
또는:
requester.rsocketClient().dispose();
재연결 — 자동
기본은 첫 호출 시 연결. 끊어지면 재호출 시 재연결.
RSocketRequester.builder()
.rsocketConnector(connector -> connector
.reconnect(Retry.fixedDelay(5, Duration.ofSeconds(1))))
.tcp("localhost", 7000);
5회까지 1초 간격 재시도.
여기서 시험 함정이 하나 있어요. 재연결은 같은 RSocketRequester 인스턴스로. 매번 새로 만들면 캐시·풀 효과 X.
재시도 패턴
단일 호출 재시도
requester.route("user.{id}", id)
.retrieveMono(User.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(t -> t instanceof TransientException))
.subscribe();
글로벌 재시도
RSocketStrategies의 ResponderHandler에 적용. 코드 일관성.
타임아웃
requester.route("slow.endpoint")
.data(req)
.retrieveMono(Result.class)
.timeout(Duration.ofSeconds(5))
.subscribe();
여기서 시험 함정이 하나 있어요. RSocket 자체엔 타임아웃 없음. Reactor의 .timeout() 사용. 또는 KEEPALIVE 프레임 lifetime 설정.
Resumability — 연결 복구
RSocketRequester.builder()
.rsocketConnector(connector -> connector
.resume(new Resume()
.sessionDuration(Duration.ofMinutes(10))
.retry(Retry.fixedDelay(5, Duration.ofSeconds(2)))))
.tcp("localhost", 7000);
서버 측도 resume 활성화 필요.
흐름:
1. 네트워크 끊김
2. 클라이언트가 resume token으로 재연결 시도
3. 서버가 끊어진 시점부터 재전송
RSocketRequester.Builder 옵션
RSocketRequester.builder()
.setupRoute("auth") // SETUP 라우트
.setupData(authPayload) // SETUP 데이터
.setupMetadata(...)
.rsocketStrategies(strategies) // 인코더·디코더
.rsocketConnector(connector -> connector
.keepAlive(Duration.ofSeconds(20), // 핑 간격
Duration.ofSeconds(90)) // 만료
.acceptor(serverResponder) // 양방향 핸들러
.reconnect(Retry...) // 재연결
.resume(...) // Resumability
)
.tcp("host", port);
연결 상태 모니터링
RSocketClient client = requester.rsocketClient();
client.source()
.doOnNext(rsocket -> {
rsocket.onClose()
.doOnTerminate(() -> log.warn("Connection closed"))
.subscribe();
})
.subscribe();
멀티 서버 — 로드 밸런싱
List<TransportFactory> targets = List.of(
() -> TcpClientTransport.create("server-1", 7000),
() -> TcpClientTransport.create("server-2", 7000),
() -> TcpClientTransport.create("server-3", 7000)
);
LoadbalanceTarget targets = LoadbalanceTarget.from(...);
자세한 건 7편(로드 밸런싱).
클라이언트 풀
@Service
public class UserClient {
private final RSocketRequester requester;
public UserClient(RSocketRequester.Builder builder) {
this.requester = builder.tcp("user-service", 7000);
}
public Mono<User> getUser(String id) {
return requester.route("user.{id}", id)
.retrieveMono(User.class);
}
public Flux<Order> getOrders(String userId) {
return requester.route("orders.user.{id}", userId)
.retrieveFlux(Order.class);
}
}
여기서 정말 중요한 시험 함정 — RSocketRequester는 Singleton 사용. Spring Bean으로 한 번 만들어 재사용. 매 호출 새로 만들면 연결 비용 폭발.
에러 처리
requester.route("user.{id}", id)
.retrieveMono(User.class)
.doOnError(ApplicationErrorException.class, e -> {
log.warn("Server error: {}", e.getMessage());
})
.doOnError(ConnectionErrorException.class, e -> {
log.error("Connection lost", e);
})
.onErrorResume(e -> Mono.empty())
.subscribe();
| 예외 | 의미 |
|---|---|
ApplicationErrorException |
서버 비즈니스 에러 |
ConnectionErrorException |
연결 문제 |
RejectedException |
서버가 거부 |
CanceledException |
취소됨 |
메타데이터 라우팅
여기서 시험 함정이 하나 있어요. 라우트는 메타데이터의 일부. 별도 헤더 X. route() 메서드가 자동으로 routing 메타데이터로 인코딩.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 4편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- RSocketRequester = 클라이언트 진입점
- Spring Boot 자동 —
RSocketRequester.Builder주입 - Transport —
tcp("host", port)/websocket(URI) - 4 모델 메서드 —
retrieveMono(RR) /retrieveFlux(Stream·Channel) /send(FNF) - 라우트 패턴 변수 —
route("user.{id}", id) - 메타데이터 —
metadata(value, mimeType) - 양방향 — 클라이언트도
@MessageMapping노출 +connector.acceptor() - 자동 연결 (첫 호출 시)
- 명시적 종료 —
requester.dispose() - 재연결 —
connector.reconnect(Retry...) - 같은 인스턴스로 재시도 (새로 만들기 X)
- 재시도 — Reactor
.retryWhen(Retry.backoff) - 타임아웃 —
.timeout(Duration)(RSocket 자체엔 없음) - Resumability —
connector.resume(...) - 끊어진 시점부터 이어서 (모바일·IoT)
- KeepAlive — 핑 간격 + 만료
- 연결 상태 —
rsocketClient().source().rsocket().onClose() - RSocketRequester는 Singleton (Bean으로 재사용)
- 매 호출 새로 만들기 X
- 예외 —
ApplicationErrorException(비즈니스) /ConnectionErrorException(연결) /RejectedException/CanceledException - 라우트 = 메타데이터의 일부 (자동 인코딩)
시리즈 다른 편
- 1편 — 기본 개념·프레임
- 2편 — 4 Interaction Models
- 3편 — Spring RSocket 서버
- 4편 — Spring RSocket 클라이언트 (현재 글)
- 5편 — 메타데이터·Composite Metadata
- 6편 — 보안·Spring Security RSocket·TLS
- 7편 — 로드 밸런싱·확장
- 8편 — 테스트
- 9편 — RSocket vs gRPC vs WebSocket
공식 문서: Spring RSocket Client 에서 더 깊이.
다음 글(5편)에서는 메타데이터·라우팅 — Composite Metadata, MIME Type, 인증·tracing·라우팅의 메타데이터 활용까지 풀어 갑니다.