Redis Pub/Sub & Streams — 실시간과 영속의 차이

2026-05-02AWS SAA-C03 스터디

Redis 핵심 정리 시리즈 6편. Pub/Sub와 Streams를 라디오 방송·사내 게시판 비유로 풀어가며 — SUBSCRIBE·PUBLISH 기본기, 패턴 구독, Streams 자료구조와 XADD·XREAD·XRANGE, Consumer Group으로 작업 분담, XACK·XPENDING·XAUTOCLAIM, 메시지 직렬화, 스트림 트림, Pub/Sub vs Streams 선택 가이드까지 처음 보는 사람도 따라올 수 있게 친절하게 풀어쓴 6편.

📚 Redis 핵심 정리 · 6편 / 14편 — 실시간과 영속의 차이

이 글은 Redis 핵심 정리 시리즈의 여섯 번째 편입니다. 5편에서 캐싱 패턴을 풀었다면, 6편은 Redis의 메시지 전달 기능 두 가지 — Pub/Sub와 Streams입니다. 이름은 비슷한 듯한데 동작은 정반대 영역에 있어요. 한쪽은 "지금 듣고 있지 않으면 끝", 다른 쪽은 "스크롤해서 과거까지 다시 볼 수 있는" 자료구조입니다.

이 6편에서는 라디오 방송 비유(Pub/Sub)사내 게시판 비유(Streams) 두 갈래로 풀어 가요. 둘의 차이를 머리에 박아 두면 "어느 자리에 어느 도구를 써야 할지" 결정이 빨라집니다. Consumer Group으로 작업을 분담하는 방법, ACK 누락 함정까지 한 번에 정리해 둡니다.

왜 메시지 전달 기능이 처음엔 헷갈릴까요

이유는 네 가지예요.

첫째, Pub/Sub와 Streams가 둘 다 "메시지를 보낸다"고만 들리면 헷갈립니다. Pub/Sub는 메시지가 흘러가 버리고, Streams는 자료구조처럼 쌓이는데 — 이 차이가 처음엔 직관에 안 옵니다.

둘째, Consumer Group이 등장하면 머리가 한 번 더 꼬여요. "여러 명이 같은 채널을 보면 모두 받는다 vs 그룹에선 한 명만 받는다" — 이 미묘한 동작 차이가 자리잡기까지 시간이 걸립니다.

셋째, $·>·0 같은 메시지 ID 기호가 한꺼번에 나옵니다. XREAD에서 $는 "지금부터 새 메시지", 0은 "처음부터"를 뜻하는데 — 두 가지가 같이 나오면 어느 쪽이 어느 쪽인지 헷갈려요.

넷째, ACK 개념이 처음 보면 어색해요. "메시지를 처리했다는 신호를 따로 보낸다"는 게 일반 함수 호출과 다른 흐름이라 자리잡는 데 시간이 걸립니다.

해결법은 한 가지예요. 두 가지 비유를 머리에 박는 것 — Pub/Sub은 라디오 방송(지금 켜둔 라디오만 들음, 안 듣고 지나가면 끝), Streams는 사내 게시판(글이 쌓이고 스크롤해 과거까지 다시 봄)으로 잡으면 갑자기 명확해집니다. 이 글은 그 비유를 따라 처음부터 풀어 갑니다.

Pub/Sub — 라디오 방송 비유

Redis Pub/Sub(Publish/Subscribe)는 발행자와 구독자를 채널로 연결하는 메시지 브로커 패턴이에요. 발행자(Publisher)는 채널에 메시지를 보내고, 구독자(Subscriber)는 그 채널을 구독해 메시지를 받습니다.

회사 비유로 — 사내 라디오 방송이에요. 방송국(Publisher)이 마이크에 대고 말하면, 그 채널을 켜 둔 직원들(Subscriber)에게만 전파됩니다. 라디오를 안 켜 두고 있던 직원이나, 잠깐 자리를 비운 직원에게는 전파되지 않아요. 한 번 흘러간 방송은 다시 들을 수 없습니다.

발행자(Publisher) → [채널: "news:sports"] → 구독자1
                                           → 구독자2
                                           → 구독자3

여기서 시험 함정이 하나 있어요. Pub/Sub는 "fire and forget" 방식이에요. 메시지가 영속 저장되지 않고, 구독자가 없거나 오프라인이면 그 메시지는 그대로 사라집니다. 그래서 놓쳐도 되는 메시지(실시간 알림, 캐시 무효화 신호)에는 적합하지만, 절대 놓치면 안 되는 메시지(주문, 결제)에는 부적합해요.

Pub/Sub 기본 명령어

구독(Subscribe)

# 특정 채널 구독
SUBSCRIBE channel1 channel2

# 패턴으로 채널 구독 (와일드카드 지원)
PSUBSCRIBE news:*          # news:로 시작하는 모든 채널
PSUBSCRIBE user:*.update   # user:XXX.update 패턴

# 구독 취소
UNSUBSCRIBE channel1
PUNSUBSCRIBE news:*

발행(Publish)

# 채널에 메시지 발행
PUBLISH channel1 "Hello, World!"
# 반환값: 메시지를 수신한 구독자 수 (정수)

PUBLISH news:sports "Team A wins!"
# (integer) 3  → 3명의 구독자가 수신

채널 정보 조회

# 활성 채널 목록 조회 (최소 1명 이상 구독)
PUBSUB CHANNELS
PUBSUB CHANNELS news:*   # 패턴으로 필터링

# 채널의 구독자 수 조회
PUBSUB NUMSUB channel1 channel2

# 패턴 구독자 수 조회
PUBSUB NUMPAT

PUBLISH의 반환값(구독자 수)이 한 가지 힌트가 돼요. 0이 반환됐다면 그 메시지는 아무도 받지 못한 채 사라진 것이라는 뜻입니다. 자세한 명령어 사양은 Redis 공식 Pub/Sub 문서에서 확인할 수 있어요.

TypeScript에서 Pub/Sub 구현

회사 비유로 — Pub/Sub는 방송 송신용 마이크와 수신용 라디오를 분리해야 합니다. 같은 라디오로 방송도 하고 청취도 하면 작동이 꼬여요. Redis 클라이언트도 같은 이유로 발행용·구독용을 분리합니다.

import { createClient } from 'redis';

// Pub/Sub는 별도의 클라이언트 연결 필요
// 일반 명령어 실행과 Pub/Sub는 같은 연결에서 혼용 불가
const publisher = createClient({ ... });
const subscriber = createClient({ ... });

await publisher.connect();
await subscriber.connect();

구독자 구현

// 구독자 설정
async function setupSubscriber() {
    // 채널 구독 및 메시지 핸들러 등록
    await subscriber.subscribe('notifications', (message, channel) => {
        console.log(`Channel: ${channel}, Message: ${message}`);
        const data = JSON.parse(message);
        handleNotification(data);
    });
    
    // 패턴 구독
    await subscriber.pSubscribe('user:*:updates', (message, channel) => {
        const userId = channel.split(':')[1];
        console.log(`User ${userId} update: ${message}`);
    });
}

// 구독 해제
async function cleanup() {
    await subscriber.unsubscribe('notifications');
    await subscriber.pUnsubscribe('user:*:updates');
    await subscriber.disconnect();
}

발행자 구현

// 메시지 발행
async function publishNotification(userId: string, type: string, data: object) {
    const message = JSON.stringify({
        type,
        userId,
        data,
        timestamp: Date.now(),
    });
    
    // 특정 사용자 채널에 발행
    const subscriberCount = await publisher.publish(`user:${userId}:updates`, message);
    console.log(`Message sent to ${subscriberCount} subscribers`);
}

// 브로드캐스트
async function broadcastAnnouncement(announcement: string) {
    await publisher.publish('global:announcements', JSON.stringify({
        text: announcement,
        timestamp: Date.now(),
    }));
}

Pub/Sub의 한계 — 두 가지 결정적 약점

메시지 저장 없음

Pub/Sub는 메시지를 영속 저장하지 않아요. 구독자가 오프라인이거나 처리 속도보다 빠르게 메시지가 들어오면 그 메시지는 사라집니다. 실시간 알림처럼 "놓쳐도 되는" 메시지에는 적합하지만, 중요한 이벤트 처리에는 부적합해요.

구독자 A: 온라인 → 메시지 수신 ✓
구독자 B: 오프라인 → 메시지 손실 ✗
구독자 C: 처리 지연 → 메시지 손실 ✗

소비자 그룹 없음

Pub/Sub는 소비자 그룹을 지원하지 않아요. 동일한 채널을 여러 서버가 구독하면 모든 서버가 동일한 메시지를 받습니다. 작업을 여러 서버에 분담(load balancing)하고 싶으면 Pub/Sub로는 불가능 — Streams로 가야 해요.

기능Pub/SubStreams
메시지 저장XO
메시지 재전송XO
소비자 그룹XO
처리 확인(ACK)XO
속도매우 빠름빠름
사용 사례실시간 알림이벤트 처리

여기까지 따라오셨다면 한 가지 의문이 들 거예요. "그러면 Pub/Sub로는 뭘 하라는 거지?" — 정답은 놓쳐도 되는, 짧은 수명의, 실시간 신호 영역입니다. 캐시 무효화 신호, 실시간 채팅 브로드캐스트, 라이브 대시보드 업데이트 — 이런 자리에 어울려요.

Redis Streams — 사내 게시판 비유

Redis Streams는 Redis 5.0에 추가된 메시지 스트림 자료구조예요. Kafka·RabbitMQ 같은 메시지 큐와 비슷한 기능을 제공하면서 Redis의 단순성을 유지합니다.

회사 비유로 — Streams는 사내 게시판이에요. 글(메시지)이 게시판에 영속적으로 쌓이고, 직원들이 스크롤해 과거 글까지 다시 읽을 수 있습니다. 게시판에 글을 올리는 사람, 게시판을 읽고 처리하는 사람이 따로 있고 — 처리 중인 글에는 "내가 처리 중"이라고 라벨을 붙여 둘 수 있어요.

각 메시지는 고유 ID(타임스탬프-시퀀스) 와 함께 영속 저장되며, 소비자 그룹(Consumer Group)을 통해 메시지를 분산 처리할 수 있습니다. 미처리 메시지를 추적하고 재처리할 수 있어 중요한 이벤트 처리에 적합해요.

자세한 자료구조 사양은 Redis 공식 Streams 문서에서 확인할 수 있어요.

Streams 기본 명령어

메시지 추가 (XADD)

# XADD: 스트림에 메시지 추가
# * 는 Redis가 자동으로 ID 생성 (타임스탬프-시퀀스 형식)
XADD stream:orders * item "laptop" price "1200" userId "123"
# "1699900000000-0" → 타임스탬프-시퀀스 형식의 ID

# 명시적 ID 지정 (직접 관리)
XADD stream:orders 1699900000000-1 item "phone" price "800"

# $ 기호: 현재 시간 이후의 메시지만 읽겠다는 의미
# (XREAD에서 사용, 과거 메시지 무시)

# 스트림 최대 크기 제한
XADD stream:orders MAXLEN 1000 * item "tablet" price "500"
# 메시지가 1000개를 초과하면 가장 오래된 것 삭제

# ~ 옵션: 대략적인 크기 제한 (더 효율적)
XADD stream:orders MAXLEN ~ 1000 * item "monitor" price "400"

여기서 시험 함정이 하나 있어요. ***은 ID 자동 생성, $는 "현재 이후"를 의미**합니다. 둘 다 별표·달러 기호라 헷갈리는데 — 사용 위치가 달라요. *은 XADD에서, $는 XREAD에서 씁니다.

메시지 읽기 (XREAD)

# XREAD: 스트림 메시지 읽기
XREAD COUNT 10 STREAMS stream:orders 0
# 0: 처음부터 읽기
# COUNT 10: 최대 10개

# $ 로 새 메시지만 읽기 (구독처럼 동작)
XREAD COUNT 10 STREAMS stream:orders $

# BLOCK: 새 메시지를 기다리는 블로킹 읽기 (밀리초)
XREAD BLOCK 0 STREAMS stream:orders $
# BLOCK 0: 메시지가 올 때까지 무한 대기
# BLOCK 5000: 5초 대기 후 타임아웃

# 여러 스트림 동시 읽기
XREAD COUNT 10 STREAMS stream:orders stream:payments 0 0

범위 조회 (XRANGE, XREVRANGE)

# XRANGE: 특정 범위의 메시지 조회
XRANGE stream:orders - +          # 전체 조회
XRANGE stream:orders - + COUNT 10 # 처음 10개
XRANGE stream:orders 1699900000000-0 +  # 특정 ID 이후

# XREVRANGE: 역순 조회
XREVRANGE stream:orders + - COUNT 10   # 최신 10개

# 스트림 길이 확인
XLEN stream:orders

회사 비유로 — XRANGE게시판을 위에서 아래로 스크롤하면서 읽는 거고, XREVRANGE아래에서 위로 스크롤하는 거예요. XLEN 은 게시판에 쌓인 글 개수를 한 번에 보여 줍니다.

TypeScript에서 Streams 구현

메시지 발행

// 주문 이벤트 발행
async function publishOrderEvent(orderId: string, event: object): Promise<string> {
    const messageId = await client.xAdd('stream:orders', '*', {
        orderId,
        event: JSON.stringify(event),
        timestamp: Date.now().toString(),
    });
    
    console.log(`Published order event: ${messageId}`);
    return messageId;
}

// 스트림 크기 제한과 함께 발행
async function publishWithLimit(streamKey: string, data: Record<string, string>): Promise<string> {
    return client.xAdd(streamKey, '*', data, {
        TRIM: {
            strategy: 'MAXLEN',
            strategyModifier: '~',  // 근사값 허용 (효율적)
            threshold: 1000,
        }
    });
}

메시지 소비 (폴링 방식)

// 폴링 방식으로 새 메시지 처리
async function consumeStream(streamKey: string) {
    let lastId = '$';  // $ = 현재 시간 이후의 새 메시지만
    
    while (true) {
        // BLOCK: 새 메시지를 기다림 (블로킹 읽기)
        const results = await client.xRead(
            [{ key: streamKey, id: lastId }],
            { BLOCK: 0, COUNT: 10 }  // BLOCK 0: 무한 대기
        );
        
        if (results && results.length > 0) {
            for (const stream of results) {
                for (const message of stream.messages) {
                    await processMessage(message);
                    lastId = message.id;  // 마지막 처리 ID 갱신
                }
            }
        }
    }
}

async function processMessage(message: any) {
    const { id, message: data } = message;
    console.log(`Processing message ${id}:`, data);
    // 실제 처리 로직
}

재시작 후 미처리 메시지 처리

// 애플리케이션 재시작 후 처리되지 않은 메시지부터 처리
async function consumeFromCheckpoint(streamKey: string, lastProcessedId: string) {
    let cursor = lastProcessedId || '0';  // 0 = 처음부터
    
    while (true) {
        const results = await client.xRead(
            [{ key: streamKey, id: cursor }],
            { COUNT: 10 }
        );
        
        if (!results || results.length === 0 || results[0].messages.length === 0) {
            // 모든 과거 메시지 처리 완료, 새 메시지 대기
            const newResults = await client.xRead(
                [{ key: streamKey, id: cursor }],
                { BLOCK: 5000, COUNT: 10 }  // 5초 대기
            );
            if (newResults) {
                // 새 메시지 처리
            }
            continue;
        }
        
        for (const stream of results) {
            for (const message of stream.messages) {
                await processMessage(message);
                cursor = message.id;
                // checkpoint 저장 (DB 또는 Redis)
                await saveCheckpoint(streamKey, cursor);
            }
        }
    }
}

여기까지가 단일 소비자 패턴이에요. 하지만 실무에서는 여러 서버가 같은 스트림을 분담해서 처리해야 하는 경우가 훨씬 많습니다. 그때 등장하는 게 Consumer Group이에요.

Consumer Group — 게시판 글을 팀이 분담

Consumer Group의 필요성

단순한 XREAD로 여러 서버가 같은 스트림을 읽으면 모든 서버가 같은 메시지를 중복 처리해요. 그건 부담이 늘기만 하지 분산이 아니에요.

Consumer Group을 쓰면 각 메시지는 그룹 내 하나의 소비자에게만 전달됩니다(로드 밸런싱). 메시지 처리 확인(ACK), 미처리 메시지 추적, 실패 메시지 재처리가 모두 가능해요.

회사 비유로 — 게시판에 올라온 글들을 "주문 처리팀" 이라는 팀 하나에 묶어 두면, 팀원 3명이 글을 분담해서 가져가요. 같은 글을 두 명이 동시에 가져가는 일은 없습니다.

스트림: stream:orders
           ↓
소비자 그룹: "order-processors"
  소비자1: 메시지 A, B, C 처리
  소비자2: 메시지 D, E, F 처리
  소비자3: 메시지 G, H, I 처리

Consumer Group 명령어

# 소비자 그룹 생성
XGROUP CREATE stream:orders order-processors $ MKSTREAM
# $ : 현재 시간 이후의 새 메시지부터 처리
# 0 : 스트림의 처음부터 처리 (미처리 메시지 포함)
# MKSTREAM: 스트림이 없으면 생성

# 소비자 그룹 읽기 (특정 소비자로)
XREADGROUP GROUP order-processors consumer1 COUNT 10 STREAMS stream:orders >
# >: 아직 다른 소비자에게 전달되지 않은 새 메시지

# 메시지 처리 완료 확인 (ACK)
XACK stream:orders order-processors "1699900000000-0"

# 미처리(Pending) 메시지 조회
XPENDING stream:orders order-processors - + 10
# 처리 중이지만 ACK하지 않은 메시지 목록

# 미처리 메시지 재할당 (다른 소비자에게)
XCLAIM stream:orders order-processors consumer2 60000 "1699900000000-0"
# 60000ms(1분) 이상 처리 중인 메시지를 consumer2에게 재할당

# 소비자 그룹 정보 조회
XINFO GROUPS stream:orders
XINFO CONSUMERS stream:orders order-processors

여기서 정말 중요한 시험 함정 — >는 "아직 누구에게도 전달되지 않은 새 메시지" 라는 뜻입니다. XREADGROUP에서만 쓰는 특수 기호예요. XREAD에서는 $를 썼는데 — Consumer Group을 쓰면 >로 갈아탑니다. 이 기호 차이를 외워 두는 게 시험에서 자주 나와요.

TypeScript Consumer Group 구현

// 소비자 그룹 설정
async function setupConsumerGroup(streamKey: string, groupName: string) {
    try {
        await client.xGroupCreate(streamKey, groupName, '$', {
            MKSTREAM: true
        });
        console.log(`Consumer group '${groupName}' created`);
    } catch (err: any) {
        if (err.message.includes('BUSYGROUP')) {
            console.log(`Consumer group '${groupName}' already exists`);
        } else {
            throw err;
        }
    }
}

// 소비자로 메시지 읽기
async function consumeAsGroupMember(
    streamKey: string,
    groupName: string,
    consumerName: string
) {
    while (true) {
        // > : 새로운 미전달 메시지 읽기
        const results = await client.xReadGroup(
            groupName,
            consumerName,
            [{ key: streamKey, id: '>' }],
            { BLOCK: 5000, COUNT: 10 }
        );
        
        if (results && results.length > 0) {
            for (const stream of results) {
                for (const message of stream.messages) {
                    try {
                        await processMessage(message);
                        // 처리 완료 ACK
                        await client.xAck(streamKey, groupName, message.id);
                    } catch (error) {
                        console.error(`Failed to process message ${message.id}:`, error);
                        // ACK 하지 않으면 pending 목록에 남아 재처리 가능
                    }
                }
            }
        }
    }
}

// 오래된 미처리 메시지 복구
async function recoverStalePendingMessages(
    streamKey: string,
    groupName: string,
    consumerName: string,
    maxIdleMs: number = 60000  // 1분
) {
    // XAUTOCLAIM: 일정 시간 처리되지 않은 메시지 자동 재할당
    const result = await client.xAutoClaim(
        streamKey,
        groupName,
        consumerName,
        maxIdleMs,
        '0'  // 처음부터
    );
    
    for (const message of result.messages) {
        try {
            await processMessage(message);
            await client.xAck(streamKey, groupName, message.id);
        } catch (error) {
            console.error(`Failed to reprocess message ${message.id}:`, error);
        }
    }
}

회사 비유로 — XAUTOCLAIM팀원이 글을 가져갔는데 처리하다가 자리를 비운 경우, 1분이 지나면 다른 팀원이 그 글을 자동으로 가져가는 동작이에요. 작업자가 갑자기 죽어도 메시지가 영원히 묶이지 않습니다.

실용 예시 — 주문 처리 시스템

전체 그림을 한 번에 보면 다음과 같아요. 이커머스 마켓플레이스 예시에서 주문이 들어오면 — 재고·결제·이메일 서비스가 같은 스트림을 그룹으로 분담해 처리합니다.

주문 API → XADD stream:orders → 소비자 그룹 "order-processors"
                                   ├── 재고 확인 서비스 (consumer1)
                                   ├── 결제 처리 서비스 (consumer2)
                                   └── 이메일 서비스 (consumer3)
// 주문 이벤트 발행
async function placeOrder(order: Order): Promise<string> {
    const orderId = generateId();
    
    // 주문 데이터 저장
    await client.hSet(`orders#${orderId}`, {
        id: orderId,
        userId: order.userId,
        items: JSON.stringify(order.items),
        total: order.total.toString(),
        status: 'pending',
    });
    
    // 스트림에 이벤트 발행
    const eventId = await client.xAdd('stream:orders', '*', {
        orderId,
        userId: order.userId,
        total: order.total.toString(),
        event: 'ORDER_PLACED',
    });
    
    return orderId;
}

// 재고 확인 서비스
async function startInventoryService() {
    await setupConsumerGroup('stream:orders', 'inventory-service');
    
    while (true) {
        const results = await client.xReadGroup(
            'inventory-service',
            'inventory-worker-1',
            [{ key: 'stream:orders', id: '>' }],
            { BLOCK: 5000, COUNT: 1 }
        );
        
        if (results) {
            for (const { messages } of results) {
                for (const message of messages) {
                    const { orderId } = message.message;
                    
                    // 재고 확인 처리
                    const inStock = await checkInventory(orderId);
                    
                    // 결과를 다음 스트림으로 전달
                    await client.xAdd('stream:inventory-results', '*', {
                        orderId,
                        status: inStock ? 'available' : 'out-of-stock',
                    });
                    
                    // ACK
                    await client.xAck('stream:orders', 'inventory-service', message.id);
                }
            }
        }
    }
}

Pub/Sub vs Streams 선택 가이드

비교표

특성Pub/SubStreams
메시지 저장X (사라짐)O (영구 저장)
재처리 가능XO
소비자 그룹X (모두 수신)O (하나만 수신)
ACK 메커니즘XO
메시지 순서채널별 순서 보장O (스트림 내 순서)
메모리 사용적음더 많음
복잡도낮음높음
지연 시간매우 낮음낮음

Pub/Sub가 적합한 경우

  • 실시간 알림 (새 메시지, 좋아요 알림)
  • 채팅 메시지 브로드캐스트
  • 캐시 무효화 신호 전파
  • 실시간 대시보드 업데이트
  • 메시지 손실이 허용되는 경우

Streams가 적합한 경우

  • 주문 처리, 결제 처리 등 중요 이벤트
  • 이벤트 소싱 (모든 상태 변경 기록)
  • 여러 서버 인스턴스 간 작업 분산
  • 메시지 재처리가 필요한 경우
  • Kafka를 사용하기엔 과한 소규모 시스템

회사 비유로 한 번 더 — 라디오로 충분하면 Pub/Sub, 게시판이 필요하면 Streams로 가세요. 작은 시스템에서 영속이 필요하면 Streams가 Kafka를 띄우는 부담을 덜어 줍니다.

메시지 직렬화 — 문자열만 저장

Streams는 문자열만 저장

// 잘못된 방법: 객체를 직접 저장할 수 없음
await client.xAdd('stream:events', '*', {
    data: { userId: 1, action: 'login' }  // 에러!
});

// 올바른 방법: 모든 필드를 문자열로
await client.xAdd('stream:events', '*', {
    userId: '1',
    action: 'login',
    data: JSON.stringify({ ip: '192.168.1.1', userAgent: 'Chrome' }),
    timestamp: Date.now().toString(),
});

복잡한 데이터 직렬화

// 헬퍼 함수로 직렬화/역직렬화 추상화
function serializeForStream(data: object): Record<string, string> {
    const result: Record<string, string> = {};
    for (const [key, value] of Object.entries(data)) {
        result[key] = typeof value === 'string' ? value : JSON.stringify(value);
    }
    return result;
}

function deserializeFromStream(message: Record<string, string>): object {
    const result: Record<string, any> = {};
    for (const [key, value] of Object.entries(message)) {
        try {
            result[key] = JSON.parse(value);
        } catch {
            result[key] = value;  // 파싱 실패 시 원본 문자열 반환
        }
    }
    return result;
}

스트림 유지 관리

스트림 크기 관리

# 스트림 길이 확인
XLEN stream:orders

# 오래된 메시지 수동 삭제
XTRIM stream:orders MAXLEN 1000    # 1000개 유지
XTRIM stream:orders MAXLEN ~ 1000  # 근사값으로 트림 (효율적)

# 특정 ID 범위 삭제
XDEL stream:orders "1699900000000-0" "1699900000001-0"

여기서 시험 함정이 하나 있어요. 스트림은 영속 자료구조라 자동으로 줄지 않습니다. 트래픽이 많으면 메모리가 무한정 늘어요. 그래서 MAXLEN ~ N 옵션을 XADD 시점에 박아 두는 게 표준 패턴입니다(~는 근사값 허용으로 더 효율적).

스트림 상태 모니터링

# 스트림 전체 정보
XINFO STREAM stream:orders

# 소비자 그룹 정보
XINFO GROUPS stream:orders

# 소비자 목록
XINFO CONSUMERS stream:orders order-processors

# 미처리 메시지 상세 조회
XPENDING stream:orders order-processors - + 10

XPENDING이 자주 쓰여요. 펜딩 큐가 계속 쌓이고 있다면 어떤 소비자가 ACK를 누락하고 있는지 알 수 있는 첫 단서입니다.

운영에서 자주 보는 함정 5가지

1. Pub/Sub 연결 재사용 불가

// 잘못된 방법: 일반 클라이언트로 Pub/Sub 사용
const client = createClient({ ... });
await client.subscribe('channel1', handler);
// 이 후 client.get('key') 호출 불가! 에러 발생

// 올바른 방법: Pub/Sub 전용 클라이언트 분리
const subscriber = client.duplicate();
await subscriber.connect();
await subscriber.subscribe('channel1', handler);

// 일반 명령은 원래 client 사용
const value = await client.get('key');

2. XREAD vs XREADGROUP 혼용 금지

// 소비자 그룹 설정 후 XREAD 사용 시 메시지 중복 처리 가능
// 반드시 XREADGROUP만 사용

// 잘못된 방법
const results = await client.xRead([{ key: 'stream:orders', id: lastId }]);

// 올바른 방법 (소비자 그룹 사용 시)
const results = await client.xReadGroup('group1', 'consumer1', 
    [{ key: 'stream:orders', id: '>' }]);

3. ACK 누락

// 잘못된 방법: ACK 없이 메시지 처리
async function badConsumer() {
    const results = await client.xReadGroup('group1', 'consumer1',
        [{ key: 'stream', id: '>' }]);
    // 처리 후 xAck 누락 → 메시지가 pending에 계속 남음
    for (const { messages } of results) {
        for (const msg of messages) {
            await process(msg);
            // xAck 호출 없음!
        }
    }
}

// 올바른 방법: 처리 후 반드시 ACK
async function goodConsumer() {
    const results = await client.xReadGroup('group1', 'consumer1',
        [{ key: 'stream', id: '>' }]);
    for (const { messages } of results) {
        for (const msg of messages) {
            await process(msg);
            await client.xAck('stream', 'group1', msg.id);  // 반드시 ACK!
        }
    }
}

4. 스트림 무한 성장

// 잘못된 방법: 크기 제한 없이 계속 추가
await client.xAdd('stream:logs', '*', logData);

// 올바른 방법: MAXLEN으로 자동 트림
await client.xAdd('stream:logs', '*', logData, {
    TRIM: {
        strategy: 'MAXLEN',
        strategyModifier: '~',
        threshold: 10000,  // 약 10000개 유지
    }
});

5. $ 기호와 0의 혼동

# $ : 현재 시점 이후의 새 메시지만 읽겠다
# → 과거 메시지는 무시
XREAD BLOCK 0 STREAMS mystream $

# 0 : 스트림의 처음부터 모든 메시지 읽겠다
# → 과거 메시지 포함
XREAD COUNT 10 STREAMS mystream 0

회사 비유로 — $"내가 출근한 다음에 올라온 글만 보겠다", 0"게시판 첫 글부터 다 보겠다" 의 차이예요. Consumer Group을 만들 때도 같은 차이가 적용되니 외워 두면 좋습니다.

시험 직전 한 번 더 — 자주 헷갈리는 함정 모음

여기까지가 Redis 6편(Pub/Sub & Streams)의 핵심입니다. 시험 직전·실무 실수 방지를 위한 압축 노트로 마무리할게요.

  • Pub/Sub = 라디오 방송 — 지금 듣는 사람만 받음, 안 들으면 끝
  • Streams = 사내 게시판 — 영속 저장, 스크롤 가능, 재처리 가능
  • Pub/Sub는 fire and forget — 메시지 영속 저장 X, ACK X, 소비자 그룹 X
  • PUBLISH 반환값 = 수신한 구독자 수, 0이면 그 메시지는 사라진 것
  • Pub/Sub 클라이언트는 별도 분리 필수 — 같은 클라이언트에서 일반 명령 못 씀
  • 적합 자리 — Pub/Sub: 실시간 알림·채팅·캐시 무효화, Streams: 주문·결제·이벤트 소싱
  • Streams 메시지 ID = 타임스탬프-시퀀스 (예: 1699900000000-0)
  • XADD의 *** = ID 자동 생성, XREAD$** = 현재 이후 새 메시지
  • XREAD는 단일 소비자, XREADGROUP 은 Consumer Group 전용
  • Consumer Group의 > = 아직 누구에게도 전달되지 않은 새 메시지
  • XACK 누락 = pending에 영원히 남음 — 처리 후 반드시 ACK
  • XAUTOCLAIM — N초 이상 처리 안 된 메시지 자동 재할당 (자리 비운 작업자 복구)
  • XPENDING으로 펜딩 큐 모니터링 — ACK 누락 디버깅의 첫 단서
  • 스트림 영속이라 자동으로 줄지 않음MAXLEN ~ N을 XADD 시점에 박아두기
  • ~ 근사값 옵션 — 정확한 트림보다 더 효율적
  • Streams는 문자열만 저장 — 객체는 JSON.stringify 명시 처리
  • XGROUP CREATE$ = 새 메시지부터, 0 = 처음부터 (재처리)
  • Pub/Sub 한계 — 메시지 손실 가능, 소비자 그룹 없음, ACK 없음
  • Streams 한계 — Pub/Sub보다 메모리·복잡도 높음
  • 자주 헷갈리는 비교 — *** vs $ vs >** = ID 자동생성 / 현재 이후 / 미전달 새 메시지
  • 자주 헷갈리는 비교 — Pub/Sub vs Streams = 라디오 vs 게시판, 휘발 vs 영속

시리즈 다른 편

같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.

※ 이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.

답글 남기기

error: Content is protected !!