k8s与监控--改造telegraf的buffer实现
改造telegraf的buffer实现
前言
最近在使用telegraf的场景中,要求数据在程序意外终止的时候不丢失。按照telegraf最初的原始实现,在running_output内部维护了两个buffer,分别是metrics和failMetrics。这两个buffer是基于go中channel实现的。由于没 有持久化机制,在意外退出的时候,存在丢失数据的风险。所以这篇文章主要讲述之前telegraf保证数据安全的一些措施和我们对代码的一些优化。
telegraf关于数据安全的处理办法
关于两个buffer,定义在running_output.go的struct中。
// RunningOutput contains the output configuration type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int MetricBatchSize int MetricsFiltered selfstat.Stat MetricsWritten selfstat.Stat BufferSize selfstat.Stat BufferLimit selfstat.Stat WriteTime selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer // Guards against concurrent calls to the Output as described in #3009 sync.Mutex }
这个两个buffer的大小提供了配置参数可以设置。
metrics: buffer.NewBuffer(batchSize), failMetrics: buffer.NewBuffer(bufferLimit),
顾名思义。metrics存放要发送到指定output的metric,而failMetrics存放发送失败的metric。当然失败的metrics会在telegraf重发机制下再次发送。
if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) if err != nil { ro.failMetrics.Add(batch...) } }
在向metrics增加metrics的时候,做是否达到批量发送的数量,如果达到就调用发送方法。当然还有定时的解决方案,如果一直没有达到MetricBatchSize,也会在一定时间后发送数据。具体实现代码在agent.go中
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) semaphore := make(chan struct{}, 1) for { select { case <-shutdown: log.Println("I! Hang on, flushing any cached metrics before shutdown") // wait for outMetricC to get flushed before flushing outputs wg.Wait() a.flush() return nil case <-ticker.C: go func() { select { case semaphore <- struct{}{}: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() <-semaphore default: // skipping this flush because one is already happening log.Println("W! Skipping a scheduled flush because there is" + " already a flush ongoing.") } }()
在程序接受到停止信号后,程序会首先flush剩下的数据到output中,然后退出进程。这样可以保证一定的数据安全。
基于redis实现buffer的持久化
在持久化机制的选型中,优先实现redis。本身redis性能高,而且具备完善的持久化。
具体的实现架构如下:
具体代码:
package buffer import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer/memory" "github.com/influxdata/telegraf/internal/buffer/redis" ) const ( BufferTypeForMemory = "memory" BufferTypeForRedis = "redis" ) type Buffer interface { IsEmpty() bool Len() int Add(metrics ...telegraf.Metric) Batch(batchSize int) []telegraf.Metric } func NewBuffer(mod string, size int, key, addr string) Buffer { switch mod { case BufferTypeForRedis: return redis.NewBuffer(size, key, addr) default: return memory.NewBuffer(size) } }
然后分别内存和redis实现了Buffer接口。
其中NewBuffer相当于一个工厂方法。
当然在后期可以实现基于file和db等buffer实现,来满足不同的场景和要求。
redis实现buffer的要点
由于要满足先进先出的要求,选择了redis的list数据结构。redis中的list是一个字符串list,所以telegraf中metric数据接口要符合序列化的要求。比如属性需要可导出,即public。所以这点需要改动telegraf对于metric struct的定义。另外可以选择json或是msgpack等序列化方式。我们这边是采用的json序列化的方式。
结语
改造以后,可以根据自己的需求通过配置文件来决定使用channel或是redis来实现buffer。各有优劣,内存实现的话,性能高,受到的依赖少。而redis这种分布式存储,决定了数据安全,但是性能会有一定的损耗,毕竟有大量的序列化和反序列化以及网络传输,当然依赖也增加了,取决于redis的可靠性,建议redis集群部署。
本文转自中文社区-k8s与监控--改造telegraf的buffer实现
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
谈谈k8s1.12新特性--Mount propagation(挂载命名空间的传播)
Mount propagation 挂载传播允许将Container挂载的卷共享到同一Pod中的其他Container,甚至可以共 享到同一节点上的其他Pod。 一个卷的挂载传播由Container.volumeMounts中的mountPropagation字段控制。它的值是: None 此卷挂载不会接收到任何后续挂载到该卷或是挂载到该卷的子目录下的挂载。以类似的方式,在主机上不会显示Container创建的装载。这是默认模式。 此模式等同于Linux内核文档中所述的 private 传播。 HostToContainer 此卷挂载将会接收到任何后续挂载到该卷或是挂载到该卷的子目录下的挂载。 换句话说,如果主机在卷挂载中挂载任何内容,则Container将看到它挂载在那里。 类似地,如果任何具有 Bidirectional 挂载传播设置的Pod挂载到同一个卷中,那么具有HostToContainer挂载传播的Container将会看到它。 此模式等同于Linux内核文档中描述的rslave挂载传播。 Bidirectional 此卷挂载的行为与HostToContainer挂载相同。...
- 下一篇
k8s使用kube-router暴露集群中的pod和svc到外部
简介 使用kube-router把k8s集群中的pod ip和cluter i暴露集群 外部,实现集群外的节点直接访问k8s的pod和svc 环境说明 本实验在已经安装配置好k8s集群基础之上进行实验,k8s安装参考博客其他文章。 实验架构 lab1: master 11.11.11.111 lab2: node 11.11.11.112 lab3: node 11.11.11.113 lab4: external 11.11.11.114 复制代码 安装 # 本次实验重新创建了集群,使用之前测试其他网络插件的集群环境没有成功 # 可能是由于环境干扰,实验时需要注意 # 创建kube-router目录下载相关文件 mkdir kube-router && cd kube-router rm -f generic-kuberouter-all-features.yaml wget https://raw.githubusercontent.com/cloudnativelabs/kube-router/master/daemonset/generic-kuberouter...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用