以下是 Redis Streams 的一些基本操作:
1. 创建 Stream:
XADD key [ID] field value [field value ...]
XADD mystream * sensor:1 temperature 25.5
在这个示例中,mystream 是 Stream 的键名,* 表示使用自动生成的时间戳作为消息的 ID,sensor:1 是消息体中的字段,temperature 是字段的值。
2. 读取 Stream 中的消息:
XREAD COUNT count STREAMS key start
XREAD COUNT 10 STREAMS mystream 0
这个命令用于读取指定 Stream 的消息。COUNT 参数指定要返回的消息数量,0 表示读取当前所有可用的消息。
3. 消费者组:
Streams 支持消费者组(Consumer Groups)的概念,多个消费者可以共同消费一个 Stream。消费者组能够自动追踪消息的消费状态,确保每个消息只被消费一次。
- 创建消费者组:
XGROUP CREATE key groupname $
XGROUP CREATE mystream mygroup $
- 消费者组读取消息:
XREADGROUP GROUP groupname consumername COUNT count STREAMS key start
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream 0
4. 确认消息的消费:
XACK key groupname ID [ID ...]
XACK mystream mygroup 1607440347892-0 1607440347892-1
XACK 命令用于向 Redis 服务器确认一个或多个消息已经被成功消费。
5. 其他命令:
- XTRIM: 用于修剪 Stream,删除过期的消息。
- XPENDING: 用于获取待处理消息的信息。
- XDEL: 用于删除指定的消息。
Streams 提供了强大的消息流处理功能,适用于实时数据流的处理场景,如日志处理、事件驱动架构等。通过使用消费者组,可以实现多个消费者共同处理消息,确保高可用性和负载均衡。
转载请注明出处:http://www.zyzy.cn/article/detail/14272/Redis