Redis Stream学习笔记

什么是Redis Stream

Redis Stream是一个基于时间序列的数据结构,用于存储和处理多个流(stream)的消息(message)。每个消息都有一个唯一的ID和一个时间戳,可以根据不同的条件进行读取和处理。

如何创建Redis Stream

创建Redis Stream可以使用XADD命令,语法如下:

Copy Code
XADD stream-name ID field1 value1 [field2 value2 ...]

其中,stream-name为流名,ID为消息ID,field1value1为消息的键值对。例如:

Copy Code
XADD mystream * name john age 30

这条命令会往流mystream中添加一条消息,消息ID为自动生成的一个ID,键为name,值为john,键为age,值为30

如何读取Redis Stream

读取Redis Stream可以使用XREAD命令,语法如下:

Copy Code
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

其中,COUNT表示最多读取的消息数量,BLOCK表示阻塞等待的时间,key为流名,ID表示消息ID。例如:

Copy Code
XREAD COUNT 10 BLOCK 10000 STREAMS mystream 0

这条命令会从mystream中读取最多10条消息,如果没有消息则阻塞10秒钟。

Redis Stream实例

下面是一个简单的Redis Stream实例,实现了一个简单的聊天室功能。

Copy Code
import redis class Chatroom: def __init__(self): self.redis = redis.Redis() def write_message(self, user, message): # 写入消息 self.redis.xadd('chatroom', {'user': user, 'message': message}) def read_messages(self, last_id=0): # 读取消息 messages = self.redis.xread({'chatroom': last_id}) if messages: stream, message = messages[0] for msg in message: yield msg['user'], msg['message'], msg['id']

使用上述代码可以创建一个名为chatroom的流,并且可以进行消息的写入和读取。例如:

Copy Code
chatroom = Chatroom() chatroom.write_message('Alice', 'Hello!') chatroom.write_message('Bob', 'Hi!') for user, message, id in chatroom.read_messages(): print(f'{user}: {message} ({id})')

这段代码会输出以下结果:

Copy Code
Alice: Hello! (1635457366383-0) Bob: Hi! (1635457366383-1)