ELK+kafka集成
1、因为本项目采用的log4j2,所以在log4j2中直接配置 <Kafkaname="Kafka"topic="XX_log"> <PatternLayoutpattern="%d{yyyy-MM-ddHH:mm:ss}||%p||%c{1}||XX_web||%m%n"/> <Propertyname="bootstrap.servers">127.0.0.1:9092</Property> <Propertyname="timeout.ms">500</Property> </Kafka> PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。 2、搭建kafka集群这里就不多介绍了官网很全, zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 3、创建logstash动态模板 { "template":"*", "settings":{ "index.refresh_interval":"5s", "number_of_replicas":"0", "number_of_shards":"3" }, "mappings":{ "_default_":{ "_all":{ "enabled":false }, "dynamic_templates":[ { "message_field":{ "match":"message", "match_mapping_type":"string", "mapping":{ "type":"string", "index":"analyzed" } } }, { "string_fields":{ "match":"*", "match_mapping_type":"string", "mapping":{ "type":"string", "index":"not_analyzed" } } } ], "properties":{ "dateTime":{ "type":"date", "format":"yyy-MM-ddHH:mm:ss" }, "@version":{ "type":"integer", "index":"not_analyzed" }, "context":{ "type":"string", "index":"analyzed" }, "level":{ "type":"string", "index":"not_analyzed" }, "class":{ "type":"string", "index":"not_analyzed" }, "server":{ "type":"string", "index":"not_analyzed" } } } } } 4、配置logstash input{ kafka{ zk_connect=>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" group_id=>"logstash" topic_id=>"XX_log" reset_beginning=>false consumer_threads=>5 decorate_events=>true } } filter{ mutate{ split=>["message","||"] add_field=>{ "dateTime"=>"%{[message][0]}" } add_field=>{ "level"=>"%{[message][1]}" } add_field=>{ "class"=>"%{[message][2]}" } add_field=>{ "server"=>"%{[message][3]}" } add_field=>{ "context"=>"%{[message][4]}" } remove_field=>["message"] } date{ match=>["logdate","yyyy-MM-ddHH:mm:ss"] } } output{ elasticsearch{ hosts=>["127.0.0.1:9200"] index=>"XX_log-%{+YYYY-MM}" codec=>"json" manage_template=>true template_overwrite=>true flush_size=>50000 idle_flush_time=>10 workers=>2 template=>"E:\logstash\template\template_log.json" } } 按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。 5、搭建ZK集群,这里就不多介绍了,网上资料比较多 6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。 7、配置kibana server.port:5601#服务端口 #Thehosttobindtheserverto. server.host:"115.28.240.113" elasticsearch.url:http://127.0.0.1:9200ES地址-集群 kibana.index:"kibana" 8、版本 JKD 1.7 ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4