리액티브 마이크로서비스 — WebFlux 서비스 통합

2026-05-03AWS SAA-C03 스터디

Spring WebFlux 핵심 정리 시리즈 12편. 리액티브 마이크로서비스 — Stock·Customer·Aggregator 3서비스 아키텍처, WebClient 서비스 클라이언트, Mono.zip() 병렬 처리, R2DBC @Transactional, 서비스 간 에러 전파, SSE 릴레이, switchIfEmpty 체인 순서까지 정리합니다.

📚 Spring WebFlux 핵심 정리 · 12편 / 14편 — WebFlux 서비스 통합

이 글은 Spring WebFlux 핵심 정리 시리즈의 12번째 편입니다. 이번 편에서는 배운 모든 것을 통합해 리액티브 마이크로서비스 시스템을 구축하는 패턴을 다룹니다. 단순 예제가 아니라 실제 운영에서 쓰이는 구조 — Stock Service, Customer Service, Aggregator Service 세 개를 WebClient와 R2DBC로 연결하는 흐름입니다.

핵심 비유는 여러 부서가 우편으로 협업하는 회사입니다. 한 부서가 답장이 늦어도 다른 부서는 다른 일을 계속합니다. 리액티브 마이크로서비스는 이 구조예요 — 서비스 하나가 느려도 다른 서비스는 영향받지 않고, 결과가 도착하는 대로 조합합니다.

📚 학습 노트

이 시리즈는 Spring 공식 문서, Project Reactor 공식 문서, Reactive Streams 명세, 여러 비동기 백엔드 학습 자료 등 공개 자료를 참고해 한국어 학습 노트로 풀어쓴 자료입니다.

12편은 앞선 편들의 총집합이에요. R2DBC(2·3편), CRUD API(4편), WebClient(8편), SSE(10편), 성능 최적화(11편)가 한 시스템 안에서 어떻게 맞물리는지 볼 수 있습니다.

전체 아키텍처 — 3서비스 구성

실습 시스템의 전체 구성은 이렇습니다.

[브라우저/앱]
    ↓ HTTP (8080)
[Aggregator Service]
    ├─→ HTTP (7070) → [Stock Service]
    │       └─ GET /stock/{ticker}: 현재 주가
    │       └─ SSE /stock/price/stream: 실시간 주가 스트림
    └─→ HTTP (6060) → [Customer Service]
            └─ GET /customer/{id}: 고객 정보
            └─ POST /customer/{id}/trade: 매수/매도
            └─ GET /customer/{id}/portfolio: 포트폴리오

Aggregator 패턴을 쓰는 이유는 명확해요. 클라이언트가 Stock Service와 Customer Service를 각각 직접 호출하면 서비스 URL이 노출되고, CORS 문제가 생기고, 각 서비스 변경 시 클라이언트 코드도 수정해야 합니다. Aggregator(BFF: Backend For Frontend)가 중간에서 여러 서비스를 호출하고 조합하면 클라이언트는 Aggregator 하나만 알면 됩니다.

Aggregator가 제공하는 API:

GET  /aggregate/stocks/stream          → Stock Service SSE를 브라우저에 릴레이
GET  /aggregate/portfolio/{customerId} → 포트폴리오 + 현재가 합산
POST /aggregate/{customerId}/trade     → 매수/매도 주문 처리

WebClient 서비스 클라이언트 패턴

각 서비스에 대한 WebClient 클라이언트를 컴포넌트로 분리하는 게 좋은 패턴이에요.

@Component
public class StockServiceClient {

    private final WebClient webClient;

    public StockServiceClient(WebClient.Builder builder) {
        this.webClient = builder
                .baseUrl("http://localhost:7070")
                .build();
    }

    // 단건 주가 조회
    public Mono<StockDto> getStockPrice(String ticker) {
        return webClient.get()
                .uri("/stock/{ticker}", ticker)
                .retrieve()
                .bodyToMono(StockDto.class);
    }

    // SSE 스트림 구독
    public Flux<StockDto> getStockPriceStream() {
        return webClient.get()
                .uri("/stock/price/stream")
                .accept(MediaType.TEXT_EVENT_STREAM)  // SSE 미디어 타입 명시
                .retrieve()
                .bodyToFlux(StockDto.class);
    }
}
@Component
public class CustomerServiceClient {

    private final WebClient webClient;

    public CustomerServiceClient(WebClient.Builder builder) {
        this.webClient = builder
                .baseUrl("http://localhost:6060")
                .build();
    }

    public Mono<CustomerDto> getCustomer(Integer customerId) {
        return webClient.get()
                .uri("/customer/{customerId}", customerId)
                .retrieve()
                .onStatus(
                    HttpStatusCode::is4xxClientError,
                    response -> response.bodyToMono(String.class)
                            .map(body -> new CustomerNotFoundException(
                                "Customer not found: " + customerId))
                )
                .bodyToMono(CustomerDto.class);
    }

    public Mono<StockTradeResponse> trade(Integer customerId, StockTradeRequest request) {
        return webClient.post()
                .uri("/customer/{customerId}/trade", customerId)
                .bodyValue(request)
                .retrieve()
                .onStatus(
                    HttpStatusCode::is4xxClientError,
                    response -> response.bodyToMono(ProblemDetail.class)
                            .map(pd -> new TradeException(pd.getDetail()))
                )
                .bodyToMono(StockTradeResponse.class);
    }
}

Aggregator Service — Mono.zip()으로 병렬 처리

Aggregator의 핵심은 여러 서비스를 호출하는 결과를 병렬로 조합하는 거예요.

@Service
public class AggregatorService {

    // 거래 처리: 현재 주가 조회 → 거래 요청 생성 → Customer Service에 전달
    public Mono<StockTradeResponse> trade(Integer customerId,
                                          TradeRequest tradeRequest) {
        return stockServiceClient.getStockPrice(tradeRequest.getTicker())
                .flatMap(stockDto -> {
                    StockTradeRequest request = StockTradeRequest.builder()
                            .ticker(tradeRequest.getTicker())
                            .price(stockDto.getPrice())  // 실시간 주가 반영
                            .quantity(tradeRequest.getQuantity())
                            .action(tradeRequest.getAction())
                            .build();
                    return customerServiceClient.trade(customerId, request);
                });
    }

    // 포트폴리오 조회: 고객 정보 + 포트폴리오를 병렬 조회 후
    // 각 종목의 현재가를 Stock Service에서 조회하여 합산
    public Mono<PortfolioSummary> getPortfolioWithPrices(Integer customerId) {
        Mono<CustomerDto> customerMono = customerServiceClient.getCustomer(customerId);
        Flux<PortfolioItem> portfolioFlux = customerServiceClient.getPortfolio(customerId);

        return Mono.zip(customerMono, portfolioFlux.collectList())
                .flatMap(tuple -> {
                    CustomerDto customer = tuple.getT1();
                    List<PortfolioItem> portfolio = tuple.getT2();

                    return Flux.fromIterable(portfolio)
                            .flatMap(item ->
                                stockServiceClient.getStockPrice(item.getTicker())
                                        .map(stock ->
                                            PortfolioItemWithPrice.builder()
                                                    .ticker(item.getTicker())
                                                    .quantity(item.getQuantity())
                                                    .currentPrice(stock.getPrice())
                                                    .totalValue(stock.getPrice() * item.getQuantity())
                                                    .build()
                                        ),
                                5  // 최대 5개 병렬 주가 조회
                            )
                            .collectList()
                            .map(items -> PortfolioSummary.builder()
                                    .customerId(customerId)
                                    .customerName(customer.getName())
                                    .balance(customer.getBalance())
                                    .portfolio(items)
                                    .totalPortfolioValue(
                                        items.stream()
                                            .mapToInt(PortfolioItemWithPrice::getTotalValue)
                                            .sum()
                                    )
                                    .build());
                });
    }
}

Mono.zip()의 핵심 — 왜 flatMap을 쓰지 않는가

// flatMap: 순차 처리 — A 완료 후 B 시작 → 총 시간 = A시간 + B시간
monoA.flatMap(a -> monoB.map(b -> combine(a, b)));

// Mono.zip(): 병렬 처리 — A, B 동시 시작 → 총 시간 = max(A시간, B시간)
Mono.zip(monoA, monoB).map(tuple -> combine(tuple.getT1(), tuple.getT2()));

고객 저장(100ms)과 포트폴리오 저장(100ms)을 Mono.zip()으로 동시에 처리하면 200ms → 100ms로 단축됩니다.

여기서 시험 함정이 하나 있어요. Mono.zip()은 두 Mono가 모두 완료되면 결과를 조합합니다. 하나라도 빈 Mono(empty)를 반환하면 전체 zip도 완료되지 않아요. 두 작업이 독립적일 때 사용하고, 결과를 결합할 필요 없이 단순 병렬 실행만 필요하면 Mono.when()을 씁니다.

Customer Service — switchIfEmpty 체인 순서

매수 처리에서 switchIfEmpty를 체인하는 순서가 중요합니다.

@Transactional
public Mono<StockTradeResponse> buyStock(Integer customerId, StockTradeRequest request) {
    Integer totalPrice = request.getPrice() * request.getQuantity();

    return customerRepository.findById(customerId)
            .switchIfEmpty(Mono.error(
                new CustomerNotFoundException("Customer not found: " + customerId)
            ))  // 1단계: 고객 없음
            .filter(customer -> customer.getBalance() >= totalPrice)
            .switchIfEmpty(Mono.error(
                new InsufficientBalanceException("Insufficient balance. Required: " + totalPrice)
            ))  // 2단계: 잔고 부족
            .flatMap(customer -> {
                return portfolioRepository
                        .findByCustomerIdAndTicker(customerId, request.getTicker())
                        .defaultIfEmpty(Portfolio.builder()
                                .customerId(customerId)
                                .ticker(request.getTicker())
                                .quantity(0)
                                .build())
                        .zipWith(Mono.just(customer));
            })
            .flatMap(tuple -> {
                Portfolio portfolio = tuple.getT1();
                Customer customer = tuple.getT2();

                customer.setBalance(customer.getBalance() - totalPrice);
                portfolio.setQuantity(portfolio.getQuantity() + request.getQuantity());

                // 두 저장을 병렬로 실행 — 원자적 처리
                return Mono.zip(
                        customerRepository.save(customer),
                        portfolioRepository.save(portfolio)
                );
            })
            .map(tuple -> StockTradeResponse.builder()
                    // ... 응답 조립
                    .build());
}

여기서 시험 함정이 하나 있어요. switchIfEmpty 체인 순서가 오류 구분의 핵심입니다.

// BAD — 두 switchIfEmpty가 같은 레벨, "고객 없음"인지 "잔고 부족"인지 구분 불가
return customerRepository.findById(customerId)
        .filter(c -> c.getBalance() >= totalPrice)
        .switchIfEmpty(Mono.error(new RuntimeException("Error"))); // 뭐가 오류?

// GOOD — 단계별 오류 분리
return customerRepository.findById(customerId)
        .switchIfEmpty(Mono.error(new CustomerNotFoundException("..."))) // 고객 없음
        .filter(c -> c.getBalance() >= totalPrice)
        .switchIfEmpty(Mono.error(new InsufficientBalanceException("..."))); // 잔고 부족

R2DBC에서 @Transactional

@Service
public class CustomerService {

    @Transactional  // R2DBC에서도 동작
    public Mono<StockTradeResponse> buyStock(Integer customerId, StockTradeRequest request) {
        // 이 메서드 내 모든 DB 작업이 하나의 트랜잭션
        return customerRepository.findById(customerId)
                .flatMap(customer -> {
                    // ...
                    return Mono.zip(
                        customerRepository.save(customer),   // 트랜잭션 내
                        portfolioRepository.save(portfolio)  // 트랜잭션 내
                    );
                })
                // 어느 하나라도 실패하면 전체 롤백
                .map(tuple -> buildResponse(tuple));
    }
}

여기서 시험 함정이 하나 있어요. R2DBC의 @TransactionalMono/Flux가 구독될 때 트랜잭션을 시작합니다. 스트림이 완료되면 커밋, 오류 신호면 롤백이에요. 핵심은 "매수 시 잔고 차감(customer 저장)과 포트폴리오 수량 증가(portfolio 저장)가 원자적이어야 한다"는 점 — 하나가 성공하고 다른 하나가 실패하면 데이터가 불일치합니다.

또 하나 — 두 작업을 별도 .subscribe()로 실행하면 각각 별도 트랜잭션이 됩니다.

// BAD — 별도 트랜잭션, 원자성 보장 안 됨
customerRepository.save(customer).subscribe();  // 트랜잭션 1
portfolioRepository.save(portfolio).subscribe(); // 트랜잭션 2

// GOOD — 단일 트랜잭션
Mono.zip(
    customerRepository.save(customer),
    portfolioRepository.save(portfolio)
);

SSE 릴레이 — Aggregator에서 Stock Service SSE를 클라이언트로 전달

@RestController
@RequestMapping("/aggregate")
public class AggregatorController {

    // Stock Service의 SSE 스트림을 클라이언트에게 릴레이
    // 클라이언트는 Stock Service를 직접 알 필요 없음
    @GetMapping(value = "/stocks/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<StockDto> streamStockPrices() {
        return stockServiceClient.getStockPriceStream();
    }
}

여기서 시험 함정이 하나 있어요. 릴레이할 때도 produces = TEXT_EVENT_STREAM_VALUE 필수입니다. 이 어노테이션이 없으면 Aggregator가 Flux를 JSON 배열로 수집한 뒤 한 번에 응답해서 SSE 릴레이가 안 됩니다.

서비스 간 에러 전파

하위 서비스의 HTTP 오류를 어떻게 도메인 예외로 변환하는가?

public Mono<StockTradeResponse> trade(Integer customerId, StockTradeRequest request) {
    return webClient.post()
            .uri("/customer/{customerId}/trade", customerId)
            .bodyValue(request)
            .retrieve()
            .onStatus(HttpStatus.NOT_FOUND::equals, response ->
                response.bodyToMono(String.class)
                        .map(body -> new CustomerNotFoundException(body))
            )
            .onStatus(HttpStatus.BAD_REQUEST::equals, response ->
                response.bodyToMono(ProblemDetail.class)
                        .map(pd -> new TradeException(pd.getDetail()))
            )
            .bodyToMono(StockTradeResponse.class);
}

onStatus()로 HTTP 상태 코드를 도메인 예외로 변환하면, Aggregator에서 Customer Service의 HTTP 응답 세부 사항을 숨기고 비즈니스 예외로만 다룰 수 있어요.

회로 차단기(Circuit Breaker) 패턴

외부 서비스가 반복적으로 실패할 때 계속 요청하면 Aggregator도 같이 장애가 납니다. 간단한 fallback 패턴으로 1차 방어할 수 있어요.

public Mono<StockDto> getStockPriceWithFallback(String ticker) {
    return stockServiceClient.getStockPrice(ticker)
            .timeout(Duration.ofSeconds(3))
            .onErrorReturn(new StockDto(ticker, -1))  // 서비스 장애 시 기본값
            .doOnError(e -> log.error("Failed to get stock price for {}: {}", ticker, e.getMessage()));
}

완전한 Circuit Breaker가 필요하면 Resilience4j를 사용합니다. CLOSED(정상) → OPEN(차단, 실패율 임계값 초과) → HALF-OPEN(탐색, 일부 요청 허용) 3단계 상태 전환으로 장애 전파를 막아요. 리액티브용 어댑터(resilience4j-reactor 모듈)를 별도로 추가해야 합니다.

여기서 시험 함정이 하나 있어요. 분산 트랜잭션은 리액티브 마이크로서비스에서 쓰지 않습니다. 블로킹 본질의 2PC(Two-Phase Commit)는 리액티브 모델과 맞지 않아요. 대신 Saga 패턴 — 로컬 트랜잭션을 순서대로 실행하고, 실패 시 보상 트랜잭션으로 이전 작업을 되돌리는 방식을 씁니다.

자주 만나는 함정 — 시험 직전 압축 노트

12편의 핵심을 정리합니다.

  • Aggregator 패턴(BFF) — 여러 서비스를 조합하여 클라이언트에 단일 API 제공
  • Mono.zip() — 독립적인 Mono를 병렬로 실행, 둘 다 완료돼야 결합
  • Mono.zip() vs flatMap — zip은 병렬, flatMap은 순차 (zip이 더 빠름)
  • @Transactional — R2DBC에서도 동작, Mono 구독 시 시작·완료 시 커밋·오류 시 롤백
  • .subscribe() 두 번 = 두 트랜잭션 → Mono.zip 사용해야 원자적
  • switchIfEmpty 순서 — 단계별로 오류 분리 (고객 없음 → 잔고 부족 순서)
  • SSE 릴레이 시 produces = TEXT_EVENT_STREAM_VALUE 필수
  • 서버 간 SSE 구독 — accept(MediaType.TEXT_EVENT_STREAM) + bodyToFlux()
  • onStatus() — HTTP 상태 코드 → 도메인 예외 변환
  • 분산 트랜잭션 금지 → Saga 패턴 (로컬 트랜잭션 + 보상 트랜잭션)
  • Circuit Breaker Resilience4j — 리액티브 어댑터(resilience4j-reactor) 별도 필요
  • WebClient는 Bean으로 등록해서 재사용 (연결 풀 공유)
  • 서비스 URL은 application.properties에서 외부화, Kubernetes에서 DNS 기반

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!