Redis Stream学习笔记
什么是Redis Stream
Redis Stream是一个基于时间序列的数据结构,用于存储和处理多个流(stream)的消息(message)。每个消息都有一个唯一的ID和一个时间戳,可以根据不同的条件进行读取和处理。
如何创建Redis Stream
创建Redis Stream可以使用XADD
命令,语法如下:
Copy CodeXADD stream-name ID field1 value1 [field2 value2 ...]
其中,stream-name
为流名,ID
为消息ID,field1
和value1
为消息的键值对。例如:
Copy CodeXADD mystream * name john age 30
这条命令会往流mystream
中添加一条消息,消息ID为自动生成的一个ID,键为name
,值为john
,键为age
,值为30
。
如何读取Redis Stream
读取Redis Stream可以使用XREAD
命令,语法如下:
Copy CodeXREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
其中,COUNT
表示最多读取的消息数量,BLOCK
表示阻塞等待的时间,key
为流名,ID
表示消息ID。例如:
Copy CodeXREAD COUNT 10 BLOCK 10000 STREAMS mystream 0
这条命令会从mystream
中读取最多10条消息,如果没有消息则阻塞10秒钟。
Redis Stream实例
下面是一个简单的Redis Stream实例,实现了一个简单的聊天室功能。
Copy Codeimport redis
class Chatroom:
def __init__(self):
self.redis = redis.Redis()
def write_message(self, user, message):
# 写入消息
self.redis.xadd('chatroom', {'user': user, 'message': message})
def read_messages(self, last_id=0):
# 读取消息
messages = self.redis.xread({'chatroom': last_id})
if messages:
stream, message = messages[0]
for msg in message:
yield msg['user'], msg['message'], msg['id']
使用上述代码可以创建一个名为chatroom
的流,并且可以进行消息的写入和读取。例如:
Copy Codechatroom = Chatroom()
chatroom.write_message('Alice', 'Hello!')
chatroom.write_message('Bob', 'Hi!')
for user, message, id in chatroom.read_messages():
print(f'{user}: {message} ({id})')
这段代码会输出以下结果:
Copy CodeAlice: Hello! (1635457366383-0)
Bob: Hi! (1635457366383-1)