Spring RSocket 마스터 — 클라이언트·RSocketRequester

2026-05-03확률과 통계 마스터 노트

Spring RSocket 마스터 노트 시리즈 4편. RSocketRequester 빌더로 클라이언트 만들기, route().data().retrieve* 메서드 패턴, 연결 관리와 자동 재연결, 재시도·타임아웃·서킷 브레이커, RSocketRequester.Builder의 ConnectMapping 핸들러, 연결 상태 모니터링, Spring Boot 자동 구성까지.

이 글은 Spring RSocket 마스터 노트 시리즈의 네 번째 편입니다. 3편(서버)에서 서버를 만들었다면, 이번엔 클라이언트RSocketRequester.

WebClient·RestTemplate과 비슷한 친화. 다만 4 모델 호출·연결 관리·양방향 처리가 RSocket 고유. 클라이언트도 라우트 노출하면 양방향 RPC.

처음 클라이언트가 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, retrieveMono·retrieveFlux·send 메서드가 헷갈립니다. 어느 게 어디? 둘째, 연결이 자동인지 수동인지 막연합니다.

해결법은 한 가지예요. 메서드 = 모델 매핑: retrieveMono (RR) / retrieveFlux (Stream/Channel) / send (FNF). 연결은 자동이지만 명시적 종료 가능. 이 매핑만 잡으면 끝.

RSocketRequester — 빌더

RSocketRequester requester = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .setupRoute("auth")
    .setupData(authToken)
    .setupMetadata(traceId, MimeTypeUtils.TEXT_PLAIN)
    .transport(TcpClientTransport.create("server-host", 7000));
// WebSocket
.transport(WebsocketClientTransport.create(URI.create("ws://server/rsocket")))

Spring Boot 자동 구성

@Configuration
public class RSocketConfig {

    @Bean
    public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
        return builder
            .setupRoute("connection.init")
            .setupData(authToken)
            .tcp("localhost", 7000);
    }
}

RSocketRequester.Builder는 자동 주입.

4 모델별 메서드

// 1. Request-Response
Mono<Result> result = requester.route("user.{id}", "123")
    .data(request)
    .retrieveMono(Result.class);

// 2. Fire-and-Forget
requester.route("event.log")
    .data(event)
    .send()
    .subscribe();

// 3. Request-Stream
Flux<StockPrice> prices = requester.route("stocks.watch")
    .data("AAPL")
    .retrieveFlux(StockPrice.class);

// 4. Channel
Flux<TradeOut> output = requester.route("trade.session")
    .data(inputFlux)
    .retrieveFlux(TradeOut.class);
메서드 모델
retrieveMono(T) Request-Response
retrieveFlux(T) Request-Stream / Channel
send() Fire-and-Forget

여기서 정말 중요한 시험 함정 — retrieveMono vs send. RR은 응답 받음 (Mono<T>), FNF는 응답 X (Mono<Void>). 잘못 사용 시 의미 다름.

라우트 — 패턴 변수

requester.route("user.{id}.posts", "123")
    .data(req)
    .retrieveMono(Posts.class);

// 다중 변수
requester.route("region.{r}.dept.{d}.users", "us", "engineering")
    .retrieveFlux(User.class);

서버 측 @DestinationVariable과 매칭.

메타데이터 추가

requester.route("user.create")
    .metadata("trace-123", MimeTypeUtils.TEXT_PLAIN)
    .data(user)
    .retrieveMono(User.class);
// 다중 메타데이터
requester.route("user.create")
    .metadata(Tuple.of("trace-id", "trace-123"))
    .metadata(Tuple.of("auth-token", "Bearer abc"))
    .data(user)
    .retrieveMono(User.class);

5편에서 자세히.

양방향 클라이언트 — @MessageMapping 노출

클라이언트도 라우트 노출:

@Controller
public class ClientHandler {

    @MessageMapping("notification")
    public void onNotification(String message) {
        log.info("Got: {}", message);
    }
}
RSocketRequester.Builder builder = RSocketRequester.builder()
    .rsocketConnector(connector ->
        connector.acceptor(messageHandler.responder()));

서버가 클라이언트로 호출 가능.

연결 관리

자동 연결

RSocketRequester requester = RSocketRequester.builder()
    .tcp("localhost", 7000);

// 첫 호출 시 자동 연결
requester.route("ping").retrieveMono(String.class).subscribe();

명시적 종료

requester.dispose();

또는:

requester.rsocketClient().dispose();

재연결 — 자동

기본은 첫 호출 시 연결. 끊어지면 재호출 시 재연결.

RSocketRequester.builder()
    .rsocketConnector(connector -> connector
        .reconnect(Retry.fixedDelay(5, Duration.ofSeconds(1))))
    .tcp("localhost", 7000);

5회까지 1초 간격 재시도.

여기서 시험 함정이 하나 있어요. 재연결은 같은 RSocketRequester 인스턴스로. 매번 새로 만들면 캐시·풀 효과 X.

재시도 패턴

단일 호출 재시도

requester.route("user.{id}", id)
    .retrieveMono(User.class)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .filter(t -> t instanceof TransientException))
    .subscribe();

글로벌 재시도

RSocketStrategies의 ResponderHandler에 적용. 코드 일관성.

타임아웃

requester.route("slow.endpoint")
    .data(req)
    .retrieveMono(Result.class)
    .timeout(Duration.ofSeconds(5))
    .subscribe();

여기서 시험 함정이 하나 있어요. RSocket 자체엔 타임아웃 없음. Reactor의 .timeout() 사용. 또는 KEEPALIVE 프레임 lifetime 설정.

Resumability — 연결 복구

RSocketRequester.builder()
    .rsocketConnector(connector -> connector
        .resume(new Resume()
            .sessionDuration(Duration.ofMinutes(10))
            .retry(Retry.fixedDelay(5, Duration.ofSeconds(2)))))
    .tcp("localhost", 7000);

서버 측도 resume 활성화 필요.

흐름:

1. 네트워크 끊김
2. 클라이언트가 resume token으로 재연결 시도
3. 서버가 끊어진 시점부터 재전송

RSocketRequester.Builder 옵션

RSocketRequester.builder()
    .setupRoute("auth")                           // SETUP 라우트
    .setupData(authPayload)                       // SETUP 데이터
    .setupMetadata(...)
    .rsocketStrategies(strategies)                // 인코더·디코더
    .rsocketConnector(connector -> connector
        .keepAlive(Duration.ofSeconds(20),         // 핑 간격
                   Duration.ofSeconds(90))          // 만료
        .acceptor(serverResponder)                 // 양방향 핸들러
        .reconnect(Retry...)                       // 재연결
        .resume(...)                               // Resumability
    )
    .tcp("host", port);

연결 상태 모니터링

RSocketClient client = requester.rsocketClient();

client.source()
    .doOnNext(rsocket -> {
        rsocket.onClose()
            .doOnTerminate(() -> log.warn("Connection closed"))
            .subscribe();
    })
    .subscribe();

멀티 서버 — 로드 밸런싱

List<TransportFactory> targets = List.of(
    () -> TcpClientTransport.create("server-1", 7000),
    () -> TcpClientTransport.create("server-2", 7000),
    () -> TcpClientTransport.create("server-3", 7000)
);

LoadbalanceTarget targets = LoadbalanceTarget.from(...);

자세한 건 7편(로드 밸런싱).

클라이언트 풀

@Service
public class UserClient {

    private final RSocketRequester requester;

    public UserClient(RSocketRequester.Builder builder) {
        this.requester = builder.tcp("user-service", 7000);
    }

    public Mono<User> getUser(String id) {
        return requester.route("user.{id}", id)
            .retrieveMono(User.class);
    }

    public Flux<Order> getOrders(String userId) {
        return requester.route("orders.user.{id}", userId)
            .retrieveFlux(Order.class);
    }
}

여기서 정말 중요한 시험 함정 — RSocketRequester는 Singleton 사용. Spring Bean으로 한 번 만들어 재사용. 매 호출 새로 만들면 연결 비용 폭발.

에러 처리

requester.route("user.{id}", id)
    .retrieveMono(User.class)
    .doOnError(ApplicationErrorException.class, e -> {
        log.warn("Server error: {}", e.getMessage());
    })
    .doOnError(ConnectionErrorException.class, e -> {
        log.error("Connection lost", e);
    })
    .onErrorResume(e -> Mono.empty())
    .subscribe();
예외 의미
ApplicationErrorException 서버 비즈니스 에러
ConnectionErrorException 연결 문제
RejectedException 서버가 거부
CanceledException 취소됨

메타데이터 라우팅

여기서 시험 함정이 하나 있어요. 라우트는 메타데이터의 일부. 별도 헤더 X. route() 메서드가 자동으로 routing 메타데이터로 인코딩.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 4편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • RSocketRequester = 클라이언트 진입점
  • Spring Boot 자동 — RSocketRequester.Builder 주입
  • Transport — tcp("host", port) / websocket(URI)
  • 4 모델 메서드 — retrieveMono (RR) / retrieveFlux (Stream·Channel) / send (FNF)
  • 라우트 패턴 변수 — route("user.{id}", id)
  • 메타데이터metadata(value, mimeType)
  • 양방향 — 클라이언트도 @MessageMapping 노출 + connector.acceptor()
  • 자동 연결 (첫 호출 시)
  • 명시적 종료 — requester.dispose()
  • 재연결connector.reconnect(Retry...)
  • 같은 인스턴스로 재시도 (새로 만들기 X)
  • 재시도 — Reactor .retryWhen(Retry.backoff)
  • 타임아웃.timeout(Duration) (RSocket 자체엔 없음)
  • Resumabilityconnector.resume(...)
  • 끊어진 시점부터 이어서 (모바일·IoT)
  • KeepAlive — 핑 간격 + 만료
  • 연결 상태 — rsocketClient().source().rsocket().onClose()
  • RSocketRequester는 Singleton (Bean으로 재사용)
  • 매 호출 새로 만들기 X
  • 예외 — ApplicationErrorException (비즈니스) / ConnectionErrorException (연결) / RejectedException / CanceledException
  • 라우트 = 메타데이터의 일부 (자동 인코딩)

시리즈 다른 편

공식 문서: Spring RSocket Client 에서 더 깊이.

다음 글(5편)에서는 메타데이터·라우팅 — Composite Metadata, MIME Type, 인증·tracing·라우팅의 메타데이터 활용까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!