Redis 핵심 정리 시리즈 6편. Pub/Sub와 Streams를 라디오 방송·사내 게시판 비유로 풀어가며 — SUBSCRIBE·PUBLISH 기본기, 패턴 구독, Streams 자료구조와 XADD·XREAD·XRANGE, Consumer Group으로 작업 분담, XACK·XPENDING·XAUTOCLAIM, 메시지 직렬화, 스트림 트림, Pub/Sub vs Streams 선택 가이드까지 처음 보는 사람도 따라올 수 있게 친절하게 풀어쓴 6편.
이 글은 Redis 핵심 정리 시리즈의 여섯 번째 편입니다. 5편에서 캐싱 패턴을 풀었다면, 6편은 Redis의 메시지 전달 기능 두 가지 — Pub/Sub와 Streams입니다. 이름은 비슷한 듯한데 동작은 정반대 영역에 있어요. 한쪽은 "지금 듣고 있지 않으면 끝", 다른 쪽은 "스크롤해서 과거까지 다시 볼 수 있는" 자료구조입니다.
이 6편에서는 라디오 방송 비유(Pub/Sub) 와 사내 게시판 비유(Streams) 두 갈래로 풀어 가요. 둘의 차이를 머리에 박아 두면 "어느 자리에 어느 도구를 써야 할지" 결정이 빨라집니다. Consumer Group으로 작업을 분담하는 방법, ACK 누락 함정까지 한 번에 정리해 둡니다.
왜 메시지 전달 기능이 처음엔 헷갈릴까요
이유는 네 가지예요.
첫째, Pub/Sub와 Streams가 둘 다 "메시지를 보낸다"고만 들리면 헷갈립니다. Pub/Sub는 메시지가 흘러가 버리고, Streams는 자료구조처럼 쌓이는데 — 이 차이가 처음엔 직관에 안 옵니다.
둘째, Consumer Group이 등장하면 머리가 한 번 더 꼬여요. "여러 명이 같은 채널을 보면 모두 받는다 vs 그룹에선 한 명만 받는다" — 이 미묘한 동작 차이가 자리잡기까지 시간이 걸립니다.
셋째, $·>·0 같은 메시지 ID 기호가 한꺼번에 나옵니다. XREAD에서 $는 "지금부터 새 메시지", 0은 "처음부터"를 뜻하는데 — 두 가지가 같이 나오면 어느 쪽이 어느 쪽인지 헷갈려요.
넷째, ACK 개념이 처음 보면 어색해요. "메시지를 처리했다는 신호를 따로 보낸다"는 게 일반 함수 호출과 다른 흐름이라 자리잡는 데 시간이 걸립니다.
해결법은 한 가지예요. 두 가지 비유를 머리에 박는 것 — Pub/Sub은 라디오 방송(지금 켜둔 라디오만 들음, 안 듣고 지나가면 끝), Streams는 사내 게시판(글이 쌓이고 스크롤해 과거까지 다시 봄)으로 잡으면 갑자기 명확해집니다. 이 글은 그 비유를 따라 처음부터 풀어 갑니다.
Pub/Sub — 라디오 방송 비유
Redis Pub/Sub(Publish/Subscribe)는 발행자와 구독자를 채널로 연결하는 메시지 브로커 패턴이에요. 발행자(Publisher)는 채널에 메시지를 보내고, 구독자(Subscriber)는 그 채널을 구독해 메시지를 받습니다.
회사 비유로 — 사내 라디오 방송이에요. 방송국(Publisher)이 마이크에 대고 말하면, 그 채널을 켜 둔 직원들(Subscriber)에게만 전파됩니다. 라디오를 안 켜 두고 있던 직원이나, 잠깐 자리를 비운 직원에게는 전파되지 않아요. 한 번 흘러간 방송은 다시 들을 수 없습니다.
발행자(Publisher) → [채널: "news:sports"] → 구독자1
→ 구독자2
→ 구독자3
여기서 시험 함정이 하나 있어요. Pub/Sub는 "fire and forget" 방식이에요. 메시지가 영속 저장되지 않고, 구독자가 없거나 오프라인이면 그 메시지는 그대로 사라집니다. 그래서 놓쳐도 되는 메시지(실시간 알림, 캐시 무효화 신호)에는 적합하지만, 절대 놓치면 안 되는 메시지(주문, 결제)에는 부적합해요.
Pub/Sub 기본 명령어
구독(Subscribe)
# 특정 채널 구독
SUBSCRIBE channel1 channel2
# 패턴으로 채널 구독 (와일드카드 지원)
PSUBSCRIBE news:* # news:로 시작하는 모든 채널
PSUBSCRIBE user:*.update # user:XXX.update 패턴
# 구독 취소
UNSUBSCRIBE channel1
PUNSUBSCRIBE news:*
발행(Publish)
# 채널에 메시지 발행
PUBLISH channel1 "Hello, World!"
# 반환값: 메시지를 수신한 구독자 수 (정수)
PUBLISH news:sports "Team A wins!"
# (integer) 3 → 3명의 구독자가 수신
채널 정보 조회
# 활성 채널 목록 조회 (최소 1명 이상 구독)
PUBSUB CHANNELS
PUBSUB CHANNELS news:* # 패턴으로 필터링
# 채널의 구독자 수 조회
PUBSUB NUMSUB channel1 channel2
# 패턴 구독자 수 조회
PUBSUB NUMPAT
PUBLISH의 반환값(구독자 수)이 한 가지 힌트가 돼요. 0이 반환됐다면 그 메시지는 아무도 받지 못한 채 사라진 것이라는 뜻입니다. 자세한 명령어 사양은 Redis 공식 Pub/Sub 문서에서 확인할 수 있어요.
TypeScript에서 Pub/Sub 구현
회사 비유로 — Pub/Sub는 방송 송신용 마이크와 수신용 라디오를 분리해야 합니다. 같은 라디오로 방송도 하고 청취도 하면 작동이 꼬여요. Redis 클라이언트도 같은 이유로 발행용·구독용을 분리합니다.
import { createClient } from 'redis';
// Pub/Sub는 별도의 클라이언트 연결 필요
// 일반 명령어 실행과 Pub/Sub는 같은 연결에서 혼용 불가
const publisher = createClient({ ... });
const subscriber = createClient({ ... });
await publisher.connect();
await subscriber.connect();
구독자 구현
// 구독자 설정
async function setupSubscriber() {
// 채널 구독 및 메시지 핸들러 등록
await subscriber.subscribe('notifications', (message, channel) => {
console.log(`Channel: ${channel}, Message: ${message}`);
const data = JSON.parse(message);
handleNotification(data);
});
// 패턴 구독
await subscriber.pSubscribe('user:*:updates', (message, channel) => {
const userId = channel.split(':')[1];
console.log(`User ${userId} update: ${message}`);
});
}
// 구독 해제
async function cleanup() {
await subscriber.unsubscribe('notifications');
await subscriber.pUnsubscribe('user:*:updates');
await subscriber.disconnect();
}
발행자 구현
// 메시지 발행
async function publishNotification(userId: string, type: string, data: object) {
const message = JSON.stringify({
type,
userId,
data,
timestamp: Date.now(),
});
// 특정 사용자 채널에 발행
const subscriberCount = await publisher.publish(`user:${userId}:updates`, message);
console.log(`Message sent to ${subscriberCount} subscribers`);
}
// 브로드캐스트
async function broadcastAnnouncement(announcement: string) {
await publisher.publish('global:announcements', JSON.stringify({
text: announcement,
timestamp: Date.now(),
}));
}
Pub/Sub의 한계 — 두 가지 결정적 약점
메시지 저장 없음
Pub/Sub는 메시지를 영속 저장하지 않아요. 구독자가 오프라인이거나 처리 속도보다 빠르게 메시지가 들어오면 그 메시지는 사라집니다. 실시간 알림처럼 "놓쳐도 되는" 메시지에는 적합하지만, 중요한 이벤트 처리에는 부적합해요.
구독자 A: 온라인 → 메시지 수신 ✓
구독자 B: 오프라인 → 메시지 손실 ✗
구독자 C: 처리 지연 → 메시지 손실 ✗
소비자 그룹 없음
Pub/Sub는 소비자 그룹을 지원하지 않아요. 동일한 채널을 여러 서버가 구독하면 모든 서버가 동일한 메시지를 받습니다. 작업을 여러 서버에 분담(load balancing)하고 싶으면 Pub/Sub로는 불가능 — Streams로 가야 해요.
| 기능 | Pub/Sub | Streams |
|---|---|---|
| 메시지 저장 | X | O |
| 메시지 재전송 | X | O |
| 소비자 그룹 | X | O |
| 처리 확인(ACK) | X | O |
| 속도 | 매우 빠름 | 빠름 |
| 사용 사례 | 실시간 알림 | 이벤트 처리 |
여기까지 따라오셨다면 한 가지 의문이 들 거예요. "그러면 Pub/Sub로는 뭘 하라는 거지?" — 정답은 놓쳐도 되는, 짧은 수명의, 실시간 신호 영역입니다. 캐시 무효화 신호, 실시간 채팅 브로드캐스트, 라이브 대시보드 업데이트 — 이런 자리에 어울려요.
Redis Streams — 사내 게시판 비유
Redis Streams는 Redis 5.0에 추가된 메시지 스트림 자료구조예요. Kafka·RabbitMQ 같은 메시지 큐와 비슷한 기능을 제공하면서 Redis의 단순성을 유지합니다.
회사 비유로 — Streams는 사내 게시판이에요. 글(메시지)이 게시판에 영속적으로 쌓이고, 직원들이 스크롤해 과거 글까지 다시 읽을 수 있습니다. 게시판에 글을 올리는 사람, 게시판을 읽고 처리하는 사람이 따로 있고 — 처리 중인 글에는 "내가 처리 중"이라고 라벨을 붙여 둘 수 있어요.
각 메시지는 고유 ID(타임스탬프-시퀀스) 와 함께 영속 저장되며, 소비자 그룹(Consumer Group)을 통해 메시지를 분산 처리할 수 있습니다. 미처리 메시지를 추적하고 재처리할 수 있어 중요한 이벤트 처리에 적합해요.
자세한 자료구조 사양은 Redis 공식 Streams 문서에서 확인할 수 있어요.
Streams 기본 명령어
메시지 추가 (XADD)
# XADD: 스트림에 메시지 추가
# * 는 Redis가 자동으로 ID 생성 (타임스탬프-시퀀스 형식)
XADD stream:orders * item "laptop" price "1200" userId "123"
# "1699900000000-0" → 타임스탬프-시퀀스 형식의 ID
# 명시적 ID 지정 (직접 관리)
XADD stream:orders 1699900000000-1 item "phone" price "800"
# $ 기호: 현재 시간 이후의 메시지만 읽겠다는 의미
# (XREAD에서 사용, 과거 메시지 무시)
# 스트림 최대 크기 제한
XADD stream:orders MAXLEN 1000 * item "tablet" price "500"
# 메시지가 1000개를 초과하면 가장 오래된 것 삭제
# ~ 옵션: 대략적인 크기 제한 (더 효율적)
XADD stream:orders MAXLEN ~ 1000 * item "monitor" price "400"
여기서 시험 함정이 하나 있어요. ***은 ID 자동 생성, $는 "현재 이후"를 의미**합니다. 둘 다 별표·달러 기호라 헷갈리는데 — 사용 위치가 달라요. *은 XADD에서, $는 XREAD에서 씁니다.
메시지 읽기 (XREAD)
# XREAD: 스트림 메시지 읽기
XREAD COUNT 10 STREAMS stream:orders 0
# 0: 처음부터 읽기
# COUNT 10: 최대 10개
# $ 로 새 메시지만 읽기 (구독처럼 동작)
XREAD COUNT 10 STREAMS stream:orders $
# BLOCK: 새 메시지를 기다리는 블로킹 읽기 (밀리초)
XREAD BLOCK 0 STREAMS stream:orders $
# BLOCK 0: 메시지가 올 때까지 무한 대기
# BLOCK 5000: 5초 대기 후 타임아웃
# 여러 스트림 동시 읽기
XREAD COUNT 10 STREAMS stream:orders stream:payments 0 0
범위 조회 (XRANGE, XREVRANGE)
# XRANGE: 특정 범위의 메시지 조회
XRANGE stream:orders - + # 전체 조회
XRANGE stream:orders - + COUNT 10 # 처음 10개
XRANGE stream:orders 1699900000000-0 + # 특정 ID 이후
# XREVRANGE: 역순 조회
XREVRANGE stream:orders + - COUNT 10 # 최신 10개
# 스트림 길이 확인
XLEN stream:orders
회사 비유로 — XRANGE 는 게시판을 위에서 아래로 스크롤하면서 읽는 거고, XREVRANGE는 아래에서 위로 스크롤하는 거예요. XLEN 은 게시판에 쌓인 글 개수를 한 번에 보여 줍니다.
TypeScript에서 Streams 구현
메시지 발행
// 주문 이벤트 발행
async function publishOrderEvent(orderId: string, event: object): Promise<string> {
const messageId = await client.xAdd('stream:orders', '*', {
orderId,
event: JSON.stringify(event),
timestamp: Date.now().toString(),
});
console.log(`Published order event: ${messageId}`);
return messageId;
}
// 스트림 크기 제한과 함께 발행
async function publishWithLimit(streamKey: string, data: Record<string, string>): Promise<string> {
return client.xAdd(streamKey, '*', data, {
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~', // 근사값 허용 (효율적)
threshold: 1000,
}
});
}
메시지 소비 (폴링 방식)
// 폴링 방식으로 새 메시지 처리
async function consumeStream(streamKey: string) {
let lastId = '$'; // $ = 현재 시간 이후의 새 메시지만
while (true) {
// BLOCK: 새 메시지를 기다림 (블로킹 읽기)
const results = await client.xRead(
[{ key: streamKey, id: lastId }],
{ BLOCK: 0, COUNT: 10 } // BLOCK 0: 무한 대기
);
if (results && results.length > 0) {
for (const stream of results) {
for (const message of stream.messages) {
await processMessage(message);
lastId = message.id; // 마지막 처리 ID 갱신
}
}
}
}
}
async function processMessage(message: any) {
const { id, message: data } = message;
console.log(`Processing message ${id}:`, data);
// 실제 처리 로직
}
재시작 후 미처리 메시지 처리
// 애플리케이션 재시작 후 처리되지 않은 메시지부터 처리
async function consumeFromCheckpoint(streamKey: string, lastProcessedId: string) {
let cursor = lastProcessedId || '0'; // 0 = 처음부터
while (true) {
const results = await client.xRead(
[{ key: streamKey, id: cursor }],
{ COUNT: 10 }
);
if (!results || results.length === 0 || results[0].messages.length === 0) {
// 모든 과거 메시지 처리 완료, 새 메시지 대기
const newResults = await client.xRead(
[{ key: streamKey, id: cursor }],
{ BLOCK: 5000, COUNT: 10 } // 5초 대기
);
if (newResults) {
// 새 메시지 처리
}
continue;
}
for (const stream of results) {
for (const message of stream.messages) {
await processMessage(message);
cursor = message.id;
// checkpoint 저장 (DB 또는 Redis)
await saveCheckpoint(streamKey, cursor);
}
}
}
}
여기까지가 단일 소비자 패턴이에요. 하지만 실무에서는 여러 서버가 같은 스트림을 분담해서 처리해야 하는 경우가 훨씬 많습니다. 그때 등장하는 게 Consumer Group이에요.
Consumer Group — 게시판 글을 팀이 분담
Consumer Group의 필요성
단순한 XREAD로 여러 서버가 같은 스트림을 읽으면 모든 서버가 같은 메시지를 중복 처리해요. 그건 부담이 늘기만 하지 분산이 아니에요.
Consumer Group을 쓰면 각 메시지는 그룹 내 하나의 소비자에게만 전달됩니다(로드 밸런싱). 메시지 처리 확인(ACK), 미처리 메시지 추적, 실패 메시지 재처리가 모두 가능해요.
회사 비유로 — 게시판에 올라온 글들을 "주문 처리팀" 이라는 팀 하나에 묶어 두면, 팀원 3명이 글을 분담해서 가져가요. 같은 글을 두 명이 동시에 가져가는 일은 없습니다.
스트림: stream:orders
↓
소비자 그룹: "order-processors"
소비자1: 메시지 A, B, C 처리
소비자2: 메시지 D, E, F 처리
소비자3: 메시지 G, H, I 처리
Consumer Group 명령어
# 소비자 그룹 생성
XGROUP CREATE stream:orders order-processors $ MKSTREAM
# $ : 현재 시간 이후의 새 메시지부터 처리
# 0 : 스트림의 처음부터 처리 (미처리 메시지 포함)
# MKSTREAM: 스트림이 없으면 생성
# 소비자 그룹 읽기 (특정 소비자로)
XREADGROUP GROUP order-processors consumer1 COUNT 10 STREAMS stream:orders >
# >: 아직 다른 소비자에게 전달되지 않은 새 메시지
# 메시지 처리 완료 확인 (ACK)
XACK stream:orders order-processors "1699900000000-0"
# 미처리(Pending) 메시지 조회
XPENDING stream:orders order-processors - + 10
# 처리 중이지만 ACK하지 않은 메시지 목록
# 미처리 메시지 재할당 (다른 소비자에게)
XCLAIM stream:orders order-processors consumer2 60000 "1699900000000-0"
# 60000ms(1분) 이상 처리 중인 메시지를 consumer2에게 재할당
# 소비자 그룹 정보 조회
XINFO GROUPS stream:orders
XINFO CONSUMERS stream:orders order-processors
여기서 정말 중요한 시험 함정 — >는 "아직 누구에게도 전달되지 않은 새 메시지" 라는 뜻입니다. XREADGROUP에서만 쓰는 특수 기호예요. XREAD에서는 $를 썼는데 — Consumer Group을 쓰면 >로 갈아탑니다. 이 기호 차이를 외워 두는 게 시험에서 자주 나와요.
TypeScript Consumer Group 구현
// 소비자 그룹 설정
async function setupConsumerGroup(streamKey: string, groupName: string) {
try {
await client.xGroupCreate(streamKey, groupName, '$', {
MKSTREAM: true
});
console.log(`Consumer group '${groupName}' created`);
} catch (err: any) {
if (err.message.includes('BUSYGROUP')) {
console.log(`Consumer group '${groupName}' already exists`);
} else {
throw err;
}
}
}
// 소비자로 메시지 읽기
async function consumeAsGroupMember(
streamKey: string,
groupName: string,
consumerName: string
) {
while (true) {
// > : 새로운 미전달 메시지 읽기
const results = await client.xReadGroup(
groupName,
consumerName,
[{ key: streamKey, id: '>' }],
{ BLOCK: 5000, COUNT: 10 }
);
if (results && results.length > 0) {
for (const stream of results) {
for (const message of stream.messages) {
try {
await processMessage(message);
// 처리 완료 ACK
await client.xAck(streamKey, groupName, message.id);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// ACK 하지 않으면 pending 목록에 남아 재처리 가능
}
}
}
}
}
}
// 오래된 미처리 메시지 복구
async function recoverStalePendingMessages(
streamKey: string,
groupName: string,
consumerName: string,
maxIdleMs: number = 60000 // 1분
) {
// XAUTOCLAIM: 일정 시간 처리되지 않은 메시지 자동 재할당
const result = await client.xAutoClaim(
streamKey,
groupName,
consumerName,
maxIdleMs,
'0' // 처음부터
);
for (const message of result.messages) {
try {
await processMessage(message);
await client.xAck(streamKey, groupName, message.id);
} catch (error) {
console.error(`Failed to reprocess message ${message.id}:`, error);
}
}
}
회사 비유로 — XAUTOCLAIM은 팀원이 글을 가져갔는데 처리하다가 자리를 비운 경우, 1분이 지나면 다른 팀원이 그 글을 자동으로 가져가는 동작이에요. 작업자가 갑자기 죽어도 메시지가 영원히 묶이지 않습니다.
실용 예시 — 주문 처리 시스템
전체 그림을 한 번에 보면 다음과 같아요. 이커머스 마켓플레이스 예시에서 주문이 들어오면 — 재고·결제·이메일 서비스가 같은 스트림을 그룹으로 분담해 처리합니다.
주문 API → XADD stream:orders → 소비자 그룹 "order-processors"
├── 재고 확인 서비스 (consumer1)
├── 결제 처리 서비스 (consumer2)
└── 이메일 서비스 (consumer3)
// 주문 이벤트 발행
async function placeOrder(order: Order): Promise<string> {
const orderId = generateId();
// 주문 데이터 저장
await client.hSet(`orders#${orderId}`, {
id: orderId,
userId: order.userId,
items: JSON.stringify(order.items),
total: order.total.toString(),
status: 'pending',
});
// 스트림에 이벤트 발행
const eventId = await client.xAdd('stream:orders', '*', {
orderId,
userId: order.userId,
total: order.total.toString(),
event: 'ORDER_PLACED',
});
return orderId;
}
// 재고 확인 서비스
async function startInventoryService() {
await setupConsumerGroup('stream:orders', 'inventory-service');
while (true) {
const results = await client.xReadGroup(
'inventory-service',
'inventory-worker-1',
[{ key: 'stream:orders', id: '>' }],
{ BLOCK: 5000, COUNT: 1 }
);
if (results) {
for (const { messages } of results) {
for (const message of messages) {
const { orderId } = message.message;
// 재고 확인 처리
const inStock = await checkInventory(orderId);
// 결과를 다음 스트림으로 전달
await client.xAdd('stream:inventory-results', '*', {
orderId,
status: inStock ? 'available' : 'out-of-stock',
});
// ACK
await client.xAck('stream:orders', 'inventory-service', message.id);
}
}
}
}
}
Pub/Sub vs Streams 선택 가이드
비교표
| 특성 | Pub/Sub | Streams |
|---|---|---|
| 메시지 저장 | X (사라짐) | O (영구 저장) |
| 재처리 가능 | X | O |
| 소비자 그룹 | X (모두 수신) | O (하나만 수신) |
| ACK 메커니즘 | X | O |
| 메시지 순서 | 채널별 순서 보장 | O (스트림 내 순서) |
| 메모리 사용 | 적음 | 더 많음 |
| 복잡도 | 낮음 | 높음 |
| 지연 시간 | 매우 낮음 | 낮음 |
Pub/Sub가 적합한 경우
- 실시간 알림 (새 메시지, 좋아요 알림)
- 채팅 메시지 브로드캐스트
- 캐시 무효화 신호 전파
- 실시간 대시보드 업데이트
- 메시지 손실이 허용되는 경우
Streams가 적합한 경우
- 주문 처리, 결제 처리 등 중요 이벤트
- 이벤트 소싱 (모든 상태 변경 기록)
- 여러 서버 인스턴스 간 작업 분산
- 메시지 재처리가 필요한 경우
- Kafka를 사용하기엔 과한 소규모 시스템
회사 비유로 한 번 더 — 라디오로 충분하면 Pub/Sub, 게시판이 필요하면 Streams로 가세요. 작은 시스템에서 영속이 필요하면 Streams가 Kafka를 띄우는 부담을 덜어 줍니다.
메시지 직렬화 — 문자열만 저장
Streams는 문자열만 저장
// 잘못된 방법: 객체를 직접 저장할 수 없음
await client.xAdd('stream:events', '*', {
data: { userId: 1, action: 'login' } // 에러!
});
// 올바른 방법: 모든 필드를 문자열로
await client.xAdd('stream:events', '*', {
userId: '1',
action: 'login',
data: JSON.stringify({ ip: '192.168.1.1', userAgent: 'Chrome' }),
timestamp: Date.now().toString(),
});
복잡한 데이터 직렬화
// 헬퍼 함수로 직렬화/역직렬화 추상화
function serializeForStream(data: object): Record<string, string> {
const result: Record<string, string> = {};
for (const [key, value] of Object.entries(data)) {
result[key] = typeof value === 'string' ? value : JSON.stringify(value);
}
return result;
}
function deserializeFromStream(message: Record<string, string>): object {
const result: Record<string, any> = {};
for (const [key, value] of Object.entries(message)) {
try {
result[key] = JSON.parse(value);
} catch {
result[key] = value; // 파싱 실패 시 원본 문자열 반환
}
}
return result;
}
스트림 유지 관리
스트림 크기 관리
# 스트림 길이 확인
XLEN stream:orders
# 오래된 메시지 수동 삭제
XTRIM stream:orders MAXLEN 1000 # 1000개 유지
XTRIM stream:orders MAXLEN ~ 1000 # 근사값으로 트림 (효율적)
# 특정 ID 범위 삭제
XDEL stream:orders "1699900000000-0" "1699900000001-0"
여기서 시험 함정이 하나 있어요. 스트림은 영속 자료구조라 자동으로 줄지 않습니다. 트래픽이 많으면 메모리가 무한정 늘어요. 그래서 MAXLEN ~ N 옵션을 XADD 시점에 박아 두는 게 표준 패턴입니다(~는 근사값 허용으로 더 효율적).
스트림 상태 모니터링
# 스트림 전체 정보
XINFO STREAM stream:orders
# 소비자 그룹 정보
XINFO GROUPS stream:orders
# 소비자 목록
XINFO CONSUMERS stream:orders order-processors
# 미처리 메시지 상세 조회
XPENDING stream:orders order-processors - + 10
XPENDING이 자주 쓰여요. 펜딩 큐가 계속 쌓이고 있다면 어떤 소비자가 ACK를 누락하고 있는지 알 수 있는 첫 단서입니다.
운영에서 자주 보는 함정 5가지
1. Pub/Sub 연결 재사용 불가
// 잘못된 방법: 일반 클라이언트로 Pub/Sub 사용
const client = createClient({ ... });
await client.subscribe('channel1', handler);
// 이 후 client.get('key') 호출 불가! 에러 발생
// 올바른 방법: Pub/Sub 전용 클라이언트 분리
const subscriber = client.duplicate();
await subscriber.connect();
await subscriber.subscribe('channel1', handler);
// 일반 명령은 원래 client 사용
const value = await client.get('key');
2. XREAD vs XREADGROUP 혼용 금지
// 소비자 그룹 설정 후 XREAD 사용 시 메시지 중복 처리 가능
// 반드시 XREADGROUP만 사용
// 잘못된 방법
const results = await client.xRead([{ key: 'stream:orders', id: lastId }]);
// 올바른 방법 (소비자 그룹 사용 시)
const results = await client.xReadGroup('group1', 'consumer1',
[{ key: 'stream:orders', id: '>' }]);
3. ACK 누락
// 잘못된 방법: ACK 없이 메시지 처리
async function badConsumer() {
const results = await client.xReadGroup('group1', 'consumer1',
[{ key: 'stream', id: '>' }]);
// 처리 후 xAck 누락 → 메시지가 pending에 계속 남음
for (const { messages } of results) {
for (const msg of messages) {
await process(msg);
// xAck 호출 없음!
}
}
}
// 올바른 방법: 처리 후 반드시 ACK
async function goodConsumer() {
const results = await client.xReadGroup('group1', 'consumer1',
[{ key: 'stream', id: '>' }]);
for (const { messages } of results) {
for (const msg of messages) {
await process(msg);
await client.xAck('stream', 'group1', msg.id); // 반드시 ACK!
}
}
}
4. 스트림 무한 성장
// 잘못된 방법: 크기 제한 없이 계속 추가
await client.xAdd('stream:logs', '*', logData);
// 올바른 방법: MAXLEN으로 자동 트림
await client.xAdd('stream:logs', '*', logData, {
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~',
threshold: 10000, // 약 10000개 유지
}
});
5. $ 기호와 0의 혼동
# $ : 현재 시점 이후의 새 메시지만 읽겠다
# → 과거 메시지는 무시
XREAD BLOCK 0 STREAMS mystream $
# 0 : 스트림의 처음부터 모든 메시지 읽겠다
# → 과거 메시지 포함
XREAD COUNT 10 STREAMS mystream 0
회사 비유로 — $는 "내가 출근한 다음에 올라온 글만 보겠다", 0은 "게시판 첫 글부터 다 보겠다" 의 차이예요. Consumer Group을 만들 때도 같은 차이가 적용되니 외워 두면 좋습니다.
시험 직전 한 번 더 — 자주 헷갈리는 함정 모음
여기까지가 Redis 6편(Pub/Sub & Streams)의 핵심입니다. 시험 직전·실무 실수 방지를 위한 압축 노트로 마무리할게요.
- Pub/Sub = 라디오 방송 — 지금 듣는 사람만 받음, 안 들으면 끝
- Streams = 사내 게시판 — 영속 저장, 스크롤 가능, 재처리 가능
- Pub/Sub는 fire and forget — 메시지 영속 저장 X, ACK X, 소비자 그룹 X
PUBLISH반환값 = 수신한 구독자 수, 0이면 그 메시지는 사라진 것- Pub/Sub 클라이언트는 별도 분리 필수 — 같은 클라이언트에서 일반 명령 못 씀
- 적합 자리 — Pub/Sub: 실시간 알림·채팅·캐시 무효화, Streams: 주문·결제·이벤트 소싱
- Streams 메시지 ID = 타임스탬프-시퀀스 (예:
1699900000000-0) XADD의 ***= ID 자동 생성,XREAD의$** = 현재 이후 새 메시지XREAD는 단일 소비자,XREADGROUP은 Consumer Group 전용- Consumer Group의
>= 아직 누구에게도 전달되지 않은 새 메시지 XACK누락 = pending에 영원히 남음 — 처리 후 반드시 ACKXAUTOCLAIM— N초 이상 처리 안 된 메시지 자동 재할당 (자리 비운 작업자 복구)XPENDING으로 펜딩 큐 모니터링 — ACK 누락 디버깅의 첫 단서- 스트림 영속이라 자동으로 줄지 않음 —
MAXLEN ~ N을 XADD 시점에 박아두기 ~근사값 옵션 — 정확한 트림보다 더 효율적- Streams는 문자열만 저장 — 객체는
JSON.stringify명시 처리 XGROUP CREATE시$= 새 메시지부터,0= 처음부터 (재처리)- Pub/Sub 한계 — 메시지 손실 가능, 소비자 그룹 없음, ACK 없음
- Streams 한계 — Pub/Sub보다 메모리·복잡도 높음
- 자주 헷갈리는 비교 — **
*vs$vs>** = ID 자동생성 / 현재 이후 / 미전달 새 메시지 - 자주 헷갈리는 비교 — Pub/Sub vs Streams = 라디오 vs 게시판, 휘발 vs 영속
시리즈 다른 편
같은 시리즈의 다른 글들도 같은 친절 톤으로 묶어 정리되어 있어요.