Logstash读取Kafka数据写入HDFS详解
强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃
通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用logstash将kafka的数据写入到elasticsearch集群,这篇文章将会介绍如何通过logstash将数据写入HDFS
本文所有演示均基于logstash 6.6.2版本
数据收集
logstash默认不支持数据直接写入HDFS,官方推荐的output插件是webhdfs
,webhdfs使用HDFS提供的API将数据写入HDFS集群
插件安装
插件安装比较简单,直接使用内置命令即可
# cd /home/opt/tools/logstash-6.6.2 # ./bin/logstash-plugin install logstash-output-webhdfs
配置hosts
HDFS集群内通过主机名进行通信所以logstash所在的主机需要配置hadoop集群的hosts信息
# cat /etc/hosts 192.168.107.154 master01 192.168.107.155 slave01 192.168.107.156 slave02 192.168.107.157 slave03
如果不配置host信息,可能会报下边的错
[WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items
logstash配置
kafka里边的源日志格式可以参考这片文章:ELK日志系统之使用Rsyslog快速方便的收集Nginx日志
logstash的配置如下:
# cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } output { webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } stdout { codec => rubydebug } }
logstash配置文件分为三部分:input、filter、output
input指定源在哪里,我们是从kafka取数据,这里就写kafka集群的配置信息,配置解释:
- bootstrap_servers:指定kafka集群的地址
- topics:需要读取的topic名字
- codec:指定下数据的格式,我们写入的时候直接是json格式的,这里也配置json方便后续处理
filter可以对input输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等
- 这里我们主要是为了解决生成HDFS文件时因时区不对差8小时导致的文件名不对的问题,后边有详细解释
output指定处理过的日志输出到哪里,可以是ES或者是HDFS等等,可以同时配置多个,webhdfs主要配置解释:
- host:为hadoop集群namenode节点名称
- user:为启动hdfs的用户名,不然没有权限写入数据
- path:指定存储到HDFS上的文件路径,这里我们每日创建目录,并按小时存放文件
- stdout:打开主要是方便调试,启动logstash时会在控制台打印详细的日志信息并格式化方便查找问题,正式环境建议关闭
webhdfs还有一些其他的参数例如compression
,flush_size
,standby_host
,standby_port
等可查看官方文档了解详细用法
启动logstash
# bin/logstash -f config/indexer_rsyslog_nginx.conf
因为logstash配置中开了stdout
输出,所以能在控制台看到格式化的数据,如下:
{ "server_addr" => "172.18.90.17", "http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c", "remote_addr" => "172.18.101.0", "status" => 200, "http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html", "upstream_response_time" => "0.056", "host" => "ops-coffee.cn", "request_uri" => "/api/community/v2/news/list", "request_time" => 0.059, "upstream_status" => "200", "@version" => "1", "http_x_forwarded_for" => "192.168.106.100", "time_local" => 2019-03-18T11:03:45.000Z, "body_bytes_sent" => 12431, "@timestamp" => 2019-03-18T11:03:45.984Z, "index.date" => "20190318", "index.hour" => "19", "request_method" => "POST", "upstream_addr" => "127.0.0.1:8181" }
查看hdfs发现数据已经按照定义好的路径正常写入
$ hadoop fs -ls /logs/nginx/20190318/19.log -rw-r--r-- 3 hadmin supergroup 7776 2019-03-18 19:07 /logs/nginx/20190318/19.log
至此kafka到hdfs数据转储完成
遇到的坑
HDFS按小时生成文件名不对
logstash在处理数据时会自动生成一个字段@timestamp
,默认情况下这个字段存储的是logstash收到消息的时间,使用的是UTC时区,会跟国内的时间差8小时
我们output到ES或者HDFS时通常会使用类似于rsyslog-nginx-%{+YYYY.MM.dd}
这样的变量来动态的设置index或者文件名,方便后续的检索,这里的变量YYYY
使用的就是@timestamp
中的时间,因为时区的问题生成的index或者文件名就差8小时不是很准确,这个问题在ELK架构中因为全部都是用的UTC时间且最终kibana展示时会自动转换我们无需关心,但这里要生成文件就需要认真对待下了
这里采用的方案是解析日志中的时间字段time_local
,然后根据日志中的时间字段添加两个新字段index.date
和index.hour
来分别标识日期和小时,在output的时候使用这两个新加的字段做变量来生成文件
logstash filter配置如下:
filter { # 匹配原始日志中的time_local字段并设置为时间字段 # time_local字段为本地时间字段,没有8小时的时间差 date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } # 添加一个index.date字段,值设置为time_local的日期 ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } # 添加一个index.hour字段,值设置为time_local的小时 ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } }
output的path中配置如下
path => "/logs/nginx/%{index.date}/%{index.hour}.log"
HDFS记录多了时间和host字段
在没有指定codec的情况下,logstash会给每一条日志添加时间和host字段,例如:
源日志格式为
ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
经过logstash处理后多了时间和host字段
2019-03-19T06:28:07.510Z %{host} ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
如果不需要我们可以指定最终的format只取message,解决方法为在output中添加如下配置:
codec => line { format => "%{message}" }
同时output到ES和HDFS
在实际应用中我们需要同时将日志数据写入ES和HDFS,那么可以直接用下边的配置来处理
# cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "localhost:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } ruby { code => "event.set('index.date', event.get('@timestamp').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('@timestamp').time.localtime.strftime('%H'))" } } output { elasticsearch { hosts => ["192.168.106.203:9200"] index => "rsyslog-nginx-%{+YYYY.MM.dd}" } webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } }
这里我使用logstash的date插件将日志中的"time_local"字段直接替换为了@timestamp,这样做有什么好处呢?
logstash默认生成的@timestamp字段记录的时间是logstash接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在ELK架构中Kibana日志输出的默认顺序就是按照@timestamp来排序的,所以往往我们需要将默认的@timestamp替换成日志产生的时间,替换方法就用到了date插件,date插件的用法如下
date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" }
match:匹配日志中的时间字段,这里为time_local
target:将match匹配到的时间戳存储到给定的字段中,默认不指定的话就存到@timestamp字段
另外还有参数可以配置:timezone
,locale
,tag_on_failure
等,具体可查看官方文档
如果你觉得文章不错,请点右下角【在看】。如果你觉得读的不尽兴,推荐阅读以下文章:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
我的ImageIO.write ByteArrayOutputStream为什么这么慢?
问题来源: 1.系统生成二维码,需要不同的图片格式来适应客户端要求 2.图片通过接口模式给客户端,最终使用base64来传递 平常思考模式: 1.BufferedImage首先通过工具把数据生成出来。 2.我绝对不会把这个BufferedImage写磁盘,直接放内存ByteArrayOutputstream后转base64岂不是更快? 3.ImageIO.write正好有个write(BufferedImage img,String format,OutputStream output) 4.真的舒服,我就用它了! 实际情况: 1.Linux环境centos6.8 虚拟化环境 2.JRE1.8 3.接口工作流程:(1) 生成BufferedImage (2)BufferedImage通过ImageIO.write(BufferedImage,"png",ByteArrayOutputStream out) (3)将ByteArrayOutputStream转化为base64 (4) 接口返回 4.一个普通的链接,生成二维码并返回base6...
- 下一篇
全新重构,uni-app实现微信端性能翻倍
多次论证、数月研发,我们重写部分Vue底层、重构uni-app框架,实现了微信端性能翻倍及更多Vue语法支持。 背景 uni-app在初期借鉴了mpvue,实现了微信小程序端的快速兼容,感谢美团点评团队对于开源社区的贡献! 随着使用uni-app的开发者愈来愈多,业务复杂度不断增加,不少开发者抱怨uni-app支持的vue语法少,某些场景性能有问题(特别是页面存在复杂组件的情况),这些问题其实是由mpvue的实现机制导致的,我们以复杂组件的性能问题为例简要说明。 mpvue/wepy 诞生之初,微信小程序尚不支持自定义组件,无法进行组件化开发;mpvue/wepy 为解决这个问题,创造性的将用户编写的Vue组件,编译为WXML中的模板(template),这样变相实现了组件化开发能力,提高代码复用性,这在当时的技术条件下是很棒的技术方案。但如此方案,也导致Vue组件中的数据会被编译为Page中的数据,对组件进行数据更新也会基于路径映射调用Page.setData。特别是组件较多、数据量交大的页面中,每个组件的局部更新会引发页面级别的全局更新,产生极大的性能开销。 微信后来推出的自定义组...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2配置默认Tomcat设置,开启更多高级功能