日志收集系统chukwa协议分析
chukwa是apache下的一个开源项目。主要是用于日志收集分析,结合hadoop进行有效的日志处理和计算。下面这张图片描述的比较形象。
在这个架构中可以看出主要由agent
,collector
,hadoop
,这样几块组成。而我现在要关注的就是agent
和collector
的交互部分。原来的日志收集系统是直接使用chukwa的一整套服务搭建起来的。但是这样子的系统有一个比较明显的缺点就是不能达到实时处理的要求。正常的情况下,日志处理的延时在5到10分钟左右。因此,需要在此基础上修改这样的架构。分析后发现,chukwa的agent还是比较好用的,拥有很多的配置方法,方便对服务器的调控。另外因为是线上服务器,也不可以随便换用agent。因为决定自己实现collector端,收集agent发过来的数据。因此就需要研究chunkwa传输的时候是怎么编码的。下面这张图片是我分析后得到的结果:
每一次,agent会发送一个package过来,这个package包括很多个chunk。其中开始的4个字节表示的int值表示这个package有多少个chunk。上面这张图片表示的就是一个chunk的各个字段组成。因为这是一个流的形式字符,需要根据java中的相应编码方式使用python进行解码。但是我查看了一下,python中没有直接支持的包,不过有一个struct扩展模块,通过对该扩展模块进行简单包装,就可以实现读取Int型,Long型,UTF字符等。代码如下:
# -*-coding: utf-8 -*- from struct import * class BinaryStream: def __init__(self,base_stream): self.base_stream = base_stream self.offset = 0 def readBytes(self,length): string, = unpack_from(str(length) + 's',self.base_stream[self.offset:]) self.offset += calcsize(str(length) + 's') self.offset +=1 return string def readUTF(self): length, = unpack_from('!H',self.base_stream[self.offset:]) self.offset +=calcsize('!H') string, = unpack_from(str(length) + 's',self.base_stream[self.offset:]) self.offset +=calcsize(str(length) + 's') return string def readLong(self): num, = unpack_from('!Q',self.base_stream[self.offset:]) self.offset +=calcsize('!Q') return num def readInt(self): num, = unpack_from('!I',self.base_stream[self.offset:]) self.offset += calcsize('!I') return num
使用这个方法就可以很简单的把收集到的package包中的chunk解码,并且提取其中的日志内容。通过这个方法基本上可以使得延时缩小到10秒以内。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
用Kibana+Logstash+Elasticsearch快速搭建实时日志查询、收集与分析系统
安装环境 先看看都需要安装什么软件包 ruby 运行Kibana 必须, rubygems 安装ruby扩展必须 bundler 功能类似于yum JDK 运行java程序必须 redis 用来处理日志队列 logstash 收集、过滤日志 ElasticSearch 全文搜索服务(logstash集成了一个) kibana 页面展示 192.168.18.240 logstash index,kibana,JDK 192.168.18.241 logstash agent,JDK 192.168.18.242 redis 192.168.18.243 ElasticSearch,JDK 先安装redis (192.168.18.242) # wget http://redis.googlecode.com/files/redis-2.6.12.tar.gz # tar zxvf redis-2.6.12.tar.gz # mv redis-2.6.12 redis # cd redis # make -j24 # make install # vi /root/soft/redis/...
- 下一篇
跟我一起云计算(1)——storm
概述 最近要做一个实时分析的项目,所以需要深入一下storm。 为什么storm 综合下来,有以下几点: 1. 生逢其时 MapReduce 计算模型打开了分布式计算的另一扇大门,极大的降低了实现分布式计算的门槛。有了MapReduce架构的支持,开发者只需要把注意力集中在如何使用 MapReduce的语义来解决具体的业务逻辑,而不用头疼诸如容错,可扩展性,可靠性等一系列硬骨头。一时间,人们拿着MapReduce这把榔头去敲 各种各样的钉子,自然而然的也试图用MapReduce计算模型来解决流处理想要解决的问题。各种失败的尝试之后,人们意识到,改良MapReduce并 不能使之适应于流处理的场景,必须发展出全新的架构来完成这一任务(MapReduce不适合做流处理的原因Yahoo!在其S4的介绍论文里面有比较详 细的阐述,而UCBerkeley的SparkStreaming项目现在正在尝试挑战这一结论,感兴趣的同志请自行查看)。另一方面,人们对传统的 CEP解决方案心存疑虑,认为其非分布式的架构可扩展性不够,无法scaleout来满足海量的数据处理要求。这时候,Yahoo!的S4以及 ...
相关文章
文章评论
共有0条评论来说两句吧...