您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
Redis Stream的消费者组介绍
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
Redis Stream的消费者组介绍
自猿其说Tech
2021-09-01
IP归属:未知
44520浏览
Sql
MySQL
Stream 是 Redis 5.0 引入的一种新数据类型,它以一种抽象的方式来构建日志结构的数据。本文主要介绍Redis Streams的消费者组相关的信息。 ### 1 什么是消费者组 在某些问题中,我们想要做的是从同一流中向许多客户端提供不同的消息子集。一个明显有用的例子是处理缓慢的消息:让 N 个不同的客户端接收流的不同部分来加快消息的处理。例如:如果有三个消费者 A1、A2、A3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们想要达到的是像下面这样分配消息。 ```bash 1 -> A1 2 -> A2 3 -> A3 4 -> A1 5 -> A2 6 -> A3 7 -> A1 ``` 为了实现这一点,Redis 使用了一个叫做消费者组的概念。从实现的角度来看,Redis 消费者组与 Kafka (TM) 消费者组没有任何关系,只是在功能上是相似的:允许一组客户端合作消费同一消息流的不同部分。 我们可以将一个消费者组简单理解为一个从流中获取数据的特殊的消费者。它从流中获取数据,然后再服务于多个消费者,同时提供了如下的保证: 1. 每条消息都提供给不同的消费者,不会将相同的消息传递给多个消费者。 1. 在消费者组中,消费者通过客户端的名称(区分大小写的字符串)进行区分,当断开连接重新连通后,消费者客户端还是提供相同的名字,会被当做同一个消费者。这意味着在消费者组中由客户端提供唯一标识符。 1. 每个消费者组都有未被消费的第一个ID的概念,这样当消费者请求新消息时,它可以只提供以前未传递的消息。 1. 消费消息需要使用特定命令进行显式确认。Redis 将该确认解释为:此消息已正确处理,可以从消费者组中移除。 1. 消费者组跟踪所有当前挂起的消息,即已传递给消费者组的某个消费者但尚未确认为已处理的消息。由于此功能,当访问流的消息历史记录时,每个使用者将只看到传递给它的消息。 下面一起来看下消费者组相关的命令。 ### 2 消费者组命令 XGROUP ``` XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername] ``` 使用XGROUP可以: - 创建与流关联的新消费者组。 - 设置要传递的下一条消息。 - 销毁一个消费者组。 - 往消费者组中添加指定的消费者。 - 从消费者组中移除指定的消费者。 #### 2.1 创建消费者组 ```bash > XGROUP CREATE mystream mygroup $ OK ``` 在创建命令中我们必须指定一个 ID,在示例中是$。这是必需的,因为消费者组必须知道在第一个消费者连接时接下来要提供哪条消息。$表示从现在开始到达流中的新消息才会提供给组中的消费者。我们也可以指定一个有效ID,会提供给消费者大于指定ID的消息。 XGROUP CREATE还支持自动创建流,如果流不存在,使用可选的MKSTREAM子命令作为最后一个参数可以自动创建对应的流: ```bash > XGROUP CREATE mystream1 mygroup $ MKSTREAM OK ``` #### 2.2 设置要传递的下一条消息 使用这种命令,可以修改消费者组要获取的下一个ID,而无需再次删除和创建使用者组。 ```bash > XGROUP SETID mystream mygroup $ OK ``` #### 2.3 销毁一个消费者组 可以使用以下形式完全销毁一个消费者组: ```bash > XGROUP DESTROY mystream1 mygroup 1 ``` 即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。 #### 2.4 消费者的添加与移除 ```bash # 往消费者组中添加指定的消费者。 > XGROUP CREATECONSUMER mystream mygroup zhangsan 1 # 从消费者组中移除指定的消费者。返回消费者在被删除之前所拥有的待处理消息数量。 > XGROUP DELCONSUMER mystream mygroup zhangsan 0 ``` ### 3 从流中读取数据 XREADGROUP XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 在从流中读取之前,让我们将一些消息放入其中: ```bash > XADD mystream * message a 1628343346807-0 > XADD mystream * message b 1628343351619-0 > XADD mystream * message c 1628343360812-0 > XADD mystream * message d 1628343369925-0 > XADD mystream * message e 1628343373721-0 > XADD mystream * message f 1628343379103-0 ``` 使用如下命令通过消费者组读取流中的数据。 ```bash > XREADGROUP GROUP mygroup zhang COUNT 1 STREAMS mystream > mystream 1628343346807-0 message a ``` XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。从语法的角度来看,这两个命令几乎是相同的,但是XREADGROUP需要一个特殊和强制的选项:GROUP<group-name> <consumer-name>。 #### 3.1 GROUP<group-name> <consumer-name> group-name是关联到流的消费者组的名称。consumer-name是客户端用于在消费者组内标识自己的字符串。对应消费者不存在时会自动创建,不同的消费者应该选择不同的消费者名称。 使用XREADGROUP时在STREAMS选项中指定的ID可以是以下两种之一: 1. 特殊ID>,意味着消费者希望只接收从未发送给任何其他消费者的消息。这意思是说,请给我新的消息。 1. 任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将返回发送命令的消费者的待处理条目信息。所以,基本上如果ID不是>,该命令将返回消费者的待处理条目信息(已发送给它,但尚未确认的条目)。 对于第2点的测试结果如下: ```bash > XREADGROUP GROUP mygroup zhang COUNT 1 STREAMS mystream 0 mystream 1628343346807-0 message a ``` 我们可以创建多个消费者来消费这个流中的消息。 ```bash > XREADGROUP GROUP mygroup li COUNT 1 STREAMS mystream > mystream 1628343351619-0 message b > XREADGROUP GROUP mygroup wang COUNT 2 STREAMS mystream > mystream 1628343360812-0 message c 1628343369925-0 message d ``` 就像XREAD一样,XREADGROUP命令也可以以阻塞的方式使用。在这方面两者没有区别。具体请参考<a href="https://developer.jdcloud.com/article/2127" target="_blank">Redis Stream介绍(一)</a> ### 4 消费确认命令 XACK XACK key group ID [ID …] 当消费者将消息正确处理后,需要调用确认命令来确认消费。只有当调用确认命令后,才会将该消息从待处理的返回中移除。 ```bash > XREADGROUP GROUP mygroup li COUNT 1 STREAMS mystream 0 mystream 1628343351619-0 message b > XACK mystream mygroup 1628343351619-0 1 > XREADGROUP GROUP mygroup li COUNT 1 STREAMS mystream 0 mystream ``` ### 5 总结 以上就是关于Redis Streams的消费者组相关的介绍和使用命令,对于是否使用消费者组: 1. 如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。 1. 如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行分区或共享,能获得一个流消息的子集,那么你需要使用消费者组。 Redis 流中的消费者组在某些方面可能类似于基于 Kafka (TM) 分区的消费者组,但是请注意,实际上它们是不同的。Redis 流的分区只是概念上的,消息都放在一个 Redis 键中,不同客户端获取的消息内容取决于谁准备好处理新消息,而不是客户端在处理哪个分区。例如,上述示例中如果消费者 li 在某个时刻永久失败,Redis 将继续为zhang 和 wang 提供所有到达的新消息,跟之前相比现在相当于只有两个分区,而Kafka 的分区不会自动更改。类似地,如果给定的消费者处理消息的速度比其他消费者快得多,则该消费者将接收更多的消息,而Kafka中消费者只能获取对应分区的消息。 在Redis中,如果你真的想将同一个流中的消息分割成多个Redis实例,你必须使用多个key和一些分片系统,比如Redis Cluster或其他一些特定于应用程序的分片系统。单个 Redis 流不会自动分区到多个实例。 ------------ ###### 自猿其说Tech-JDL京东物流技术发展部 ###### 作者:网规技术部 管碧强
原创文章,需联系作者,授权转载
上一篇:搜索中常见数据结构与算法探究(一)
下一篇:iPaaS在云原生的思考和探索
相关文章
【技术干货】企业级扫描平台EOS关于JS扫描落地与实践!
开发也要防沉迷--IDEA插件教程
如何保证MySQL和Redis的数据一致性?10分钟带你搞定!
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号