k8s与监控--从telegraf改造谈golang多协程精确控制
从telegraf改造谈golang多协程精确控制
前言
telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的 项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。与现在流行的各种exporter+promethues监控方案相比:
- 大致具备良好的可扩展性。很容易增加自己的处理逻辑,在input,output,process,filter等环境定制自己专属的插件。
- 统一了各种exporter,减少了部署各种exporter的工作量和维护成本。
目前telegraf改造工作基本上是两大部分:
- 增加了一些telegraf不支持的插件,比如虚拟化(kvm,vmware等),数据库(oracle),k8s和openstack等input插件。
- telegraf是基于配置文件的,所以会有两个问题,很难做分布式和无停机动态调度input任务。所以我们的工作就是将获取配置接口化,所有的配置文件来源于统一配置中心。然后就是改造无停机动态调度input。
在改造改造无停机动态调度input就涉及到golang多协程精确控制的问题。
一些golang常用并发手段
sync包下WaitGroup
具体事例:
var wg sync.WaitGroup wg.Add(len(a.Config.Outputs)) for _, o := range a.Config.Outputs { go func(output *models.RunningOutput) { defer wg.Done() err := output.Write() if err != nil { log.Printf("E! Error writing to output [%s]: %s\n", output.Name, err.Error()) } }(o) } wg.Wait()
WaitGroup内部维护了一个counter,当counter数值为0时,表明添加的任务都已经完成。
总共有三个方法:
func (wg *WaitGroup) Add(delta int)
添加任务,delta参数表示添加任务的数量。
func (wg *WaitGroup) Done()
任务执行完成,调用Done方法,一般使用姿势都是defer wg.Done(),此时counter中会减一。
func (wg *WaitGroup) Wait()
通过使用sync.WaitGroup,可以阻塞主线程,直到相应数量的子线程结束。
chan struct{},控制协程退出
启动协程的时候,传递一个shutdown chan struct{},需要关闭该协程的时候,直接close(shutdown)。struct{}在golang中是一个消耗接近0的对象。
具体事例:
// gatherer runs the inputs that have been configured with their own // reporting interval. func (a *Agent) gatherer( shutdown chan struct{}, kill chan struct{}, input *models.RunningInput, interval time.Duration, metricC chan telegraf.Metric, ) { defer panicRecover(input) GatherTime := selfstat.RegisterTiming("gather", "gather_time_ns", map[string]string{"input": input.Config.Name}, ) acc := NewAccumulator(input, metricC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) ticker := time.NewTicker(interval) defer ticker.Stop() for { internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) start := time.Now() gatherWithTimeout(shutdown, kill, input, acc, interval) elapsed := time.Since(start) GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: return case <-kill: return case <-ticker.C: continue } } }
借助chan 实现指定数量的协程或动态调整协程数量
当然这里必须是每个协程是幂等,也就是所有协程做的是同样的工作。
首先创建 一个 pool:= make(chan chan struct{}, maxWorkers),maxWorkers为目标协程数量。
然后启动协程:
for i := 0; i < s.workers; i++ { go func() { wQuit := make(chan struct{}) s.pool <- wQuit s.sFlowWorker(wQuit) }() }
关闭协程:
func (s *SFlow) sFlowWorker(wQuit chan struct{}) { LOOP: for { select { case <-wQuit: break LOOP case msg, ok = <-sFlowUDPCh: if !ok { break LOOP } } // 此处执行任务操作 }
动态调整:
for n = 0; n < 10; n++ { if len(s.pool) > s.workers { wQuit := <-s.pool close(wQuit) } }
多协程精确控制
在改造telegraf过程中,要想动态调整input,每个input都是唯一的,分属不同类型插件。就必须实现精准控制指定的协程的启停。
这个时候实现思路就是:实现一个kills map[string]chan struct{},k为每个任务的唯一ID。添加任务时候,传递一个chan struct{},这个时候关闭指定ID的chan struct{},就能控制指定的协程。
// DelInput add input func (a *Agent) DelInput(inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _, v := range inputs { if _, ok := a.kills[v.Config.ID]; !ok { return fmt.Errorf("input: %s,未找到,无法删除", v.Config.ID) } } for _, input := range inputs { if kill, ok := a.kills[input.Config.ID]; ok { delete(a.kills, input.Config.ID) close(kill) } } return nil }
添加任务:
// AddInput add input func (a *Agent) AddInput(shutdown chan struct{}, inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _, v := range inputs { if _, ok := a.kills[v.Config.ID]; ok { return fmt.Errorf("input: %s,已经存在无法新增", v.Config.ID) } } for _, input := range inputs { interval := a.Config.Agent.Interval.Duration // overwrite global interval if this plugin has it's own. if input.Config.Interval != 0 { interval = input.Config.Interval } if input.Config.ID == "" { continue } a.wg.Add(1) kill := make(chan struct{}) a.kills[input.Config.ID] = kill go func(in *models.RunningInput, interv time.Duration) { defer a.wg.Done() a.gatherer(shutdown, kill, in, interv, a.metricC) }(input, interval) } return nil }
总结
简单介绍了一下telegraf项目。后续的优化和改造工作还在继续。主要是分布式telegraf的调度算法。毕竟集中化所有exporter以后,telegraf的负载能力受单机能力限制,而且也不符合高可用的使用目标。
本文转自中文社区-k8s与监控--从telegraf改造谈golang多协程精确控制
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
k8s安装traefik配置使用ingress
简介 traefik 是一个前端负载均衡器,对于微服务 架构尤其是 kubernetes 等编排工具具有良好的支持;同 nginx 等相比,traefik 能够自动感知后端容器变化,从而实现自动服务发现。 traefik部署在k8s上分为daemonset和deployment两种方式各有优缺点: daemonset 能确定有哪些node在运行traefik,所以可以确定的知道后端ip,但是不能方便的伸缩 deployment 可以更方便的伸缩,但是不能确定有哪些node在运行traefik所以不能确定的知道后端ip 一般部署两种不同类型的traefik: 面向内部(internal)服务的traefik,建议可以使用deployment的方式 面向外部(external)服务的traefik,建议可以使用daemonset的方式 建议使用traffic-type标签 traffic-type: external traffic-type: internal traefik相应地使用labelSelector traffic-type=internal traffic-type=ext...
- 下一篇
k8s集群配置使用coredns代替kube-dns
简介 CoreDNS是一个Go语言实现的链式插件DNS服务端,是CNCF成员,是一个 高性能、易扩展的DNS服务端。可以很方便的部署在k8s集群中,用来代替kube-dns。 使用kubeadm初始化时指定 安装方法与《centos7使用kubeadm安装k8s集群》基本一致 只需要简单修改kubeadm-master.config配置文件 apiVersion: kubeadm.k8s.io/v1alpha1 kind: MasterConfiguration kubernetesVersion: v1.9.0 imageRepository: registry.cn-shanghai.aliyuncs.com/gcr-k8s etcd: image: registry.cn-shanghai.aliyuncs.com/gcr-k8s/etcd-amd64:3.1.10 api: advertiseAddress: 11.11.11.111 networking: podSubnet: 10.244.0.0/16 featureGates: CoreDNS: true 复制代码 单独...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Linux系统CentOS6、CentOS7手动修改IP地址