redis 消息队列方案
观察角度:消息有序,重复消息处理,消息可靠性保证
pub/sub 发布订阅机制
list集合
消息有序:lpush和rpop可以保证消息顺序的被消费
重复消息处理:list没有为消息提供唯一标识,需要生产者提供唯一标识,消费程序自行判断消息是否重复
消息可靠性保证:讲的是消息因为网络,停电,宕机等突发因素丢失,list没有为消息保存副本,这样消费者如果正在消费时有突发因素,那么就会发生消息丢失。
所以list做消息队列时,如果是对消息丢失不是很敏感的场景可以使用。
stream
优点:支持持久化,支持消费组形式的消息读取
Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取
Stream基本结构
Redis Stream像是一个仅追加内容的消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和内容,它还从 Kafka 借鉴了另一种概念:消费者组(Consumer Group),这让Redis Stream变得更加复杂。
Redis Stream的结构如下:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 XADD 指令追加消息时自动创建。
-
Consumer Group: 消费者组,消费者组记录了Starem的状态**,使用 XGROUP CREATE 命令手动创建,在同一个Stream内消费者组名称唯一。一个消费组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享Stream内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费Stream中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。 -
last_delivered_id: 游标,用来记录某个消费者组在Stream上的消费位置信息**,每个消费组会有个游标,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。创建消费者组时需要指定从Stream的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化 last_delivered_id 这个变量。这个last_delivered_id一般来说就是最新消费的消息ID。 -
pending_ids: 消费者内部的状态变量,作用是维护消费者的未确认的消息ID。pending_ids记录了当前已经被客户端读取,但是还没有 ack (Acknowledge character:确认字符)的消息。 目的是为了保证客户端至少消费了消息一次,而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息ID 就会越来越多,一旦某个消息被ack,它就会对应开始减少。 这个变量也被 Redis 官方称为 PEL (Pending Entries List)。
插入消息
往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。*号代表自动创建全局唯一的id,也可以手动生成,但必须是增长且唯一的。
XADD mqstream * repo 5
"1599203861727-0"
读取消息
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
-
count :数量 -
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式 -
key :队列名 -
id :消息 ID
创建消费者组 (XGROUP CREATE)
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
-
key :队列名称,如果不存在就创建 -
groupname :组名。 -
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
消费组命令
消费者组消费消息(XREADGROUP GROUP)
使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
-
group :消费组名 -
consumer :消费者名。 -
count : 读取数量。 -
milliseconds : 阻塞毫秒数。 -
key : 队列名。 -
ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

本文由 mdnice 多平台发布