Reactive GraphQL 마스터 노트 시리즈 3편. Subscription이 실시간 양방향 통신 도구인 이유, WebSocket vs SSE 전송 선택, Spring GraphQL의 Flux 반환, Sinks로 메시지 발행, Pub/Sub 패턴 통합(Redis·Kafka), graphql-ws 프로토콜, 채팅·라이브 알림 구현까지.
이 글은 Reactive GraphQL 마스터 노트 시리즈의 세 번째 편입니다. 2편(Query/Mutation)에서 단방향이었다면, 이번엔 양방향 — Subscription.
채팅·라이브 알림·실시간 시세에. WebSocket 위 Reactive Stream. Flux 반환만으로 자동 처리.
처음 Subscription이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, WebSocket 설정이 막연합니다. 둘째, Pub/Sub 패턴이 어떻게 통합되는지 안 보입니다.
해결법은 한 가지예요. "Subscription = Flux 반환" 한 줄. Spring GraphQL이 자동 처리. WebSocket·이벤트 발행·구독 모두 추상화.
Subscription Schema
type Subscription {
postCreated: Post!
userMessageReceived(userId: ID!): Message!
stockPriceChanged(symbol: String!): Stock!
}
Subscription 타입에 정의. 인자 가능.
Spring GraphQL Subscription
@Controller
public class PostController {
private final Sinks.Many<Post> sink = Sinks.many().multicast().onBackpressureBuffer();
@SubscriptionMapping
public Flux<Post> postCreated() {
return sink.asFlux();
}
@MutationMapping
public Post createPost(@Argument CreatePostInput input) {
Post post = postService.create(input);
sink.tryEmitNext(post); # 모든 구독자에게 발행
return post;
}
}
핵심:
@SubscriptionMapping— 구독 핸들러- Flux 반환 — 자동 스트리밍
- Sinks — 발행 채널
여기서 정말 중요한 시험 함정 — Subscription = Flux 반환만. 나머지 자동. WebSocket·메시지 직렬화·연결 관리 모두 Spring GraphQL이.
Sinks 종류
// Multicast — 여러 구독자 동시 (기본)
Sinks.Many<Post> sink = Sinks.many().multicast().onBackpressureBuffer();
// Replay — 새 구독자에게 과거 데이터도
Sinks.Many<Post> sink = Sinks.many().replay().limit(100);
// Unicast — 한 구독자만
Sinks.Many<Post> sink = Sinks.many().unicast().onBackpressureBuffer();
여기서 시험 함정이 하나 있어요. Multicast = 일반 권장. 채팅·알림 등. Unicast는 1:1.
Backpressure
Sinks.many().multicast().onBackpressureBuffer() # 큐 버퍼
Sinks.many().multicast().onBackpressureBuffer(1000) # 한도 1000
Sinks.many().multicast().directBestEffort() # 손실 가능
구독자 처리 못 따라가면? 버퍼 vs 손실.
WebSocket 활성
spring:
graphql:
websocket:
path: /graphql
자동 활성. ws://localhost:8080/graphql 접속.
# graphql-ws 클라이언트 (TypeScript)
import { createClient } from 'graphql-ws';
const client = createClient({
url: 'ws://localhost:8080/graphql',
});
client.subscribe(
{ query: 'subscription { postCreated { id title } }' },
{ next: data => console.log(data) }
);
graphql-ws 프로토콜
표준 GraphQL over WebSocket. 4 메시지:
ConnectionInit — 연결 시작·인증
ConnectionAck — 서버 인증 완료
Subscribe — 구독 시작
Next — 데이터 스트림
Complete — 구독 종료
여기서 시험 함정이 하나 있어요. subscriptions-transport-ws (옛) vs graphql-ws (신). 구버전은 deprecated. 새 프로젝트 = graphql-ws.
SSE — 단방향 (대안)
spring:
graphql:
http:
path: /graphql
sse:
path: /graphql/sse
Server-Sent Events. 단방향 (서버 → 클라이언트). WebSocket보다 단순.
const eventSource = new EventSource('/graphql/sse');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
};
여기서 시험 함정이 하나 있어요. SSE = HTTP 기반·단방향. WebSocket보다 친화적 (방화벽). 다만 단방향.
인자 + Subscription
type Subscription {
userMessageReceived(userId: ID!): Message!
}
@SubscriptionMapping
public Flux<Message> userMessageReceived(@Argument String userId) {
return messageSink.asFlux()
.filter(msg -> msg.getUserId().equals(userId));
}
특정 사용자 메시지만 필터.
인증·권한
@SubscriptionMapping
@PreAuthorize("isAuthenticated()")
public Flux<Notification> notifications() {
return notificationSink.asFlux()
.filter(n -> n.getUserId().equals(getCurrentUserId()));
}
여기서 정말 중요한 시험 함정 — Subscription도 인증 필수. SETUP 시 인증·각 메시지 검증. 6편에서 자세히.
Pub/Sub 통합 — Redis
@Service
public class RedisPostPubSub {
@Autowired
private ReactiveRedisTemplate<String, Post> redisTemplate;
public void publish(Post post) {
redisTemplate.convertAndSend("posts", post).subscribe();
}
public Flux<Post> subscribe() {
return redisTemplate.listenToChannel("posts")
.map(ReactiveSubscription.Message::getMessage);
}
}
@Controller
public class PostController {
@Autowired
private RedisPostPubSub pubSub;
@SubscriptionMapping
public Flux<Post> postCreated() {
return pubSub.subscribe();
}
@MutationMapping
public Post createPost(@Argument CreatePostInput input) {
Post post = postService.create(input);
pubSub.publish(post);
return post;
}
}
여러 GraphQL 서버 인스턴스 → Redis Pub/Sub → 모든 인스턴스의 구독자에게 전달.
Pub/Sub 통합 — Kafka
@KafkaListener(topics = "post-events")
public void onPost(PostEvent event) {
sink.tryEmitNext(event.toPost());
}
@MutationMapping
public Post createPost(@Argument CreatePostInput input) {
Post post = postService.create(input);
kafkaTemplate.send("post-events", PostEvent.from(post));
return post;
}
영속·확장성·다중 구독자.
여기서 시험 함정이 하나 있어요. Subscription의 Pub/Sub 백엔드 = 단일 인스턴스 메모리 X. Redis·Kafka 같은 외부 시스템. 여러 서버 환경 필수.
종료 처리
@SubscriptionMapping
public Flux<Post> postCreated() {
return sink.asFlux()
.doOnCancel(() -> log.info("Subscription cancelled"))
.doOnTerminate(() -> log.info("Subscription terminated"));
}
클라이언트 연결 끊김·취소 = 자동 정리.
Filter·Transform
@SubscriptionMapping
public Flux<Post> postCreated(@Argument String category) {
return sink.asFlux()
.filter(post -> post.getCategory().equals(category))
.map(this::sanitize)
.take(Duration.ofMinutes(30));
}
filter·map·take 등 Reactor 연산자 자유롭게.
채팅 — 실전 예시
type Subscription {
messageReceived(roomId: ID!): Message!
}
type Mutation {
sendMessage(roomId: ID!, text: String!): Message!
}
@Controller
public class ChatController {
private final Map<String, Sinks.Many<Message>> roomSinks = new ConcurrentHashMap<>();
@SubscriptionMapping
public Flux<Message> messageReceived(@Argument String roomId) {
return getOrCreateSink(roomId).asFlux();
}
@MutationMapping
public Message sendMessage(
@Argument String roomId,
@Argument String text,
@AuthenticationPrincipal UserDetails user
) {
Message msg = new Message(user.getUsername(), text, Instant.now());
getOrCreateSink(roomId).tryEmitNext(msg);
return msg;
}
private Sinks.Many<Message> getOrCreateSink(String roomId) {
return roomSinks.computeIfAbsent(roomId,
k -> Sinks.many().multicast().onBackpressureBuffer());
}
}
방별 구독·발행. 다중 방 채팅.
모니터링
@SubscriptionMapping
public Flux<Notification> notifications() {
return sink.asFlux()
.doOnSubscribe(s -> activeSubs.incrementAndGet())
.doFinally(s -> activeSubs.decrementAndGet());
}
@Bean
public MeterBinder activeSubsMetric() {
return registry -> Gauge.builder("graphql.subscriptions.active", activeSubs::get)
.register(registry);
}
활성 구독 수·발행 비율 모니터링.
성능 고려
Subscription 동시 수:
- WebSocket 연결 = OS 한계 (수만)
- 메모리 사용 = 구독당 ~수 KB
- Backpressure = 버퍼 크기 신중
확장:
- 단일 서버 한계 시 → 수평 확장 + Pub/Sub
- Sticky session 또는 Pub/Sub
여기서 정말 중요한 시험 함정 — 수만 동시 구독 = WebFlux Virtual Thread 권장. Platform Thread 한계.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 3편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Subscription = 실시간 양방향 통신
- Schema —
type Subscription { ... } - Spring —
@SubscriptionMapping+ Flux 반환 - 자동 — WebSocket·직렬화·연결 관리
- Sinks — Multicast (일반)·Replay (과거 포함)·Unicast (1:1)
- Backpressure —
onBackpressureBuffer·directBestEffort - WebSocket = 양방향 (
ws://) - graphql-ws 프로토콜 = 표준 (subscriptions-transport-ws deprecated)
- SSE 대안 = 단방향 (HTTP 친화·방화벽)
- 인자 + filter — 특정 사용자·방·카테고리만
- 인증 필수 —
@PreAuthorize·@AuthenticationPrincipal - Pub/Sub 통합 — Redis (간단·휘발) / Kafka (영속·확장)
- 다중 서버 환경 = Pub/Sub 필수
- 메모리만 X
- 종료 처리 —
doOnCancel·doOnTerminate - Filter·Transform — Reactor 연산자 자유
- 채팅 패턴 —
Map<roomId, Sinks>+ 방별 구독·발행 - 활성 구독 메트릭 —
doOnSubscribe·doFinally - 수만 동시 = Virtual Thread 권장
- 확장 — 수평 + Pub/Sub
시리즈 다른 편
- 1편 — 기본 개념·Schema
- 2편 — Query·Mutation·Variables
- 3편 — Subscription·실시간 구독 (현재 글)
- 4편 — Spring for GraphQL
- 5편 — Reactive GraphQL
- 6편 — Security·Testing
- 7편 — 고급 (DataLoader·Federation·운영)
공식 문서: Spring GraphQL Subscriptions / graphql-ws Protocol 에서 더 깊이.
다음 글(4편)에서는 Spring for GraphQL — @SchemaMapping·자동 매핑·Argument·BatchMapping까지 풀어 갑니다.