实时日志分析之六:Logstash

logstash是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理
官方文档:https://www.elastic.co/guide/en/logstash/current/index.html
中文文档:https://kibana.logstash.es/content/logstash/

下面我们使用logstash消费kafka中的内容output到elasticsearch集群中去

环境

  • 系统:centos7
  • ip:192.168.2.233

优化linux内核参数
参考文章 实时日志分析之二:Openresty 其中的“优化linux内核参数”部分

安装JDK

1
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel

sudo vim /etc/profile 加入如下内容(注意多加了一个环境变量“JAVACMD”)

1
2
3
4
5
6
JAVA_HOME=/usr/lib/jvm/java
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
JAVACMD=/usr/bin/java
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH JAVACMD PATH

载入profile环境

1
source /etc/profile

验证是否安装成功

1
java -version

设置host

1
sudo hostnamectl set-hostname logstash01

安装

1
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

sudo vim /etc/yum.repos.d/logstash.repo 加入如下内容

1
2
3
4
5
6
7
8
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

安装

1
sudo yum install logstash

设置logstash开机启动

1
sudo systemctl enable logstash

  • 安装文件说明:
    配置目录:/etc/logstash/
    备注:自定义配置放在“/etc/logstash/conf.d”下,且必须以.conf结尾
    log目录:/var/log/logstash/
    bin目录:/usr/share/logstash/bin (测试的时候可以用这里面的logstash手动启动,如:sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf)
    elaticsearch模板目录:/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.7-java/lib/logstash/outputs/elasticsearch

  • 插件管理:
    官方可用插件可以查看:https://github.com/logstash-plugins/

  1. 插件帮助:sudo /usr/share/logstash/bin/logstash-plugin –help
  2. 插件列表:sudo /usr/share/logstash/bin/logstash-plugin list
  3. 插件安装:sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-mongodb

配置

sudo vim /etc/hosts 加入如下内容(kafka集群的host, 参考文章实时日志分析之三:Kafka

1
2
3
192.168.2.230 kafka01
192.168.2.231 kafka02
192.168.2.232 kafka03

sudo vim /etc/logstash/jvm.options 修改如下参数,来控制JVM heap size(一般设置为系统内存的一半)

1
2
-Xms2g
-Xmx2g

sudo vim /etc/logstash/logstash.yml 配置如下内容

1
2
3
4
5
6
7
8
9
10
11
12
path.data: /var/lib/logstash
# 最好设置为跟cpu核心数量相同
pipeline.workers: 2
pipeline.output.workers: 2
# Logstash 会攒到 pipeline.batch.size 条数据才一次性发送出去(根据业务的量慎重考虑此值,一般在线上系统需要调整此值,实现最大的吞吐量)
pipeline.batch.size: 20000
# 每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms(此设置一般不需要更改)
pipeline.batch.delay: 5
path.config: /etc/logstash/conf.d
http.host: "192.168.2.233"
log.level: info
path.logs: /var/log/logstash

sudo vim /etc/logstash/conf.d/click_to_elk.conf 配置如下内容(具体的filter规则,请查看logstash文档,这是logstash的核心内容)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
input {
kafka {
codec => "json"
topics => ["we_click_access"]
bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
group_id => "we_click_access__group"
client_id => "logstash01"
max_poll_records => "20000" # 使用poll取数据,每次最多取max_poll_records个
poll_timeout_ms => 1000 # 使用poll取数据,每poll_timeout_ms拿一次数据
auto_offset_reset => "latest"
request_timeout_ms => "40000"
consumer_threads => 3 # consumer的线程数,最好跟topic的分区数相等效率最高(默认1)
enable_auto_commit => "true"
exclude_internal_topics => "true"
fetch_max_wait_ms => "500"
fetch_min_bytes => "1"
heartbeat_interval_ms => "3000"
max_partition_fetch_bytes => "1048576"
reconnect_backoff_ms => "100"
retry_backoff_ms => "100"
session_timeout_ms => "30000"
}
}
filter {
grok {
match => {
message => '%{IPORHOST:remote_addr} - %{NOTSPACE} \[%{HTTPDATE:timestamp}\] "(?:%{WORD} %{NOTSPACE:request}(?: HTTP/%{NUMBER})?|%{DATA})" %{NUMBER:status} (?:%{NUMBER}|-) "%{DATA:referer}" "%{DATA}" "%{DATA}" "%{DATA:wesub}"'
}
remove_field => ["message", "source", "offset", "beat", "input_type"]
}
ruby {
init => "@kname = ['url_path','url_args']"
code => "
if event.get('referer') == '-'
event.set('referer', 0)
else
event.set('referer', 1)
end
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
kv {
prefix => ""
source => "url_args"
field_split => "&"
include_keys => [ "pubid", "campid", "gaid" ]
remove_field => [ "url_args", "url_path", "request" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
dissect {
mapping => {
"wesub" => '%{wesub_clickid}:,%{?wesub_campid}:,%{?wesub_pubid}:,%{wesub_offer_geo}:,%{wesub_payout}:,%{wesub_publisher_payout}:,%{wesub_source}:,%{?wesub_sub}::,%{?wesub_sub2}::,%{?wesub_sub3}'
}
remove_field => ["wesub"]
}
mutate {
convert => {
"status" => "integer"
"pubid" => "integer"
"campid" => "integer"
"wesub_payout" => "float"
"wesub_publisher_payout" => "float"
"wesub_source" => "integer"
}
}
}
output{
stdout{
codec=>rubydebug
}
}

运行测试

1
sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/click_to_elk.conf

这时候访问前面的openresty
访问“http://192.168.2.236/v1/ad/click/?pubid=123&campid=456&gaid=abcdefg”
logstash会输出类似如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"remote_addr" => "192.168.2.182",
"referer" => 0,
"wesub_clickid" => "3fe63613-83b1-474b-bd29-2d21d8709980",
"wesub_offer_geo" => "XY",
"campid" => 456,
"gaid" => "abcdefg",
"wesub_publisher_payout" => 0.252,
"type" => "we_click_access",
"wesub_source" => 2,
"@timestamp" => 2017-09-06T09:14:33.000Z,
"wesub_payout" => 0.252,
"@version" => "1",
"pubid" => 123,
"status" => 200
}

至此我们就完成了从openresty日志内容

1
192.168.2.182 - - [06/Sep/2017:09:14:33 +0000] "GET /v1/ad/click/?pubid=123&campid=456&gaid=abcdefg HTTP/1.1" 200 43 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36" "-" "3fe63613-83b1-474b-bd29-2d21d8709980:,12345:,45678:,XY:,0.252:,0.252:,2:,sub1::sub2,::,sub3"

一直到logstash转换后输出上面的内容 (此处应有雷鸣般的掌声!)

下面我们只需要把logstash的输出内容配置为输出到elasticsearch集群即可
sudo vim /etc/logstash/conf.d/click_to_elk.conf 修改output的内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
output{
if [type] == "we_click_access" {
elasticsearch {
hosts => ["192.168.2.234:9200", "192.168.2.235:9200", "192.168.2.236:9200"]
index => "logstash-click-%{+YYYY.MM.dd}"
document_type => "%{type}"
template => "/etc/logstash/elasticsearch-template-es5x-click.json"
template_name => "elasticsearch-template-es5x-click"
template_overwrite => true
http_compression => true
}
}
}

设置elasticsearch的模板文件
sudo vim /etc/logstash/elasticsearch-template-es5x-click.json 加入如下内容
备注:线上环境可以调整number_of_shards、number_of_replicas、index.refresh_interval设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
{
"template" : "logstash-click*",
"version" : 50001,
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 0,
"index.refresh_interval" : "60s"
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : true, "norms" : false},
"dynamic_templates" : [ {
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false
}
}
} ],
"properties" : {
"@timestamp": { "type": "date", "include_in_all": false },
"@version": { "type": "keyword", "include_in_all": false },
"geoip" : {
"dynamic": true,
"properties" : {
"ip": { "type": "ip" },
"location" : { "type" : "geo_point" },
"latitude" : { "type" : "half_float" },
"longitude" : { "type" : "half_float" }
}
},
"campid": {
"type": "long"
},
"referer": {
"type": "long"
},
"wesub_payout": {
"type": "half_float"
},
"wesub_publisher_payout": {
"type": "half_float"
},
"remote_addr": {
"norms": false,
"fields": {
"keyword": {
"type": "keyword"
}
},
"type": "text"
},
"gaid": {
"norms": false,
"fields": {
"keyword": {
"type": "keyword"
}
},
"type": "text"
},
"wesub_source": {
"type": "long"
},
"pubid": {
"type": "long"
},
"type": {
"norms": false,
"fields": {
"keyword": {
"type": "keyword"
}
},
"type": "text"
},
"status": {
"type": "long"
},
"wesub_offer_geo": {
"norms": false,
"fields": {
"keyword": {
"type": "keyword"
}
},
"type": "text"
},
"wesub_clickid": {
"norms": false,
"fields": {
"keyword": {
"type": "keyword"
}
},
"type": "text"
}
}
}
}
}

正式启动

1
sudo service logstash start

监控

查看logstash运行状态,包括input多少数据及使用的时间、各个filter使用的时间、output多少数据及使用的时间等等

1
curl -XGET '192.168.2.233:9600/_node/stats?pretty' #(可以看到logstash各种状态及吞吐量)

由于我在logstash的filter中使用了grok,其效率较低,可能成为瓶颈,如果发现filter使用的时间较长,可以优先考虑优化filter中的配置,也可以考虑增加logstash节点试试
如果发现output到elasticsearch占用较长时间,导致整体吞吐量上不去,就要考虑优化elasticsearch方面的配置

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章