kubernetes的rolling update机制解析


commit: d577db99873cbf04b8e17b78f17ec8f3a27eca30 Date: Fri Apr 10 23:45:36 2015 -0700

##0.命令行和依赖的基 础知识


Perform a rolling update of the given ReplicationController. Replaces the specified controller with new controller, updating one pod at a time to use the new PodTemplate. The new-controller.json must specify the same namespace as the existing controller and overwrite at least one (common) label in its replicaSelector. kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC 


// Update pods of frontend-v1 using new controller data in frontend-v2.json. $ kubectl rolling-update frontend-v1 -f frontend-v2.json // Update pods of frontend-v1 using JSON data passed into stdin. $ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f - 





 "spec":{ "replicas":1, "selector":{ "name":"redis", "role":"master" }, 


 "labels": { "name": "redis" }, 




func main() { runtime.GOMAXPROCS(runtime.NumCPU()) cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) if err := cmd.Execute(); err != nil { os.Exit(1) } } 



 cmds.AddCommand(NewCmdRollingUpdate(f, out)) 


func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command { cmd := &cobra.Command{ Use: "rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC", // rollingupdate is deprecated. Aliases: []string{"rollingupdate"}, Short: "Perform a rolling update of the given ReplicationController.", Long: rollingUpdate_long, Example: rollingUpdate_example, Run: func(cmd *cobra.Command, args []string) { err := RunRollingUpdate(f, out, cmd, args) cmdutil.CheckErr(err) }, } } 


func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error { ... mapper, typer := f.Object() // TODO: use resource.Builder instead obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()). NamespaceParam(cmdNamespace).RequireNamespace(). FilenameParam(filename). Do(). Object() if err != nil { return err } newRc, ok := obj.(*api.ReplicationController) if !ok { return cmdutil.UsageError(cmd, "%s does not specify a valid ReplicationController", filename) } 


 if oldName == newName { return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s", filename, oldName) } var hasLabel bool for key, oldValue := range oldRc.Spec.Selector { if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue { hasLabel = true break } } if !hasLabel { return cmdutil.UsageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s", filename, oldName) } 


 updater := kubectl.NewRollingUpdater(newRc.Namespace, client) // fetch rc oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName) if err != nil { return err } ... err = updater.Update(out, oldRc, newRc, period, interval, timeout) if err != nil { return err } 

在做rolling update的时候,有两个条件限制,一个是新的rc的名字需要和旧的不一样,第二是至少有个一个标签的值不一样。其中namespace是k8s用来做多租户资源隔离的,可以先忽略不计。

##3. 数据结构和实现


// RollingUpdater provides methods for updating replicated pods in a predictable, // fault-tolerant way. type RollingUpdater struct { // Client interface for creating and updating controllers c client.Interface // Namespace for resources ns string } 

可以看到这里的RollingUpdater里面是一个k8s的client的结构来向api server发送命令

func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error { oldName := oldRc.ObjectMeta.Name newName := newRc.ObjectMeta.Name retry := &RetryParams{interval, timeout} waitForReplicas := &RetryParams{interval, timeout} if newRc.Spec.Replicas <= 0 { return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec) } desired := newRc.Spec.Replicas sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID) // look for existing newRc, incase this update was previously started but interrupted rc, existing, err := r.getExistingNewRc(sourceId, newName) if existing { fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName) if err != nil { return err } replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation] desired, err = strconv.Atoi(replicas) if err != nil { return fmt.Errorf("Unable to parse annotation for %s: %s=%s", newName, desiredReplicasAnnotation, replicas) } newRc = rc } else { fmt.Fprintf(out, "Creating %s\n", newName) if newRc.ObjectMeta.Annotations == nil { newRc.ObjectMeta.Annotations = map[string]string{} } newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired) newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId newRc.Spec.Replicas = 0 newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc) if err != nil { return err } } // +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { newRc.Spec.Replicas += 1 oldRc.Spec.Replicas -= 1 fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas) if err != nil { return err } time.Sleep(updatePeriod) oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas) if err != nil { return err } fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) } // delete remaining replicas on oldRc if oldRc.Spec.Replicas != 0 { fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n", oldName, oldRc.Spec.Replicas, 0) oldRc.Spec.Replicas = 0 oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas) // oldRc, err = r.resizeAndWait(oldRc, interval, timeout) if err != nil { return err } } // add remaining replicas on newRc if newRc.Spec.Replicas != desired { fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n", newName, newRc.Spec.Replicas, desired) newRc.Spec.Replicas = desired newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas) if err != nil { return err } } // Clean up annotations if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil { return err } delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation) delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation) newRc, err = r.updateAndWait(newRc, interval, timeout) if err != nil { return err } // delete old rc fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName) return r.c.ReplicationControllers(r.ns).Delete(oldName) } 


  1. 如果新的rc没有被创建,就先创一下,如果已经创建了(在上次的rolling_update中创建了但超时了)
  2. 用几个循环,把新的rc的replicas增加上去,旧的rc的replicas降低下来,主要调用的函数是resizeAndWait和updateAndWait

##4. 底层调用

接上一节的resizeAndWait,代码在/pkg/kubectl/resize.go,这里的具体代码就不贴了 其余的所有调用都发生/pkg/client这个目录下,这是一个http/json的client,主要功能就是向api-server发送请求 整体来说,上面的wait的实现都是比较土的,就是发一个update请求过去,后面轮询的调用get来检测状态是否符合最终需要的状态。

##5. 总结


update-period:新rc增加一个pod后,等待这个period,然后从旧rc缩减一个pod poll-interval:这个函数名来源于linux上的poll调用,就是每过一个poll-interval,向服务端发起请求,直到这个请求成功或者报失败 timeout:总操作的超时时间

rolling update主要是客户端这边实现的,分析完了,但还是有一些未知的问题,例如:

  1. api-server, cadvisor, kubelet, proxy, etcd这些服务端组件是怎么交互的?怎么保证在服务一直可用的情况下增减pod?
  2. 是否有可能在pod增减的时候插入自己的一些代码或者过程?因为我们目前的架构中没有使用k8s的proxy,需要自己去调用负载均衡的系统给这些pod导流量
  3. 对于具体的pod,我们怎么去做内部程序的健康检查?在业务不可用的情况下向k8s系统发送消息,干掉这个pod,在别的机器上创建新的来替代。
