Spring RSocket 마스터 — 테스트·StepVerifier·통합 테스트

2026-05-03확률과 통계 마스터 노트

Spring RSocket 마스터 노트 시리즈 8편. RSocket 단위 테스트와 통합 테스트의 결정적 차이, StepVerifier로 Mono/Flux 검증, Embedded RSocket Server로 통합 테스트, 4 Interaction Models별 테스트 패턴, Mock RSocketRequester, 양방향 통신 테스트, Testcontainers 활용까지.

이 글은 Spring RSocket 마스터 노트 시리즈의 여덟 번째 편입니다. 1~7편이 구현이었다면, 이번엔 그것을 검증 — 테스트.

Reactive 흐름 + 4 Models + 양방향 + 백프레셔. 일반 JUnit으론 부족. StepVerifier·Embedded Server·Mock 3 도구가 핵심.

처음 RSocket 테스트가 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Reactive 검증이 익숙하지 않습니다. assertEquals로 Mono/Flux를 어떻게? 둘째, 단위 vs 통합 테스트 경계가 막연합니다.

해결법은 한 가지예요. "Mono/Flux = StepVerifier / 통합 = Embedded Server". 컨트롤러 단위 = Mock으로 빠르게, 종단 = Embedded Server로 진짜. 이 둘만 잡으면 끝.

StepVerifier — Reactive 검증

import reactor.test.StepVerifier;

Mono<String> mono = service.process();

StepVerifier.create(mono)
    .expectNext("expected")
    .verifyComplete();
Flux<Integer> flux = Flux.range(1, 5);

StepVerifier.create(flux)
    .expectNext(1, 2, 3, 4, 5)
    .verifyComplete();

에러 검증

StepVerifier.create(monoWithError)
    .expectError(IllegalArgumentException.class)
    .verify();

시간 기반

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofHours(1)))
    .thenAwait(Duration.ofHours(1))
    .expectNextCount(1)
    .verifyComplete();

가상 시간으로 즉시. 실제 1시간 대기 X.

여기서 정말 중요한 시험 함정 — .subscribe() 대신 StepVerifier.create(). subscribe만 하면 비동기 실행되어 테스트 종료 시 검증 X. StepVerifier가 동기 검증.

단위 테스트 — 컨트롤러

의존성 Mock

@ExtendWith(MockitoExtension.class)
class UserControllerTest {

    @Mock
    private UserService service;

    @InjectMocks
    private UserController controller;

    @Test
    void getUser() {
        User user = new User("1", "Alice");
        when(service.findById("1")).thenReturn(Mono.just(user));

        StepVerifier.create(controller.getUser("1"))
            .expectNext(user)
            .verifyComplete();
    }
}

빠름. 외부 의존 X.

여기서 시험 함정이 하나 있어요. 단위 테스트는 라우팅·메타데이터 검증 X. RSocket 흐름 검증은 통합 테스트.

통합 테스트 — Embedded RSocket Server

Spring Boot Test

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = "spring.rsocket.server.port=0")
class UserRSocketIntegrationTest {

    @LocalRSocketServerPort
    private Integer port;

    @Autowired
    private RSocketRequester.Builder builder;

    private RSocketRequester requester;

    @BeforeEach
    void setup() {
        requester = builder.tcp("localhost", port);
    }

    @AfterEach
    void cleanup() {
        requester.dispose();
    }

    @Test
    void getUser() {
        StepVerifier.create(
            requester.route("user.{id}", "1")
                .retrieveMono(User.class)
        )
        .expectNextMatches(u -> u.getName().equals("Alice"))
        .verifyComplete();
    }
}

@LocalRSocketServerPort = 무작위 포트 주입.

4 Interaction Models 테스트

Request-Response

@Test
void requestResponse() {
    StepVerifier.create(
        requester.route("rr.test")
            .data(request)
            .retrieveMono(Result.class)
    )
    .expectNextMatches(r -> r.isOk())
    .verifyComplete();
}

Fire-and-Forget

@Test
void fireAndForget() {
    StepVerifier.create(
        requester.route("event.log")
            .data(event)
            .send()
    )
    .verifyComplete();

    // 부수 효과 검증 (DB·Mock)
    verify(eventService).persist(event);
}

Request-Stream

@Test
void requestStream() {
    StepVerifier.create(
        requester.route("stream.numbers")
            .data(10)
            .retrieveFlux(Integer.class)
            .take(5)
    )
    .expectNext(0, 1, 2, 3, 4)
    .verifyComplete();
}

Channel

@Test
void channel() {
    Flux<String> input = Flux.just("hello", "world", "test");

    StepVerifier.create(
        requester.route("channel.echo")
            .data(input)
            .retrieveFlux(String.class)
    )
    .expectNext("HELLO", "WORLD", "TEST")
    .verifyComplete();
}

Mock RSocketRequester

서버 의존 없이 클라이언트 측 로직 테스트:

@ExtendWith(MockitoExtension.class)
class UserClientTest {

    @Mock
    private RSocketRequester requester;
    @Mock
    private RSocketRequester.RequestSpec requestSpec;
    @Mock
    private RSocketRequester.RetrieveSpec retrieveSpec;

    @Test
    void getUser() {
        when(requester.route("user.{id}", "1")).thenReturn(requestSpec);
        when(requestSpec.retrieveMono(User.class)).thenReturn(Mono.just(new User("1", "Alice")));

        UserClient client = new UserClient(requester);

        StepVerifier.create(client.getUser("1"))
            .expectNextMatches(u -> u.getName().equals("Alice"))
            .verifyComplete();
    }
}

복잡. 큰 흐름은 통합 테스트 권장.

백프레셔 테스트

@Test
void backpressure() {
    AtomicInteger requested = new AtomicInteger();

    StepVerifier.create(
        requester.route("infinite.stream")
            .retrieveFlux(Integer.class)
            .doOnRequest(n -> requested.addAndGet((int) n))
            .take(10),
        1                           // 처음 1개만 request
    )
    .thenRequest(9)                 // 9개 더
    .expectNextCount(10)
    .verifyComplete();

    // 클라이언트가 요청한 수만 서버가 보냄
    assertThat(requested.get()).isLessThanOrEqualTo(10);
}

보안 테스트

@Test
void unauthenticatedAccess() {
    StepVerifier.create(
        requester.route("admin.list")
            .retrieveMono(String.class)
    )
    .expectErrorMatches(e -> e instanceof RejectedSetupException)
    .verify();
}

@Test
void authenticatedAccess() {
    UsernamePasswordMetadata creds = new UsernamePasswordMetadata("alice", "pass");

    StepVerifier.create(
        requester.route("user.list")
            .metadata(creds, BASIC_AUTHENTICATION_MIME_TYPE)
            .retrieveFlux(User.class)
    )
    .expectNextCount(3)
    .verifyComplete();
}

TestPublisher — 입력 시퀀스 제어

import reactor.test.publisher.TestPublisher;

TestPublisher<String> input = TestPublisher.create();

Flux<String> result = service.process(input.flux());

StepVerifier.create(result)
    .then(() -> input.next("hello"))
    .expectNext("HELLO")
    .then(() -> input.next("world"))
    .expectNext("WORLD")
    .then(() -> input.complete())
    .verifyComplete();

Channel 모델 테스트에 유용.

Testcontainers

@Testcontainers
@SpringBootTest
class IntegrationTest {

    @Container
    static GenericContainer<?> redisContainer = new GenericContainer<>("redis:7-alpine")
        .withExposedPorts(6379);

    @DynamicPropertySource
    static void redisProps(DynamicPropertyRegistry registry) {
        registry.add("spring.data.redis.port", redisContainer::getFirstMappedPort);
    }

    @Test
    void test() {
        // 실제 Redis와 RSocket 함께 테스트
    }
}

DB·메시징 시스템 함께 통합 테스트.

@WebFluxTest는?

여기서 정말 중요한 시험 함정 — @WebFluxTest는 RSocket 미지원. WebFlux HTTP만. RSocket = @SpringBootTest + @LocalRSocketServerPort.

Spy로 검증

@Test
void verifyMessageMappingCalled() {
    requester.route("user.create")
        .data(user)
        .retrieveMono(User.class)
        .block();

    verify(controller).create(user);
}

성능 테스트

@Test
@Tag("performance")
void throughput() {
    long start = System.currentTimeMillis();
    int count = 10000;

    Flux.range(0, count)
        .flatMap(i ->
            requester.route("rr.test")
                .data("data-" + i)
                .retrieveMono(String.class), 16)
        .blockLast();

    long duration = System.currentTimeMillis() - start;
    log.info("TPS: {}", count * 1000.0 / duration);
}

별도 태그로 분리. CI 일반 테스트에서 제외.

통합 테스트 패턴 정리

@SpringBootTest(properties = {
    "spring.rsocket.server.port=0",
    "spring.rsocket.server.transport=tcp",
    "logging.level.io.rsocket=DEBUG"
})
class FullIntegrationTest {

    @LocalRSocketServerPort
    Integer port;

    @Autowired
    RSocketRequester.Builder builder;

    RSocketRequester requester;

    @BeforeEach
    void setup() {
        requester = builder
            .setupMetadata(new UsernamePasswordMetadata("alice", "pass"),
                          UsernamePasswordMetadata.BASIC_AUTHENTICATION_MIME_TYPE)
            .tcp("localhost", port);
    }

    @Test
    void endToEnd() {
        // 전체 흐름 검증
    }
}

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 8편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • StepVerifier = Reactive 검증 표준
  • expectNext / expectNextMatches / expectError / verifyComplete
  • 시간 기반 = withVirtualTime
  • .subscribe() 대신 StepVerifier.create() (테스트 동기 검증)
  • 단위 테스트 — Mock + StepVerifier
  • 라우팅·메타데이터는 통합 테스트만
  • Embedded Server@SpringBootTest + @LocalRSocketServerPort
  • 무작위 포트 주입
  • 4 Models 테스트 — retrieveMono / send / retrieveFlux / Flux 입력
  • Mock RSocketRequester = 복잡, 큰 흐름은 통합 테스트
  • 백프레셔StepVerifier.create(flux, 1) + thenRequest(9)
  • 보안 테스트 — RejectedSetupException 검증
  • 인증 메타데이터 추가
  • TestPublisher = 입력 시퀀스 제어 (Channel 테스트)
  • Testcontainers = 외부 의존 (Redis·Kafka 등) 함께
  • @WebFluxTest RSocket 미지원@SpringBootTest
  • 성능 테스트는 별도 태그
  • 통합 테스트 표준 — Embedded Server + RSocketRequester + StepVerifier

시리즈 다른 편

공식 문서: Reactor Test / Spring Boot Testing 에서 더 깊이.

다음 글(9편, 마지막)에서는 RSocket vs gRPC vs WebSocket 비교 — 결정적 차이, 선택 기준, 함께 사용 패턴까지 시리즈 마무리.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!