您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
Kafka Broker通信模型与顺序处理Producer消息的原理
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
Kafka Broker通信模型与顺序处理Producer消息的原理
自猿其说Tech
2021-10-14
IP归属:未知
39280浏览
计算机编程
本文首先简略介绍了Kafka与producer交互时的通信模型。然后基于通信模型,主要探讨Producer将消息发送给Broker后,Broker如何保证消息的顺序性处理的问题。文章介绍了broker与producer通信时的IO模型;然后基于这种IO模型,介绍了多producer情况下,broker是如何将各个producer产生的消息顺序地放入内存中的requsetQueue中;最后,通过结合源码的方式,介绍了broker端多线程处理的情况下,还能顺序处理每个producer产生的消息的原理。读者可以多多关注kafka broker的通信架构。 ### 1 铺垫 对于Broker集群中的某一个Broker,可能会有大量的Producer与其连接,形成的网络简图如下: <center>![](//img1.jcloudcs.com/developer.jdcloud.com/158f1679-8b6d-4ee7-a9a5-3480c5a21cc820211014134537.png) 图1 大量producer与同一个broker建立连接</center> Broker作为服务端,想要承受得起大量的Producer客户端连接,就像Tomcat、Redis等其他中间件一样,必须得使用多路复用器来处理各个客户端的连接,并且,由于连接上来的客户端数量巨大,对应的管理R/W的多路复用器(连接处理器)可能需要多个,因此形成的网络图大致如下所示: <center>![](//img1.jcloudcs.com/developer.jdcloud.com/cf04427c-b168-448a-ba58-40ca423508ac20211014134808.png) 图2 broker接收producer消息时的通信模型</center> Kafka并没有使用Netty,但是IO处理模型类似于Netty,图中listen类似于Netty中的BossGroup, 连接处理器类似于Netty中的WorkerGroup。 ### 2 Broker顺序处理Producer消息(一) Kafka能够保证的是同一个Producer发送的消息是有序的,但是不能保证多个Producer之间的消息顺序有序性(如果要达到多个Producer消息有序性,只能在Producer客户端做手脚,比如分布式锁)。很容易想到,broker要保证单个Producer消息的有序性,可以使用一个队列requestQueue来缓存各个连接处理器获得的消息,然后用Handler来处理requestQueue中的消息数据(响应客户端、持久化、主备同步等)如图3所示。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/6cff7604-0e8f-463b-bed6-0d408f87d53c20211014134850.png) 图3 broker通过一个全局requset队列缓存各个producer发来的消息</center> 图中的三角形、菱形、圆形、六边形分别代表多个Producer发送的消息,先后到达连接处理器,然后按照先后顺序放入request队列中,对于某个producer与broker建立的连接,只会被分配给一个连接处理器来监听、处理。因此对于同一个producer,产生的消息在连接处理器中是有序的(底层是操作系统的recv-q队列缓存),所以放入request队列的顺序也是有序的。 ### 3 Broker顺序处理Producer消息(二) 图3中,通过一个request队列保证了单个Producer生产的消息日志的顺序性。如果Handler是单线程运行的(只有一个Handler),那么Handler将request队列中的消息日志写入磁盘也能保证其顺序性。但是,kafka broker的Handler是多线程运行的,那么kafka是如何保证消息能顺序地写入磁盘呢? 首先想到的是对request队列加锁,一个Handler线程从request队列中获取一条/一批消息,并且处理完成之后再释放锁。但是这种方式的本质是:任一时刻,只有一个Handler在运行,退化成了单线程,且其他没有运行的线程白白占用内存空间,当锁释放后还会产生锁竞争,还不如单线程。 细心的小伙伴可能已经发现,在图3中,producer-1产生的六变形,在request队列中我画成了虚线。实际上,kafka将某一producer生产的消息放入requset队列时,是一条一条(**或者是一批一批,取决于producer消息的发送速度与broker处理消息的速度之间的差距**)放入的,处理完一条/一批,再放入下一条/一批。无论是一条还是一批,站在broker端的角度,都是从与当前producer产生的socket连接的recv-q接收队列中拉取一次数据,而拉取的这一条/一批数据,将作为一个不可分割的处理单元(不会被多个线程处理)。为描述方面,**后文简称这一条/一批数据为“处理单元”**。 为了能够解释清楚broker消费producer消息的顺序性,我们需要先看一下broker处理producer消息的整体逻辑。源代码和相应注释如图4所示。图中展示的循环逻辑,后文称为Main-Loop. <center>![](//img1.jcloudcs.com/developer.jdcloud.com/a1df1789-f843-469f-8b26-d8947efbb2af20211014134954.png) 图4 broker与producer交互时的整体逻辑</center> 在图4的整体逻辑中,保证顺序地处理每个producer产生的消息数据,主要涉及到三个方法:poll()、processCompletedReceives()、processCompletedSends()。下面分别介绍这三个方法。 #### 3.1 poll()方法 对于poll()方法,我们可以先看一下源码中对于该方法的部分注释,如图5: <center>![](//img1.jcloudcs.com/developer.jdcloud.com/3dcdeef8-056a-4a4a-8b84-e730682c9da020211014135038.png) 图5 poll()方法的注释</center> 注释大意是说,request队列中的数据可能会被多线程处理,为了保证对同一个producer对应的channel中的数据进行顺序性地处理,在每次执行Main-Loop中的poll()方法时,对同一个producer对应的channel中的数据只会被拉取一次,并且会作为一个“处理单元”(注释中的entry的key实际对应的是一个channel)放入completedReceives变量中。 由于poll()方法内部逻辑比较长,**下面介绍的源码只展示关于“顺序性处理producer消息”相关代码。** 在poll()方法中,为了保证顺序性处理消息,主要涉及到两个重要的变量: - private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; - private final List<NetworkReceive> completedReceives; ##### 3.1.1 stagedReceives变量相关逻辑 stagedReceives变量用来保存在一次Main-Loop过程中拉取到的不同producer对应的channel中的数据。如图6所示: <center>![](//img1.jcloudcs.com/developer.jdcloud.com/5871e8f1-6343-4131-bfd8-eb911bb62e2920211014135224.png) 图6 暂存数据到stagedReceives变量中</center> 从图6中可以看到,暂存当前channel中的数据到stagedReceives变量的条件是:stagedReceives变量中没有当前channel的数据。 ##### 3.1.2 completedReceives变量相关逻辑 completedReceives变量由stagedReceives中的数据做简单转换而来,具体代码如图7、图8所示。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/a3ce346e-c3ac-4c1d-9b1f-8cc73921e7c720211014135254.png) 图7 将stagedReceives中各个channel对应的数据加入到completedReceives(一)</center> <center>![](//img1.jcloudcs.com/developer.jdcloud.com/584d1826-7771-493b-840e-d360d0f3c6d920211014135318.png) 图8 将stagedReceives中各个channel对应的数据加入到completedReceives(二)</center> stagedReceives变量实际是一个 Map<KafkaChannel, Deque<NetworkReceive>>类型的变量,在图8中也看到,在把stagedReceives中某个channel中的数据加入到completedReceives时,取的是双端队列中头部的元素。给人一种感觉:对于同一个channel,stagedReceives变量似乎会为该channel保存多个“处理单元”数据,但是图6的逻辑中保证了:对于同一个channel,只会缓存一个“处理单元”的数据。所以猜想kafka作者设计stagedReceives变量为 Map<KafkaChannel, Deque<NetworkReceive>>结构,是为了扩展性等其他原因。无论如何,目前的逻辑是,在一次Main-Loop循环过程中,对于同一个channel中的数据,只会被拉取一个“处理单元”到completedReceives变量中! #### 3.2 processCompletedReceives()方法 processCompletedReceives()方法主要功能是将这一次Main-Loop过程中对各个channel拉取到的“处理单元”数据放入requestQueue队列中,并且移除对各个channel的OP_READ事件监听,源代码如图9、图10所示。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/1ad278d3-8e33-48ff-b8a3-a4c030e2b83820211014135652.png) 图9 processCompletedReceives()方法的整体逻辑</center> <center>![](//img1.jcloudcs.com/developer.jdcloud.com/0d5987b8-1b83-4b99-a85b-e4945af477f120211014135713.png) 图10 实际移除对当前socket连接的OP_READ事件监听</center> #### 3.3 processCompletedSends()方法 processCompletedSends()方法主要完成对各个channel的unmute操作,以便持续处理和各个producer连接的recv-q中的消息数据,如图11、图12所示。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/ff3bd48c-ce37-4314-8731-914b928d2e5420211014135804.png) 图11 processCompletedSend()方法整体逻辑</center> 需要说明的是,图11中的completedSends中的元素添加,是在3.2 中介绍的poll()方法中完成的,上文说过,在poll()方法中完成了读写操作,当broker对一个producer响应完成之后,就会将对应channel加入到completedSends集合中。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/95ae41ba-16d5-4c7c-9f20-96894b2ee4ea20211014135836.png) 图12 实际重新添加对当前socket连接的OP_READ事件监听</center> #### 3.4 一些补充信息和说明 ##### 3.4.1 stagedReceives变量和completedReceives变量清空操作 细心的读者可能已经发现,从3.2到3.4中没有介绍何时将stagedReceives变量和completedReceives变量清空,以便下一次循环Main-Loop时不重复处理数据。实际上,stagedReceives变量和completedReceives变量的清空是在poll()方法一开始的时候执行的清空操作。 ##### 3.4.2 多线程处理requestQueue源代码 前文一直提到Kafka处理request请求的Handler是多线程的,下面给出源码如图13所示。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/0550c797-46f1-4417-8987-db045167b5e320211014135920.png) 图13 启动多线程处理请求</center> 对Producer发送的消息的处理,是这些线程从requestQueue(前文中提到的“request队列”)中拉取处理的,具体我们可以进到KafkaRequsetHandler的run()方法中查看。源码如图14。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/3e542d47-266e-4c83-a0ce-44d22e6318c320211014135947.png) 图14 线程从requestQueue拉取消息进行处理</center> receiveRequest()方法的具体逻辑如图15。 <center>![](//img1.jcloudcs.com/developer.jdcloud.com/3c44824b-50a6-49c2-8d27-49970e41693520211014140017.png) 图15 receiveRequest()方法的具体逻辑</center> 关于更多的源码细节,读者可以自行下载kafka源码进行阅读。本文中的kafka源码来自kafka-2.2.0版本。 ### 4.关于Kafka频繁移除和添加OP_READ事件的思考 通过上文,kafka broker需要频繁移除和添加OP_READ事件的根本原因是图3中处理requestQueue中数据的Handler是多线程运行的,因此为了保证顺序性处理,必须保证对于同一个channel中的一个“处理单元”的数据只能在requestQueue中存在一份,处理完了在存入下一份,否则,多线程处理乱序,所以kafka采用了频繁移除和添加OP_READ的操作。 如果Handler使用单线程,即便是同一个producer的多个“处理单元”同时存在于requestQueue中,也不会乱序。那么为什么kafka的Handler要采用多线程呢?这不禁让人与其同Redis对比,Redis为什么使用单线程处理呢? 个人认为有如下几点原因: 1. kafka broker只是单纯地将消息append到对应的分区,并没有对消息做任何的计算,它使用多线程来处理这些消息唯一要保证的就是:对于同一个producer发送的消息的顺序性处理。使用多线程可以充分利用cpu,只不过付出的代价是频繁地移除和添加OP_READ事件。kafka经过权衡,还是使用多线程的效率更高(纯属猜想)。 1. 而Redis不一样,Redis的业务线程是要计算key、value的,比如加减乘除、求交集、求并集等操作。如果使用多线程,则势必要涉及到加锁,加锁不仅实现麻烦,而且并发量大时,反而会影响性能。 ### 5 总结 本文从broker和producer通信模型入手,探究了broker端保证各个producer消息顺序性的原理。看起来高大上的一个特性,其实现方式却依赖的是多路复用器中的一个小知识。实际上IO模型是大多数中间件中比较核心的东西,值得好好学习。 ------------ ###### 自猿其说Tech-JDL京东物流技术发展部 ###### 作者:客户服务技术部 伍泓全
原创文章,需联系作者,授权转载
上一篇:WebScoket简介与使用
下一篇:浅析@Transactional注解原理及使用
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号