您现在的位置是:首页 > 文章详情

日志收集系统chukwa协议分析

日期:2013-11-07点击:779

chukwa是apache下的一个开源项目。主要是用于日志收集分析,结合hadoop进行有效的日志处理和计算。下面这张图片描述的比较形象。

chunkwa

在这个架构中可以看出主要由agent,collector,hadoop,这样几块组成。而我现在要关注的就是agentcollector的交互部分。原来的日志收集系统是直接使用chukwa的一整套服务搭建起来的。但是这样子的系统有一个比较明显的缺点就是不能达到实时处理的要求。正常的情况下,日志处理的延时在5到10分钟左右。因此,需要在此基础上修改这样的架构。分析后发现,chukwa的agent还是比较好用的,拥有很多的配置方法,方便对服务器的调控。另外因为是线上服务器,也不可以随便换用agent。因为决定自己实现collector端,收集agent发过来的数据。因此就需要研究chunkwa传输的时候是怎么编码的。下面这张图片是我分析后得到的结果:

chunk

每一次,agent会发送一个package过来,这个package包括很多个chunk。其中开始的4个字节表示的int值表示这个package有多少个chunk。上面这张图片表示的就是一个chunk的各个字段组成。因为这是一个流的形式字符,需要根据java中的相应编码方式使用python进行解码。但是我查看了一下,python中没有直接支持的包,不过有一个struct扩展模块,通过对该扩展模块进行简单包装,就可以实现读取Int型,Long型,UTF字符等。代码如下:

# -*-coding: utf-8 -*- from struct import * class BinaryStream: def __init__(self,base_stream): self.base_stream = base_stream self.offset = 0 def readBytes(self,length): string, = unpack_from(str(length) + 's',self.base_stream[self.offset:]) self.offset += calcsize(str(length) + 's') self.offset +=1 return string def readUTF(self): length, = unpack_from('!H',self.base_stream[self.offset:]) self.offset +=calcsize('!H') string, = unpack_from(str(length) + 's',self.base_stream[self.offset:]) self.offset +=calcsize(str(length) + 's') return string def readLong(self): num, = unpack_from('!Q',self.base_stream[self.offset:]) self.offset +=calcsize('!Q') return num def readInt(self): num, = unpack_from('!I',self.base_stream[self.offset:]) self.offset += calcsize('!I') return num 

使用这个方法就可以很简单的把收集到的package包中的chunk解码,并且提取其中的日志内容。通过这个方法基本上可以使得延时缩小到10秒以内。

原文链接:https://yq.aliyun.com/articles/449587
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章