RocketMQ主从读写分离机制
一般来说,选择主从备份实现高可用的架构中,都会具备读写分离机制,比如 MySql 读写分离,客户端可以向主从服务器读取数据,但客户写数据只能通过主服务器。
RocketMQ 的读写分离机制又跟上述描写的不太一致,RocketMQ 有属于自己的一套读写分离逻辑,它会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。
决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中:
org.apache.rocketmq.store.GetMessageResult:
private boolean suggestPullingFromSlave = false;
其默认值为 false,即默认消费者不会消费从服务器,以下逻辑可以改变该值:
org.apache.rocketmq.store.DefaultMessageStore#getMessage:
long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory);
其中 maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出当前消息堆积量是否大于物理内存的 40 %,如果大于则将 suggestPullingFromSlave 设置为 true。
接下来该参数值会在消息拉取逻辑里面产生作用:
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:
if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }
如果发现主服务器的消息堆积超过了物理内存的 40%,则会设置 suggestWhichBrokerId 为从服务器 broker ID。
这里还会有个 slaveReadEnable 值来决定是否可以从从服务器拉取消息:
- 如果 slaveReadEnable=true,并且堆积量已经超过物理内存 40%时,则建议从从服务器拉取消息,否则还是从主服务器拉取消息;
- 如果 slaveReadEnable=false,则消息者只能从主服务器中拉取消息。
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#updatePullFromWhichNode:
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (null == suggest) { this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); } else { suggest.set(brokerId); } }
当消费者收到拉取响应回来的数据后,会将下次建议拉取的 brokerID 缓存起来。下次拉取消息就会从 pullFromWhichNodeTable 中取出拉取 brokerId。
关注公众号回复关键字「后端」免费领取后端开发大礼包!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
10月份Github上热门的开源项目
10 月份 GitHub 上最热门的开源项目排行已经出炉啦,在本月的名单中,有开源书籍、有开源课程等,下面就是本月上榜的10个开源项目: 1. Anime4K https://github.com/bloc97/Anime4K Star 9714 Anime4K 可以将 1080P 的动画转换为 4K 分辨率,Anime4K 的速度比 Waifu2x 快 300 倍,根据介绍提升画质的时间仅仅需要 3ms!当放大动画视频时,Anime4K 可以减少模糊程度,并在 HLSL / GLSL / Java 执行环境中运行。 2. iptv https://github.com/iptv-org/iptv Star 8282 该项目收集了来自世界各地的 8000 多个公共 IPTV 频道。你需要将 https://iptv-org.github.io/iptv/index.m3u 这个链接粘贴到支持M3U播放列表的任何播放器。 3. v https://github.com/vlang/v Star 133314 V 是一个集合了 Go 的简单和 Rust 的安全特性的新语言。是一门快速,安全...
- 下一篇
RocketMQ主从如何同步消息消费进度?
前面我也跟大家讲述了 RocketMQ 读写分离的规则,但是你可能会问,主从服务器之间的消费进度是如何保持同步的?下面我来给大家解答一下。 如果消费者消费模式不同,也会有不同的保存方式,消费者端的消息消费进度保存到 OffsetStore 中,他有两个实现类: org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore // 本地消费进度保存实现 org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore // 远程消费进度保存实现 其中,如果是广播模式消费,消息的消费进度是保存到本地,如果是集群消费模式,消息的消费进度则是保存到 Broker,但无论是保存到本地,还是保存到 Broker,消费者都会在本地留一份缓存,我们暂且看看集群消费模式下,消息消费进度的缓存是如何保存的: org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset: public ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Red5直播服务器,属于Java语言的直播服务器
- Hadoop3单机部署,实现最简伪集群
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址