图解kubernetes调度器抢占流程与算法设计
抢占调度是分布式调度中一种常见的设计,其核心目标是当不能为高优先级的任务分配资源的时候,会通过抢占低优先级的任务来进行高优先级的调度,本文主要学习k8s的抢占调度以及里面的一些有趣的算法
1. 抢占调度设计
1.1 抢占原理
抢占调度原理其实很简单就是通过高优先级的pod抢占低优先级的pod资源,从而满足高优先pod的调度
1.2 中断预算
在kubernetes中为了保证服务尽可能的高可用,设计PDB(PodDisruptionBudget)其核心目标就是在保证对应pod在指定的数量,主要是为了保证服务的可用性,在进行抢占的过程中,应尽可能遵守该设计,尽量不去抢占有PDB的资源,避免因为抢占导致服务的不可用
1.3 优先级反转
优先级反转是信号量里面的一种机制即因为低优先级任务的运行阻塞高优先级的任务运行
在k8s中抢占调度是通过高优先级抢占低优先级pod,如果高优先级pod依赖低优先级pod, 则会因为依赖问题,导致优先级失效,所以应该尽可能减少高优先级pod对低优先级的pod的依赖, 后面进行筛选源码分析时可以看到
1.4 抢占选择算法
抢占选择算法是指的通过抢占部分节点后,如何从被抢占的node数组中筛选出一个node节点,目前k8s中主要实现了5个算法
1.4.1 最少违反PDB
即最少违反PDB规则
1.4.2 最高优先级最小优先
比较所有node的最高优先级的pod,找到优先级最低的node
1.4.3 优先级总和最低优先
计算每个node上面的被抢占的pod优先级之和,选择优先级和最低的节点
1.4.4 最少抢占数量优先
计算需要抢占的节点数量最少的节点优先
1.4.5 最近更新节点优先
比较每个node中被驱逐的pod中最早启动的pod的启动时间,最近启动的pod的节点,会被选择
2. 源码设计
2.1 抢占核心流程
抢占的流程主要是通过Preempt来实现,其针对预选失败的节点来进行驱逐某些低优先级的pod来满足高优先级pod
func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // 只允许预选失败的pod进行重试 fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { return nil, nil, nil, nil } // 是否允许抢占其他提议的pod if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } // 获取当前集群中的所有node allNodes := g.cache.ListNodes() if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } // 初步筛选潜在的可以进行抢占操作的node potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } // 获取所有pdb pdbs, err := g.pdbLister.List(labels.Everything()) if err != nil { return nil, nil, nil, err } // 针对之前初步筛选的node尝试进行抢占和预选操作,返回结果中包含所有可以通过抢占低优先级pod完成pod调度的node节点与抢占的pod nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err } // 调用extenders进行再一轮的筛选 nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) if err != nil { return nil, nil, nil, err } // 从筛选结果中选择最适合抢占的node candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, nil } // 如果candidateNode不为nil,则找到一个最优的执行抢占操作的node, 返回低优先的提议的pod // 还有抢占的pod和当前节点 nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) }
2.2 抢占条件检测
如果发现需要执行抢占的pod有提名的node并且对应node上面存在比自己优先级低的pod正在进行删除, 则不允许进行抢占
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool { if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever) return false } nomNodeName := pod.Status.NominatedNodeName if len(nomNodeName) > 0 { if nodeInfo, found := nodeNameToInfo[nomNodeName]; found { podPriority := util.GetPodPriority(pod) for _, p := range nodeInfo.Pods() { if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority { // 正在终止的优先级低于当前pod的pod就不会进行抢占 return false } } } } return true }
2.3 筛选潜在节点
每个node在预选阶段都会进行一个标记,标记当前node执行预选失败的原因,筛选潜在节点主要是根据对应的错误来进行筛选,如果不是不可解决的预选错误,则该node节点就可以参与接下来的抢占阶段
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node { potentialNodes := []*v1.Node{} // 根据预选阶段的错误原因,如果不存在无法解决的错误,则这些node可能在接下来的抢占流程中被使用 for _, node := range nodes { if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable { continue } failedPredicates, _ := fitErr.FailedPredicates[node.Name] if !unresolvablePredicateExists(failedPredicates) { // 如果我们发现并不是不可解决的调度错误的时候,就讲这个节点加入到这里 // 可能通过后续的调整会让这些node重新满足 klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) potentialNodes = append(potentialNodes, node) } } return potentialNodes }
不可通过调整的预选失败原因
var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{ predicates.ErrNodeSelectorNotMatch: {}, predicates.ErrPodAffinityRulesNotMatch: {}, predicates.ErrPodNotMatchHostName: {}, predicates.ErrTaintsTolerationsNotMatch: {}, predicates.ErrNodeLabelPresenceViolated: {}, // 省略大部分,感兴趣的可以自己关注下 }
2.4 并行筛选节点
筛选抢占节点主要是并行对之前筛选潜在node进行尝试,通过驱逐低优先级pod满足高优先级pod调度,最终会筛选一批可以通过抢占来满足pod调度需要的节点, 其核心实现时通过selectVictimsOnNode来进行检测,继续往下看
func (g *genericScheduler) selectNodesForPreemption( pluginContext *framework.PluginContext, pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, potentialNodes []*v1.Node, fitPredicates map[string]predicates.FitPredicate, metadataProducer predicates.PredicateMetadataProducer, queue internalqueue.SchedulingQueue, pdbs []*policy.PodDisruptionBudget, ) (map[*v1.Node]*schedulerapi.Victims, error) { nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := potentialNodes[i].Name var metaCopy predicates.PredicateMetadata if meta != nil { metaCopy = meta.ShallowCopy() } pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := schedulerapi.Victims{ Pods: pods, NumPDBViolations: numPDBViolations, } nodeToVictims[potentialNodes[i]] = &victims resultLock.Unlock() } } workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode) return nodeToVictims, nil }
2.5 单点筛选流程
selectVictimsOnNode即单点筛选流程是针对单个node来指向具体的驱逐抢占决策的流程, 其核心流程如下
2.5.1 优先级筛选
优先级筛选首先会对当前node上面的所有节点进行优先级排序,移除所有比当前pod低的pod
potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod} nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) { nodeInfoCopy.RemovePod(rp) if meta != nil { meta.RemovePod(rp, nodeInfoCopy.Node()) } } addPod := func(ap *v1.Pod) { nodeInfoCopy.AddPod(ap) if meta != nil { meta.AddPod(ap, nodeInfoCopy) } } podPriority := util.GetPodPriority(pod) for _, p := range nodeInfoCopy.Pods() { if util.GetPodPriority(p) < podPriority { // 移除所有优先级比自己低的pod potentialVictims.Items = append(potentialVictims.Items, p) removePod(p) } }
2.5.2 预选判断
对移除所有优先级比自己的pod之后,会尝试进行预选流程,如果发现预选流程失败,则当前node即使通过移除所有比自己优先级低的pod也不能满足调度需求,则就进行下一个node判断
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } return nil, 0, false }
2.5.3 PDB分组与分组算法
PDB分组就是对当前节点上筛选出来的低优先级pod按照是否有PDB匹配来进行分组,分为违反PDB和未违反PDB的两组
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
分组算法其实也不难,只需要遍历所有的pdb和pod就可以得到最终的分组
func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) { for _, obj := range pods { pod := obj.(*v1.Pod) pdbForPodIsViolated := false // A pod with no labels will not match any PDB. So, no need to check. if len(pod.Labels) != 0 { for _, pdb := range pdbs { if pdb.Namespace != pod.Namespace { continue } selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) if err != nil { continue } // A PDB with a nil or empty selector matches nothing. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { continue } // We have found a matching PDB. if pdb.Status.PodDisruptionsAllowed <= 0 { pdbForPodIsViolated = true break } } } if pdbForPodIsViolated { violatingPods = append(violatingPods, pod) } else { nonViolatingPods = append(nonViolatingPods, pod) } } return violatingPods, nonViolatingPods }
2.5.4 违反PDB计数与最少驱逐汇总
会分别对违反PDB和不违反的pod集合来进行reprievePod检测,如果加入当前pod后,不能满足预选筛选流程,则该pod则必须被进行移除加入到victims中, 同时如果是违反PDB的pod则需要进行违反pdb计数numViolatingVictim
reprievePod := func(p *v1.Pod) bool { // 我们首先将pod加入到meta中 addPod(p) fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false) // if !fits { // 如果我们加入了pod然后导致了预选不成功,则这个pod必须给移除 removePod(p) victims = append(victims, p) // 添加到我们需要移除的列表里面 klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name) } return fits } for _, p := range violatingVictims { if !reprievePod(p) { numViolatingVictim++ } } // Now we try to reprieve non-violating victims. for _, p := range nonViolatingVictims { // 尝试移除未违反pdb的pod reprievePod(p) } return victims, numViolatingVictim, true
2.6 筛选最优抢占
最优筛选主要是通过pickOneNodeForPreemption实现,其中筛选数据存储结构主要是通过重用minNodes1和minNodes2两段内存来进行实现,这两个node数组分别配有两个计数器lenNodes1和lenNodes2, 针对具有相同优先级、相同数量的node,每增加一个会进行一次计数器累加, 核心算法流程如下
2.6.1 最少违反PDB
最少违反PDB是根据前面统计的违反PDB的计数统计,找到最少违反的node,如果是单个node则直接返回筛选结束
minNumPDBViolatingPods := math.MaxInt32 var minNodes1 []*v1.Node lenNodes1 := 0 for node, victims := range nodesToVictims { if len(victims.Pods) == 0 { // 如果发现一个noed不需要任何抢占,则返回它 return node } numPDBViolatingPods := victims.NumPDBViolations if numPDBViolatingPods < minNumPDBViolatingPods { // 如果小于最小pdb数量, 如果数量发生变化,就重置 minNumPDBViolatingPods = numPDBViolatingPods minNodes1 = nil lenNodes1 = 0 } if numPDBViolatingPods == minNumPDBViolatingPods { // 多个相同的node会进行追加,并累加计数器 minNodes1 = append(minNodes1, node) lenNodes1++ } } if lenNodes1 == 1 { return minNodes1[0] }
2.6.2 最高优先级最小优先
最高优先级最小优先是指通过对比多个node的最高优先级的pod,优先级最低的那个node被选中,如果多个则进行下一个算法
minHighestPriority := int32(math.MaxInt32) var minNodes2 = make([]*v1.Node, lenNodes1) lenNodes2 := 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. // 返回优先级最高的pod highestPodPriority := util.GetPodPriority(victims.Pods[0]) if highestPodPriority < minHighestPriority { // 重置状态 minHighestPriority = highestPodPriority lenNodes2 = 0 } if highestPodPriority == minHighestPriority { // 如果优先级相等则加入进去 minNodes2[lenNodes2] = node lenNodes2++ } } if lenNodes2 == 1 { return minNodes2[0] }
2.6.3 优先级总和最低优先
统计每个node上的所有被抢占的pod的优先级的总和,然后在多个node之间进行比较,优先级总和最低的节点被选中
minSumPriorities := int64(math.MaxInt64) lenNodes1 = 0 for i := 0; i < lenNodes2; i++ { var sumPriorities int64 node := minNodes2[i] // 统计所有优先级 for _, pod := range nodesToVictims[node].Pods { sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1) } if sumPriorities < minSumPriorities { minSumPriorities = sumPriorities lenNodes1 = 0 } if sumPriorities == minSumPriorities { minNodes1[lenNodes1] = node lenNodes1++ } } // 最少优先级的node if lenNodes1 == 1 { return minNodes1[0] }
2.6.4 最少抢占数量优先
最少抢占数量优先即统计每个node被抢占的节点数量,数量最少得被选中
minNumPods := math.MaxInt32 lenNodes2 = 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] numPods := len(nodesToVictims[node].Pods) if numPods < minNumPods { minNumPods = numPods lenNodes2 = 0 } if numPods == minNumPods { minNodes2[lenNodes2] = node lenNodes2++ } } // 最少节点数量 if lenNodes2 == 1 { return minNodes2[0] }
2.6.5 最近更新节点优先
该算法会筛选每个node驱逐的pod中优先级最高的pod的最早更新时间(其实就是说这个pod早就被创建了),然后在多个node之间进行比较,如果谁上面的时间越新(即这个node上的pod可能是最近被调度上去的),则就选中这个节点
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) if latestStartTime == nil { // If the earliest start time of all pods on the 1st node is nil, just return it, // which is not expected to happen. // 如果第一个节点上所有pod的最早开始时间为零,那么返回它 klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0]) return minNodes2[0] } nodeToReturn := minNodes2[0] for i := 1; i < lenNodes2; i++ { node := minNodes2[i] // Get earliest start time of all pods on the current node. // 获取当前node最早启动时间 earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) if earliestStartTimeOnNode == nil { klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node) continue } if earliestStartTimeOnNode.After(latestStartTime.Time) { latestStartTime = earliestStartTimeOnNode nodeToReturn = node } } return nodeToReturn
阅读总结
因为是纯的算法流程,并没有复杂的数据结构,大家看看就好,调度器的设计可能就看到这了,后面把之前的都串起来,算是一个总结,如果有兴趣我就再看看 SchedulerExtender和framework的设计, 其实学习scheduler调度器部分只是因为自己对分布式调度这块比较好奇,而且自己有运维开发的经验,这对pod调度类似场景并不陌生,看起来总的来说相对容易一点,而且我只分析了核心的数据结构和算法,还有几个阶段,为了避免陷入对kubenretes一些复杂逻辑的处理,我都尽量简化逻辑,就是希望即时不去看k8s scheduler的代码,也能有所收获 > 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章 > 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
函数组合的 N 种模式
随着以函数即服务(Function as a Service)为代表的无服务器计算(Serverless)的广泛使用,很多用户遇到了涉及多个函数的场景,需要组合多个函数来共同完成一个业务目标,这正是微服务“分而治之,合而用之”的精髓所在。本文以阿里云函数计算为例,试图全面介绍函数组合的常见模式和使用场景,希望有助于选择合适的解决方案。 虽然本文主要介绍的是函数组合,但是基本思想也可用于服务组合。 函数同步调用函数 在这种模式里,函数直接调用 InvokeFunction 同步 API 执行一个或者多个函数,等待被调用函数返回结果,然后继续执行。这是一个有些争议的模式,不使用同步调用通常有以下原因: 从费用的角度:由于函数计算按照函数实际执行时间收费,调用者在等待被调用函数返回前也会产生一定费用。 执行时长限制:由于函数最长执行10分钟,这就
- 下一篇
Win 7退役、User-Agent将消失 | Git不用克隆整个仓库、GCC迁移到Git
回顾一周社区热门资讯 第【五十七】期:20200111-20200117 点击相应标题,跳转阅读全文。 微软又走怀旧风,Windows Terminal 将换上复古 CRT 外观 Firefox 打包太复杂,OpenBSD 稳定版将不再接收其更新 由于 cbindgen 和 Rust 的依赖性,Firefox 太复杂而无法在稳定分支上打包,并且这样做将需要测试所有 Rust 使用者,因此 OpenBSD 6.6 稳定版的分支不会收到来自 www/mozilla-firefox 的更新。 Debian 将支持从 F2FS 根文件系统进行系统引导 尽管 F2FS 早已问世,并且得到了越来越多的采用,尤其是在 Android 移动设备上,但默认情况下,大多数 Linux 发行版都不允许默认从 F2FS 文件系统进行引导。 2020年,GCC 项目代码终于从 SVN 完全切换至 Git 未来,GCC 团队将会通过使用 Git 的工作流程来开发 GCC,就像 LLVM 当初也是从 SVN 转换到 Git 一样。 2020 年值得关注的 5 个 Kubernetes 趋势 联盟、Kubernete...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用