Canal binlog 日志 Dump 流程分析
点击上方“中间件兴趣圈”,选择“设为星标”
Canal 的 dump 支持串行和并行模式两种模式,本篇重点梳理 dump 的核心流程,以便对 dump 过程有一个充分的了解,更好的理解 Canal 的实现原理与细节,下一篇中将重点关注Canal是如何引入并行模式来提高dump的性能,即并行编程相关的技巧。
从前面的文章我们得知 Canal binlog 日志解析的基本流程如下图所示:
解析来重点梳理一下 dump 命令的发送逻辑,特别是日志的处理流程,一些基本的日志格式。
1、 dump 流程分析
在 Canal 中 dump 方法声明如下:
String binlogfilename
binlog 文件名称,例如 mysql-bin.000038。Long binlogPosition
在文件中的偏移量。SinkFunction func
每解析出一条binlog日志的处理函数。
接下来我们直奔主题,一起来看一下 MysqlConnection 关于 dump 的实现流程。
Step1:在发送dump之前先设置相关的参数。
set wait_timeout=9999999
连接空闲超时时间,默认为8消息,用于 Canal Slave 的等待超时时间远大于默认值。set net_write_timeout=1800
网络写请求超时时间,针对正在进行数据读写的连接,该值默认为 60s。set net_read_timeout=1800
网络读请求超时时间,针对正在进行数据读写的连接,该值默认为 30s。set names 'binary'
设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化。set @master_binlog_checksum= @@global.binlog_checksum
设置master_binlog_checksum,因为在mysql5.6之后为binlog引入了checksum机制,例如crc32,canal作为mysql slave,需要与服务端相关参数保持一致。set @slave_uuid=uuid()
canal相对与mysql数据库服务而言就是一个从服务器,这个指令用于设置server_id,使用uuid,避免server_id重复。SET @master_heartbeat_period=15
设置客户端与服务端心跳发送间隔,默认为15s。MysqlConnection#dump
Step2:从主库查询binlog checksum,具体向主库发送 select @@global.binlog_checksum 语句。MysqlConnection#dump
Step3:向MySQL Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。MysqlConnection#dump
Step4:向 MySQL Master 发送 dump 请求,MySQL是基于请求与应答模式,发送请求命令后,就会向网络通道中写入响应请求。(在这里大家不妨先大概思考一下如何读取 dump 命令的返回值,这部分虽然涉及到网络相关的知识,我在这边会稍微简单提一下)。MysqlConnection#dump
Step5:构建 DirectLogFetcher对象,实现基于 socket 的日志拉取服务,并构建 LogDecoder 对象,用于解析 binlog 日志。MysqlConnection#dump
Step6:使用 while 循环反复拉取消息,通过通过 LogDecoder 对二进制流进行解析,提取一条完整的binlog事件,交给 SinkFunction 去处理,并且如果开启了半同步机制,则需要向master发送ACK。既然是while循环,该方法的退出条件还是值得我们关注的:fetch.fetch()方法返回 false
SinkFunction 的 sink 方法 false,SinkFunction的详细处理流程将在下文介绍,这里先告知返回false的情况是 binlog 日志解析线程已停止运行。
上面粗略的介绍了 dump 命令的几个核心关键步骤,要想详细掌握其实现细节,我们必须继续深入探讨如下几个问题:
DirectLogFetcher 内部工作机制
LogDecoder binlog 日志解析
发送Dump底层网实现思路
2、DirectLogFetcher 内部工作机制
2.1 DirectLogFetcher 类图
LogBuffer
日志buffer,主要定义如下属性:byte[] buffer
缓存区中数据容器。int origin
当前buffer中的读指针int limit
当前buffer的最大可读可写指针int position
当前buffer的写指针。int semival
是否需要发送ACK(用于半同步)。
LogBuffer封装了字节相关的操作,不仅定义了上面的属性,也定义了字节读取相关众多API,其截图如下:在这里插入图片描述 LogFetcher binlog日志抓取抽象类,定义了如下关键属性与抽象方法。
int DEFAULT_INITIAL_CAPACITY
LogBuffer中的初始容量,默认为8K。float DEFAULT_GROWTH_FACTOR
容量增长因子,默认为 2.0。int BIN_LOG_HEADER_SIZE
binlog日志条目 header 的长度,固定为4字节。float factor
增长因子。public abstract boolean fetch()
抓取binlog日志。public abstract void close()
关闭 Fetch。DirectLogFetcher Canal LogFetcher模式实现类,其核心属性如下:
SocketChannel channel
网络通道,用于发送dump请求的网络通道。boolean issemi = false
是否开启半同步。
2.2 fetch流程详解
接下来我们重点剖析 DirectLogFetcher 的 fetch 方法,来探究其实现原理。
在研究DirectLogFetcher的fetch方法之前,我们先重点跟踪一下其内部网络读写方法fetch0方法,该方法是具体与网络读写相关的实现。
在详细介绍该方法之前先来介绍一下其参数的含义:
int off
从通道中读取到的内容放入到buffer中的起始位置int len
期望从通道中读取的字节长度。
该方法的实现关键点如下:
首先先确保接收缓存区有足够的剩余空间,如果空间不足,则进行扩容。
然后从通道中读取指定长度的字节。
接下来我们来重点看一下DirectLogFetcher的fetch的实现流程。
Step1:尝试从网络通道中读取4个字节(即读取协议的头部),如果通道中还没有可读取内容,返回false,造成的效果是一次 dump 请求结束。
Step2:从上文读到的4个字节分别读出该网络包的总长度以及当前包的序号,从这里可以看成MySQL协议头为4字节,前3个字节为网络包的总长度,第4个字节为包的序列号。再取出数据包的长度后,继续向通道中读取netlen个字节,即读取一个完整的数据包到buffer中。
Step3:继续从数据包中读取一个字节,判断该包的状态码,是否是一个成功的响应,如果是错误的响应,会向外抛出一次,Canal 会记录dump命令执行错误的次数。
Step4:如果一个包的长度为允许的最大包长度,则继续读取,这个主要是根据MySQL协议做的处理,即读取到一个数据包,然后返回true,表示拉取到一条日志,然后通过LogDecoder解码,然后传入到sink方法中,进行日志的后续处理。
Step5:这一步的目的,就是将buffer中的当前指针指向数据的开始位置。这样一次 fetch就结束了。
从上面的流程来看,DirectLogFetcher#fetch 方法结束后,就将进入到LogDecoder中。经过一次DirectLogFetcher#fetch方法后,即取回一条binlog日志,即二进制流,接下来就根据binlog协议对其解析。本文暂不深入该方法,如果大家想深入数据库中间件方面,可以作为一个很好的示例,面向MySQL通信协议进行编程。
3、SinkFunction
通过 LogDecoder从中解析一个事件后,会调用SinkFunction的sink方法,如果该方法返回 false,一次dump请求将介绍,接下来我们看一下其sink方法。
该方法的实现比较简单,这里不打算继续深入,我们重点来看一下 Canal.Entry 的结构:
这个结构是基于Canal做架构设计,解决顺序消费、数据不丢失一个重要参考依据,没解析一条事务,最终放入到环形缓存区,环形缓存区尽量以一个事务提交到Sink组件,其代码如下:
这里主要有如下几个关键点:
首先需要调用EventSink组件将解析出来的数据传入EventSink。
EventSink组件处理成功后,会提交解析位点。
原创不易,如果对你有所帮助请你为本文点个【在看】吧,这将是我写作更多优质文章的最强动力。
欢迎加入我的知识星球,一起交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按如下二维码加入。
本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
HBase/TiDB都在用的数据结构:LSM Tree,不得了解一下?
LSM Tree(Log-structured merge-tree)广泛应用在HBase,TiDB等诸多数据库和存储引擎上,我们先来看一下它的一些应用: 参考 资料【4 】 这么牛X的名单,你不想了解下LSM Tree吗?装X之前,我们先来了解一些基本概念。 设计数据存储系统可能需要考虑的一些问题有:ACID,RUM(Read,Write,Memory)。 ACID ACID 相信小伙伴都被面试官问过,我想简单讨论的一点是:如何 持久化数据 才能保证数据写入的 事务性 和 读写性能? 事务性可简单理解为:1.数据必须持久化。2.一次数据的写入返回给用户 写入成功就一定成功,失败就一定失败。 读写性能可简单理解为:一次读 或 一次写 需要的IO次数,因为访问速率:CPU>>内存>>SSD/磁盘。 对于单机存储,最简单的方式当然是:写一条就持久化一条,读一条就遍历一遍所有数据,然后返回。当然没人这么干,在内存中我们都还知道用个HashMap呢。 拿Mysql InnoDB举例子: 读性能体现在数据的索引在磁盘上主要用B+树来保证。 写性能体现在运用 WAL机制 来...
- 下一篇
构造、析构期间被调虚函数发生的惨案,长教训!
最近有个问题出现长达一个月,经过两次修改未能解决,大致场景如下: 一个多态对象Children被注册回调(m_observer对象位于基类Base中),正好在析构函数里面回调,导致crash。 class Base { // ... protected: std::shared_ptr<Observer> m_observer; } class Children: public Base { Children(): Base() { // Register函数,接口有锁保护,避免回调时竞争访问cb句柄 m_observer->Register(std::bind(&Children::callback, this)); } virtual void callback() {}; }; 第一次修改是通过在基类的base里面对observable对象取消回调订阅,来避免回调时对象不存在。 class Base { virtual ~Base() { m_observer->Register(nullptr); // 取消回调 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Hadoop3单机部署,实现最简伪集群