리액티브 레디스 마스터 노트 시리즈 6편. Redis Pub/Sub의 발행-구독 메커니즘, fire-and-forget 특성과 메시지 보장 한계, ReactiveRedisTemplate의 listenTo·convertAndSend 사용법, WebSocket과 결합한 실시간 채팅 구현, 트렌딩 서비스 패턴(ZSet 점수 갱신 + 주기 발행), Redis Streams가 메시지 영속화로 Pub/Sub 한계 푸는 방식까지.
이 글은 리액티브 레디스 마스터 노트 시리즈의 여섯 번째 편입니다. 1~5편이 키-값 처리였다면, 이번엔 실시간 통신 — Redis Pub/Sub과 WebSocket 결합.
채팅·알림·트렌딩 서비스가 모두 Pub/Sub의 영역. 다만 Pub/Sub은 fire-and-forget — 구독 안 한 시점 메시지는 영원히 사라집니다. 이 한계와 Redis Streams가 어떻게 푸는지가 이번 편의 핵심.
처음 Pub/Sub이 어렵게 느껴지는 이유
처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Kafka·RabbitMQ와 뭐가 다른지 막연합니다. 모두 메시징인데? 둘째, WebSocket과 결합 시 흐름이 복잡합니다. 클라이언트 → 서버 → Redis → 다른 서버 → 클라이언트.
해결법은 한 가지예요. "Redis Pub/Sub = 휘발성 방송" 한 줄. 라디오 방송처럼 — 켜둔 사람만 듣고, 나중에 켜면 못 들음. 이 그림이 잡히면 Streams의 등장이 자연스럽습니다.
Redis Pub/Sub — 휘발성 방송
Publisher → PUBLISH channel:notifications "new message"
↓
Redis → 모든 SUBSCRIBE channel:notifications 클라이언트에 즉시 전달
↓
Subscriber A, B, C 받음
핵심:
- Fire-and-forget — 발행 후 보존 X
- At-most-once — 구독 안 한 시점 메시지 손실
- 다중 구독 — N 구독자에게 동시 전달
여기서 정말 중요한 시험 함정 — Pub/Sub은 메시지 큐 X. 메시지 영속화 안 됨. 구독자 다운 → 그 시간 메시지 영원히 손실. 보장이 필요하면 Kafka 또는 Redis Streams.
Pub/Sub vs Kafka vs Streams
| 구분 | Pub/Sub | Kafka | Redis Streams |
|---|---|---|---|
| 보존 | X | 길게 | 길게 |
| 보장 | At-most-once | At-least-once | At-least-once |
| 처리량 | 높음 | 매우 높음 | 높음 |
| 컨슈머 그룹 | X | O | O |
| 메시지 ID | X | offset | timestamp-seq |
| 사용처 | 휘발 알림·실시간 | 영속 이벤트 | 가벼운 이벤트 큐 |
ReactiveRedisTemplate Pub/Sub
구독
@Service
public class NotificationListener {
@Autowired
ReactiveRedisTemplate<String, String> redisTemplate;
@PostConstruct
public void listen() {
redisTemplate.listenToChannel("notifications")
.map(ReactiveSubscription.Message::getMessage)
.doOnNext(msg -> log.info("Received: {}", msg))
.subscribe();
}
}
발행
@Service
public class NotificationPublisher {
@Autowired
ReactiveRedisTemplate<String, String> redisTemplate;
public Mono<Long> publish(String channel, String message) {
return redisTemplate.convertAndSend(channel, message);
}
}
convertAndSend 반환 = 받은 구독자 수.
Pattern Subscribe
// 패턴 구독 — channel:user:* 모두
redisTemplate.listenToPattern("channel:user:*")
.doOnNext(msg -> {
String channel = msg.getChannel(); // 실제 채널명
String body = msg.getMessage();
// ...
})
.subscribe();
> PSUBSCRIBE channel:user:*
> PUBLISH channel:user:1 "hello" # 위 구독자가 받음
> PUBLISH channel:user:2 "hi" # 위 구독자가 받음
여기서 시험 함정이 하나 있어요. 패턴 구독은 부하 ↑. 모든 PUBLISH가 패턴 매칭 검사. 와일드카드 너무 광범위 X.
WebSocket 채팅 구현
큰 그림
[Client A] ←WebSocket→ [Server 1] ←Redis Pub/Sub→ [Server 2] ←WebSocket→ [Client B]
여러 서버 인스턴스에 걸쳐 채팅 전달. 각 서버는 자기 클라이언트에만 직접 WebSocket, 다른 서버 클라이언트는 Pub/Sub으로 전달받아.
Spring WebFlux WebSocket
@Component
public class ChatHandler implements WebSocketHandler {
@Autowired
ReactiveRedisTemplate<String, String> redisTemplate;
@Override
public Mono<Void> handle(WebSocketSession session) {
String roomId = extractRoomId(session);
// 1. 입력: 클라이언트 → Redis Pub
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(msg -> redisTemplate.convertAndSend("chat:" + roomId, msg))
.then();
// 2. 출력: Redis Sub → 클라이언트
Mono<Void> output = session.send(
redisTemplate.listenToChannel("chat:" + roomId)
.map(ReactiveSubscription.Message::getMessage)
.map(session::textMessage)
);
return Mono.zip(input, output).then();
}
}
핵심 — Mono.zip으로 입력·출력 동시. 한쪽 끊기면 모두 종료.
트렌딩 서비스 패턴
실시간 인기 게시물:
// 1. 사용자 액션마다 ZSet 점수 ↑
public Mono<Double> incrementTrending(String postId) {
return redisTemplate.opsForZSet()
.incrementScore("trending:posts", postId, 1.0);
}
// 2. 주기적으로 상위 10개 발행 (스케줄러)
@Scheduled(fixedDelay = 5000)
public void publishTrending() {
redisTemplate.opsForZSet()
.reverseRangeWithScores("trending:posts", Range.closed(0L, 9L))
.collectList()
.map(this::serializeRanking)
.flatMap(ranking ->
redisTemplate.convertAndSend("trending:updates", ranking))
.subscribe();
}
// 3. WebSocket 클라이언트가 Pub/Sub 받아 즉시 갱신
여기서 정말 중요한 시험 함정 — ZSet + Pub/Sub 결합 = Redis 실시간 시스템 표준. 점수는 영속(ZSet), 알림은 휘발(Pub/Sub). 둘 보완.
시간 윈도우 트렌딩
// 시간 = 점수 (오래된 항목 자연스럽게 밀려남)
opsForZSet().add("trending:" + windowKey(), postId, currentTime);
// 윈도우 윈도우 밖 제거
opsForZSet().removeRangeByScore(
"trending:" + windowKey(),
Range.leftUnbounded(Range.Bound.exclusive((double)(currentTime - WINDOW_MS)))
);
ZSet 자체로 시간 기반 만료.
Redis Streams — 영속 메시지 큐
Pub/Sub의 한계를 푸는 도구. Kafka 같은 메시지 보존.
명령
# 메시지 추가
> XADD events:user-1 * action login user_id 1
"1234567890-0" # 메시지 ID (timestamp-seq)
# 조회
> XRANGE events:user-1 - +
# 컨슈머 그룹 생성
> XGROUP CREATE events:user-1 mygroup $
# 그룹으로 읽기
> XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS events:user-1 >
# ack
> XACK events:user-1 mygroup 1234567890-0
Reactive Streams 사용
ReactiveStreamOperations<String, String, String> ops = redisTemplate.opsForStream();
// 발행
ops.add("events", Map.of("type", "login", "user_id", "1")).subscribe();
// 구독 (계속 대기)
StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(5));
ops.read(Consumer.from("mygroup", "consumer-1"),
options,
StreamOffset.create("events", ReadOffset.lastConsumed()))
.doOnNext(record -> {
// 처리
ops.acknowledge("events", "mygroup", record.getId()).subscribe();
})
.subscribe();
Pub/Sub vs Streams 선택
| 상황 | 선택 |
|---|---|
| 휘발 알림 (sound.ring) | Pub/Sub |
| 채팅 메시지 (영속) | Streams |
| 이벤트 로그 | Streams |
| 실시간 대시보드 | Pub/Sub |
여기서 정말 중요한 시험 함정 — 메시지 보장 필요 = Streams. Pub/Sub은 빠르지만 손실 가능. 무엇을 잃어도 되나? 잃으면 안 되나? 결정 기준.
WebFlux + Streams 채팅
public Mono<Void> handle(WebSocketSession session) {
String roomId = extractRoomId(session);
String streamKey = "chat:" + roomId;
Mono<Void> input = session.receive()
.flatMap(msg -> redisTemplate.opsForStream()
.add(streamKey, Map.of("body", msg.getPayloadAsText())))
.then();
Mono<Void> output = session.send(
redisTemplate.opsForStream()
.read(StreamOffset.create(streamKey, ReadOffset.from("0")))
.map(record -> record.getValue().get("body"))
.map(session::textMessage)
);
return Mono.zip(input, output).then();
}
채팅 이력 영속 + 늦게 들어와도 모든 메시지 받음.
그룹 알림 vs 1:1
// 그룹 알림 (모두 받음) — Pub/Sub
publish("group:1:notifications", message);
// 1:1 (대상자만) — Streams + 컨슈머 그룹
streams.add("user:" + targetUserId + ":notifications", message);
Redis Sentinel·Cluster에서 Pub/Sub
여기서 시험 함정이 하나 있어요. Cluster 환경 Pub/Sub는 노드 사이 메시지 전달. 모든 노드에 broadcast → 메시지 양 ↑. Pattern subscribe는 더 비싸. Cluster 큰 환경엔 Streams 또는 외부 메시지 큐.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.
- Pub/Sub = 휘발성 방송, fire-and-forget
- 메시지 보존 X, 구독 안 한 시점 손실
- At-most-once 보장
- 다중 구독 자연스러움
- Pub/Sub vs Kafka vs Streams — 보존·보장·컨슈머 그룹 차이
- ReactiveRedisTemplate —
listenToChannel/convertAndSend - Pattern Subscribe (
PSUBSCRIBE) — 와일드카드, 부하 ↑ - WebSocket + Redis Pub/Sub = 여러 서버 사이 실시간 전달
- WebFlux 핸들러 —
Mono.zip(input, output) - 한쪽 끊기면 둘 다 종료
- 트렌딩 서비스 = ZSet 점수 + Pub/Sub 발행
- 점수 영속·알림 휘발
- Redis Streams = 영속 메시지 큐
- XADD·XRANGE·XGROUP·XREADGROUP·XACK
- ID = timestamp-seq
- 컨슈머 그룹 (Kafka 비슷)
- Streams 사용 = 메시지 보장 필요
- Pub/Sub = 휘발 OK / Streams = 잃으면 안 되는 것
- 채팅(영속) = Streams / 알림(휘발) = Pub/Sub
- Cluster Pub/Sub은 노드 broadcast → 부하 ↑
시리즈 다른 편
- 1편 — Redis 기본·Spring Boot 연동
- 2편 — Template·Redisson·Serializer
- 3편 — 자료구조 5종
- 4편 — WebFlux 캐싱
- 5편 — 성능
- 6편 — Pub/Sub·WebSocket (현재 글)
- 7편 — 고급 (Transaction·Persistence·GeoSpatial·ACL)
공식 문서: Redis Pub/Sub / Redis Streams 에서 더 깊이.
다음 글(7편, 마지막)에서는 고급 주제 — Transaction(MULTI/EXEC), Persistence(RDB·AOF), GeoSpatial, ACL 인증까지 시리즈 마무리.