Reactive GraphQL — Subscription·실시간 구독

2026-05-03확률과 통계 마스터 노트

Reactive GraphQL 마스터 노트 시리즈 3편. Subscription이 실시간 양방향 통신 도구인 이유, WebSocket vs SSE 전송 선택, Spring GraphQL의 Flux 반환, Sinks로 메시지 발행, Pub/Sub 패턴 통합(Redis·Kafka), graphql-ws 프로토콜, 채팅·라이브 알림 구현까지.

이 글은 Reactive GraphQL 마스터 노트 시리즈의 세 번째 편입니다. 2편(Query/Mutation)에서 단방향이었다면, 이번엔 양방향 — Subscription.

채팅·라이브 알림·실시간 시세에. WebSocket 위 Reactive Stream. Flux 반환만으로 자동 처리.

처음 Subscription이 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, WebSocket 설정이 막연합니다. 둘째, Pub/Sub 패턴이 어떻게 통합되는지 안 보입니다.

해결법은 한 가지예요. "Subscription = Flux 반환" 한 줄. Spring GraphQL이 자동 처리. WebSocket·이벤트 발행·구독 모두 추상화.

Subscription Schema

type Subscription {
  postCreated: Post!
  userMessageReceived(userId: ID!): Message!
  stockPriceChanged(symbol: String!): Stock!
}

Subscription 타입에 정의. 인자 가능.

Spring GraphQL Subscription

@Controller
public class PostController {
    
    private final Sinks.Many<Post> sink = Sinks.many().multicast().onBackpressureBuffer();
    
    @SubscriptionMapping
    public Flux<Post> postCreated() {
        return sink.asFlux();
    }
    
    @MutationMapping
    public Post createPost(@Argument CreatePostInput input) {
        Post post = postService.create(input);
        sink.tryEmitNext(post);   # 모든 구독자에게 발행
        return post;
    }
}

핵심:

  • @SubscriptionMapping — 구독 핸들러
  • Flux 반환 — 자동 스트리밍
  • Sinks — 발행 채널

여기서 정말 중요한 시험 함정 — Subscription = Flux 반환만. 나머지 자동. WebSocket·메시지 직렬화·연결 관리 모두 Spring GraphQL이.

Sinks 종류

// Multicast — 여러 구독자 동시 (기본)
Sinks.Many<Post> sink = Sinks.many().multicast().onBackpressureBuffer();

// Replay — 새 구독자에게 과거 데이터도
Sinks.Many<Post> sink = Sinks.many().replay().limit(100);

// Unicast — 한 구독자만
Sinks.Many<Post> sink = Sinks.many().unicast().onBackpressureBuffer();

여기서 시험 함정이 하나 있어요. Multicast = 일반 권장. 채팅·알림 등. Unicast는 1:1.

Backpressure

Sinks.many().multicast().onBackpressureBuffer()      # 큐 버퍼
Sinks.many().multicast().onBackpressureBuffer(1000)  # 한도 1000
Sinks.many().multicast().directBestEffort()           # 손실 가능

구독자 처리 못 따라가면? 버퍼 vs 손실.

WebSocket 활성

spring:
  graphql:
    websocket:
      path: /graphql

자동 활성. ws://localhost:8080/graphql 접속.

# graphql-ws 클라이언트 (TypeScript)
import { createClient } from 'graphql-ws';

const client = createClient({
  url: 'ws://localhost:8080/graphql',
});

client.subscribe(
  { query: 'subscription { postCreated { id title } }' },
  { next: data => console.log(data) }
);

graphql-ws 프로토콜

표준 GraphQL over WebSocket. 4 메시지:

ConnectionInit  — 연결 시작·인증
ConnectionAck   — 서버 인증 완료
Subscribe       — 구독 시작
Next            — 데이터 스트림
Complete        — 구독 종료

여기서 시험 함정이 하나 있어요. subscriptions-transport-ws (옛) vs graphql-ws (신). 구버전은 deprecated. 새 프로젝트 = graphql-ws.

SSE — 단방향 (대안)

spring:
  graphql:
    http:
      path: /graphql
    sse:
      path: /graphql/sse

Server-Sent Events. 단방향 (서버 → 클라이언트). WebSocket보다 단순.

const eventSource = new EventSource('/graphql/sse');
eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
};

여기서 시험 함정이 하나 있어요. SSE = HTTP 기반·단방향. WebSocket보다 친화적 (방화벽). 다만 단방향.

인자 + Subscription

type Subscription {
  userMessageReceived(userId: ID!): Message!
}
@SubscriptionMapping
public Flux<Message> userMessageReceived(@Argument String userId) {
    return messageSink.asFlux()
        .filter(msg -> msg.getUserId().equals(userId));
}

특정 사용자 메시지만 필터.

인증·권한

@SubscriptionMapping
@PreAuthorize("isAuthenticated()")
public Flux<Notification> notifications() {
    return notificationSink.asFlux()
        .filter(n -> n.getUserId().equals(getCurrentUserId()));
}

여기서 정말 중요한 시험 함정 — Subscription도 인증 필수. SETUP 시 인증·각 메시지 검증. 6편에서 자세히.

Pub/Sub 통합 — Redis

@Service
public class RedisPostPubSub {
    
    @Autowired
    private ReactiveRedisTemplate<String, Post> redisTemplate;
    
    public void publish(Post post) {
        redisTemplate.convertAndSend("posts", post).subscribe();
    }
    
    public Flux<Post> subscribe() {
        return redisTemplate.listenToChannel("posts")
            .map(ReactiveSubscription.Message::getMessage);
    }
}

@Controller
public class PostController {
    
    @Autowired
    private RedisPostPubSub pubSub;
    
    @SubscriptionMapping
    public Flux<Post> postCreated() {
        return pubSub.subscribe();
    }
    
    @MutationMapping
    public Post createPost(@Argument CreatePostInput input) {
        Post post = postService.create(input);
        pubSub.publish(post);
        return post;
    }
}

여러 GraphQL 서버 인스턴스 → Redis Pub/Sub → 모든 인스턴스의 구독자에게 전달.

Pub/Sub 통합 — Kafka

@KafkaListener(topics = "post-events")
public void onPost(PostEvent event) {
    sink.tryEmitNext(event.toPost());
}

@MutationMapping
public Post createPost(@Argument CreatePostInput input) {
    Post post = postService.create(input);
    kafkaTemplate.send("post-events", PostEvent.from(post));
    return post;
}

영속·확장성·다중 구독자.

여기서 시험 함정이 하나 있어요. Subscription의 Pub/Sub 백엔드 = 단일 인스턴스 메모리 X. Redis·Kafka 같은 외부 시스템. 여러 서버 환경 필수.

종료 처리

@SubscriptionMapping
public Flux<Post> postCreated() {
    return sink.asFlux()
        .doOnCancel(() -> log.info("Subscription cancelled"))
        .doOnTerminate(() -> log.info("Subscription terminated"));
}

클라이언트 연결 끊김·취소 = 자동 정리.

Filter·Transform

@SubscriptionMapping
public Flux<Post> postCreated(@Argument String category) {
    return sink.asFlux()
        .filter(post -> post.getCategory().equals(category))
        .map(this::sanitize)
        .take(Duration.ofMinutes(30));
}

filter·map·take 등 Reactor 연산자 자유롭게.

채팅 — 실전 예시

type Subscription {
  messageReceived(roomId: ID!): Message!
}

type Mutation {
  sendMessage(roomId: ID!, text: String!): Message!
}
@Controller
public class ChatController {
    
    private final Map<String, Sinks.Many<Message>> roomSinks = new ConcurrentHashMap<>();
    
    @SubscriptionMapping
    public Flux<Message> messageReceived(@Argument String roomId) {
        return getOrCreateSink(roomId).asFlux();
    }
    
    @MutationMapping
    public Message sendMessage(
        @Argument String roomId,
        @Argument String text,
        @AuthenticationPrincipal UserDetails user
    ) {
        Message msg = new Message(user.getUsername(), text, Instant.now());
        getOrCreateSink(roomId).tryEmitNext(msg);
        return msg;
    }
    
    private Sinks.Many<Message> getOrCreateSink(String roomId) {
        return roomSinks.computeIfAbsent(roomId,
            k -> Sinks.many().multicast().onBackpressureBuffer());
    }
}

방별 구독·발행. 다중 방 채팅.

모니터링

@SubscriptionMapping
public Flux<Notification> notifications() {
    return sink.asFlux()
        .doOnSubscribe(s -> activeSubs.incrementAndGet())
        .doFinally(s -> activeSubs.decrementAndGet());
}

@Bean
public MeterBinder activeSubsMetric() {
    return registry -> Gauge.builder("graphql.subscriptions.active", activeSubs::get)
        .register(registry);
}

활성 구독 수·발행 비율 모니터링.

성능 고려

Subscription 동시 수:
  - WebSocket 연결 = OS 한계 (수만)
  - 메모리 사용 = 구독당 ~수 KB
  - Backpressure = 버퍼 크기 신중

확장:
  - 단일 서버 한계 시 → 수평 확장 + Pub/Sub
  - Sticky session 또는 Pub/Sub

여기서 정말 중요한 시험 함정 — 수만 동시 구독 = WebFlux Virtual Thread 권장. Platform Thread 한계.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 3편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Subscription = 실시간 양방향 통신
  • Schema — type Subscription { ... }
  • Spring — @SubscriptionMapping + Flux 반환
  • 자동 — WebSocket·직렬화·연결 관리
  • Sinks — Multicast (일반)·Replay (과거 포함)·Unicast (1:1)
  • Backpressure — onBackpressureBuffer·directBestEffort
  • WebSocket = 양방향 (ws://)
  • graphql-ws 프로토콜 = 표준 (subscriptions-transport-ws deprecated)
  • SSE 대안 = 단방향 (HTTP 친화·방화벽)
  • 인자 + filter — 특정 사용자·방·카테고리만
  • 인증 필수@PreAuthorize·@AuthenticationPrincipal
  • Pub/Sub 통합 — Redis (간단·휘발) / Kafka (영속·확장)
  • 다중 서버 환경 = Pub/Sub 필수
  • 메모리만 X
  • 종료 처리 — doOnCancel·doOnTerminate
  • Filter·Transform — Reactor 연산자 자유
  • 채팅 패턴Map<roomId, Sinks> + 방별 구독·발행
  • 활성 구독 메트릭 — doOnSubscribe·doFinally
  • 수만 동시 = Virtual Thread 권장
  • 확장 — 수평 + Pub/Sub

시리즈 다른 편

공식 문서: Spring GraphQL Subscriptions / graphql-ws Protocol 에서 더 깊이.

다음 글(4편)에서는 Spring for GraphQL — @SchemaMapping·자동 매핑·Argument·BatchMapping까지 풀어 갑니다.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!