您现在的位置是:首页 > 文章详情

logstash将Kafka中的日志数据订阅到HDFS

日期:2016-07-06点击:682

前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉。那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年、一年甚至更长时间的大数据分析。下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs。

wKiom1d8ekGgX_HBAABasyVQt-E025.png-wh_50

一:安装logstash(下载tar包安装也行,我直接yum装了)

#yum install logstash-2.1.1


二:从github上克隆代码

#git clone  https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git #ls logstash-output-webhdfs-discontinued


三:安装logstash-output-webhdfs插件

#cd logstash-output-webhdfs-discontinued logstash的bin目录下有个plugin,使用plugin来安装插件 #/opt/logstash/bin/plugin install logstash-output-webhdfs

wKiom1d8b9HzWVqnAAAN1y8C7dQ438.png-wh_50


四:配置logstash

#vim /etc/logstash/conf.d/logstash.conf input {   kafka {     zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181'   #kafka的zk集群地址     group_id => 'hdfs'                     #消费者组,不要和ELK上的消费者一样     topic_id => 'apiAppWebCms-topic'       #topic      consumer_id => 'logstash-consumer-10.10.8.8'   #消费者id,自定义,我写本机ip。     consumer_threads => 1     queue_size => 200     codec => 'json'   } } output {             #如果你一个topic中会有好几种日志,可以提取出来分开存储在hdfs上。 if [type] == "apiNginxLog" {     webhdfs {            workers => 2            host => "10.10.8.1"        #hdfs的namenode地址                port => 50070              #webhdfs端口            user => "hdfs"             #hdfs运行的用户啊,以这个用户的权限去写hdfs。            path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log               #按天建目录,按小时建log文件。            flush_size => 500 #       compression => "snappy"             #压缩格式,可以不压缩         idle_flush_time => 10         retry_interval => 0.5        }    } if [type] == "apiAppLog" {     webhdfs {         workers => 2         host => "10.64.8.1"         port => 50070         user => "hdfs"         path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log"         flush_size => 500 #        compression => "snappy"         idle_flush_time => 10         retry_interval => 0.5        }    }   stdout { codec => rubydebug } }

 

五:启动logstash

#/etc/init.d/logstash start


已经可以成功写入了。

wKiom1d8d97DWrGPAABNBo8GGGY014.png

wKioL1d8eFTSO0dwAACYhM0Li-E358.png-wh_50



原文链接:https://blog.51cto.com/heqin/1796776
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章