리액티브 레디스 — Pub/Sub·WebSocket·실시간 통신

2026-05-03확률과 통계 마스터 노트

리액티브 레디스 마스터 노트 시리즈 6편. Redis Pub/Sub의 발행-구독 메커니즘, fire-and-forget 특성과 메시지 보장 한계, ReactiveRedisTemplate의 listenTo·convertAndSend 사용법, WebSocket과 결합한 실시간 채팅 구현, 트렌딩 서비스 패턴(ZSet 점수 갱신 + 주기 발행), Redis Streams가 메시지 영속화로 Pub/Sub 한계 푸는 방식까지.

이 글은 리액티브 레디스 마스터 노트 시리즈의 여섯 번째 편입니다. 1~5편이 키-값 처리였다면, 이번엔 실시간 통신 — Redis Pub/Sub과 WebSocket 결합.

채팅·알림·트렌딩 서비스가 모두 Pub/Sub의 영역. 다만 Pub/Sub은 fire-and-forget — 구독 안 한 시점 메시지는 영원히 사라집니다. 이 한계와 Redis Streams가 어떻게 푸는지가 이번 편의 핵심.

처음 Pub/Sub이 어렵게 느껴지는 이유

처음 이 단원이 어렵게 느껴지는 이유는 두 가지예요. 첫째, Kafka·RabbitMQ와 뭐가 다른지 막연합니다. 모두 메시징인데? 둘째, WebSocket과 결합 시 흐름이 복잡합니다. 클라이언트 → 서버 → Redis → 다른 서버 → 클라이언트.

해결법은 한 가지예요. "Redis Pub/Sub = 휘발성 방송" 한 줄. 라디오 방송처럼 — 켜둔 사람만 듣고, 나중에 켜면 못 들음. 이 그림이 잡히면 Streams의 등장이 자연스럽습니다.

Redis Pub/Sub — 휘발성 방송

Publisher → PUBLISH channel:notifications "new message"
                ↓
Redis → 모든 SUBSCRIBE channel:notifications 클라이언트에 즉시 전달
                ↓
Subscriber A, B, C 받음

핵심:

  • Fire-and-forget — 발행 후 보존 X
  • At-most-once — 구독 안 한 시점 메시지 손실
  • 다중 구독 — N 구독자에게 동시 전달

여기서 정말 중요한 시험 함정 — Pub/Sub은 메시지 큐 X. 메시지 영속화 안 됨. 구독자 다운 → 그 시간 메시지 영원히 손실. 보장이 필요하면 Kafka 또는 Redis Streams.

Pub/Sub vs Kafka vs Streams

구분 Pub/Sub Kafka Redis Streams
보존 X 길게 길게
보장 At-most-once At-least-once At-least-once
처리량 높음 매우 높음 높음
컨슈머 그룹 X O O
메시지 ID X offset timestamp-seq
사용처 휘발 알림·실시간 영속 이벤트 가벼운 이벤트 큐

ReactiveRedisTemplate Pub/Sub

구독

@Service
public class NotificationListener {

    @Autowired
    ReactiveRedisTemplate<String, String> redisTemplate;

    @PostConstruct
    public void listen() {
        redisTemplate.listenToChannel("notifications")
            .map(ReactiveSubscription.Message::getMessage)
            .doOnNext(msg -> log.info("Received: {}", msg))
            .subscribe();
    }
}

발행

@Service
public class NotificationPublisher {

    @Autowired
    ReactiveRedisTemplate<String, String> redisTemplate;

    public Mono<Long> publish(String channel, String message) {
        return redisTemplate.convertAndSend(channel, message);
    }
}

convertAndSend 반환 = 받은 구독자 수.

Pattern Subscribe

// 패턴 구독 — channel:user:* 모두
redisTemplate.listenToPattern("channel:user:*")
    .doOnNext(msg -> {
        String channel = msg.getChannel();   // 실제 채널명
        String body = msg.getMessage();
        // ...
    })
    .subscribe();
> PSUBSCRIBE channel:user:*
> PUBLISH channel:user:1 "hello"   # 위 구독자가 받음
> PUBLISH channel:user:2 "hi"      # 위 구독자가 받음

여기서 시험 함정이 하나 있어요. 패턴 구독은 부하 ↑. 모든 PUBLISH가 패턴 매칭 검사. 와일드카드 너무 광범위 X.

WebSocket 채팅 구현

큰 그림

[Client A] ←WebSocket→ [Server 1] ←Redis Pub/Sub→ [Server 2] ←WebSocket→ [Client B]

여러 서버 인스턴스에 걸쳐 채팅 전달. 각 서버는 자기 클라이언트에만 직접 WebSocket, 다른 서버 클라이언트는 Pub/Sub으로 전달받아.

Spring WebFlux WebSocket

@Component
public class ChatHandler implements WebSocketHandler {

    @Autowired
    ReactiveRedisTemplate<String, String> redisTemplate;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String roomId = extractRoomId(session);

        // 1. 입력: 클라이언트 → Redis Pub
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .flatMap(msg -> redisTemplate.convertAndSend("chat:" + roomId, msg))
            .then();

        // 2. 출력: Redis Sub → 클라이언트
        Mono<Void> output = session.send(
            redisTemplate.listenToChannel("chat:" + roomId)
                .map(ReactiveSubscription.Message::getMessage)
                .map(session::textMessage)
        );

        return Mono.zip(input, output).then();
    }
}

핵심 — Mono.zip으로 입력·출력 동시. 한쪽 끊기면 모두 종료.

트렌딩 서비스 패턴

실시간 인기 게시물:

// 1. 사용자 액션마다 ZSet 점수 ↑
public Mono<Double> incrementTrending(String postId) {
    return redisTemplate.opsForZSet()
        .incrementScore("trending:posts", postId, 1.0);
}

// 2. 주기적으로 상위 10개 발행 (스케줄러)
@Scheduled(fixedDelay = 5000)
public void publishTrending() {
    redisTemplate.opsForZSet()
        .reverseRangeWithScores("trending:posts", Range.closed(0L, 9L))
        .collectList()
        .map(this::serializeRanking)
        .flatMap(ranking ->
            redisTemplate.convertAndSend("trending:updates", ranking))
        .subscribe();
}

// 3. WebSocket 클라이언트가 Pub/Sub 받아 즉시 갱신

여기서 정말 중요한 시험 함정 — ZSet + Pub/Sub 결합 = Redis 실시간 시스템 표준. 점수는 영속(ZSet), 알림은 휘발(Pub/Sub). 둘 보완.

시간 윈도우 트렌딩

// 시간 = 점수 (오래된 항목 자연스럽게 밀려남)
opsForZSet().add("trending:" + windowKey(), postId, currentTime);

// 윈도우 윈도우 밖 제거
opsForZSet().removeRangeByScore(
    "trending:" + windowKey(),
    Range.leftUnbounded(Range.Bound.exclusive((double)(currentTime - WINDOW_MS)))
);

ZSet 자체로 시간 기반 만료.

Redis Streams — 영속 메시지 큐

Pub/Sub의 한계를 푸는 도구. Kafka 같은 메시지 보존.

명령

# 메시지 추가
> XADD events:user-1 * action login user_id 1
"1234567890-0"   # 메시지 ID (timestamp-seq)

# 조회
> XRANGE events:user-1 - +

# 컨슈머 그룹 생성
> XGROUP CREATE events:user-1 mygroup $

# 그룹으로 읽기
> XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS events:user-1 >

# ack
> XACK events:user-1 mygroup 1234567890-0

Reactive Streams 사용

ReactiveStreamOperations<String, String, String> ops = redisTemplate.opsForStream();

// 발행
ops.add("events", Map.of("type", "login", "user_id", "1")).subscribe();

// 구독 (계속 대기)
StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(5));

ops.read(Consumer.from("mygroup", "consumer-1"),
        options,
        StreamOffset.create("events", ReadOffset.lastConsumed()))
    .doOnNext(record -> {
        // 처리
        ops.acknowledge("events", "mygroup", record.getId()).subscribe();
    })
    .subscribe();

Pub/Sub vs Streams 선택

상황 선택
휘발 알림 (sound.ring) Pub/Sub
채팅 메시지 (영속) Streams
이벤트 로그 Streams
실시간 대시보드 Pub/Sub

여기서 정말 중요한 시험 함정 — 메시지 보장 필요 = Streams. Pub/Sub은 빠르지만 손실 가능. 무엇을 잃어도 되나? 잃으면 안 되나? 결정 기준.

WebFlux + Streams 채팅

public Mono<Void> handle(WebSocketSession session) {
    String roomId = extractRoomId(session);
    String streamKey = "chat:" + roomId;

    Mono<Void> input = session.receive()
        .flatMap(msg -> redisTemplate.opsForStream()
            .add(streamKey, Map.of("body", msg.getPayloadAsText())))
        .then();

    Mono<Void> output = session.send(
        redisTemplate.opsForStream()
            .read(StreamOffset.create(streamKey, ReadOffset.from("0")))
            .map(record -> record.getValue().get("body"))
            .map(session::textMessage)
    );

    return Mono.zip(input, output).then();
}

채팅 이력 영속 + 늦게 들어와도 모든 메시지 받음.

그룹 알림 vs 1:1

// 그룹 알림 (모두 받음) — Pub/Sub
publish("group:1:notifications", message);

// 1:1 (대상자만) — Streams + 컨슈머 그룹
streams.add("user:" + targetUserId + ":notifications", message);

Redis Sentinel·Cluster에서 Pub/Sub

여기서 시험 함정이 하나 있어요. Cluster 환경 Pub/Sub는 노드 사이 메시지 전달. 모든 노드에 broadcast → 메시지 양 ↑. Pattern subscribe는 더 비싸. Cluster 큰 환경엔 Streams 또는 외부 메시지 큐.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 6편의 핵심입니다. 시험 직전 또는 실무에서 헷갈릴 때 다시 펼쳐 볼 수 있게 압축 노트로 마무리할게요.

  • Pub/Sub = 휘발성 방송, fire-and-forget
  • 메시지 보존 X, 구독 안 한 시점 손실
  • At-most-once 보장
  • 다중 구독 자연스러움
  • Pub/Sub vs Kafka vs Streams — 보존·보장·컨슈머 그룹 차이
  • ReactiveRedisTemplate — listenToChannel / convertAndSend
  • Pattern Subscribe (PSUBSCRIBE) — 와일드카드, 부하 ↑
  • WebSocket + Redis Pub/Sub = 여러 서버 사이 실시간 전달
  • WebFlux 핸들러 — Mono.zip(input, output)
  • 한쪽 끊기면 둘 다 종료
  • 트렌딩 서비스 = ZSet 점수 + Pub/Sub 발행
  • 점수 영속·알림 휘발
  • Redis Streams = 영속 메시지 큐
  • XADD·XRANGE·XGROUP·XREADGROUP·XACK
  • ID = timestamp-seq
  • 컨슈머 그룹 (Kafka 비슷)
  • Streams 사용 = 메시지 보장 필요
  • Pub/Sub = 휘발 OK / Streams = 잃으면 안 되는 것
  • 채팅(영속) = Streams / 알림(휘발) = Pub/Sub
  • Cluster Pub/Sub은 노드 broadcast → 부하 ↑

시리즈 다른 편

공식 문서: Redis Pub/Sub / Redis Streams 에서 더 깊이.

다음 글(7편, 마지막)에서는 고급 주제 — Transaction(MULTI/EXEC), Persistence(RDB·AOF), GeoSpatial, ACL 인증까지 시리즈 마무리.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!