bin-log-distributor消费数据丢失问题解决记录
bin-log-distributor项目简介
bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址
背景
线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。
验证方法
在装mysql的服务器上,使用多个线程并发循环对bin-log-distributor监控的数据表进行增、删、改。
预期
bin-log-distributor客户端出现数据丢失。
定位
分析客户端收到的数据,发现不仅有数据丢失,还有数据重复消费。
将客户端日志打印更仔细,发现以下两点问题:
- 从redis队列peek数据后,remove抛出如下异常:
- Redis队列数据没消费完其他线程异常进行了消费:
分析如下消费redis数据方法源码:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
try {
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
// 尝试加锁,最多等待20秒,上锁以后30秒自动解锁
boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
//拿到锁且 队列不为空 进入
if (lockRes && !queue.isEmpty()) {
//让这个线程把队列里的全部处理完吧
rLock.lock();
//DATA_KEY_IN_PROCESS.add(dataKey);
while ((dto = queue.peek()) != null) {
//处理完毕,把数据从队列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收处理数据失败:" + e.toString());
} finally {
rLock.forceUnlock();
rLock.delete();
//DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
猜测是redis RLock并没有效,于是在rLock.tryLock() 和 rLock.delete() 方法后面分别打印出 acquireLock 和 releaseLock的日志,再次执行测试,得到如下日志:
发现 pool-2-thread-1线程成功获取锁后,pool-2-thread-2获取锁失败,并立刻将pool-2-thread-1 持有的锁release了。结合源码发现pool-2-thread-2获取锁失败后,执行了finally代码块中的rLock.forceUnlock()方法,查阅资料发现forceUnlock()方法可以直接释放相同key的其他线程持有的锁。
至此,定位到问题为:每次收到数据起一个线程执行doRunWithLock()方法,当数据库操作较频繁时,一个线程获取到的redis锁会被另一个线程release掉,导致redis锁内部本来应该是单例执行的消费redis队列数据变成了多个线程同时执行,从而出现多个线程消费队列数据冲突,引发数据丢失和重复消费的问题。
解决
确保每个线程获取到的Redis锁只能由当前线程release(),更改后的代码如下:
private void doRunWithLock() {
RLock rLock = redissonClient.getLock(dataKeyLock);
EventBaseDTO dto;
boolean lockRes = false;
try {
// 尝试加锁,最多等待50ms(防止过多线程等待)
// 上锁以后6个小时自动解锁(防止redis队列太长,当前拿到锁的线程处理时间过长)
lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
if (!lockRes) {
return;
}
DATA_KEY_IN_PROCESS.add(dataKey);
//拿到锁之后再获取队列
RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
if (!queue.isExists() || queue.isEmpty()) {
return;
}
//拿到锁且 队列不为空 进入
while ((dto = queue.peek()) != null) {
//处理完毕,把数据从队列摘除
boolean handleRes = doHandleWithLock(dto, 0);
if (handleRes) {
queue.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
log.severe("接收处理数据失败:" + e.toString());
} finally {
//forceUnlock是可以释放别的线程拿到的锁的,需要判断是否是当前线程持有的锁
if (lockRes) {
rLock.forceUnlock();
rLock.delete();
DATA_KEY_IN_PROCESS.remove(dataKey);
}
}
}
回归
更改后执行继续执行相同的测试用例,得到如下结果日志:
pool-2-thread-2在获取到redis锁之后一直持续消费redis队列中的数据,其他线程尝试获取redis锁失败后,不再强制release该锁,客户端消费条数、类型结果正确。
完成
修改测试用例,使用更多线程、执行更多数据库操作,分析消费结果,结果数据条数正确,未出现数据丢失、重复消费等问题。
确认结果回归结果正确,提交代码。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
2019年4月值得一读的9本技术书籍(机器学习、云计算、Android等书籍)!
导语:当你困惑时,书给你启迪当你迷惘时,书给你指路。当你落寞时,书给你勇气。当你悲伤时,书给你欢愉。当你狂躁时,书给你冷静。当你成功时,书给你导航。我们整理出2019年4月值得一读的9本技术书籍,其中涉及机器学习、人工智能、Android等方面。这个春天就让书陪你度过吧。 1、《从机器学习到深度学习:基于scikit-learn与TensorFlow的高效开发实战》 书籍介绍: 《从机器学习到深度学习:基于scikit-learn与TensorFlow的高效开发实战》是一本场景式的机器学习实践书,笔者努力做到“授人以渔,而非授人以鱼”。理论方面从人工智能(AI)与机器学习(ML)的基本要素讲起,逐步展开有监督学习、无监督学习、强化学习这三大类模型的应用场景与算法原理;实践方面通过金融预测、医疗诊断概率模型、月球登陆器、图像识别、写诗机器人、中国象棋博弈等案例启发读者将机器学习应用在各行各业里,其中后三个案例使用了深度学习技术。 2、《给产品经理讲技术》 书籍介绍: 本书专为非技术背景的互联网行业从业者和想了解互联网技术的人员量身定制,分门别类地整理了Web前端技术、客户端技术、开发技术...
-
下一篇
现代IM系统中的消息系统架构 - 架构篇
前言 IM全称是『Instant Messaging』,中文名是即时通讯。在这个高度信息化的移动互联网时代,生活中IM类产品已经成为必备品,比较有名的如钉钉、微信、QQ等以IM为核心功能的产品。当然目前微信已经成长为一个生态型产品,但其核心功能还是IM。还有一些非以IM系统为核心的应用,最典型的如一些在线游戏、社交应用,IM也是其重要的功能模块。可以说,IM系统已经是任何一个带有社交属性的应用需要具备的基础功能,网络上对于这类系统的设计与实现的讨论也越来越多。 IM系统在互联网初期即存在,其基础技术架构在这十几年的发展中更新迭代多次,从早期的CS、P2P架构,到现在后台已经演变为一个复杂的分布式系统,涉及移动端、网络通信、协议、安全、存储和搜索等技术的方方面面。IM系统中最核心的部分是消息系统,消息系统中最核心的功能是消息的同步、存储
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- MySQL数据库在高并发下的优化方案
- SpringBoot2配置默认Tomcat设置,开启更多高级功能