Spring RSocket 마스터 노트 시리즈 8편. RSocket 단위 테스트와 통합 테스트의 결정적 차이, StepVerifier로 Mono/Flux 검증, Embedded RSocket Server로 통합 테스트, 4 Interaction Models별 테스트 패턴, Mock RSocketRequester, 양방향 통신 테스트, Testcontainers 활용까지.
이 글은 Spring RSocket 마스터 노트 시리즈의 여덟 번째 편입니다. 1~7편이 구현이었다면, 이번엔 그것을 검증 — 테스트.
Reactive 흐름 + 4 Models + 양방향 + 백프레셔. 일반 JUnit으론 부족. StepVerifier·Embedded Server·Mock 3 도구가 핵심.
처음 RSocket 테스트가 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Reactive 검증이 익숙하지 않습니다. assertEquals로 Mono/Flux를 어떻게? 둘째, 단위 vs 통합 테스트 경계가 막연합니다.
해결법은 한 가지예요. "Mono/Flux = StepVerifier / 통합 = Embedded Server". 컨트롤러 단위 = Mock으로 빠르게, 종단 = Embedded Server로 진짜. 이 둘만 잡으면 끝.
StepVerifier — Reactive 검증
import reactor.test.StepVerifier;
Mono<String> mono = service.process();
StepVerifier.create(mono)
.expectNext("expected")
.verifyComplete();
Flux<Integer> flux = Flux.range(1, 5);
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
에러 검증
StepVerifier.create(monoWithError)
.expectError(IllegalArgumentException.class)
.verify();
시간 기반
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofHours(1)))
.thenAwait(Duration.ofHours(1))
.expectNextCount(1)
.verifyComplete();
가상 시간으로 즉시. 실제 1시간 대기 X.
여기서 정말 중요한 시험 함정 — .subscribe() 대신 StepVerifier.create(). subscribe만 하면 비동기 실행되어 테스트 종료 시 검증 X. StepVerifier가 동기 검증.
단위 테스트 — 컨트롤러
의존성 Mock
@ExtendWith(MockitoExtension.class)
class UserControllerTest {
@Mock
private UserService service;
@InjectMocks
private UserController controller;
@Test
void getUser() {
User user = new User("1", "Alice");
when(service.findById("1")).thenReturn(Mono.just(user));
StepVerifier.create(controller.getUser("1"))
.expectNext(user)
.verifyComplete();
}
}
빠름. 외부 의존 X.
여기서 시험 함정이 하나 있어요. 단위 테스트는 라우팅·메타데이터 검증 X. RSocket 흐름 검증은 통합 테스트.
통합 테스트 — Embedded RSocket Server
Spring Boot Test
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = "spring.rsocket.server.port=0")
class UserRSocketIntegrationTest {
@LocalRSocketServerPort
private Integer port;
@Autowired
private RSocketRequester.Builder builder;
private RSocketRequester requester;
@BeforeEach
void setup() {
requester = builder.tcp("localhost", port);
}
@AfterEach
void cleanup() {
requester.dispose();
}
@Test
void getUser() {
StepVerifier.create(
requester.route("user.{id}", "1")
.retrieveMono(User.class)
)
.expectNextMatches(u -> u.getName().equals("Alice"))
.verifyComplete();
}
}
@LocalRSocketServerPort = 무작위 포트 주입.
4 Interaction Models 테스트
Request-Response
@Test
void requestResponse() {
StepVerifier.create(
requester.route("rr.test")
.data(request)
.retrieveMono(Result.class)
)
.expectNextMatches(r -> r.isOk())
.verifyComplete();
}
Fire-and-Forget
@Test
void fireAndForget() {
StepVerifier.create(
requester.route("event.log")
.data(event)
.send()
)
.verifyComplete();
// 부수 효과 검증 (DB·Mock)
verify(eventService).persist(event);
}
Request-Stream
@Test
void requestStream() {
StepVerifier.create(
requester.route("stream.numbers")
.data(10)
.retrieveFlux(Integer.class)
.take(5)
)
.expectNext(0, 1, 2, 3, 4)
.verifyComplete();
}
Channel
@Test
void channel() {
Flux<String> input = Flux.just("hello", "world", "test");
StepVerifier.create(
requester.route("channel.echo")
.data(input)
.retrieveFlux(String.class)
)
.expectNext("HELLO", "WORLD", "TEST")
.verifyComplete();
}
Mock RSocketRequester
서버 의존 없이 클라이언트 측 로직 테스트:
@ExtendWith(MockitoExtension.class)
class UserClientTest {
@Mock
private RSocketRequester requester;
@Mock
private RSocketRequester.RequestSpec requestSpec;
@Mock
private RSocketRequester.RetrieveSpec retrieveSpec;
@Test
void getUser() {
when(requester.route("user.{id}", "1")).thenReturn(requestSpec);
when(requestSpec.retrieveMono(User.class)).thenReturn(Mono.just(new User("1", "Alice")));
UserClient client = new UserClient(requester);
StepVerifier.create(client.getUser("1"))
.expectNextMatches(u -> u.getName().equals("Alice"))
.verifyComplete();
}
}
복잡. 큰 흐름은 통합 테스트 권장.
백프레셔 테스트
@Test
void backpressure() {
AtomicInteger requested = new AtomicInteger();
StepVerifier.create(
requester.route("infinite.stream")
.retrieveFlux(Integer.class)
.doOnRequest(n -> requested.addAndGet((int) n))
.take(10),
1 // 처음 1개만 request
)
.thenRequest(9) // 9개 더
.expectNextCount(10)
.verifyComplete();
// 클라이언트가 요청한 수만 서버가 보냄
assertThat(requested.get()).isLessThanOrEqualTo(10);
}
보안 테스트
@Test
void unauthenticatedAccess() {
StepVerifier.create(
requester.route("admin.list")
.retrieveMono(String.class)
)
.expectErrorMatches(e -> e instanceof RejectedSetupException)
.verify();
}
@Test
void authenticatedAccess() {
UsernamePasswordMetadata creds = new UsernamePasswordMetadata("alice", "pass");
StepVerifier.create(
requester.route("user.list")
.metadata(creds, BASIC_AUTHENTICATION_MIME_TYPE)
.retrieveFlux(User.class)
)
.expectNextCount(3)
.verifyComplete();
}
TestPublisher — 입력 시퀀스 제어
import reactor.test.publisher.TestPublisher;
TestPublisher<String> input = TestPublisher.create();
Flux<String> result = service.process(input.flux());
StepVerifier.create(result)
.then(() -> input.next("hello"))
.expectNext("HELLO")
.then(() -> input.next("world"))
.expectNext("WORLD")
.then(() -> input.complete())
.verifyComplete();
Channel 모델 테스트에 유용.
Testcontainers
@Testcontainers
@SpringBootTest
class IntegrationTest {
@Container
static GenericContainer<?> redisContainer = new GenericContainer<>("redis:7-alpine")
.withExposedPorts(6379);
@DynamicPropertySource
static void redisProps(DynamicPropertyRegistry registry) {
registry.add("spring.data.redis.port", redisContainer::getFirstMappedPort);
}
@Test
void test() {
// 실제 Redis와 RSocket 함께 테스트
}
}
DB·메시징 시스템 함께 통합 테스트.
@WebFluxTest는?
여기서 정말 중요한 시험 함정 — @WebFluxTest는 RSocket 미지원. WebFlux HTTP만. RSocket = @SpringBootTest + @LocalRSocketServerPort.
Spy로 검증
@Test
void verifyMessageMappingCalled() {
requester.route("user.create")
.data(user)
.retrieveMono(User.class)
.block();
verify(controller).create(user);
}
성능 테스트
@Test
@Tag("performance")
void throughput() {
long start = System.currentTimeMillis();
int count = 10000;
Flux.range(0, count)
.flatMap(i ->
requester.route("rr.test")
.data("data-" + i)
.retrieveMono(String.class), 16)
.blockLast();
long duration = System.currentTimeMillis() - start;
log.info("TPS: {}", count * 1000.0 / duration);
}
별도 태그로 분리. CI 일반 테스트에서 제외.
통합 테스트 패턴 정리
@SpringBootTest(properties = {
"spring.rsocket.server.port=0",
"spring.rsocket.server.transport=tcp",
"logging.level.io.rsocket=DEBUG"
})
class FullIntegrationTest {
@LocalRSocketServerPort
Integer port;
@Autowired
RSocketRequester.Builder builder;
RSocketRequester requester;
@BeforeEach
void setup() {
requester = builder
.setupMetadata(new UsernamePasswordMetadata("alice", "pass"),
UsernamePasswordMetadata.BASIC_AUTHENTICATION_MIME_TYPE)
.tcp("localhost", port);
}
@Test
void endToEnd() {
// 전체 흐름 검증
}
}
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 8편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- StepVerifier = Reactive 검증 표준
expectNext/expectNextMatches/expectError/verifyComplete- 시간 기반 =
withVirtualTime .subscribe()대신StepVerifier.create()(테스트 동기 검증)- 단위 테스트 — Mock + StepVerifier
- 라우팅·메타데이터는 통합 테스트만
- Embedded Server —
@SpringBootTest+@LocalRSocketServerPort - 무작위 포트 주입
- 4 Models 테스트 —
retrieveMono/send/retrieveFlux/ Flux 입력 - Mock RSocketRequester = 복잡, 큰 흐름은 통합 테스트
- 백프레셔 —
StepVerifier.create(flux, 1)+thenRequest(9) - 보안 테스트 —
RejectedSetupException검증 - 인증 메타데이터 추가
TestPublisher= 입력 시퀀스 제어 (Channel 테스트)- Testcontainers = 외부 의존 (Redis·Kafka 등) 함께
@WebFluxTestRSocket 미지원 →@SpringBootTest- 성능 테스트는 별도 태그
- 통합 테스트 표준 — Embedded Server + RSocketRequester + StepVerifier
시리즈 다른 편
- 1편 — 기본 개념·프레임
- 2편 — 4 Interaction Models
- 3편 — Spring RSocket 서버
- 4편 — Spring RSocket 클라이언트
- 5편 — 메타데이터·Composite Metadata
- 6편 — 보안·Spring Security RSocket·TLS
- 7편 — 로드 밸런싱·확장
- 8편 — 테스트 (현재 글)
- 9편 — RSocket vs gRPC vs WebSocket
공식 문서: Reactor Test / Spring Boot Testing 에서 더 깊이.
다음 글(9편, 마지막)에서는 RSocket vs gRPC vs WebSocket 비교 — 결정적 차이, 선택 기준, 함께 사용 패턴까지 시리즈 마무리.