源码分析ElasticJob分片机制(带分片机制流程图)
本文将重点分析 ElasticJob 的分片机制:
ElasticJob分片工作机制:
- ElasticJob在启动时,首先会启动是否需要重新分片的监听器。
代码见:ListenerManager#startAllListeners {...; shardingListenerManager.start();...}。 - 任务执行之前需要获取分片信息,如果需要重新分片,主服务器执行分片算法,其他从服务器等待直到分片完成。
代码见:AbstractElasticJobExecutor#execute {...; jobFacade.getShardingContexts();...;}
1、分片管理监听器详解
ElasticJob的事件监听管理器实现类为:AbstractListenerManager。
其类图为:
-
JobNodeStorage jobNodeStorage:Job node操作API。
其核心方法:- public abstract void start():启动监听管理器,由子类具体实现。
- protected void addDataListener(TreeCacheListener listener):增加事件监听器。
ElasticJob的选主监听管理器、分片监听器管理器、故障转移监听管理器等都是 AbstractListenerManager 的子类。 分片相关的监听管理器类图如图所示:
- ShardingListenerManager:分片监听管理器。
- ShardingTotalCountChangedJobListener:监听总分片数量事件管理器,是TreeCacheListener(curator的事件监听器)子类。
- ListenServersChangedJobListener:任务job服务器数量(运行时实例)发生变化后的事件监听器。
1.1 源码分析ShardingTotalCountChangedJobListener监听器
class ShardingTotalCountChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount(); if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { shardingService.setReshardingFlag(); JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount); } } } }
job配置的分片总节点数发生变化监听器(ElasticJob允许通过Web界面修改每个任务配置的分片总数量)。
job的配置信息存储在${namespace}/jobname/config节点上,存储内容为json格式的配置信息。
如果${namespace}/jobname/config节点的内容发生变化,zk会触发该节点的节点数据变化事件,如果zk中存储的分片节点数量与内存中的分片数量不相同的话,调用ShardingService设置需要重新分片标记(创建${namespace}/jobname/leader/sharding/necessary持久节点)并更新内存中的分片节点总数。
1.2 源码分析ListenServersChangedJobListener 监听器
class ListenServersChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) { shardingService.setReshardingFlag(); } } private boolean isInstanceChange(final Type eventType, final String path) { return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType; } private boolean isServerChange(final String path) { return serverNode.isServerPath(path); } }
分片节点(实例数)发生变化事件监听器,当新的分片节点加入或原的分片实例宕机后,需要进行重新分片。
当${namespace}/jobname/servers或${namespace}/jobname/instances路径下的节点数量是否发生变化,如果检测到发生变化,设置需要重新分片标识。
2、具体分片逻辑
上面详细分析了分片监听管理器,其职责就是监听特定的 ZK 目录,当发生变化后判断是否需要设置重新分片的标记,如果设置了需要重新分片标记后,在什么时候触发重新分片呢?
每个调度任务在执行之前,首先需要获取分片信息(分片上下文环境),然后根据分片信息从服务器拉取不同的数据,进行任务处理,其源码入口为:AbstractElasticJobExecutor#execute。
jobFacade.getShardingContexts()方法。
具体实现方法代码为:LiteJobFacade#getShardingContexts。
public ShardingContexts getShardingContexts() { boolean isFailover = configService.load(true).isFailover(); // @1 if (isFailover) { List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems(); if (!failoverShardingItems.isEmpty()) { return executionContextService.getJobShardingContext(failoverShardingItems); } } shardingService.shardingIfNecessary(); // @2 List<Integer> shardingItems = shardingService.getLocalShardingItems(); // @3 if (isFailover) { shardingItems.removeAll(failoverService.getLocalTakeOffItems()); } shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); // @4 return executionContextService.getJobShardingContext(shardingItems); // @5 }
代码@1:是否启动故障转移,本篇重点关注ElasticJob的分片机制,故障转移在下篇文章中详细介绍,本文假定不开启故障转移功能。
代码@2:如果有必要,则执行分片,如果不存在分片信息(第一次分片)或需要重新分片,则执行分片算法,接下来详细分析分片的实现逻辑。
代码@3:获取本地的分片信息。遍历所有分片信息${namespace}/jobname/sharding/{分片item}下所有instance节点,判断其值jobinstanceId是否与当前的jobInstanceId相等,相等则认为是本节点的分片信息。
代码@4:移除本地禁用分片,本地禁用分片的存储目录为${namespace}/jobname
/sharding/{分片item}/disable。
代码@5:返回当前节点的分片上下文环境,这个主要是根据配置信息(分片参数)与当前的分片实例,构建ShardingContexts对象。
2.1 shardingService.shardingIfNecessary 详解【分片逻辑】
/** * 如果需要分片且当前节点为主节点, 则作业分片. * * <p> * 如果当前无可用节点则不分片. * </p> */ public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); // @1 if (!isNeedSharding() || availableJobInstances.isEmpty()) { // @2 return; } if (!leaderService.isLeaderUntilBlock()) { // @3 blockUntilShardingCompleted(); //@4 return; } waitingOtherJobCompleted(); // @5 LiteJobConfiguration liteJobConfig = configService.load(false); int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(); // @5 log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); // @6 resetShardingInfo(shardingTotalCount); // @7 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass()); // @8 jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); // @9 log.debug("Job '{}' sharding complete.", jobName); }
代码@1:获取当前可用实例,首先获取${namespace}/jobname/instances目录下的所有子节点,并且判断该实例节点的IP所在服务器是否可用,${namespace}/jobname/servers/ip节点存储的值如果不是DISABLE,则认为该节点可用。
代码@2:如果不需要重新分片(${namespace}/jobname/leader/sharding
/necessary节点不存在)或当前不存在可用实例,则返回。
代码@3,判断是否是主节点,如果当前正在进行主节点选举,则阻塞直到选主完成,阻塞这里使用的代码如下:
while (!hasLeader() && serverService.hasAvailableServers()) { // 如果不存在主节点摈弃有可用的实例,则Thread.sleep()一下,触发一次选主。 log.info("Leader is electing, waiting for {} ms", 100); BlockUtils.waitingShortTime(); if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { electLeader(); } } return isLeader();
代码@4:如果当前节点不是主节点,则等待分片结束。分片是否结束的判断依据是${namespace}/jobname/leader/sharding/necessary节点存在或${namespace}/jobname/leader/sharding/processing节点存在(表示正在执行分片操作),如果分片未结束,使用Thread.sleep方法阻塞100毫米后再试。
代码@5:能进入到这里,说明该节点是主节点。主节点在执行分片之前,首先等待该批任务全部执行完毕,判断是否有其他任务在运行的方法是判断是否存在${namespace}/jobname/sharding/{分片item}/running,如果存在,则使用Thread.sleep(100),然后再判断。
代码@6:创建临时节点${namespace}/jobname/leader/sharding/processing节点,表示分片正在执行。
代码@7:重置分片信息。先删除${namespace}/jobname/sharding/{分片item}/instance节点,然后创建${namespace}/jobname/sharding/{分片item}节点(如有必要)。然后根据当前配置的分片总数量,如果当前${namespace}/jobname/sharding子节点数大于配置的分片节点数,则删除多余的节点(从大到小删除)。
代码@8:获取配置的分片算法类,常用的分片算法为平均分片算法(AverageAllocationJobShardingStrategy)。
代码@9:在一个事务内创建 相应的分片实例信息${namespace}/jobname/{分片item}/instance,节点存放的内容为JobInstance实例的ID。
在ZK中执行事务操作:JobNodeStorage#executeInTransaction
/** * 在事务中执行操作. * * @param callback 执行操作的回调 */ public void executeInTransaction(final TransactionExecutionCallback callback) { try { CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and(); // @1 callback.execute(curatorTransactionFinal); // @2 curatorTransactionFinal.commit(); //@3 //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
代码@1,使用CuratorFrameworkFactory的inTransaction()方法,级联调用check(),最后通过and()方法返回CuratorTransactionFinal实例,由该实例执行事务中的所有更新节点命令。然后执行commit()命令统一提交(该方法可以保证要么全部成功,要么全部失败)。
代码@2,通过回调PersistShardingInfoTransactionExecutionCallback方法执行具体的逻辑。
代码@3,提交事务。
代码见ShardingService$PersistShardingInfoTransactionExecutionCallback
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback { private final Map<JobInstance, List<Integer>> shardingResults; @Override public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception { for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { for (int shardingItem : entry.getValue()) { curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and(); // @1 } } curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and(); // @2 curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and(); // @3 } }
代码@1:所谓的分片,主要是创建${namespace}/jobname/sharding/{分片item}/instance,节点内容为JobInstance ID。
代码@2:删除${namespace}/jobname/leader/sharding/necessary节点。
代码@3:删除${namespace}/jobname/leader/sharding/processing节点,表示分片结束。
下面以一张分片流程图来结束本节的讲述:
原文发布时间为:2018-12-02
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
IntelliJ IDEA 安装和配置
一、安装 下载地址:http://www.jetbrains.com/idea 历史版本下载:https://www.jetbrains.com/idea/download/previous.html 下载后直接点击安装即可,没有什么特别需要说明的直接使用默认的即可。 激活说明:有经济条件的还是支持一下直接购买吧,激活码获取地址:http://idea.lanyus.com;也可以到淘宝上去买一个激活码。 二、设置 2.1 编码设置 file->Settings->Editor->File Encodings Global Encoding:UTF-8 Projectt Encoding:UTF-8 Default encoding for properties files:UTF-8 勾选上Transparent native-to-ascii conversion 2.2 代码样式 file-->settings-->Editor-->Font font----consolas size 14 将show only monospaced font...
- 下一篇
如何编写高质量的 JS 函数(3) --函数式编程[理论篇]
本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/EWSqZuujHIRyx8Eb2SSidQ 作者:杨昆 【编写高质量函数系列】中, 《如何编写高质量的 JS 函数(1) -- 敲山震虎篇》介绍了函数的执行机制,此篇将会从函数的命名、注释和鲁棒性方面,阐述如何通过 JavaScript 编写高质量的函数。 《如何编写高质量的 JS 函数(2)-- 命名/注释/鲁棒篇》从函数的命名、注释和鲁棒性方面,阐述如何通过 JavaScript编写高质量的函数。 【 前 言 】 这是编写高质量函数系列文章的函数式编程篇。我们来说一说,如何运用函数式编程来提高你的函数质量。函数式编程篇分为两篇,分别是理论篇和实战篇。此篇文章属于理论篇,在本文中,我将通过背景加提问的方式,对函数式编程的本质、目的、来龙去脉等方面进行一次清晰的阐述。 写作逻辑通过对计算机和编程语言发展史的阐述,找到函数式编程的时代背景。通过对与函数式编程强相关的人物介绍,来探寻和感受函数式编程的那些不为人知的本质。 下面列一个简要目录: 一、背景介绍 计算机和编程语言的发展史 二...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Mario游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8