如果要实现一个消息队列,在Redis 5.0之前,我们可能会想到LIST。在前面我们也提到,通过LPUSH,LPOP,RPUSH,RPOP操作,可以把List当成队列或者栈来使用。
1、使用LIST
使用List实现的一个最简单的消息队列模型如下:
消息队列由Redis的List充当,Provider使用LPUSH命令往消息队列中推送消息,Consumer使用RPOP从消息队列中拉取消息。
当然,可以支持多个消费者:
List中的元素一旦出队列之后,就再也找不到了这会导致消息一旦消费失败,就会导致消息丢失。为了保证消息的可靠性,我们可以引入多一个备份消息列表。每当执行POP的时候,顺便把消息写入到备份消息队列中,等待消费者真正的处理完消息之后,再从备份的消息队列中删除掉消息:
当然,也可以引入ACK机制,在消息消费完毕之后,再让消息从队列中POP出来,但这样会导致消息不能多个消费者并行消费,必须等到上一个消息处理完,并且发送了ACK之后,才会从List中取出,才能得继续读取下一条消息。
但是,使用List作为消息队列会存在以下问题:
- ID由客户端自己生成,需要客户端另外准备一个唯一ID生成器组件;
- 不支持消息分组:虽然可以支持多个消费者同时消费同一个消息队列,但是list这个结构不支持一条消息被多个消费者重复消费;
- 消息可靠性无法保证,消息可能会丢失,导致未被消费到,虽然可以通过备份消息队列处理。
为此,在Redis 5.0中,引入了Stream数据类型,非常适合用作消息队列。
为了进一步提高CPU效率,我们可以使用阻塞式的API,如
BRPOP
、BRPOPLPUSH
,这样可以挂起线程,让线程进入等待,避免在队列还没有元素的时候反复的进行网络请求,减少系统资源消耗。
2、使用Stream
Stream,抽象日志类型,存储起来的数据,不会立刻删除掉,而是可以传入一个偏移量进行反复读取。
Stream支持一下特性:
- 自动生成ID,ID顺序增长,保证有序性;
- 支持消息的分组消费,这个特性借鉴了kafka;
- 支持消息的ACK机制,支持重复读取消息,不像List中的消息,POP出来之后,就再也找不到了。
使用Stream实现消息队列的关键命令如下:
- XADD:添加日志消息
- XREAD:读取日志消息
- XGROUP:创建分组
- XREADGROUP:按分组读取消息
- XPENDING:检查待处理消息列表,即每个消费组内消费者已读取,但是尚未得到确认的消息
- XACK:用于消费完之后,发送ACK消息给Stream,这个时候消息将从XPENDING中移除
- XCLAIM:如果某一个客户端挂了,可以使用此命令,让其他Consumer主动接管它的pending msg
具体命令使用,参考Part I中的相关内容。
使用Stream实现的消息队列,我们在讲Stream数据类型的时候已经讲过了,我们再来回顾一下这张图:
- 生产者通过XADD命令往Stream中添加消息:
1 | XADD articles * title redis author arthinking |
-
默认的,会为每条消息生成一个唯一ID;
-
通过XGROUP CREATE创建Group分组;
-
消费者通过XREADGROUP命令消费分组中的消息,一条消息只会被同一个消费分组下的一个消费者消费,不同消费分组可以消费相同的消息
-
如果XREADGROUP命令没有指定NOACK选项,那么默认的会把每个消费分组中被消费者取出的消息放入待处理消息列表中;
-
消费者消费完消息后,执行XACK命令,把消息从待处理消息列表中删除。
使用消息队列需要注意什么
一般的,我们在使用消息队列的时候,业务实现尽量不要依赖消息的顺序,业务本身做好幂等,最后,要考虑消息可靠性,我们是否需要确保消息不能丢失,如果不能丢失,那么就要考虑装满的消息队列中间件了。
3、可以使用Stream代替消息队列中间件吗?
使用Redis的Stream做消息队列的优势是,部署简单,不需要依赖其他第三方组件。
需要注意的是,由于Redis持久化机制会导致丢数据的问题,Stream也可能丢消息。如果需要更加强大的消息队列,比如,金融业务场景,不允许丢失消息的场景,那么就得用上专业的消息队列组件了。
而Redis的Stream则更适用于发短信、消息推送等对可靠性要求不高的业务场景。