图解kubernetes调度器SchedulerExtender扩展
在kubernetes的scheduler调度器的设计中为用户预留了两种扩展机制SchdulerExtender与Framework,本文主要浅谈一下SchdulerExtender的实现, 因为还有一篇Framework, 所以本文的k8s代码切到1.18版本
1. 设计思路
1.1 实现机制
SchdulerExtender是kubernets外部扩展方式,用户可以根据需求独立构建调度服务,实现对应的远程调用接口(目前是http), scheduler在调度的对应阶段会根据用户定义的资源和接口来进行远程调用,对应的service根据自己的资源数据和scheduler传递过来的中间调度结果来进行决策
1.2 服务插拔
extender只需要实现对应插件的接口,并编写yaml文件来进行注册对应的服务接口,就可以实现scheduler的扩展,不需要修改任何调度器的代码,即可实现调度插件的插拔
1.3 资源存储
因为是独立的服务,extender可以实现自定义资源的存储与获取,甚至可以不依赖于etcd使用第三方的存储来进行资源的存储,主要是用于kubernetes中不支持的那些资源的调度扩展
2. SchedulerExtender
2.1 接口与实现
2.1.1 接口声明
Scheduler主要用于扩展
type SchedulerExtender interface { // Name returns a unique name that identifies the extender. Name() string //预选阶段, 进行筛选 Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) // 优选阶段,参与优选评分 Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) // extender对pod指向绑定操作 Bind(binding *v1.Binding) error // 扩展是否支持bind IsBinder() bool // 是否对对应的pod的资源感兴趣 IsInterested(pod *v1.Pod) bool // 抢占阶段 ProcessPreemption( pod *v1.Pod, nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error) // 是否支持抢占 SupportsPreemption() bool // IsIgnorable returns true indicates scheduling should not fail when this extender // is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well. IsIgnorable() bool }
2.1.2 默认实现
// HTTPExtender implements the algorithm.SchedulerExtender interface. type HTTPExtender struct { extenderURL string preemptVerb string filterVerb string prioritizeVerb string bindVerb string weight int64 // 对应的权重 client *http.Client // 负责http接口通过 nodeCacheCapable bool // 是否传递node元数据 managedResources sets.String // 当前extender管理的资源 ignorable bool }
extender的默认是海鲜是同过 HTTPExtender实现,即基于http协议通过json来进行数据传递,其核心数据结构如下
2.2 关键实现机制
2.2.1 远程通信接口
其实通信很简单,通过http协议json序列化方式来进行远程post的提交,并序列化返回的结果
// Helper function to send messages to the extender func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error { // 序列化 out, err := json.Marshal(args) if err != nil { return err } // 拼接url url := strings.TrimRight(h.extenderURL, "/") + "/" + action req, err := http.NewRequest("POST", url, bytes.NewReader(out)) if err != nil { return err } // 设置http header req.Header.Set("Content-Type", "application/json") // 发送数据接收结果 resp, err := h.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode) } // 序列化返回结果 return json.NewDecoder(resp.Body).Decode(result) }
2.2.2 node cache
nodeCacheCapable是声明extender的一个参数,即对应的extender是否会缓存node的数据,如果缓存数据,则只需要传递node的名字,而不会进行所有元数据的传递,可以减少通信的数据包大小
if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { // 只会传递node的名字 nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { // 传递node所有元数据 nodeList.Items = append(nodeList.Items, *node) } } // 构建传递的数据 args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, }
2.2.3 managedResources
在进行extender的调用的时候,会进行检测extenders会否对对应的pod的container的资源感兴趣,如果感兴趣,则进行调用,否则则会进行跳过
func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool { if h.managedResources.Len() == 0 { return true } // pod的容器 if h.hasManagedResources(pod.Spec.Containers) { return true } // pod的初始化容器 if h.hasManagedResources(pod.Spec.InitContainers) { return true } return false } func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool { for i := range containers { container := &containers[i] // 检查container的requests里面是否有感兴趣的资源 for resourceName := range container.Resources.Requests { if h.managedResources.Has(string(resourceName)) { return true } } // 检查container的limits里面是否有感兴趣的资源 for resourceName := range container.Resources.Limits { if h.managedResources.Has(string(resourceName)) { return true } } } return false }
2.3 过滤接口Filter
Filter主要是用于在预选阶段完成后调用extender进行二次过滤
2.3.1 循环串行调用
在findNodesThatPassExtenders中会遍历所有的extender来确定是否关心对应的资源,如果关心就会调用Filter接口来进行远程调用,并将筛选结果传递给下一个extender,逐步缩小筛选集合,注意这个阶段的插件调用是串行,因为每个插件都以上个插件的结果来继续筛选
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { for _, extender := range g.extenders { if len(filtered) == 0 { break } // 判断对应的extender是否关心pod中容器的资源 if !extender.IsInterested(pod) { continue } // 进行远程过程的调用 filteredList, failedMap, err := extender.Filter(pod, filtered) if err != nil { if extender.IsIgnorable() { klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", extender, err) continue } return nil, err } // 通过结果 for failedNodeName, failedMsg := range failedMap { if _, found := statuses[failedNodeName]; !found { statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg) } else { statuses[failedNodeName].AppendReason(failedMsg) } } // 传递给下一个extender之前的FIlter结果 filtered = filteredList } return filtered, nil }
2.3.2 远程过滤接口
func (h *HTTPExtender) Filter( pod *v1.Pod, nodes []*v1.Node, ) ([]*v1.Node, extenderv1.FailedNodesMap, error) { var ( result extenderv1.ExtenderFilterResult nodeList *v1.NodeList nodeNames *[]string nodeResult []*v1.Node args *extenderv1.ExtenderArgs ) fromNodeName := make(map[string]*v1.Node) for _, n := range nodes { fromNodeName[n.Name] = n } if h.filterVerb == "" { return nodes, extenderv1.FailedNodesMap{}, nil } // 根据nodeCacheCapable来进行参数的传递 if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { nodeList.Items = append(nodeList.Items, *node) } } args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } // 调用对应service的filter接口 if err := h.send(h.filterVerb, args, &result); err != nil { return nil, nil, err } if result.Error != "" { return nil, nil, fmt.Errorf(result.Error) } // 根据nodeCacheCapable和结果来进行结果数据的组合 if h.nodeCacheCapable && result.NodeNames != nil { nodeResult = make([]*v1.Node, len(*result.NodeNames)) for i, nodeName := range *result.NodeNames { if n, ok := fromNodeName[nodeName]; ok { nodeResult[i] = n } else { return nil, nil, fmt.Errorf( "extender %q claims a filtered node %q which is not found in the input node list", h.extenderURL, nodeName) } } } else if result.Nodes != nil { nodeResult = make([]*v1.Node, len(result.Nodes.Items)) for i := range result.Nodes.Items { nodeResult[i] = &result.Nodes.Items[i] } } return nodeResult, result.FailedNodes, nil }
2.4 优先级接口Prioritize
2.4.1 并行优先级统计
优先级阶段调用extender插件是并行的,通过并行的调用extender获取主机结果,然后再串行的汇总结果,计算算法为:主机得分=得分*当前extender的优先级
var mu sync.Mutex var wg sync.WaitGroup combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if !g.extenders[i].IsInterested(pod) { continue } wg.Add(1) // 并行调用 extender go func(extIndex int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec() wg.Done() }() prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) if err != nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities return } mu.Lock() // 串行进行结果的汇总 for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klog.V(10) { klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score) } // 主机的结果=得分*当前extender的优先级 combinedScores[host] += score * weight } mu.Unlock() }(i) } // wait for all go routines to finish wg.Wait()
2.4.2 合并优先级结果
结果汇总的得分,在当前版本中的计算:主机得分=主机得分*(100/10),
for i := range result { // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, therefore we need to scale the score returned by extenders to the score range used by the scheduler. result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) }
2.4.3 优先级接口调用
优先级调用接口跟Filter流程上都是一样的,只需要拼接传递数据,然后返回结果即可,不同的是返回结果中会返回当前extender的优先级,以用于后续计算
func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) { var ( result extenderv1.HostPriorityList nodeList *v1.NodeList nodeNames *[]string args *extenderv1.ExtenderArgs ) if h.prioritizeVerb == "" { result := extenderv1.HostPriorityList{} for _, node := range nodes { result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0}) } return &result, 0, nil } // 根据node cache来进行传递参数的构建 if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { nodeList.Items = append(nodeList.Items, *node) } } args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } if err := h.send(h.prioritizeVerb, args, &result); err != nil { return nil, 0, err } // 返回结果 return &result, h.weight, nil }
2.5 绑定阶段
绑定阶段其实就只需要把当前结果传递给对应的插件,即可
func (h *HTTPExtender) Bind(binding *v1.Binding) error { var result extenderv1.ExtenderBindingResult if !h.IsBinder() { // This shouldn't happen as this extender wouldn't have become a Binder. return fmt.Errorf("Unexpected empty bindVerb in extender") } req := &extenderv1.ExtenderBindingArgs{ PodName: binding.Name, PodNamespace: binding.Namespace, PodUID: binding.UID, Node: binding.Target.Name, } if err := h.send(h.bindVerb, &req, &result); err != nil { return err } if result.Error != "" { return fmt.Errorf(result.Error) } return nil }
新年回来第一次更新,文章内容相对简单一点,今天就到这里了,谢谢大佬们观看,希望对大佬们有用,扩展机制的后续总结会在分析完framework之后,希望大佬们能帮转发下,谢谢大家 > 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章 > 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
阿里云数据分析最佳实践:二维数据可视化 + 设备数据下发
概述 物联网数据分析,又称Link Analytics,是阿里云为物联网开发者提供的设备智能分析服务,全链路覆盖了设备数据生成、管理(存储)、清洗、分析及可视化等环节。有效降低数据分析门槛,助力物联网开发工作。这里分别演示通过二维数据可视化功能展示设备位置 + 通过数据分析实现定时下发数据到设备。 Step By Step 1、创建产品,导入物模型,参考链接 物模型json内容 { "schema": "https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json", "profile": { "productKey": "a1kVHWEOsM2" }, "properties": [ { "identifier": "GeoLocation", "name": "地理位置", "accessMode": "rw", "required": true, "dataType": { "type": "struct", "specs": [ { "identifier": "Longitude", "name": "经度", "d...
- 下一篇
Spring Cloud Greenwich SR5 发布
Spring Cloud Greenwich Service Release 5 (SR5) 发布了,可通过 Maven Central 获取。发布主页显示,此版本包含错误修复、较小的功能增强以及文档更新。 这是 Greenwich Release Train 的最终计划发行版本(更多信息请参见此处)。官方建议尽快升级到 Hoxton,Greenwich.SR5 基于 Spring Boot 2.1.12。 Spring Cloud Commons 为了支持来自配置客户端中 YAML 的合并列表,此版本进行了更改,以便将每个PropertySource分别添加到环境中,而不是添加到组合中。 作为 Greenwich.SR5 的一部分,以下模块已更新: MODULE VERSION ISSUES Spring Cloud Build 2.1.10.RELEASE Spring Cloud Sleuth 2.1.7.RELEASE (issues) Spring Cloud Consul 2.1.5.RELEASE (issues) Spring Cloud Gateway 2.1.5....
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装