源码分析ElasticJob任务运行事件监听器
在任务执行的前后,ElasticJob可以提供扩展,其主要类图如下:
ElastciJobListener:elasticJob任务执行事件监听器,提供如下两个方法:
-
void beforeJobExecuted(final ShardingContexts shardingContexts);
在任务执行之前调用,shardingContexts为分片上下文信息。
-
void afterJobExecuted(final ShardingContexts shardingContexts)
在任务执行之后调用,shardingContexts为分片上下文信息。
上述回调函数是分片级的,也就是说默认情况下,同一个任务的多个分片都会执行beforeJobExecuted、afterJobExecuted方法,如果某些情况同一个任务只需在最后一个分片执行之前执行,最后一个分片执行完成后才执行,又该如何实现呢。AbstractDistributeOnceElasticJobListener粉墨登场。
AbstractDistributeOnceElasticJobListener:在分布式作业中只执行一次的监听器。
- private final long startedTimeoutMilliseconds:分片等待beforeJobExecuted方法执行的超时时间,单位为毫秒。
- private final Object startedWait = new Object():分片等待beforeJobExecuted的监视器。
- private final long completedTimeoutMilliseconds:分片等待afterJobExecuted方法执行的超时时间,单位为毫秒。
- private final Object completedWait = new Object():分片等待afterJobExecuted的监视器。
- private GuaranteeService guaranteeService:保证分布式任务全部开始和结束状态的服务。
- private TimeService timeService = new TimeService():时间服务器,主要用来获取当前服务器的系统时间。
- public final void beforeJobExecuted(final ShardingContexts shardingContexts):分片任务执行之前调用,该方法是一个模板方法,最后一个分片成功启动后调用doBeforeJobExecutedAtLastStarted方法,该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doBeforeJobExecutedAtLastStarted方法。
- public final void afterJobExecuted(final ShardingContexts shardingContexts):分片任务执行之后调用,该方法是一个模板方法,实现当最后一个分片成功执行完成后调用doAfterJobExecutedAtLastCompleted方法,该方法为抽象方法,由具体子类实现,如果有其他分片未执行完成,该方法会阻塞等待,或最后启动的分片执行完doAfterJobExecutedAtLastCompleted方法。
- public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts):分布式环境中最后一个作业分片执行前的执行的方法。
- public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts):分布式环境中最后一个作业分片执行完成后的执行方法。
- public void notifyWaitingTaskStart():通知分片节点上的任务开始之前(唤醒由于还有其他分片未启动造成自身等待阻塞)。
- public void notifyWaitingTaskComplete():通知分片节点任务执行完成(唤醒由于存在其他分片任务未执行完成时阻塞)。
接下来重点分析AbstractDistributeOnceElasticJobListener实现原理(分布式环境中,监听器只在一个节点上执行的实现逻辑)
重点分析beforeJobExecuted方法实现原理,afterJobExecuted方法类似。
AbstractDistributeOnceElasticJobListener#beforeJobExecuted
public final void beforeJobExecuted(final ShardingContexts shardingContexts) { guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet()); // @1 if (guaranteeService.isAllStarted()) { // @2 doBeforeJobExecutedAtLastStarted(shardingContexts); guaranteeService.clearAllStartedInfo(); return; } long before = timeService.getCurrentMillis(); // @3 try { synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) { // @4 guaranteeService.clearAllStartedInfo(); handleTimeout(startedTimeoutMilliseconds); } }
代码@1:使用GuaranteeService注册分片开始。
代码@2:判断该任务所有的分片是否都已经注册启动,如果都注册启动,则调用doBeforeJobExecutedAtLastStarted()方法。
代码@3:获取服务器当前时间。
代码@4:利用startWait.wait(startedTimeoutMilliseconds)带超时时间的等待,这里如何唤醒呢?
代码@5:判断唤醒是超时唤醒还是正常唤醒,如果是超时唤醒,清除所有的分片注册启动信息,处理超时异常。
上述流程简单明了,上面有两个问题需要进一步探究,如何注册分片启动信息与如何被唤醒。
1、任务节点注册分配给当前节点的任务分片
/** * 根据分片项注册任务开始运行. * * @param shardingItems 待注册的分片项 */ public void registerStart(final Collection<Integer> shardingItems) { for (int each : shardingItems) { jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each)); } }
创建持久节点:${namespace}/jobname/guarantee/started/{item}。
2、当最后一个节点注册启动执行doBeforeJobExecutedAtLastStarted方法后,如果唤醒其他节点以便进入到任务执行阶段
if (guaranteeService.isAllStarted()) { doBeforeJobExecutedAtLastStarted(shardingContexts); guaranteeService.clearAllStartedInfo(); return; }
也就是回调函数执行完毕后,会删除任务所有的分片。温馨提示:业务实现子类实现doBeforeJobExecutedAtLastStarted方法时最好不要抛出异常,不然各节点的唤醒操作只能是等待超时后被唤醒。
GuaranteeService#clearAllStartedInfo
/** * 清理所有任务启动信息. */ public void clearAllStartedInfo() { jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT); }
直接删除${namespace}/jobname/guarantee/started根节点。基于ZK的开发模式,触发一次删除操作,肯定会有事件监听器来监听该节点的删除事件,从而触发其他节点的唤醒操作,果不奇然,ElastciJob提供GuaranteeListenerManager事件监听来监听${namespace}/jobname/guarantee/started节点的删除事件。
GuaranteeListenerManager$StartedNodeRemovedJobListener
class StartedNodeRemovedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); } } } } }
每个Job实例在监听到${namespace}/jobname/guarantee/started节点被删除后,会执行AbstractDistributeOnceElasticJobListener的notifyWaitingTaskStart方法唤醒被阻塞的线程,是线程进入到任务执行阶段。
同理,任务执行后监听方法afterJobExecuted的执行流程实现原理一样,在这里就不在重复讲解了。
原文发布时间为:2018-12-15
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Go中使用Seed得到重复随机数的问题
重复的随机数 废话不多说,首先我们来看使用seed的一个很神奇的现象。 func main() { for i := 0; i < 5; i++ { rand.Seed(time.Now().Unix()) fmt.Println(rand.Intn(100)) } } // 结果如下 // 90 // 90 // 90 // 90 // 90 可能不熟悉seed用法的看到这里会很疑惑,我不是都用了seed吗?为何我随机出来的数字都是一样的?不应该每次都不一样吗? 可能会有人说是你数据的样本空间太小了,OK,我们加大样本空间到10w再试试。 func main() { for i := 0; i < 5; i++ { rand.Seed(time.Now().Unix()) fmt.Println(rand.Intn(100000)) } } // 结果如下 // 84077 // 84077 // 84077 // 84077 // 84077 你会发现结果仍然是一样的。简单的推理一下我们就能知道,在上面那种情况,每次都取到相同的随机数跟我们所取的样本空间大小是无关的。那...
- 下一篇
微信公众号发送提醒消息
公众号端配置 1.模板消息需要提前申请,入口在添加功能插件中,通过审批之后在功能-模板消息。需要注意的是行业决定了模板可以选择的范围,行业可以改但是需要时间。 2.进入微信公众平台在设置菜单中找到公众号设置,进入后设置网页授权对应的域名 获取公众号openid过程 注意:*.2,3步为网页之间的跳转,需要先走微信认证服务器由微信服务器返回至网页,且不能携带参数,网页端通过参数中有没有code来判断是否完成了认证请求。*.第4步中解密code携带的用户信息需要再服务端完成,需要访问微信服务器,携带的参数有公众号appid,开发者密码,在“微信公众平台-开发-基本配置”中获取。 具体授权,解密链接地址及参数请参考官方文档 发送公众号模板消息 通过开发者密码和公众号appid获取token,可把token缓存起来避免频繁访问 按照模板参数发送请求。 这一环没啥可说的可参考官方文档中“发送模板消息”这一块 碰到的问题 问题: 授权页面不能带端口,但是实际项目在有端口的项目上解决方法: 在80端口工程下增加一个跳转页面跳转至其它端口下,页面代码如下 <script type="text/j...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装