纯干货 | Dolphinscheduler Master模块源码剖析
此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。
Master Slot计算
核心代码逻辑: org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify
public void notify(Map<String, MasterHeartBeat> masterNodeInfo) { List<Server> serverList = masterNodeInfo.values().stream() // TODO 这里其实就是过滤掉buzy的master节点 .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY)) .map(this::convertHeartBeatToServer).collect(Collectors.toList()); // TODO 同步master节点 syncMasterNodes(serverList); }
计算 totalSlot和currentSlot
private void syncMasterNodes(List<Server> masterNodes) { slotLock.lock(); try { this.masterPriorityQueue.clear(); // TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345] this.masterPriorityQueue.putAll(masterNodes); // TODO 就是获取到本地ip的在队列中的位置 int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress()); // TODO 所有节点数量 int tempTotalSlot = masterNodes.size(); // TODO 正常情况下不会小于0 if (tempCurrentSlot < 0) { totalSlot = 0; currentSlot = 0; log.warn("Current master is not in active master list"); } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) { // TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1 totalSlot = tempTotalSlot; currentSlot = tempCurrentSlot; log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot); } } finally { slotLock.unlock(); } }
this.masterPriorityQueue.putAll(masterNodes); 会计算索引
public void putAll(Collection<Server> serverList) { for (Server server : serverList) { this.queue.put(server); } // TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引 refreshMasterList(); } private void refreshMasterList() { hostIndexMap.clear(); Iterator<Server> iterator = queue.iterator(); int index = 0; while (iterator.hasNext()) { Server server = iterator.next(); String addr = NetUtils.getAddr(server.getHost(), server.getPort()); hostIndexMap.put(addr, index); index += 1; } }
Master消费Command生成流程实例
command最终的获取逻辑:
比如说两个Master节点 : masterCount=2 thisMasterSlot=0 master1 masterCount=2 thisMasterSlot=1 master2 command中的数据如下 : 1 master2 2 master1 3 master2 4 master1 select * from t_ds_command where id % #{masterCount} = #{thisMasterSlot} order by process_instance_priority, id asc limit #{limit}
有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?
比如说,master1节点是这样的 1 master2 2 master1 3 master2 4 master1 比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command 1 master1 2 master1 3 master1 4 master1
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand
@Transactional public [@Nullable](https://my.oschina.net/u/2896689) ProcessInstance handleCommand(String host, Command command) throws CronParseException, CodeGenerateException { // TODO 创建流程实例 ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { log.error("scan command, command parameter is error: {}", command); commandService.moveToErrorCommand(command, "process instance is null"); return null; } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); processInstance.setTestFlag(command.getTestFlag()); // if the processDefinition is serial ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); // TODO 是否是串行执行 if (processDefinition.getExecutionType().typeIsSerial()) { saveSerialProcess(processInstance, processDefinition); if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) { setSubProcessParam(processInstance); triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); deleteCommandWithCheck(command.getId()); // todo: this is a bad design to return null here, whether trigger the task return null; } } else { // TODO 并行执行 processInstanceDao.upsertProcessInstance(processInstance); } // TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系 triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); // TODO 设置子流程参数 setSubProcessParam(processInstance); // TODO 删除command deleteCommandWithCheck(command.getId()); return processInstance; }
注意:这个方法是加@Transactional的,所以说创建流程实例和删除Command是在一个事物里面的,如果不同的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个异常,这样就会让数据库进行回滚。
工作流启动流程
DAG切分 & 任务提交
Master事件状态流转
图连接 : Master事件状态流转
TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析
其实就是Master自己状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的TaskExecuteRunnable中(注意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程进行TaskExecuteRunnable的提交。
转载自Journey 原文链接:https://segmentfault.com/a/1190000044992842
本文由 白鲸开源科技 提供发布支持!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
高效定位 Go 应用问题:Go 可观测性功能深度解析
作者:古琦 背景 自 2024 年 6 月 26 日,阿里云 ARMS 团队正式推出面向 Go 应用的可观测性监控功能以来,我们与程序语言及编译器团队携手并进,持续深耕技术优化与功能拓展。这一创新性的解决方案旨在为开发者提供更为全面、深入且高效的应用性能监控体验,助力企业在数字化转型中实现卓越的系统稳定性与性能表现。 从商业化版本的首次亮相至今,我们已历经五次重大版本迭代及若干次精细化的小版本更新。相较于初始版本,系统性能实现了翻倍提升,同时在功能层面亦展现出前所未有的丰富性与灵活性。新增特性包括但不限于智能化应用诊断、高度可定制的扩展能力、灵活的应用开关机制、接口全量采样以及代码热点分析等模块。这些功能的引入不仅显著提升了系统的实用性,也赢得了广大用户的广泛认可与积极反馈。而基于编译时插桩(Compile-time Instrumentation)的技术路径,更被实践证明是 Go 语言应用监控领域的一次突破性创举,堪称当前最优解。 为进一步赋能用户在复杂场景下快速定位与解决问题,我们结合近期发布的一系列全新功能,精心梳理了一套从接入到问题发现、再到问题排查与精准定位的最佳实践指南。 ...
- 下一篇
业务复杂度治理方法论--十年系统设计经验总结
作者:京东物流 尹昊喆 一、复杂度综述 1、什么是复杂度 软件设计的核心在于降低复杂性。 --《软件设计的哲学》 业界对于复杂度并没有统一的定义,斯坦福教授John Ousterhout从认知负担和工作量方面给出了一个复杂度量公式 子模块的复杂度cp乘以该模块对应的开发时间权重值tp,累加后得到系统的整体复杂度C 这里的子模块复杂度cp是一个经验值 需要注意:如果一个子系统特别复杂,但是很少使用及修改,也不会对整体复杂度造成太大影响。例:spring框架内部代码较为复杂,但由于几乎不需要我们去变动,所以对系统的整体复杂度影响并不大 2、复杂度分类 本文主要面向业务复杂度的治理 3、业务复杂度高的影响 (1)研发成本高。需要花费更多的时间去理解、维护代码;同样的需求,可能需要要修改更多的工程和类 (2)稳定性差。过高的业务复杂度,会导致系统难以理解甚至理解出现错漏,改动代码后极易出现“按下葫芦起了瓢”的问题 二、业务系统复杂度高的常见原因 1、业务系统模块多,关系复杂,互相依赖 比如一个电商业务,会包含商品、订单、采购、库存、财务等多个系统,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS关闭SELinux安全模块
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装