您现在的位置是:首页 > 文章详情

ELK+kafka集成

日期:2018-11-29点击:475

1、因为本项目采用的log4j2,所以在log4j2中直接配置 

<Kafka name="Kafka" topic="XX_log">     <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>       <Property name="bootstrap.servers">127.0.0.1:9092</Property>       <Property name="timeout.ms">500</Property>   </Kafka>

PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。 


2、搭建kafka集群这里就不多介绍了官网很全, 

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183


3、创建logstash动态模板 

{       "template": "*",       "settings": {           "index.refresh_interval": "5s",           "number_of_replicas": "0",           "number_of_shards": "3"       },       "mappings": {           "_default_": {               "_all": {                   "enabled": false               },               "dynamic_templates": [                   {                       "message_field": {                           "match": "message",                           "match_mapping_type": "string",                           "mapping": {                               "type": "string",                               "index": "analyzed"                           }                       }                   },                   {                       "string_fields": {                           "match": "*",                           "match_mapping_type": "string",                           "mapping": {                               "type": "string",                               "index": "not_analyzed"                           }                       }                   }               ],               "properties": {                   "dateTime": {                       "type": "date",                       "format": "yyy-MM-dd HH:mm:ss"                   },                   "@version": {                       "type": "integer",                       "index": "not_analyzed"                   },                   "context": {                       "type": "string",                       "index": "analyzed"                   },                   "level": {                       "type": "string",                       "index": "not_analyzed"                   },                   "class": {                       "type": "string",                       "index": "not_analyzed"                   },                   "server": {                       "type": "string",                       "index": "not_analyzed"                   }               }           }       }   }


4、配置logstash 

input{          kafka {                   zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"                   group_id =>"logstash"                   topic_id =>"XX_log"                   reset_beginning => false                   consumer_threads => 5                   decorate_events => true          }   }   filter {      mutate{           split=>["message","||"]           add_field => {                "dateTime" => "%{[message][0]}"           }           add_field => {                 "level" => "%{[message][1]}"           }           add_field => {                  "class" => "%{[message][2]}"           }           add_field => {                   "server" => "%{[message][3]}"            }           add_field => {                   "context" => "%{[message][4]}"            }            remove_field => ["message"]       }       date {           match => ["logdate", "yyyy-MM-dd HH:mm:ss"]       }   }   output{         elasticsearch {               hosts => ["127.0.0.1:9200"]               index => "XX_log-%{+YYYY-MM}"               codec => "json"               manage_template => true               template_overwrite => true               flush_size => 50000               idle_flush_time => 10               workers => 2               template => "E:\logstash\template\template_log.json"         }   }


按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。 


5、搭建ZK集群,这里就不多介绍了,网上资料比较多


6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。


7、配置kibana 

server.port: 5601 # 服务端口   # The host to bind the server to.   server.host: "115.28.240.113"   elasticsearch.url: http://127.0.0.1:9200   ES地址-集群   kibana.index: "kibana"


8、版本 JKD 1.7  ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4


原文链接:https://blog.roncoo.com/article/129152
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章