您现在的位置是:首页 > 文章详情

图解kubernetes容器探活机制核心实现状态管理

日期:2020-02-12点击:244

k8s为实现容器探活worker的管理构建了一个Manager组件,该组件负责底层探活worker的管理,并且缓存当前的容器的状态,并对外同步容器的当前状态,今天我们就来分析下其部分核心组件

1. 核心原理实现

image.png Manager缓存的状态主要是会被kubelet、状态组件消费,并且在Pod同步状态的时候,会通过当前Manager里面的探测状态来更新Pod的容器的就绪与启动状态的更新,让我们一起看看Manager自身的一些关键实现吧

2. 探活结果管理

即prober/results/results_manager组件,其主要作用是:存储探测结果和通知探测结果

2.1 核心数据结构

cache负责容器的探测结果的保存,updates则负责对外更新状态的订阅,其通过新的结果和cache中的状态进行对比,从而决定是否对外通知

// Manager implementation. type manager struct { // 保护cache sync.RWMutex // 容器ID->探测结果 cache map[kubecontainer.ContainerID]Result // 更新管道 updates chan Update } 

2.2 更新缓存通知事件

更新缓存的时候回通过对比前后状态来进行是否发布变更事件,从而通知到外部订阅容器变更的kubelet核心流程

 func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) { // 修改内部状态 if m.setInternal(id, result) { // 同步更新事件 m.updates <- Update{id, result, pod.UID} } } 

内部状态修改与判断是否进行同步实现

// 如果之前的缓存不存在,或者前后状态不一致则会返回true触发更新 func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool { m.Lock() defer m.Unlock() prev, exists := m.cache[id] if !exists || prev != result { m.cache[id] = result return true } return false } 

2.3 对外更新管道

 func (m *manager) Updates() <-chan Update { return m.updates } 

3.探测管理器 

探测管理器是指的prober/prober)manager的Manager组件,其负责当前kubelet上面探活组件的管理,并且进行探测状态结果的缓存与同步,并且内部还通过statusManager来进行apiserver状态的同步

3.1 容器探测Key

每个探测Key包含要探测的目标信息:pod的ID、容器名、探测类型

type probeKey struct { podUID types.UID containerName string probeType probeType } 

3.2 核心数据结构

statusManager组件在后续章节里面会进行详细分析,说下livenessManager该组件即探活的结果,所以当一个容器探测失败,则会由kubelet本地先进行处理,而readlinessManager和startupManager则需要通过statusManager同步给apiserver进行同步

type manager struct { //探测Key与worker映射 workers map[probeKey]*worker // 读写锁 workerLock sync.RWMutex //statusManager缓存为探测提供pod IP和容器id。 statusManager status.Manager // 存储readiness探测结果 readinessManager results.Manager // 存储liveness探测结果 livenessManager results.Manager // 存储startup探测结果 startupManager results.Manager // 执行探测操作 prober *prober } 

3.3 同步startup探测结果

func (m *manager) updateStartup() { // 从管道获取数据进行同步 update := <-m.startupManager.Updates() started := update.Result == results.Success m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) } 

3.4 同步readiness探测结果

func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) } 

3.5 启动同步探测结果后台任务

func (m *manager) Start() { // Start syncing readiness. go wait.Forever(m.updateReadiness, 0) // Start syncing startup. go wait.Forever(m.updateStartup, 0) } 

3.6 添加Pod探测

添加 Pod的时候会遍历Pod的所有容器,并根据探测类型来进行对应探测worker的构建

func (m *manager) AddPod(pod *v1.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name // 针对startupProbe的探测任务的构建 if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { key.probeType = startup if _, ok := m.workers[key]; ok { klog.Errorf("Startup probe already exists! %v - %v", format.Pod(pod), c.Name) return } // 构建新的worker w := newWorker(m, startup, pod, c) m.workers[key] = w go w.run() } // 针对ReadinessProbe的探测任务的构建 if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { klog.Errorf("Readiness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, readiness, pod, c) m.workers[key] = w go w.run() } // 针对LivenessProbe的探测任务的构建 if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { klog.Errorf("Liveness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, liveness, pod, c) m.workers[key] = w go w.run() } } } 

3.7 更新Pod状态

更新Pod状态主要是根据当前Manager里面缓存的之前的状态信息来更新Pod里面对应容器的状态,这些状态是Pod里面容器最新的探测状态,获取这些状态则是检测当前的容器是否已经就绪和启动,为后续更新流程做基础数据

3.7.1 容器状态更新

 for i, c := range podStatus.ContainerStatuses { var ready bool // 检测容器状态 if c.State.Running == nil { ready = false } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { // 检测readinessMnager里面的状态,如果是成功则就是已经就绪 ready = result == results.Success } else { // 检查是否有尚未运行的探测器。只要存在则认为就绪 _, exists := m.getWorker(podUID, c.Name, readiness) ready = !exists } podStatus.ContainerStatuses[i].Ready = ready var started bool if c.State.Running == nil { started = false } else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { // 容器正在运行,如果StartupProbe功能被禁用,则假定它已启动 started = true } else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { // 如果startupManager里面的状态是成功的则认为是已经启动的 started = result == results.Success } else { // 检查是否有尚未运行的探测器。 _, exists := m.getWorker(podUID, c.Name, startup) started = !exists } podStatus.ContainerStatuses[i].Started = &started } 

3.7.2 初始化容器状态更新

针对初始化容器主要容器已经终止并且退出的状态码为0,则认为初始化容器已经就绪

 for i, c := range podStatus.InitContainerStatuses { var ready bool if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 { // 容器状态 ready = true } podStatus.InitContainerStatuses[i].Ready = ready } 

3.8 存活状态通知

存活状态通知主要是在kubelet的核心流程循环中进行的,如果检测到容器的状态失败,会立刻进行对应pod的容器状态的同步,从而决定下一步的操作是做什么

 case update := <-kl.livenessManager.Updates(): // 如果探测状态失败 if update.Result == proberesults.Failure { // 省略代码 handler.HandlePodSyncs([]*v1.Pod{pod}) } 

探活整体的设计大概就是这样,接下来会分期其statusManager组件,即将将探测的状态与apiserver的同步的实现, k8s源码阅读电子书地址: https://www.yuque.com/baxiaoshi/tyado3

> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章 21天大棚 > 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布

原文链接:https://my.oschina.net/u/4131034/blog/3165379
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章