Spring WebFlux 핵심 정리 시리즈 12편. 리액티브 마이크로서비스 — Stock·Customer·Aggregator 3서비스 아키텍처, WebClient 서비스 클라이언트, Mono.zip() 병렬 처리, R2DBC @Transactional, 서비스 간 에러 전파, SSE 릴레이, switchIfEmpty 체인 순서까지 정리합니다.
이 글은 Spring WebFlux 핵심 정리 시리즈의 12번째 편입니다. 이번 편에서는 배운 모든 것을 통합해 리액티브 마이크로서비스 시스템을 구축하는 패턴을 다룹니다. 단순 예제가 아니라 실제 운영에서 쓰이는 구조 — Stock Service, Customer Service, Aggregator Service 세 개를 WebClient와 R2DBC로 연결하는 흐름입니다.
핵심 비유는 여러 부서가 우편으로 협업하는 회사입니다. 한 부서가 답장이 늦어도 다른 부서는 다른 일을 계속합니다. 리액티브 마이크로서비스는 이 구조예요 — 서비스 하나가 느려도 다른 서비스는 영향받지 않고, 결과가 도착하는 대로 조합합니다.
이 시리즈는 Spring 공식 문서, Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 백엔드 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.
12편은 앞선 편들의 총집합이에요. R2DBC(2·3편), CRUD API(4편), WebClient(8편), SSE(10편), 성능 최적화(11편)가 한 시스템 안에서 어떻게 맞물리는지 볼 수 있습니다.
전체 아키텍처 — 3서비스 구성
실습 시스템의 전체 구성은 이렇습니다.
[브라우저/앱]
↓ HTTP (8080)
[Aggregator Service]
├─→ HTTP (7070) → [Stock Service]
│ └─ GET /stock/{ticker}: 현재 주가
│ └─ SSE /stock/price/stream: 실시간 주가 스트림
└─→ HTTP (6060) → [Customer Service]
└─ GET /customer/{id}: 고객 정보
└─ POST /customer/{id}/trade: 매수/매도
└─ GET /customer/{id}/portfolio: 포트폴리오
Aggregator 패턴을 쓰는 이유는 명확해요. 클라이언트가 Stock Service와 Customer Service를 각각 직접 호출하면 서비스 URL이 노출되고, CORS 문제가 생기고, 각 서비스 변경 시 클라이언트 코드도 수정해야 합니다. Aggregator(BFF: Backend For Frontend)가 중간에서 여러 서비스를 호출하고 조합하면 클라이언트는 Aggregator 하나만 알면 됩니다.
Aggregator가 제공하는 API:
GET /aggregate/stocks/stream → Stock Service SSE를 브라우저에 릴레이
GET /aggregate/portfolio/{customerId} → 포트폴리오 + 현재가 합산
POST /aggregate/{customerId}/trade → 매수/매도 주문 처리
WebClient 서비스 클라이언트 패턴
각 서비스에 대한 WebClient 클라이언트를 컴포넌트로 분리하는 게 좋은 패턴이에요.
@Component
public class StockServiceClient {
private final WebClient webClient;
public StockServiceClient(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("http://localhost:7070")
.build();
}
// 단건 주가 조회
public Mono<StockDto> getStockPrice(String ticker) {
return webClient.get()
.uri("/stock/{ticker}", ticker)
.retrieve()
.bodyToMono(StockDto.class);
}
// SSE 스트림 구독
public Flux<StockDto> getStockPriceStream() {
return webClient.get()
.uri("/stock/price/stream")
.accept(MediaType.TEXT_EVENT_STREAM) // SSE 미디어 타입 명시
.retrieve()
.bodyToFlux(StockDto.class);
}
}
@Component
public class CustomerServiceClient {
private final WebClient webClient;
public CustomerServiceClient(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("http://localhost:6060")
.build();
}
public Mono<CustomerDto> getCustomer(Integer customerId) {
return webClient.get()
.uri("/customer/{customerId}", customerId)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> response.bodyToMono(String.class)
.map(body -> new CustomerNotFoundException(
"Customer not found: " + customerId))
)
.bodyToMono(CustomerDto.class);
}
public Mono<StockTradeResponse> trade(Integer customerId, StockTradeRequest request) {
return webClient.post()
.uri("/customer/{customerId}/trade", customerId)
.bodyValue(request)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> response.bodyToMono(ProblemDetail.class)
.map(pd -> new TradeException(pd.getDetail()))
)
.bodyToMono(StockTradeResponse.class);
}
}
Aggregator Service — Mono.zip()으로 병렬 처리
Aggregator의 핵심은 여러 서비스를 호출하는 결과를 병렬로 조합하는 거예요.
@Service
public class AggregatorService {
// 거래 처리: 현재 주가 조회 → 거래 요청 생성 → Customer Service에 전달
public Mono<StockTradeResponse> trade(Integer customerId,
TradeRequest tradeRequest) {
return stockServiceClient.getStockPrice(tradeRequest.getTicker())
.flatMap(stockDto -> {
StockTradeRequest request = StockTradeRequest.builder()
.ticker(tradeRequest.getTicker())
.price(stockDto.getPrice()) // 실시간 주가 반영
.quantity(tradeRequest.getQuantity())
.action(tradeRequest.getAction())
.build();
return customerServiceClient.trade(customerId, request);
});
}
// 포트폴리오 조회: 고객 정보 + 포트폴리오를 병렬 조회 후
// 각 종목의 현재가를 Stock Service에서 조회하여 합산
public Mono<PortfolioSummary> getPortfolioWithPrices(Integer customerId) {
Mono<CustomerDto> customerMono = customerServiceClient.getCustomer(customerId);
Flux<PortfolioItem> portfolioFlux = customerServiceClient.getPortfolio(customerId);
return Mono.zip(customerMono, portfolioFlux.collectList())
.flatMap(tuple -> {
CustomerDto customer = tuple.getT1();
List<PortfolioItem> portfolio = tuple.getT2();
return Flux.fromIterable(portfolio)
.flatMap(item ->
stockServiceClient.getStockPrice(item.getTicker())
.map(stock ->
PortfolioItemWithPrice.builder()
.ticker(item.getTicker())
.quantity(item.getQuantity())
.currentPrice(stock.getPrice())
.totalValue(stock.getPrice() * item.getQuantity())
.build()
),
5 // 최대 5개 병렬 주가 조회
)
.collectList()
.map(items -> PortfolioSummary.builder()
.customerId(customerId)
.customerName(customer.getName())
.balance(customer.getBalance())
.portfolio(items)
.totalPortfolioValue(
items.stream()
.mapToInt(PortfolioItemWithPrice::getTotalValue)
.sum()
)
.build());
});
}
}
Mono.zip()의 핵심 — 왜 flatMap을 쓰지 않는가
// flatMap: 순차 처리 — A 완료 후 B 시작 → 총 시간 = A시간 + B시간
monoA.flatMap(a -> monoB.map(b -> combine(a, b)));
// Mono.zip(): 병렬 처리 — A, B 동시 시작 → 총 시간 = max(A시간, B시간)
Mono.zip(monoA, monoB).map(tuple -> combine(tuple.getT1(), tuple.getT2()));
고객 저장(100ms)과 포트폴리오 저장(100ms)을 Mono.zip()으로 동시에 처리하면 200ms → 100ms로 단축됩니다.
여기서 시험 함정이 하나 있어요. Mono.zip()은 두 Mono가 모두 완료되면 결과를 조합합니다. 하나라도 빈 Mono(empty)를 반환하면 전체 zip도 완료되지 않아요. 두 작업이 독립적일 때 사용하고, 결과를 결합할 필요 없이 단순 병렬 실행만 필요하면 Mono.when()을 씁니다.
Customer Service — switchIfEmpty 체인 순서
매수 처리에서 switchIfEmpty를 체인하는 순서가 중요합니다.
@Transactional
public Mono<StockTradeResponse> buyStock(Integer customerId, StockTradeRequest request) {
Integer totalPrice = request.getPrice() * request.getQuantity();
return customerRepository.findById(customerId)
.switchIfEmpty(Mono.error(
new CustomerNotFoundException("Customer not found: " + customerId)
)) // 1단계: 고객 없음
.filter(customer -> customer.getBalance() >= totalPrice)
.switchIfEmpty(Mono.error(
new InsufficientBalanceException("Insufficient balance. Required: " + totalPrice)
)) // 2단계: 잔고 부족
.flatMap(customer -> {
return portfolioRepository
.findByCustomerIdAndTicker(customerId, request.getTicker())
.defaultIfEmpty(Portfolio.builder()
.customerId(customerId)
.ticker(request.getTicker())
.quantity(0)
.build())
.zipWith(Mono.just(customer));
})
.flatMap(tuple -> {
Portfolio portfolio = tuple.getT1();
Customer customer = tuple.getT2();
customer.setBalance(customer.getBalance() - totalPrice);
portfolio.setQuantity(portfolio.getQuantity() + request.getQuantity());
// 두 저장을 병렬로 실행 — 원자적 처리
return Mono.zip(
customerRepository.save(customer),
portfolioRepository.save(portfolio)
);
})
.map(tuple -> StockTradeResponse.builder()
// ... 응답 조립
.build());
}
여기서 시험 함정이 하나 있어요. switchIfEmpty 체인 순서가 오류 구분의 핵심입니다.
// BAD — 두 switchIfEmpty가 같은 레벨, "고객 없음"인지 "잔고 부족"인지 구분 불가
return customerRepository.findById(customerId)
.filter(c -> c.getBalance() >= totalPrice)
.switchIfEmpty(Mono.error(new RuntimeException("Error"))); // 뭐가 오류?
// GOOD — 단계별 오류 분리
return customerRepository.findById(customerId)
.switchIfEmpty(Mono.error(new CustomerNotFoundException("..."))) // 고객 없음
.filter(c -> c.getBalance() >= totalPrice)
.switchIfEmpty(Mono.error(new InsufficientBalanceException("..."))); // 잔고 부족
R2DBC에서 @Transactional
@Service
public class CustomerService {
@Transactional // R2DBC에서도 동작
public Mono<StockTradeResponse> buyStock(Integer customerId, StockTradeRequest request) {
// 이 메서드 내 모든 DB 작업이 하나의 트랜잭션
return customerRepository.findById(customerId)
.flatMap(customer -> {
// ...
return Mono.zip(
customerRepository.save(customer), // 트랜잭션 내
portfolioRepository.save(portfolio) // 트랜잭션 내
);
})
// 어느 하나라도 실패하면 전체 롤백
.map(tuple -> buildResponse(tuple));
}
}
여기서 시험 함정이 하나 있어요. R2DBC의 @Transactional은 Mono/Flux가 구독될 때 트랜잭션을 시작합니다. 스트림이 완료되면 커밋, 오류 신호면 롤백이에요. 핵심은 "매수 시 잔고 차감(customer 저장)과 포트폴리오 수량 증가(portfolio 저장)가 원자적이어야 한다"는 점 — 하나가 성공하고 다른 하나가 실패하면 데이터가 불일치합니다.
또 하나 — 두 작업을 별도 .subscribe()로 실행하면 각각 별도 트랜잭션이 됩니다.
// BAD — 별도 트랜잭션, 원자성 보장 안 됨
customerRepository.save(customer).subscribe(); // 트랜잭션 1
portfolioRepository.save(portfolio).subscribe(); // 트랜잭션 2
// GOOD — 단일 트랜잭션
Mono.zip(
customerRepository.save(customer),
portfolioRepository.save(portfolio)
);
SSE 릴레이 — Aggregator에서 Stock Service SSE를 클라이언트로 전달
@RestController
@RequestMapping("/aggregate")
public class AggregatorController {
// Stock Service의 SSE 스트림을 클라이언트에게 릴레이
// 클라이언트는 Stock Service를 직접 알 필요 없음
@GetMapping(value = "/stocks/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<StockDto> streamStockPrices() {
return stockServiceClient.getStockPriceStream();
}
}
여기서 시험 함정이 하나 있어요. 릴레이할 때도 produces = TEXT_EVENT_STREAM_VALUE 필수입니다. 이 어노테이션이 없으면 Aggregator가 Flux를 JSON 배열로 수집한 뒤 한 번에 응답해서 SSE 릴레이가 안 됩니다.
서비스 간 에러 전파
하위 서비스의 HTTP 오류를 어떻게 도메인 예외로 변환하는가?
public Mono<StockTradeResponse> trade(Integer customerId, StockTradeRequest request) {
return webClient.post()
.uri("/customer/{customerId}/trade", customerId)
.bodyValue(request)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, response ->
response.bodyToMono(String.class)
.map(body -> new CustomerNotFoundException(body))
)
.onStatus(HttpStatus.BAD_REQUEST::equals, response ->
response.bodyToMono(ProblemDetail.class)
.map(pd -> new TradeException(pd.getDetail()))
)
.bodyToMono(StockTradeResponse.class);
}
onStatus()로 HTTP 상태 코드를 도메인 예외로 변환하면, Aggregator에서 Customer Service의 HTTP 응답 세부 사항을 숨기고 비즈니스 예외로만 다룰 수 있어요.
회로 차단기(Circuit Breaker) 패턴
외부 서비스가 반복적으로 실패할 때 계속 요청하면 Aggregator도 같이 장애가 납니다. 간단한 fallback 패턴으로 1차 방어할 수 있어요.
public Mono<StockDto> getStockPriceWithFallback(String ticker) {
return stockServiceClient.getStockPrice(ticker)
.timeout(Duration.ofSeconds(3))
.onErrorReturn(new StockDto(ticker, -1)) // 서비스 장애 시 기본값
.doOnError(e -> log.error("Failed to get stock price for {}: {}", ticker, e.getMessage()));
}
완전한 Circuit Breaker가 필요하면 Resilience4j를 사용합니다. CLOSED(정상) → OPEN(차단, 실패율 임계값 초과) → HALF-OPEN(탐색, 일부 요청 허용) 3단계 상태 전환으로 장애 전파를 막아요. 리액티브용 어댑터(resilience4j-reactor 모듈)를 별도로 추가해야 합니다.
여기서 시험 함정이 하나 있어요. 분산 트랜잭션은 리액티브 마이크로서비스에서 쓰지 않습니다. 블로킹 본질의 2PC(Two-Phase Commit)는 리액티브 모델과 맞지 않아요. 대신 Saga 패턴 — 로컬 트랜잭션을 순서대로 실행하고, 실패 시 보상 트랜잭션으로 이전 작업을 되돌리는 방식을 씁니다.
자주 만나는 함정 — 시험 직전 압축 노트
12편의 핵심을 정리합니다.
- Aggregator 패턴(BFF) — 여러 서비스를 조합하여 클라이언트에 단일 API 제공
Mono.zip()— 독립적인 Mono를 병렬로 실행, 둘 다 완료돼야 결합Mono.zip()vsflatMap— zip은 병렬, flatMap은 순차 (zip이 더 빠름)@Transactional— R2DBC에서도 동작, Mono 구독 시 시작·완료 시 커밋·오류 시 롤백.subscribe()두 번 = 두 트랜잭션 → Mono.zip 사용해야 원자적switchIfEmpty순서 — 단계별로 오류 분리 (고객 없음 → 잔고 부족 순서)- SSE 릴레이 시
produces = TEXT_EVENT_STREAM_VALUE필수 - 서버 간 SSE 구독 —
accept(MediaType.TEXT_EVENT_STREAM)+bodyToFlux() onStatus()— HTTP 상태 코드 → 도메인 예외 변환- 분산 트랜잭션 금지 → Saga 패턴 (로컬 트랜잭션 + 보상 트랜잭션)
- Circuit Breaker Resilience4j — 리액티브 어댑터(
resilience4j-reactor) 별도 필요 - WebClient는 Bean으로 등록해서 재사용 (연결 풀 공유)
- 서비스 URL은
application.properties에서 외부화, Kubernetes에서 DNS 기반
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.