Kubernetes CRI 分析 - kubelet 创建 Pod 分析
kubelet CRI 创建 Pod 调用流程
本文以 kubelet dockershim 创建 Pod 调用流程为例做分析。 kubelet 通过调用 dockershim 来创建并启动容器,而 dockershim 则调用 Docker 来创建并启动容器,并调用 CNI 来构建 Pod 网络。
kubelet dockershim 创建 Pod 调用流程图
dockershim 属于 kubelet 内置 CRI shim,其余的 remote CRI shim 创建 Pod 调用流程其实与 dockershim 调用基本一致,只不过是调用了不同的容器引擎来操作容器,但一样由 CRI shim 调用 CNI 来构建 Pod 网络。
下面是详细的源码分析。
kubeGenericRuntimeManager 的 SyncPod 方法,调用 CRI 创建 Pod 的逻辑将在该方法里触发。
从该方法代码也可以看出,kubelet 创建一个 Pod 的逻辑为:
-
先创建并启动 Pod sandbox 容器,并构建好 Pod 网络。
-
创建并启动 ephemeral containers。
-
创建并启动 init containers。
-
最后创建并启动 normal containers(即普通业务容器)。
这里对调用 m.createPodSandbox 来创建 Pod sandbox 进行分析,用 m.startContainer 等调用分析可以参照该分析自动进行分析,调用流程几乎一致。
// pkg/kubelet/kuberuntime/kuberuntime_manager.go // SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { ... // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) ... }
m.createPodSandbox
**m.createPodSandbox **方法主要是调用 m.runtimeService.RunPodSandbox。
runtimeService 即 RemoteRuntimeService,实现了 CRI shim 客户端-容器运行时接口 RuntimeService interface,持有与 CRI shim 容器运行时服务端通信的客户端。所以调用 m.runtimeService.RunPodSandbox,实际上等于调用了 CRI shim 服务端的 RunPodSandbox 方法,来进行 Pod sandbox 的创建。
// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go // createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error). func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) { podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) klog.Error(message) return "", message, err } // Create pod logs directory err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755) if err != nil { message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err) klog.Errorf(message) return "", message, err } runtimeHandler := "" if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil { runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName) if err != nil { message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err) return "", message, err } if runtimeHandler != "" { klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler) } } podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler) if err != nil { message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err) klog.Error(message) return "", message, err } return podSandBoxID, "", nil }
m.runtimeService.RunPodSandbox
m.runtimeService.RunPodSandbox 方法,会调用 r.runtimeClient.RunPodSandbox,即利用 CRI shim 客户端,调用 CRI shim 服务端来进行 Pod sandbox 的创建。
分析到这里,kubelet 中的 CRI 相关调用就分析完毕了,接下来将会进入到 CRI shim(以 kubelet 内置 CRI shim-dockershim 为例)里进行创建 Pod sandbox 的分析。
// pkg/kubelet/remote/remote_runtime.go // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) { // Use 2 times longer timeout for sandbox operation (4 mins by default) // TODO: Make the pod sandbox timeout configurable. ctx, cancel := getContextWithTimeout(r.timeout * 2) defer cancel() resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{ Config: config, RuntimeHandler: runtimeHandler, }) if err != nil { klog.Errorf("RunPodSandbox from runtime service failed: %v", err) return "", err } if resp.PodSandboxId == "" { errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata()) klog.Errorf("RunPodSandbox failed: %s", errorMessage) return "", errors.New(errorMessage) } return resp.PodSandboxId, nil }
r.runtimeClient.RunPodSandbox
接下来以 dockershim 为例,进入到 CRI shim 来进行创建 Pod sandbox 的分析。
前面 kubelet 调用 r.runtimeClient.RunPodSandbox,会进入到 dockershim 下面的 RunPodSandbox 方法。
创建 Pod sandbox 主要有 5 个步骤:
-
调用 docker,拉取 pod sandbox 的镜像。
-
调用 docker,创建 pod sandbox 容器。
-
创建 pod sandbox 的 Checkpoint。
-
调用 docker,启动 pod sandbox 容器。
-
调用 CNI,给 pod sandbox 构建网络。
// pkg/kubelet/dockershim/docker_sandbox.go // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. // For docker, PodSandbox is implemented by a container holding the network // namespace for the pod. // Note: docker doesn't use LogDirectory (yet). func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { config := r.GetConfig() // Step 1: Pull the image for the sandbox. image := defaultSandboxImage podSandboxImage := ds.podSandboxImage if len(podSandboxImage) != 0 { image = podSandboxImage } // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly. // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository // Only pull sandbox image when it's not present - v1.PullIfNotPresent. if err := ensureSandboxImageExists(ds.client, image); err != nil { return nil, err } // Step 2: Create the sandbox container. if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName { return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler()) } createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err) } if err != nil || createResp == nil { return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} ds.setNetworkReady(createResp.ID, false) defer func(e *error) { // Set networking ready depending on the error return of // the parent function if *e == nil { ds.setNetworkReady(createResp.ID, true) } }(&err) // Step 3: Create Sandbox Checkpoint. if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // Rewrite resolv.conf file generated by docker. // NOTE: cluster dns settings aren't passed anymore to docker api in all cases, // not only for pods with host network: the resolver conf will be overwritten // after sandbox creation to override docker's behaviour. This resolv.conf // file is shared by all containers of the same pod, and needs to be modified // only once per pod. if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // Do not invoke network plugins if in hostNetwork mode. if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { return resp, nil } // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod // sandbox networking, but it might insert iptables rules or open ports // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) networkOptions := make(map[string]string) if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { // Build DNS options. dnsOption, err := json.Marshal(dnsConfig) if err != nil { return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err) } networkOptions["dns"] = string(dnsOption) } err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions) if err != nil { errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)} // Ensure network resources are cleaned up even if the plugin // succeeded but an error happened between that success and here. err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) if err != nil { errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod) if err != nil { errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } return resp, utilerrors.NewAggregate(errList) } return resp, nil }
接下来以 ds.client.CreateContainer 调用为例,分析下dockershim是如何调用docker的。
ds.client.CreateContainer 主要是调用 d.client.ContainerCreate。
// pkg/kubelet/dockershim/libdocker/kube_docker_client.go func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) { ctx, cancel := d.getTimeoutContext() defer cancel() // we provide an explicit default shm size as to not depend on docker daemon. // TODO: evaluate exposing this as a knob in the API if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 { opts.HostConfig.ShmSize = defaultShmSize } createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) if ctxErr := contextError(ctx); ctxErr != nil { return nil, ctxErr } if err != nil { return nil, err } return &createResp, nil }
ds.client.ContainerCreate 构建请求参数,向 Docker 指定的 url 发送 http 请求,创建 Pod sandbox 容器。
// vendor/github.com/docker/docker/client/container_create.go // ContainerCreate creates a new container based in the given configuration. // It can be associated with a name, but it's not mandatory. func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) { var response container.ContainerCreateCreatedBody if err := cli.NewVersionError("1.25", "stop timeout"); config != nil && config.StopTimeout != nil && err != nil { return response, err } // When using API 1.24 and under, the client is responsible for removing the container if hostConfig != nil && versions.LessThan(cli.ClientVersion(), "1.25") { hostConfig.AutoRemove = false } query := url.Values{} if containerName != "" { query.Set("name", containerName) } body := configWrapper{ Config: config, HostConfig: hostConfig, NetworkingConfig: networkingConfig, } serverResp, err := cli.post(ctx, "/containers/create", query, body, nil) defer ensureReaderClosed(serverResp) if err != nil { return response, err } err = json.NewDecoder(serverResp.body).Decode(&response) return response, err } // vendor/github.com/docker/docker/client/request.go // post sends an http request to the docker API using the method POST with a specific Go context. func (cli *Client) post(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) { body, headers, err := encodeBody(obj, headers) if err != nil { return serverResponse{}, err } return cli.sendRequest(ctx, "POST", path, query, body, headers) }
总结
CRI 架构图
在 CRI 之下,包括两种类型的容器运行时的实现:
-
kubelet 内置的 dockershim,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim 代码内置于 kubelet,被 kubelet 调用,让 dockershim 起独立的 server 来建立 CRI shim,向 kubelet 暴露 grpc server。
-
外部的容器运行时,用来支持 rkt、containerd 等容器引擎的外部容器运行时。
kubelet 调用 CRI 创建 Pod 流程分析
kubelet 创建一个 Pod 的逻辑为:
-
先创建并启动 pod sandbox 容器,并构建好 Pod 网络。
-
创建并启动 ephemeral containers。
-
创建并启动 init containers。
-
最后创建并启动 normal containers(即普通业务容器)。
下面以 kubelet dockershim 创建 Pod 调用流程为例做一下分析。
kubelet 通过调用 dockershim 来创建并启动容器,而 dockershim 则调用 Docker 来创建并启动容器,并调用 CNI 来构建 Pod 网络。
kubelet dockershim 创建 Pod 调用流程图示
dockershim 属于 kubelet 内置 CRI shim,其余 remote CRI shim 的创建 Pod 调用流程其实与 dockershim 调用基本一致,只不过是调用了不同的容器引擎来操作容器,但一样由 CRI shim 调用 CNI 来构建 Pod 网络。
关注“青云技术社区”公众号,后台回复关键字“云原生实战”,即可加入课程交流群。
作者
良凯尔 云原生爱好者 负责 Kubernetes 相关的研发工作
本文由博客一文多发平台 OpenWrite 发布!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
v72.01 鸿蒙内核源码分析(Shell解析篇) | 应用窥伺内核的窗口 | 百篇博客分析OpenHarmony源码
子曰:“苟正其身矣,于从政乎何有?不能正其身,如正人何?” 《论语》:子路篇 百篇博客系列篇.本篇为: v72.xx 鸿蒙内核源码分析(Shell解析篇) | 应用窥视内核的窗口 进程管理相关篇为: v02.06 鸿蒙内核源码分析(进程管理) | 谁在管理内核资源 v24.03 鸿蒙内核源码分析(进程概念) | 如何更好的理解进程 v45.05 鸿蒙内核源码分析(Fork) | 一次调用 两次返回 v46.05 鸿蒙内核源码分析(特殊进程) | 老鼠生儿会打洞 v47.02 鸿蒙内核源码分析(进程回收) | 临终托孤的短命娃 v48.05 鸿蒙内核源码分析(信号生产) | 年过半百 活力十足 v49.03 鸿蒙内核源码分析(信号消费) | 谁让CPU连续四次换栈运行 v71.03 鸿蒙内核源码分析(Shell编辑) | 两个任务 三个阶段 v72.01 鸿蒙内核源码分析(Shell解析) | 应用窥伺内核的窗口 系列篇从内核视角用一句话概括shell的底层实现为:两个任务,三个阶段。其本质是独立进程,因而划到进程管理模块。每次创建shell进程都会再创建两个任务。 客户端任务(Shel...
- 下一篇
从 JavaScript 调用本地函数 | 用 WasmEdge 运行JavaScript 程序
WasmEdge 让 JavaScript 可以在共享库调用本地函数。 在前三篇文章中,我解释了为什么以及如何在 WebAssembly 沙箱中运行 JavaScript 程序。同时,还讨论了如何使用 Rust 为 WasmEdge 创建自定义 JavaScript AP。 用 WasmEdge 在 WebAssembly 中运行 JavaScript 将 JavaScript 嵌入到 Rust 中,并在 WebAssembly 中运行 用 Rust 创建高性能 JavaScript API,并在 WebAssembly 中运行 但是,为了完全访问底层系统的操作系统和硬件功能,我们有时需要为基于 C 的本机函数创建 JavaScript API。 也就是说,当 JavaScript 程序调用预定义的函数时,WasmEdge 会将其传递给 OS 上的原生共享库执行。 本文中,我们将向你展示如何做到这一点。我们将创建以下两个组件。 一个定制的 WasmEdge runtime,允许 WebAssembly 函数调用外部原生函数。 一个定制的 QuickJS 解释器,用于解析 JavaScr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker安装Oracle12C,快速搭建Oracle学习环境