k8s与日志--journalbeat源码解读
前言
对于日志系统的重要性不言而喻,参照沪江的一 篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用:
- 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案
- 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态
- 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程
日志+大数据+AI的确有很多想象空间。
而对于收集系统,流行的技术stack有之前的elk,到现在的efk。logstash换成了filebeat。当然日志收集agent,也有flume和fluentd,尤其fluentd属于cncf组织的产品,在k8s中有着广泛的应用。但是fluentd是ruby写的,不利于深入源码了解。当然今天我们重点讲的是另外一个agent--journalbeat。望文生义,隶属于efk stack 中beats系列中的一员,专门用于收集journald日志。
journalbeat源码解读
journald日志简介
长久以来 syslog 是每一个 Unix 系统中的重要部件。在漫长的历史中在各种 Linux 发行版中都有不同的实现去完成类似的工作,它们采取的是逻辑相近,并使用基本相同的文件格式。但是 syslog 也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证、数据格式松散、日志检索低效、有限的元数据保存、无法记录二进制数据等。
Journald是针对以上需求的解决方案。受udev事件启发,Journal 条目与环境组块相似。一个键值域,按照换行符分开,使用大写的变量名。除了支持ASCII 格式的字符串外,还能够支持二进制数据,如 ATA SMART 健康信息、SCSI 数据。应用程序和服务可以通过将项目域传递给systemd journald服务来生成项目。该服务可以为项目增加一定数量的元数据。这些受信任域的值由 Journal 服务来决定且无法由客户端来伪造。在Journald中,可以把日志数据导出,在异地读取,并不受处理器架构的影响。这对嵌入式设备是很有用的功能,方便维护人员分析设备运行状况。
大致总结就是
- journald日志是新的linux系统的具备的
- journald区别于传统的文件存储方式,是二进制存储。需要用journalctl查看。
docker对于journald的支持
The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已经增加了journald驱动。
目前本司使用场景
我们的k8s集群,所有的docker输出的日志格式都采用journald,这样主机centos系统日志和docker的日志都用journalbeat来收集。
journalbeat实现关键
journalbeat整个实现过程,基本上两点:
- 与其他社区贡献的beats系列,比如packetbeat,mysqlbeat类似,遵循了beats的框架和约定,journalbeat实现了run和stop等方法即可,然后作为一个客户端,将收集到的数据,publish到beats中。
- 读取journald日志,采用了coreos开源的go-systemd库中sdjournal部分。其实sdjournal是一个利用cgo 对于journald日志c接口的封装。
源码解读
程序入口:
package main import ( "log" "github.com/elastic/beats/libbeat/beat" "github.com/mheese/journalbeat/beater" ) func main() { err := beat.Run("journalbeat", "", beater.New) if err != nil { log.Fatal(err) } }
整个journalbeat共实现了3个方法即可。run,stop,和new。
run和stop顾名思义,就是beats控制journalbeat的运行和停止。
而new:
需要按照
// Creator initializes and configures a new Beater instance used to execute // the beat its run-loop. type Creator func(*Beat, *common.Config) (Beater, error)
实现Creator方法,返回的Beater实例,交由beats控制。
具体实现:
// New creates beater func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config := config.DefaultConfig var err error if err = cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("Error reading config file: %v", err) } jb := &Journalbeat{ config: config, done: make(chan struct{}), cursorChan: make(chan string), pending: make(chan *eventReference), completed: make(chan *eventReference, config.PendingQueue.CompletedQueueSize), } if err = jb.initJournal(); err != nil { logp.Err("Failed to connect to the Systemd Journal: %v", err) return nil, err } jb.client = b.Publisher.Connect() return jb, nil }
一般的beats中,都会有一些共同属性。例如下面的done和client属性。
// Journalbeat is the main Journalbeat struct type Journalbeat struct { done chan struct{} config config.Config client publisher.Client journal *sdjournal.Journal cursorChan chan string pending, completed chan *eventReference wg sync.WaitGroup }
done是一个控制整个beater启停的信号量。
而client 是与beats平台通信的client。注意在初始化的时候,
jb.client = b.Publisher.Connect()
建立链接。
然后在收集到数据,发送的时候,也是通过该client
select { case <-jb.done: return nil default: // we need to clone to avoid races since map is a pointer... jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed) }
注意上边的发送姿势和对于刚才提到的done信号量使用。
其他方法都是业务相关不再详细解读了。
journalbeat如何保证发送失败的日志重新发送
关于这点,个人感觉是最优雅的部分
所有发送失败的日志是会在程序结束之前以json格式保存到文件,完成持久化。
// on exit fully consume both queues and flush to disk the pending queue defer func() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for evRef := range jb.pending { pending[evRef.cursor] = evRef.body } }() go func() { defer wg.Done() for evRef := range jb.completed { completed[evRef.cursor] = evRef.body } }() wg.Wait() logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed))) if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil { logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err) } }()
程序启动以后首先会读取之前持久化的发送失败的日志,重新发送
// load the previously saved queue of unsent events and try to publish them if any if err := jb.publishPending(); err != nil { logp.Warn("could not read the pending queue: %s", err) }
client publish收集到的日志到beats,设置了publisher.Guaranteed模式,成功和失败都有反馈
jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
其中publisher.Signal(&eventSignal{ref, jb.completed})类似于一个回调,凡是成功的都会写成功的ref到jb.completed中。方便客户端控制。
维护了两个chan,一个存放客户端发送的日志,一个存放服务端接受成功的日志,精确对比,可获取发送失败的日志,进入重发动作
journalbeat struct中有下面两个属性
pending, completed chan *eventReference
每次客户端发送一条日志,都会写到pending。
case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed): if published := <-publishedChan; published { jb.pending <- ref // save cursor if jb.config.WriteCursorState { jb.cursorChan <- rawEvent.Cursor } } }
publisher.Signal(&eventSignal{ref, jb.completed}),回调会将成功的写到completed。
整个程序同时会启动一个
go jb.managePendingQueueLoop()
协程,专门用来定时重发失败日志。
// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked func (jb *Journalbeat) managePendingQueueLoop() { jb.wg.Add(1) defer jb.wg.Done() pending := map[string]common.MapStr{} completed := map[string]common.MapStr{} // diff returns the difference between this map and the other. diff := func(this, other map[string]common.MapStr) map[string]common.MapStr { result := map[string]common.MapStr{} for k, v := range this { if _, ok := other[k]; !ok { result[k] = v } } return result } // flush saves the map[string]common.MapStr to the JSON file on disk flush := func(source map[string]common.MapStr, dest string) error { tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest))) if err != nil { return err } if err = json.NewEncoder(tempFile).Encode(source); err != nil { _ = tempFile.Close() return err } _ = tempFile.Close() return os.Rename(tempFile.Name(), dest) } // on exit fully consume both queues and flush to disk the pending queue defer func() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for evRef := range jb.pending { pending[evRef.cursor] = evRef.body } }() go func() { defer wg.Done() for evRef := range jb.completed { completed[evRef.cursor] = evRef.body } }() wg.Wait() logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed))) if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil { logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err) } }() // flush the pending queue to disk periodically tick := time.Tick(jb.config.PendingQueue.FlushPeriod) for { select { case <-jb.done: return case p, ok := <-jb.pending: if ok { pending[p.cursor] = p.body } case c, ok := <-jb.completed: if ok { completed[c.cursor] = c.body } case <-tick: result := diff(pending, completed) if err := flush(result, jb.config.PendingQueue.File); err != nil { logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err) } pending = result completed = map[string]common.MapStr{} } } }
总结
当然还有一些其他的细节,不再一一讲述了。比如定时写Cursor的功能和日志格式转换等。具体的大家可以看源码。主要是讲了我认为其优雅的部分和为beats编写beater的要点。
本文转自SegmentFault-k8s与日志--journalbeat源码解读
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
centos7纯手动安装kubernetes-v1.11版本
简介 本文章主要介绍如何通过使用官方提供 的二进制包安装配置k8s集群 实验环境说明 实验架构 lab1: master 11.11.11.111 lab2: node 11.11.11.112 lab3: node 11.11.11.113 复制代码 实验使用的Vagrantfile # -*- mode: ruby -*- # vi: set ft=ruby : ENV["LC_ALL"] = "en_US.UTF-8" Vagrant.configure("2") do |config| (1..3).each do |i| config.vm.define "lab#{i}" do |node| node.vm.box = "centos-7.4-docker-17" node.ssh.insert_key = false node.vm.hostname = "lab#{i}" node.vm.network "private_network", ip: "11.11.11.11#{i}" node.vm.provision "shell", inline: "echo he...
- 下一篇
超适合小项目的 K8S 部署策略
Kubernetes 的稳健性、可靠性使它成为现阶段最流行的云 原生技术之一,但也有不少用户反映, Kubernetes 技术学习起来十分复杂,只适用于大集群且成本较高。这篇文章将打破你的观念,教你在小型项目中部署 Kubernetes 集群。 选择 K8S 部署小型集群的三大理由 理由一:花费时间少 在部署小型集群之前,你需要思考以下这些问题: 应该如何部署应用程序?(仅仅 rsync 到服务器?) 依赖关系是怎么样的?(如果利用 python 或 ruby,你必须在服务器上安装它们!) 手动运行命令?(如果以 nohup 的方式在后台运行二进制文件这可能不是最好的选择,但去配置路由服务,是否还需要学习 systemd?) 如何通过不同域名或 HTTP 路径运行多个应用程序?(你可能需要设置 haproxy 或 Nginx!) 当更新应用程序后应该如何推出新变化?(停止服务、部署代码、重启服务?如何避免停机?) 如果搞砸了部署怎么办?有什么方法可以回滚? 应用程序是否需要使用其他服务?又该如何配置这些服务?(如:redis) 以上这些问题很有可能在你部署小型集群时出现,但 Kuber...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6