您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
Canal Instance原理浅析
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
Canal Instance原理浅析
自猿其说Tech
2022-09-27
IP归属:未知
561浏览
计算机编程
目前服务+实时数据同步主要采用自建的Canal集群,在日常工作中包含了大量的维护工作及线上问题处理,基于此对整个Canal进行了梳理, 本文将结合实际线上生产问题讲解Canal,对Canal Instance核心原理进行解析,包含开源组件的实现和服务+对开源组件的定制逻辑。 ### 1 Canal简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 ![](//img1.jcloudcs.com/developer.jdcloud.com/08a4d865-2393-4102-9d73-e331229e45b220220927152754.png) 工作原理: - canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议; - MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ); - canal 解析 binary log 对象(原始为 byte 流)。 摘自: https://github.com/alibaba/canal/wiki ### 2 Canal架构及相关角色 ![](//img1.jcloudcs.com/developer.jdcloud.com/ba4d7cf2-6ffe-4dd3-b924-e1e4271494f720220927152817.png) 说明: - server代表一个canal运行实例,对应于一个jvm; - instance对应于一个数据队列 (1个server对应1..n个instance)。 instance模块: - eventParser (数据源接入,模拟slave协议和master进行交互,协议解析); - eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作); - eventStore (数据存储); - metaManager (增量订阅&消费信息管理器)。 摘自:https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B ### 3 Instance解析 Intsance 代表单个canal实例,每一个destination会独立一个Intsance实例,每一个实例会监听一个数据库的binlog,对binlog事件的处理包含 parse->sink->store 三个过程, 同时还会维护一些元数据管理。 注: 此部分流程解析基于Canal的默认实现,配置文件位于:https://github.com/alibaba/canal/blob/master/deployer/src/main/resources/spring/file-instance.xml #### 3.1 Intance类图 ![](//img1.jcloudcs.com/developer.jdcloud.com/1eabaec2-eb00-4a79-91ce-17808d927a6f20220927152853.png) 如类图所示: CanalInstanceGenerator 是创建 Instance的抽象接口,有两个实现 : - SpringCanalInstanceGenerator : 通过 destination 的配置信息中的spring xml配置路径,初始化一个 ClassPathXmlApplicationContext ; - PlainCanalInstanceGenerator: 这个就是实现了从admin管理端拉取相关的配置,设置的Instance配置项中,最终其实还是初始化了一个 Spring的ClassPathXmlApplicationContext。 CanalInstance 是 Instance 的抽象接口,有两个实现: - CanalInstanceWithSpring : 基于spring容器启动canal实例; - CanalInstanceWithManager: 从管理端拉取配置,初始化的canal实例,但是在梳理代码的时候,发现这个其实已经不再使用了。 AbstractCanalInstance 为 Instance 实现的核心逻辑的抽象类,大部分逻辑都在此抽象类中实现的。 注: 本文章主要基于 SpringCanalInstanceGenerator 和 CanalInstanceWithSpring 进分析。 #### 3.2 Instance启动流程 ![](//img1.jcloudcs.com/developer.jdcloud.com/052572de-0ade-4a6c-aa92-dbafddd7c52220220927152930.png) Instance启动的时候会根据依赖关系倒叙来启动相关组件,也就是说依赖关系为 parser模块->sink模块->store模块->元数据模块,启动的时候会倒过来启动相关的组件。 #### 3.3 Binlog事件处理流程 如图所示展示了Canal Instance 监听到MySQL Binlog事件之后,从Parser->Sink->Store的全部过程,包含了模块内部处理的一些逻辑和数据结构,下面会进行详细说明: ![](//img1.jcloudcs.com/developer.jdcloud.com/037b4284-ae92-4d99-9cdf-fbbcb5c957f220220927152954.png) ##### 3.3.1 Parser过程 类图: ![](//img1.jcloudcs.com/developer.jdcloud.com/32f8f277-bbae-43ed-81ca-67994fe4969720220927153013.png) 流程图: ![](//img1.jcloudcs.com/developer.jdcloud.com/fd411b21-bc0d-422c-85e4-1cf30337c5e520220927153022.png) 是否并行默认配置为true,所以在不特殊配置的情况下将走并行处理数据逻辑。 ###### 1)并行处理 处理类:com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor 处理过程: - 网络接收 (单线程); - 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息); - 事件深度解析 (多线程, DML事件数据的完整解析); - 投递到store (单线程)。 实现方式: 主要依赖于disruptor ringbuffer 进行实现,模式为单生产者,多消费者,消费者包括一下几个: - SimpleParserStage: 主要对binlog事件进行初步,主要解析header、表元数据等,并且此处会通过配置的黑白名单对表名进行过滤; - DmlParserStage: 此消费者为并行处理,默认并发度为: Runtime.getRuntime() .availableProcessors() * 60 / 100 ,此消费者会对事件进行深度解析,主要针对DML相关事件,解析出完整的变更的数据、DML语句等信息,此处在处理过程中也会调用黑白名单对表名进行过滤,如果未通过校验,则不需要对此事件进行深度解析,节省资源; - SinkStoreStage:将解析后的事件存储到事务缓存中。 ###### 2)事件事务缓存队列 处理类:com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer 具体实现: 事件事务缓存队列主要的作用为提供按事务刷新数据的机制,由于事件都是一条一条发送的,但是对于实际业务基本都是按事务来处理,所以队列会保证在进入下一个环节的时候的数据是一个完整的事务,详细如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/4ef11f91-954c-44d7-8b01-f049f813cc3520220927153120.png) 如图所示,在遇到事务开始、事务结束、非DML事件、心跳等事件类型的时候回触发flush方法,将缓冲中的数据刷到sink模块进行后续处理codi。 ##### 3)构建最后的位点信息 在一批数据sink处理完成成功后,会获取本批数据中最后的位点信息并调用位点持久化模块进行持久化,构建位点信息的代码如下: ```java protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) { // 初始化一下 for (int i = entries.size() - 1; i > 0; i--) { CanalEntry.Entry entry = entries.get(i); if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position return buildLastPosition(entry); } } return null; } ``` 此处采用的倒叙的方式,遍历实体信息,当遇到事件类型为"事务结束"的时候会根据事务结束对应的实体信息构建位点信息,如果其他情况可能返回null,则不记录位点信息。 tip:此处代码有点小问题,可以找一下,后续在线上问题分析的时候也会分析 ##### 3.3.2 Sink和Store ![](//img1.jcloudcs.com/developer.jdcloud.com/92760da2-870a-4c1d-9c8b-482f5042e9df20220927153210.png) sink实现: com.alibaba.otter.canal.sink.entry.EntryEventSink 此处的这个Sink场景比价简单,主要是将接收到的数据转换为最终推送的存储的数据实体,此处有一个特殊逻辑,针对一些空事务,不是每次都会put到存储中,而是会语句一些逻辑对空事务进行过滤,详细代码如下: ```java // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) { lastEmptyTransactionCount.set(0L); lastEmptyTransactionTimestamp = currentTimestamp; return doSink(events); } ``` 空事务: 基于前面parser模块解析的过程,针对我们不关注的一些表的变更, canal不会解析具体的数据变更,只会发出事务的开始和结束,这种叫空事务。 store实现:com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer 此处使用的store只是基于内存buffer构建内存memory store。 ##### 3.3.3 位点持久化 此处使用的是: com.alibaba.otter.canal.parse.index.FailbackLogPositionManager。 其中: - primary 为 com.alibaba.otter.canal.parse.index.MemoryLogPositionManager (基于内存的保存位点); - second 为 com.alibaba.otter.canal.parse.index.MetaLogPositionManager (调用meta模块的配置保存位点)。 此manager在获取位点信息是会从meta和memory两个中获取。 在Canal服务端所有过程处理完之后会将位点保存到内存中,但是当Canal客户端消费完数据后,会将客户端信息和他当前消费的位点保存在文件中。 ### 4 线上问题分析 此部分主要讲一个服务+这边在使用的过程中遇到的一个问题。 #### 4.1 问题描述 此业务场景为,主数据一张表的下游数据使用方会监听此表的binlog信息来做数据同步,支持业务运行,但是在某一次业务对数据进行变更后下游数据使用方的数据长时间都未更新,查看Canal管理端记录的位点信息和数据主库当前实时的binlog位点信息,发现当前记录的位点为很久之前的位点信息,和数据实时的位点差异很大,并且在后台日志发现如下的日志信息: ``` ERROR LogAlarmHandler destination:database:table_XXX[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:238) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:262) at java.lang.Thread.run(Thread.java:745) java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.5.jar:?] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:238) ~[canal.parse-1.1.5.jar:?] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:262) ~[canal.parse-1.1.5.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] ``` #### 4.2 问题分析 分两个方面来分析问题: 1. 为什么位点不更新,由于私有化部署的Canal是通过一些定制的,是否是定制的模块有问题, 还是canal原生就有bug。 2. 功能设计层面,这个更新频率非常低的一个基础数据的表为什么要使用监听binlog的方式进行数据同步,具体的业务场景是什么,这么设计是否合理,由于本文主要为介绍Canal,所以设计合理性的问题在其他文章进行讨论。 #### 4.3 定制的模块 为了方便问题分析,首先看一下对原生Canal模块的定制有哪些,此处主要替换了两个部分: - 位点存储模块:实现了接口 com.alibaba.otter.canal.parse.index.CanalLogPositionManager 的自定义实现,针对位点存储,主要逻辑为首先将位点信息存储的redis中,同时启动一个异步线程 每3秒会将redis中的位点信息同步到数据库中。 - store的模块:实现了接口 com.alibaba.otter.canal.store.CanalEventStore 的自定义实现,主要逻辑为直接将数据直接发送到JMQ中,topic为管理端配置的topic。 #### 4.4 位点不更新分析 基于以上叙述,针对位点不更新的问题,根据上文所属的Instance的处理流程,结合我们定制的部分进行分析,系统记录位点的位置为事务缓存队列flush之后,经过sink模块处理成功之后,此处的代码没有定制化,代码如下: ```java boolean successed = consumeTheEventAndProfilingIfNecessary(transaction); if (!running) { return; } if (!successed) { throw new CanalParseException("consume failed!"); } LogPosition position = buildLastTransactionPosition(transaction); if (position != null) { // 可能position为空 logger.error("记录位点信息:{}", JSON.toJSONString(position)); logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position); } ``` 从上面代码可以看出,不记录位点有两个逻辑: 1. consumeTheEventAndProfilingIfNecessary 方法返回false,其实这个方法核心就是调用sink模块,依赖于sink模块的返回结果,而sink模块的结果依赖于store模块的结果,我们自定义的store模块都会返回 true,所以不会触发此逻辑,后续再此处增加日志也验证了此猜想。 2. buildLastTransactionPosition 构建位点信息返回的位点信息为null,在上文我们描述过 buildLastTransactionPosition 方法的实现,同时遗留了一个问题。代码如下: ``` protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) { // 初始化一下 for (int i = entries.size() - 1; i > 0; i--) { CanalEntry.Entry entry = entries.get(i); if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position return buildLastPosition(entry); } } return null; } ``` 分析代码可以看出,此段代码上可以看出有个明显的问题,就是在遍历消费到的事件实体构建位点信息的时候,使用的是 > 而不是 >= ,所以其实是消费不到第一条数据的,我理解Canal开发人员在此处设计的时候认为此处需要处理的都是按事务一组一组过来的,也就是说最后一条数据基本都是事务结束的事件,所以倒序第一条就能处理到,可以做到尽快拿到想要的结果。 此时我们猜想是否为此处造成的问题,所以我们对此处增加日志,发现进入到此构建位点的方法的数据包含事务的结束信息的List的长度都是 1, 也就是数据都是一条一条过来的,不是包括事务开始、事务结束一组一组来的,所以造成数据永远处理不到,所以位点信息也就不会更新,日志信息如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/7a6ca5c2-9c03-442e-b911-b6a126d5c25020220927154504.png) 至此其实可以确认一个修改方案,只需要把此处的 > 修改为 >= 即可。 另外针对这个问题在github issue上搜素也发现了类似的问题,并且有小伙伴提交了PR合并到master,但是在后续的过程中由被改回去了, 已经联系Canal的开发人员询问相关原因,如下: ![](//img1.jcloudcs.com/developer.jdcloud.com/72256bf1-6be6-49a5-b4fe-fecb5a1eea6a20220927153524.png) 另外此处涉及到另一个问题,也就是为什么此处消费到的数据都是一条一条的,而不是一组一组的,针对日志我们进行分析,发现在事务开始和事务结束之间有一个QUERY类型的事件信息,根据事务缓存队列的代码的分析 ,在不是DML事件的时候,会对数据进行队列进行flush,也就是说会触发sink和位点存储,是否DML判断代码如下: ```java private boolean isDml(EventType eventType) { return eventType == EventType.INSERT || eventType == EventType.UPDATE || eventType == EventType.DELETE; } ``` 可以看出QUERY类型是会触发flush,所以触发了数据不是按事务来触发的。 那这个QUERY类型的事件到底是什么,对源码进行分析, 其实就是执行的SQL语句,当数据库开启了 binlog_rows_query_log_events 参数的时候,binlog会记录触发的SQL语句,在Canal里面事务之前也就是会包含一个QUERY类型的事件信息,所以一个空事务就是 事务开头->QUERY->事务结束 。 至此另一种解决方案也就有了: 1. 修改isDml判断增加query类型的判断 。 2. 将 binlog_rows_query_log_events 数据库参数关闭。 最终我们选择了修改 buildLastTransactionPosition 代码逻辑 ,将 > 修改 >= 这种修改方案。 ### 5 附录 canal整体性能优化: https://github.com/alibaba/canal/issues/726 此issue里面有很多大家对性能优化的讨论,也解释了好多代码设计的原因。 ------------ ###### 自猿其说Tech-JDL京东物流技术与数据智能部 ###### 作者:史贤伟
原创文章,需联系作者,授权转载
上一篇:Pipeline流水线校验批量下单
下一篇:SLF4J门面日志框架源码探索
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2157088
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2157088
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号