Redis 공식문서 번역
Redis Streams 소개
Redis 스트림은 추가 전용 로그처럼 작동하는 데이터 구조이지만, 일반적인 추가 전용 로그의 한계를 극복하기 위한 여러 작업을 구현합니다. 여기에는 O(1) 시간의 랜덤 액세스와 컨슈머 그룹과 같은 복잡한 소비 전략이 포함됩니다.
스트림을 사용하여 실시간으로 이벤트를 기록하고 동시에 배포할 수 있습니다.
Redis 스트림 사용 사례
Redis 스트림의 사용 사례에는 다음이 포함됩니다:
- 이벤트 소싱 (예: 사용자 행동, 클릭 등 추적)
- 센서 모니터링 (예: 현장 장치의 판독값)
- 알림 (예: 각 사용자의 알림 기록을 별도의 스트림에 저장)
Redis는 각 스트림 항목에 대해 고유한 ID를 생성합니다.
이러한 ID를 사용하여 나중에 관련 항목을 검색하거나 스트림의 모든 후속 항목을 읽고 처리할 수 있습니다. 이러한 ID는 시간과 관련이 있으므로, 여기에 표시된 ID는 다를 수 있으며 자신의 Redis 인스턴스에서 보는 ID와 다를 것입니다.
Redis 스트림은 여러 트리밍 전략(스트림이 무한정 증가하는 것을 방지하기 위해)과 여러 소비 전략을 지원합니다 (XREAD
, XREADGROUP
, XRANGE
참조).
기본 명령어
XADD
- 스트림에 새 항목을 추가합니다.XREAD
- 지정된 위치에서 시작하여 시간순으로 하나 이상의 항목을 읽습니다.XRANGE
- 제공된 두 항목 ID 사이의 범위의 항목을 반환합니다.XLEN
- 스트림의 길이를 반환합니다.
전체 스트림 명령어 목록 (opens in a new tab)을 참조하세요.
예제
체크포인트를 통과하는 레이서들의 정보를 스트림에 추가하는 예제입니다. 각 레이서의 이름, 속도, 위치, 위치 ID를 포함합니다:
Redis CLI
> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1
"1692632086370-0"
> XADD race:france * rider Norem speed 28.8 position 3 location_id 1
"1692632094485-0"
> XADD race:france * rider Prickett speed 29.7 position 2 location_id 1
"1692632102976-0"
Python
import redis
r = redis.Redis(decode_responses=True)
# 스트림에 항목 추가
res1 = r.xadd(
"race:france",
{"rider": "Castilla", "speed": 30.2, "position": 1, "location_id": 1},
)
print(res1) # >>> 1692629576966-0
res2 = r.xadd(
"race:france",
{"rider": "Norem", "speed": 28.8, "position": 3, "location_id": 1},
)
print(res2) # >>> 1692629594113-0
res3 = r.xadd(
"race:france",
{"rider": "Prickett", "speed": 29.7, "position": 2, "location_id": 1},
)
print(res3) # >>> 1692629613374-0
# 범위로 항목 읽기
res4 = r.xrange("race:france", "1691765278160-0", "+", 2)
print(res4)
# >>> [
# ('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# )
# ]
# 스트림 읽기
res5 = r.xread(streams={"race:france": 0}, count=100, block=300)
print(res5)
# >>> [
# ['race:france',
# [('1692629576966-0',
# {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}
# ),
# ('1692629594113-0',
# {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}
# ),
# ('1692629613374-0',
# {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}
# )]
# ]
# ]
# 스트림 길이 확인
res7 = r.xlen("race:france")
print(res7) # >>> 4
Node.js
import { createClient } from 'redis';
const client = await createClient();
await client.connect();
// 스트림에 항목 추가
const res1 = await client.xAdd('race:france', '*', {
'rider': 'Castilla',
'speed': '30.2',
'position': '1',
'location_id': '1'
});
console.log(res1); // >>> 1700073067968-0
const res2 = await client.xAdd('race:france', '*', {
'rider': 'Norem',
'speed': '28.8',
'position': '3',
'location_id': '1'
});
console.log(res2); // >>> 1692629594113-0
// 범위로 항목 읽기
const res4 = await client.xRange('race:france', '1691765278160-0', '+', {
COUNT: 2
});
console.log(res4);
// >>> [
// { id: '1692629576966-0', message: { rider: 'Castilla', speed: '30.2', position: '1', location_id: '1' } },
// { id: '1692629594113-0', message: { rider: 'Norem', speed: '28.8', position: '3', location_id: '1' } }
// ]
// 스트림 읽기
const res5 = await client.xRead(
{ key: 'race:france', id: '0-0' },
{ COUNT: 100, BLOCK: 300 }
);
console.log(res5);
// 스트림 길이 확인
const res7 = await client.xLen('race:france');
console.log(res7); // >>> 4
엔트리 ID
스트림 엔트리 ID는 두 부분으로 구성됩니다:
- Unix 타임스탬프 (밀리초)
- 시퀀스 번호
예: 1692629576966-0
ID는 자동으로 생성되거나 명시적으로 지정할 수 있습니다:
# 자동 ID 생성 (*)
r.xadd("mystream", {"field": "value"})
# 명시적 ID 지정
r.xadd("mystream", {"field": "value"}, id="1000-0")
스트림에서 데이터 가져오기
XRANGE와 XREVRANGE로 범위 쿼리
# 전체 스트림 읽기
messages = r.xrange("mystream", "-", "+")
# 특정 범위 읽기
messages = r.xrange("mystream", "1692629576966", "1692629576967")
# 역순으로 읽기
messages = r.xrevrange("mystream", "+", "-", count=10)
XREAD로 새 항목 수신
# 블로킹 모드로 새 메시지 대기
messages = r.xread({"mystream": "$"}, block=0)
# 특정 ID 이후의 메시지 읽기
messages = r.xread({"mystream": "1692629576966-0"}, count=10)
컨슈머 그룹
Redis 스트림은 컨슈머 그룹을 지원하여 여러 컨슈머가 스트림을 효율적으로 소비할 수 있도록 합니다. 컨슈머 그룹은 다음과 같은 특징을 가집니다:
- 각 메시지는 그룹 내 하나의 컨슈머에게만 전달됨
- 메시지 확인(ACK) 메커니즘
- 보류 중인 메시지 추적
- 메시지 재할당 가능
컨슈머 그룹 생성
# 컨슈머 그룹 생성
res18 = r.xgroup_create("race:france", "france_riders", "$")
print(res18) # >>> True
# 스트림이 없는 경우 스트림도 함께 생성
res19 = r.xgroup_create("race:italy", "italy_riders", "$", mkstream=True)
print(res19) # >>> True
컨슈머 그룹으로 읽기
# 새로운 메시지만 읽기 (>)
res20 = r.xreadgroup(
streams={"race:italy": ">"},
consumername="Alice",
groupname="italy_riders",
count=1,
)
print(res20)
# >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]]
# 보류 중인 메시지 확인
res25 = r.xpending("race:italy", "italy_riders")
print(res25)
# >>> {
# 'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0',
# 'consumers': [{'name': 'Bob', 'pending': 2}]
# }
# 메시지 확인 (ACK)
res22 = r.xack("race:italy", "italy_riders", "1692629925771-0")
print(res22) # >>> 1
스트림 트리밍
스트림이 무한정 증가하는 것을 방지하기 위해 트리밍 옵션을 사용할 수 있습니다:
# maxlen 옵션으로 항목 추가
r.xadd("race:italy", {"rider": "Jones"}, maxlen=2)
r.xadd("race:italy", {"rider": "Wood"}, maxlen=2)
r.xadd("race:italy", {"rider": "Henshaw"}, maxlen=2)
# 수동으로 트리밍
res37 = r.xtrim("race:italy", maxlen=10, approximate=False)
print(res37) # >>> 0
스트림에서 단일 항목 제거
스트림은 ID만으로 스트림 중간에서 항목을 제거하는 특별한 명령도 가지고 있습니다. 일반적으로 추가 전용 데이터 구조에서는 이상한 기능처럼 보일 수 있지만, 예를 들어 개인 정보 보호 규정과 관련된 애플리케이션에는 실제로 유용합니다. 이 명령은 XDEL
이라고 하며 스트림 이름과 삭제할 ID를 받습니다:
# 스트림에서 특정 항목 삭제
res40 = r.xdel("race:italy", "1692631018238-0")
print(res40) # >>> 1
그러나 현재 구현에서는 매크로 노드가 완전히 비어 있을 때까지 메모리가 실제로 회수되지 않으므로 이 기능을 남용해서는 안 됩니다.
영구 장애로부터 복구
컨슈머가 장애를 일으켜 메시지를 처리할 수 없게 되면, 다른 컨슈머가 해당 메시지를 인계받을 수 있습니다:
# XCLAIM을 사용하여 메시지 소유권 변경
# min-idle-time(밀리초) 이상 유휴 상태인 메시지만 클레임
messages = r.xclaim("mystream", "mygroup", "consumer2",
min_idle_time=60000,
message_ids=["1692629925789-0"])
자동 클레임
Redis 6.2부터는 XAUTOCLAIM
명령을 사용하여 보류 중인 메시지를 자동으로 클레임할 수 있습니다:
# 1초 이상 유휴 상태인 메시지를 자동으로 클레임
result = r.xautoclaim("mystream", "mygroup", "consumer2",
min_idle_time=1000,
start_id="0-0",
count=10)
스트림 관찰성
스트림의 상태를 모니터링하기 위한 다양한 정보 명령들:
# 스트림 정보
res31 = r.xinfo_stream("race:italy")
print(res31)
# >>> {
# 'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2,
# 'last-generated-id': '1692629926436-0', 'groups': 1,
# 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}),
# 'last-entry': ('1692629926436-0', {'rider': 'Norem'})
# }
# 그룹 정보
res32 = r.xinfo_groups("race:italy")
print(res32)
# >>> [
# {
# 'name': 'italy_riders', 'consumers': 2, 'pending': 2,
# 'last-delivered-id': '1692629925790-0'
# }
# ]
# 컨슈머 정보
res33 = r.xinfo_consumers("race:italy", "italy_riders")
print(res33)
# >>> [
# {'name': 'Alice', 'pending': 2, 'idle': 199332},
# {'name': 'Bob', 'pending': 0, 'idle': 489170}
# ]
캡핑된 스트림
스트림의 크기를 제한하는 두 가지 방법이 있습니다:
MAXLEN 옵션
정확한 최대 길이를 지정합니다:
# 정확히 1000개의 항목만 유지
r.xadd("mystream", {"field": "value"}, maxlen=1000)
# 대략적인 트리밍 (성능 최적화)
r.xadd("mystream", {"field": "value"}, maxlen=1000, approximate=True)
MINID 옵션
특정 ID보다 오래된 항목을 제거합니다:
# 특정 ID보다 오래된 항목 제거
r.xtrim("mystream", minid="1692629576966-0")
스트림 API의 특수 ID
Redis 스트림은 몇 가지 특수 ID를 지원합니다:
-
: 가장 작은 ID (스트림의 첫 번째 항목)+
: 가장 큰 ID (스트림의 마지막 항목)$
: 스트림의 마지막 ID (XREAD에서만 사용)>
: 아직 다른 컨슈머에게 전달되지 않은 메시지 (XREADGROUP에서만 사용)*
: 자동 ID 생성 (XADD에서만 사용)
지속성, 복제 및 메시지 안전성
Redis 스트림은 Redis의 일반적인 지속성 및 복제 메커니즘을 따릅니다:
- AOF (Append Only File): 모든 쓰기 작업을 로그에 기록
- RDB (Redis Database): 주기적인 스냅샷
- 복제: 마스터-슬레이브 복제를 통한 고가용성
메시지 안전성을 위한 권장사항:
- AOF를
everysec
또는always
모드로 설정 - 복제를 사용하여 데이터 손실 위험 최소화
- 중요한 메시지의 경우
WAIT
명령을 사용하여 복제 확인
길이가 0인 스트림
스트림은 모든 항목이 삭제되어도 메타데이터(컨슈머 그룹 정보 등)를 유지합니다:
# 빈 스트림도 존재할 수 있음
r.xlen("empty_stream") # >>> 0
# 하지만 여전히 그룹 정보는 유지됨
r.xinfo_groups("empty_stream")
Kafka와의 차이점
Redis 스트림과 Apache Kafka의 주요 차이점:
- 파티셔닝: Redis 스트림은 파티션을 지원하지 않음
- 스케일: Redis는 단일 노드 기반, Kafka는 분산 시스템
- 메시지 크기: Redis는 작은 메시지에 최적화
- 복잡성: Redis가 훨씬 간단하고 설정이 쉬움
- 성능: 작은 메시지의 경우 Redis가 더 빠름
요약
Redis 스트림은 다음과 같은 강력한 기능을 제공합니다:
- 추가 전용 로그 구조: 시간순으로 정렬된 데이터 저장
- 고유한 ID 생성: 각 항목에 대해 타임스탬프 기반 ID 자동 생성
- 범위 쿼리: 시간 범위로 데이터 조회 가능
- 컨슈머 그룹: 여러 컨슈머 간의 작업 부하 분산
- 보류 메시지 관리: 처리되지 않은 메시지 추적 및 재할당
- 트리밍 옵션: 스트림 크기 제한으로 메모리 관리
- 장애 복구: XCLAIM과 XAUTOCLAIM을 통한 메시지 재할당
- 실시간 처리: XREAD의 블로킹 모드로 실시간 이벤트 처리
이러한 기능들은 Redis 스트림을 실시간 데이터 처리, 이벤트 소싱, 메시지 큐잉, 로그 수집, IoT 데이터 처리 등 다양한 사용 사례에 적합하게 만듭니다.