图解kubernetes中etcd增删改查的工业实现
kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现
1. 数据的存储与版本
1.1 数据存储的转换

1.2 资源版本revision

1.3 数据模型的映射

2. 查询接口一致性

// 省略非核心代码
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
// 获取key
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
// 检测当前版本,是否达到最小版本的
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
// 执行数据转换
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
// 解码数据
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
3. 创建接口实现![image.png]()
创建一个接口数据则会首先进行资源对象的检查,避免重复创建对象,此时会先通过资源对象的version字段来进行初步检查,然后在利用etcd的事务机制来保证资源创建的原子性操作
// 省略非核心代码
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
// 将数据编码
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
// 转换数据
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
startTime := time.Now()
// 事务操作
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key), // 如果之前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在
).Then(
clientv3.OpPut(key, string(newData), opts...), // put修改数据
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
// 获取对应的Revision
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
4. 删除接口的实现

// 省略非核心代码
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
startTime := time.Now()
// 获取当前的key的数据
getResp, err := s.client.KV.Get(ctx, key)
for {
// 获取当前的状态
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于当前状态,就尝试删除
).Then(
clientv3.OpDelete(key), // 删除
).Else(
clientv3.OpGet(key), // 获取
).Commit()
if !txnResp.Succeeded {
// 获取最新的数据重试事务操作
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
// 将最后一个版本的数据解码到out里面,然后返回
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
5. 更新接口的实现

// 省略非核心代码
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
// 获取当前key的最新数据
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}
// 获取当前数据
var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
// 如果提供了建议的数据,则会使用,
origState, err = s.getStateFromObject(suggestion[0])
if err != nil {
return err
}
//但是需要检测数据
mustCheckData = true
} else {
// 尝试重新获取数据
origState, err = getCurrentState()
if err != nil {
return err
}
}
transformContext := authenticatedDataString(key)
for {
// 检查对象是否已经更新, 主要是通过检测uuid/revision来实现
if err := preconditions.Check(key, origState.obj); err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// 如果检查数据一致性错误,则需要重新获取
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
// 删除当前的版本数据revision
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// It's possible we were working with stale data
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
// 编码数据
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if !origState.stale && bytes.Equal(data, origState.data) {
// 如果我们发现我们当前的数据与获取到的数据一致,则会直接跳过
if mustCheckData {
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
if !bytes.Equal(data, origState.data) {
// original data changed, restart loop
continue
}
}
if !origState.stale {
// 直接返回数据
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// 砖汉数据
newData, err := s.transformer.TransformToStorage(data, transformContext)
if err != nil {
return storage.NewInternalError(err.Error())
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
startTime := time.Now()
// 事务更新数据
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
if err != nil {
return err
}
trace.Step("Transaction committed")
if !txnResp.Succeeded {
// 重新获取数据
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
trace.Step("Retry value restored")
mustCheckData = false
continue
}
// 获取put响应
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
6. 未曾讲到的地方
transformer的实现和注册地方我并没有找到,只看到了几个覆盖资源类型的地方,还有list/watch接口,后续再继续学习,今天就先到这里,下次再见
> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章
> 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
平台型产品功能设计的需求抽象与升维适配|专访宜信郭建伟
摘要:平台型产品经理需要具备“对业务的结构化理解”,“先抽象提炼”再“反向适配具体业务需求”。 前言:宜信技术人物专访是宜信技术学院推出的系列性专题,我们邀请软件研发行业的优秀技术人,分享自己在软件研发领域的实践经验和前瞻性观点。 第五期专访我们邀请到宜信科技中心普惠金融需求管理部负责人郭建伟,以“平台型产品的功能与体验设计”为主题,围绕宜信的产品开发实践,分享平台型产品经理的工作经验和能力要求。 嘉宾 | 宜信郭建伟 记者 | 宜信成芳 本文为采访实录。原创内容,转载请留言获取授权。 记者:郭建伟老师您好,今天我们的采访将围绕“平台型产品的功能与体验设计”展开。业务模式的变化会对产品产生升级改造的需求。请您结合宜信的具体实践,介绍在业务模式发生颠覆性创新的情况下,平台型产品团队该如何处理跨多平台融合、多团队沟通的复杂问题,从而顺利推进和实现平台产品升级? 郭建伟:正好在宜信的这段工作经历中,参与过一次公司战略级的业务模式创新项目——火凤凰项目,将网贷业务中间人模式升级为直接借贷模式。有一些经验体会可以分享给大家: 首先,要明确自身在项目中的定位,目标清晰。 通常业务模式升级类项目都是...
-
下一篇
你一定想不到,实现一个Python+Selenium的自动化测试框架就这么简单!
首先你得知道什么是Selenium? Selenium是一个基于浏览器的自动化测试工具,它提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分:Selenium IDE、Selenium WebDriver 和Selenium Grid。 Selenium IDE:Firefox的一个扩展,它可以进行录制回放,并把录制的操作以多种语言(例如java、python等)的形式导出成测试用例。 Selenium WebDriver:提供Web自动化所需的API,主要用作浏览器控制、页面元素选择和调试。不同的浏览器需要不同的WebDriver。 Selenium Grid:提供了在不同机器的不同浏览器上运行selenium测试的能力。 下面我会使用思维导图目录结构介绍基础测试框架,编写测试用例进行功能测试用例,希望对您的学习有所帮助。 设计思路 框架采用python3 + selenium3 + PO + yaml + ddt + unittest等技术编写成基础测试框架,能适应日常测试工作需要。 使用Page Object模式将页面定位和业务操作分开,分离...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS关闭SELinux安全模块
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G


微信收款码
支付宝收款码