基于环状队列和迭代器实现分布式任务RR分配策略
背景
分布式任务分配
在很多运维场景下,我们都会执行一些长时间的任务,比如装机、部署环境、打包镜像等长时间任务, 而通常我们的任务节点数量通常是有限的(排除基于k8s的hpa、或者knative等自动伸缩场景)。
那么当我们有一个任务如何根据当前的worker和corrdinator和任务来进行合理的分配,分配其实也比较复杂,往复杂里面做,可以根据当前系统的负载、每个任务的执行资源消耗、当前集群的任务数量等, 这里我们就搞一个最简单的,基于任务和当前worker的RR算法
系统架构
在worker和任务队列之间,添加一层协调调度层Coordinator, 由它来根据当前集群任务的状态来进行任务的分配,同时感知当前集群worker和task的状态,协调整个集群任务的执行、终止等操作
单机实现
整体设计
members: 表示当前集群中所有的worker tasks: 就是当前的任务 Coordinator: 就是我们的协调者, 负责根据members和tasks进行任务的分配 result: 就是分配的结果
CircularIterator
CircularIterator就是我们的环状对立迭代器, 拥有两个方法, 一个是add添加member, 一个Next返回基于rr的下一个member
// CircularIterator 环状迭代器 type CircularIterator struct { list []interface{} // 保存所有的成员变量 next int } // Next 返回下一个元素 func (c *CircularIterator) Next() interface{} { item := c.list[c.next] c.next = (c.next + 1) % len(c.list) return item } // Add 添加任务 func (c *CircularIterator) Add(v interface{}) bool { for _, item := range c.list { if v == item { return false } } c.list = append(c.list, v) return true }
Member&Task
Member就是负责执行任务的worker, 有一个AddTask方法和Execute方法负责任务的执行和添加任务 Task标识一个任务
// Member 任务组成员 type Member struct { id int tasks []*Task } // ID 返回当前memberID func (m *Member) ID() int { return m.id } // AddTask 为member添加任务 func (m *Member) AddTask(t *Task) bool { for _, task := range m.tasks { if task == t { return false } } m.tasks = append(m.tasks, t) return true } // Execute 执行任务 func (m *Member) Execute() { for _, task := range m.tasks { fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute()) } } // Task 任务 type Task struct { name string } // Execute 执行task返回结果 func (t *Task) Execute() string { return "Task " + t.name + " run success" }
Coordinator
Coordinator是协调器,负责根据 Member和task进行集群任务的协调调度
// Task 任务 type Task struct { name string } // Execute 执行task返回结果 func (t *Task) Execute() string { return "Task " + t.name + " run success" } // Coordinator 协调者 type Coordinator struct { members []*Member tasks []*Task } // TaskAssignments 为member分配任务 func (c *Coordinator) TaskAssignments() map[int]*Member { taskAssignments := make(map[int]*Member) // 构建迭代器 memberIt := c.getMemberIterator() for _, task := range c.tasks { member := memberIt.Next().(*Member) _, err := taskAssignments[member.ID()] if err == false { taskAssignments[member.ID()] = member } member.AddTask(task) } return taskAssignments } func (c *Coordinator) getMemberIterator() *CircularIterator { // 通过当前成员, 构造成员队列 members := make([]interface{}, len(c.members)) for index, member := range c.members { members[index] = member } return NewCircularIterftor(members) } // AddMember 添加member组成员 func (c *Coordinator) AddMember(m *Member) bool { for _, member := range c.members { if member == m { return false } } c.members = append(c.members, m) return true } // AddTask 添加任务 func (c *Coordinator) AddTask(t *Task) bool { for _, task := range c.tasks { if task == t { return false } } c.tasks = append(c.tasks, t) return true }
测试
我们首先创建一堆member和task, 然后调用coordinator进行任务分配,执行任务结果
coordinator := NewCoordinator() for i := 0; i < 10; i++ { m := &Member{id: i} coordinator.AddMember(m) } for i := 0; i < 30; i++ { t := &Task{name: fmt.Sprintf("task %d", i)} coordinator.AddTask(t) } result := coordinator.TaskAssignments() for _, member := range result { member.Execute() }
结果
可以看到每个worker均匀的得到任务分配
Member 6 run task Task task 6 run success Member 6 run task Task task 16 run success Member 6 run task Task task 26 run success Member 8 run task Task task 8 run success Member 8 run task Task task 18 run success Member 8 run task Task task 28 run success Member 0 run task Task task 0 run success Member 0 run task Task task 10 run success Member 0 run task Task task 20 run success Member 3 run task Task task 3 run success Member 3 run task Task task 13 run success Member 3 run task Task task 23 run success Member 4 run task Task task 4 run success Member 4 run task Task task 14 run success Member 4 run task Task task 24 run success Member 7 run task Task task 7 run success Member 7 run task Task task 17 run success Member 7 run task Task task 27 run success Member 9 run task Task task 9 run success Member 9 run task Task task 19 run success Member 9 run task Task task 29 run success Member 1 run task Task task 1 run success Member 1 run task Task task 11 run success Member 1 run task Task task 21 run success Member 2 run task Task task 2 run success Member 2 run task Task task 12 run success Member 2 run task Task task 22 run success Member 5 run task Task task 5 run success Member 5 run task Task task 15 run success Member 5 run task Task task 25 run success
完整代码
package main import "fmt" // CircularIterator 环状迭代器 type CircularIterator struct { list []interface{} next int } // Next 返回下一个元素 func (c *CircularIterator) Next() interface{} { item := c.list[c.next] c.next = (c.next + 1) % len(c.list) return item } // Add 添加任务 func (c *CircularIterator) Add(v interface{}) bool { for _, item := range c.list { if v == item { return false } } c.list = append(c.list, v) return true } // Member 任务组成员 type Member struct { id int tasks []*Task } // ID 返回当前memberID func (m *Member) ID() int { return m.id } // AddTask 为member添加任务 func (m *Member) AddTask(t *Task) bool { for _, task := range m.tasks { if task == t { return false } } m.tasks = append(m.tasks, t) return true } // Execute 执行任务 func (m *Member) Execute() { for _, task := range m.tasks { fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute()) } } // Task 任务 type Task struct { name string } // Execute 执行task返回结果 func (t *Task) Execute() string { return "Task " + t.name + " run success" } // Coordinator 协调者 type Coordinator struct { members []*Member tasks []*Task } // TaskAssignments 为member分配任务 func (c *Coordinator) TaskAssignments() map[int]*Member { taskAssignments := make(map[int]*Member) // 构建迭代器 memberIt := c.getMemberIterator() for _, task := range c.tasks { member := memberIt.Next().(*Member) _, err := taskAssignments[member.ID()] if err == false { taskAssignments[member.ID()] = member } member.AddTask(task) } return taskAssignments } func (c *Coordinator) getMemberIterator() *CircularIterator { // 通过当前成员, 构造成员队列 members := make([]interface{}, len(c.members)) for index, member := range c.members { members[index] = member } return NewCircularIterftor(members) } // AddMember 添加member组成员 func (c *Coordinator) AddMember(m *Member) bool { for _, member := range c.members { if member == m { return false } } c.members = append(c.members, m) return true } // AddTask 添加任务 func (c *Coordinator) AddTask(t *Task) bool { for _, task := range c.tasks { if task == t { return false } } c.tasks = append(c.tasks, t) return true } // NewCircularIterftor 返回迭代器 func NewCircularIterftor(list []interface{}) *CircularIterator { iterator := CircularIterator{} for _, item := range list { iterator.Add(item) } return &iterator } // NewCoordinator 返回协调器 func NewCoordinator() *Coordinator { c := Coordinator{} return &c } func main() { coordinator := NewCoordinator() for i := 0; i < 10; i++ { m := &Member{id: i} coordinator.AddMember(m) } for i := 0; i < 30; i++ { t := &Task{name: fmt.Sprintf("task %d", i)} coordinator.AddTask(t) } result := coordinator.TaskAssignments() for _, member := range result { member.Execute() } }
总结
任务协调是一个非常复杂的事情, 内部的任务平台,虽然实现了基于任务的组合和app化,但是任务调度分配着一块,仍然没有去做,只是简单的根据树形任务去简单的做一些分支任务的执行,未来有时间再做吧,要继续研究下一个模块了
这个调度思想来源于kafka connect的DistributedHerder里面的WorkerCoordinator,感兴趣的可以看看,未完待续
更多文章可以访问http://www.sreguide.com/

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
centos7系统源码编译安装PHP7.3.5版本详细步骤
新增系统用户组和用户: [root@localhost ~]# groupadd webg [root@localhost ~]# useradd -g webg webu 下载 PHP7.3.5 [root@localhost ~]# mkdir devdir [root@localhost ~]# cd devdir/ [root@localhost devdir]# wget https://www.php.net/distributions/php-7.3.5.tar.gz -bash: wget: 未找到命令 [root@localhost devdir]# rpm -qa|grep wget [root@localhost devdir]# yum -y install wget [root@localhost devdir]# rpm -qa|grep wget wget-1.14-18.el7.x86_64 下载PHP7 解压 编译 wget -chttps://downloads.php.net/~cmb/php-7.3.4.tar.gz 编译参数...
- 下一篇
大话文本识别经典模型:CRNN
在前一篇文章中(详见本博客文章:大话文本检测经典模型 CTPN),介绍了文字识别在现实生活中的广泛应用,以及文字识别的简单流程: 其中“文本检测”、“文本识别”是其中两个关键环节,“文本检测”已经在前一篇文章中介绍了详细的介绍,本文主要介绍“文本识别”的经典模型CRNN及其原理。 在介绍CRNN之前,先来梳理一下要实现“文本识别”的模型,需要具备哪些要素: (1)首先是要读取输入的图像,提取图像特征,因此,需要有个卷积层用于读取图像和提取特征。具体原理可详见本公众号的文章:白话卷积神经网络(CNN); (2)由于文本序列是不定长的,因此在模型中需要引入RNN(循环神经网络),一般是使用双向LSTM来处理不定长序列预测的问题。具体原理可详见本公众号的文章:白话循环神经网络(RNN); (3)为了提升模型的适用性,最好不要要求对输入字符进行分割,直接可进行端到端的训练,这样可减少大量的分割标注工作,这时就要引入CTC模型(Connectionist temporal classification, 联接时间分类),来解决样本的分割对齐的问题。 (4)最后根据一定的规则,对模型输出结果进行...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS8编译安装MySQL8.0.19
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS8安装Docker,最新的服务器搭配容器使用
- Linux系统CentOS6、CentOS7手动修改IP地址
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库