logstash将Kafka中的日志数据订阅到HDFS
前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉。那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年、一年甚至更长时间的大数据分析。下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs。
一:安装logstash(下载tar包安装也行,我直接yum装了)
#yum install logstash-2.1.1
二:从github上克隆代码
#git clone https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git #ls logstash-output-webhdfs-discontinued
三:安装logstash-output-webhdfs插件
#cd logstash-output-webhdfs-discontinued logstash的bin目录下有个plugin,使用plugin来安装插件 #/opt/logstash/bin/plugin install logstash-output-webhdfs
四:配置logstash
#vim /etc/logstash/conf.d/logstash.conf input { kafka { zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181' #kafka的zk集群地址 group_id => 'hdfs' #消费者组,不要和ELK上的消费者一样 topic_id => 'apiAppWebCms-topic' #topic consumer_id => 'logstash-consumer-10.10.8.8' #消费者id,自定义,我写本机ip。 consumer_threads => 1 queue_size => 200 codec => 'json' } } output { #如果你一个topic中会有好几种日志,可以提取出来分开存储在hdfs上。 if [type] == "apiNginxLog" { webhdfs { workers => 2 host => "10.10.8.1" #hdfs的namenode地址 port => 50070 #webhdfs端口 user => "hdfs" #hdfs运行的用户啊,以这个用户的权限去写hdfs。 path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log #按天建目录,按小时建log文件。 flush_size => 500 # compression => "snappy" #压缩格式,可以不压缩 idle_flush_time => 10 retry_interval => 0.5 } } if [type] == "apiAppLog" { webhdfs { workers => 2 host => "10.64.8.1" port => 50070 user => "hdfs" path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log" flush_size => 500 # compression => "snappy" idle_flush_time => 10 retry_interval => 0.5 } } stdout { codec => rubydebug } }
五:启动logstash
#/etc/init.d/logstash start
已经可以成功写入了。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Lync就地升级Skype for Business报错处理一例
Lync Server 2013就地升级Skype for Business Server 2015,So easy ,So do. 一切都按部就班进行,一路过关斩妖,安装必须的IIS KB补丁、SQL RtcLocal/LYNCLocal with sp1、删除原来的Lync Server 2013相关的组件和数据库,再安装Skype for Business Server 2015组件和数据库,一切看起来就是那么顺利。 规划不到位(安装系统和Lync,系统盘空间规划的比较小),必报错,今天讲一例报错,先看一下就地升级报错信息: 中文报错: 错误: 命令执行失败: Install-CsDatabase 无法找到用于存储数据库文件的合适驱动器。这通常是由于磁盘空间不足;通常,在尝试创建数据库之前,您至少应有 32 GB 的可用空间。但是,此命令失败可能有其他原因。有关详细信息,请参阅 http://go.microsoft.com/fwlink/?LinkId=511023 英文报错: Install-CsDatabase was unable to find suitable dri...
- 下一篇
iPerf 测试网速的小教训
最近使用iPerf有个小教训,简单记载引以为戒。 上周,豆子公司升级WAN宽带从20M到1G。光纤,SPF,交换机端口,VLAN, iBGP的设定等等都严格按照ISP的要求,最后升级完成以后,豆子做iPerf做了个网速测试。 在我看来,iperf的测试应该很简单,在一端机器上运行 ipferf.exe -s 监听,另外一端运行iperf.exe -c IPaddress,就行了。 测试的结果如下所示。豆子一看,尼玛,带宽怎么才100M?鉴于我们的ISP不太靠谱的历史记录,我的第一个反应就是ISP又在坑爹了。于是很气愤的联系老板和对方的客户经理,%¥##!#¥@, 一通牢骚之后,对方表示会安排工程师测试速度。 过了2天,测试结果出来了,网速没有问题。皮球又踢回给我。豆子挠挠头皮,这到底是哪里出错了?经过研究,最后发现,网速真的没有问题,有问题的是我的测试方式!! iPerf可以进行多线程的并发测试,他默认的情况下只执行了一个线程,但是在带宽很大的情况下,完全达不到带宽的最大上限,这样导致iPerf给了一个错误的带宽值出来。当我同时执行几十个线程进行压力测试,真正的上限才显示出来。 比如说...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7