一、Logstash 介绍
Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
![logstash.png wKioL1e3v2vDm9B-AAEPIwibWb0700.png]()
官网地址:https://www.elastic.co/products/logstash
工作流程
![449064-20150912115455559-525747352.png wKiom1e31O6hsGJUAADcUSxOIdM003.png]()
Logstash 工作的三个阶段:
input 数据输入端,可以接收来自任何地方的源数据。
Filter 数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等,常用的过滤器如下。
output 是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用,常用的有:
-
elasticsearch: 发送事件数据到 Elasticsearch,便于查询,分析,绘图。
-
file: 将事件数据写入到磁盘文件上。
-
mongodb:将事件数据发送至高性能NoSQL mongodb,便于永久存储,查询,分析,大数据分片。
-
redis:将数据发送至redis-server,常用于中间层暂时缓存。
-
graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
-
statsd: 发送事件数据到 statsd。
二、安装logstash
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
vim
/etc/yum
.repos.d
/es
.repo
[kibana-4.5]
name=Kibana repository
for
4.5.x packages
baseurl=http:
//packages
.elastic.co
/kibana/4
.5
/centos
gpgcheck=1
gpgkey=http:
//packages
.elastic.co
/GPG-KEY-elasticsearch
enabled=1
[beats]
name=Elastic Beats Repository
baseurl=https:
//packages
.elastic.co
/beats/yum/el/
$basearch
enabled=1
gpgkey=https:
//packages
.elastic.co
/GPG-KEY-elasticsearch
gpgcheck=1
yum
install
logstash -y
|
三、使用命令行运行一个简单的logstash程序
|
1
2
3
4
5
6
7
8
9
10
|
logstash
/bin/logstash
-e
'input{stdin{}}output{stdout{codec=>rubydebug}}'
{
"message"
=>
"abc"
,
"@version"
=>
"1"
,
"@timestamp"
=>
"2016-08-20T03:33:00.769Z"
,
"host"
=>
"iZ23tzjZ"
}
|
四、配置语法讲解
logstash使用{ }来定义配置区域,区域内又可以包含其插件的区域配置。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
input{
stdin{ }
}
output{
stdout{
codec=>rubydebug
}
}
input{
stdin{ }
}
filter{
}
output{
stdout{
codec=>rubydebug
}
}
input{
stdin{ }
file
{
path => [
"/var/log/message"
]
type
=>
"system"
start_position =>
"beginning"
}
}
output{
stdout{
codec=>rubydebug
}
file
{
path =>
"/var/datalog/mysystem.log.gz"
gzip
=>
true
}
}
|
启动方式
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/bin/logstash
-f
/etc/logstash/conf
.d
/nginx_logstash
.conf
/bin/logstash
-f
/etc/logstash/conf
.d
/nginx_logstash
.conf &
/etc/init
.d
/logstash
start
|
五、filebeat基本讲解
filebeat是基于原先 logstash-forwarder 的源码开发而来,无需JAVA环境,运行起来更轻便,无疑是业务服务器端的日志收集工具。
配 置
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
filebeat:
prospectors:
-
paths:
-
/var/log/messages
input_type: log
document_type: messages
-
paths:
-
/alidata/log/nginx/access/access
.log.json
input_type: log
document_type: nginxacclog
-
paths:
-
/alidata/www/storage/logs/laravel
.log
input_type: log
document_type: larlog
-
paths:
-
/alidata/www/500_error/500_error
.log
input_type: log
document_type: error500
-
paths:
-
/alidata/www/deposit/deposit
.log
input_type: log
document_type: deposit
-
paths:
-
/alidata/www/call_error
.log
input_type: log
document_type: call_error
-
paths:
-
/alidata/www/weixin_deposit
.log
input_type: log
document_type: weixin_deposit
-
paths:
-
/alidata/log/php/php-fpm
.log.slow
input_type: log
document_type: phpslowlog
multiline:
pattern:
'^[[:space:]]'
negate:
true
match: after
registry_file:
/var/lib/filebeat/registry
output:
redis:
host:
"10.122.52.129"
port: 6379
password:
"123456"
logstash:
hosts: [
"10.160.8.221:5044"
]
shipper:
name:
"host_2"
logging:
files:
rotateeverybytes: 10485760
|
filebeat主要配置就是这个配置文件了,设定好之后启动服务就会自动从源拉取数据发送到指定位置,当数据为普通行数据时,filebeat会自动为其添加字段信息,其中一项字段 @timestamp 为filebeat读取到这条数据的时间,默认格式为UTC时间,比中国大陆时间早8小时。
如果数据为json格式,而数据中已包含@timestamp字段,filebeat处理时会把@timestamp字段值替换为filebeat读取到该行数据的当前UTC时间。
六、实战运用
架构一
![1111.png wKiom1e7uC6D7Us_AABlKRVNxjw054.png]()
nginx 日志格式配置
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
log_format json
'{"@timestamp":"$time_iso8601",'
'"slbip":"$remote_addr",'
'"clientip":"$http_x_forwarded_for",'
'"serverip":"$server_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"method":"$request_method",'
'"requesturi":"$request_uri",'
'"url":"$uri",'
'"appversion":"$HTTP_APP_VERSION",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}'
;
|
filebeat 配置
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
filebeat:
prospectors:
-
paths:
-
/alidata/log/nginx/access/access
.log.json
input_type: log
document_type: nginxacclog
output:
logstash:
hosts: [
"10.160.8.221:5044"
]
|
logstash 配置 (此处logstash用于接收filebeat的数据,然后转存redis)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
input {
beats {
port => 5044
codec =>
"json"
}
}
filter {
if
[
type
] ==
"nginxacclog"
{
geoip {
source
=>
"clientip"
target =>
"geoip"
database =>
"/u01/elk/logstash/GeoLiteCity.dat"
add_field => [
"[geoip][coordinates]"
,
"%{[geoip][longitude]}"
]
add_field => [
"[geoip][coordinates]"
,
"%{[geoip][latitude]}"
]
}
mutate {
convert => [
"[geoip][coordinates]"
,
"float"
]
}
}
}
output{
if
[
type
] ==
"nginxacclog"
{
redis {
data_type =>
"list"
key =>
"nginxacclog"
host =>
"127.0.0.1"
port =>
"26379"
password =>
"123456"
db =>
"0"
}
}
if
[
type
] ==
"messages"
{
redis {
data_type =>
"list"
key =>
"messages"
host =>
"127.0.0.1"
port =>
"26379"
password =>
"123456"
db =>
"0"
}
}
}
|
logstash 配置 (此处logstash用于读取redis list中的数据,然后转存elasticsearch)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
input{
redis {
host =>
"10.10.1.2"
port =>
"26379"
db =>
"0"
key =>
"nginxacclog"
threads => 300
password =>
"123456"
data_type =>
"list"
codec =>
"json"
}
redis {
host =>
"10.10.1.2"
port =>
"26379"
db =>
"0"
key =>
"messages"
password =>
"123456"
threads => 50
data_type =>
"list"
codec =>
"json"
}
}
output {
if
[
type
] ==
"nginxacclog"
{
elasticsearch {
hosts => [
"127.0.0.1:9200"
]
index =>
"logstash-nginxacclog-%{+YYYY.MM.dd}"
manage_template =>
true
flush_size => 50000
idle_flush_time => 10
workers => 2
}
}
if
[
type
] ==
"messages"
{
elasticsearch {
hosts => [
"127.0.0.1:9200"
]
index =>
"logstash-messages-%{+YYYY.MM.dd}"
manage_template =>
true
flush_size => 50000
idle_flush_time => 30
workers => 1
}
}
}
|
关键指令解释:
threads 开启多少个线程读取redis数据,也就是从redis输入到logstash的速度,线程越多读取速度越快,但是根据接收节点的接收速度来设置,如果输入过快,接收速度不够,则会出现丢数据的情况,设置一个最佳的threads值需要和接收节点做反复测试才能得出。
flush_size 控制logstash向Elasticsearch批量发送数据,上面的配置表示,logstash会努力赞到50000条数据一次发送给Elasticsearch。
idle_flush_time 控制logstash多长时间向Elasticsearch发送一次数据,默认为1秒,根据以上配置,logstash积攒数据未到flush_size 10秒后也会向Elasticsearch发送一次数据。
workers 建议设置为1或2,如果机器性能不错可以设置为2. 不建议设置的更高。
架构二
![3333333.png wKioL1e70Lygdwk4AABaCnmdXAw932.png]()
filebeat 配置(从日志文件读取到的数据直接缓存至redis队列)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
filebeat:
prospectors:
-
paths:
-
/alidata/log/nginx/access/access
.log.json
input_type: log
document_type: nginxacclog
output:
redis:
host:
"10.160.8.221"
port: 26379
password:
"123456"
|
document_type 自定义日志类型,在logstash中可通过type判断做不同的处理。
logstash 配置 (此处logstash用于读取redis list中的数据,然后转存mongodb)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
input {
redis {
host =>
"10.160.8.221"
port =>
"26379"
key =>
"filebeat"
data_type =>
"list"
password =>
"123456"
threads => 50
}
redis {
host =>
"10.160.8.221"
port =>
"26379"
key =>
"mycat"
data_type =>
"list"
password =>
"123456"
threads => 50
type
=>
"mycat"
}
}
output {
if
[
type
] ==
"mycat"
{
mongodb{
collection =>
"mycat%{+yyyyMMdd}"
isodate =>
true
database =>
"logdb"
uri =>
"mongodb://log_user:123456@10.10.1.102:27017/logdb"
}
}
if
[type_xi09wnk] ==
"nginxacclog"
{
mongodb{
collection =>
"nginx_accress%{years_dik3k}%{months_dik3k}%{days_dik3k}"
isodate =>
true
database =>
"logdb"
uri =>
"mongodb://log_user:123456@10.10.1.102:27017/logdb"
}
}
}
|