Logstash 介绍

1. 日志分析挑战

日志由各种各样的资源生成,比如系统,应用程序,设备等等。通常有两部分构成:

  • 时间戳(事件发生的时间)
  • 数据(事件信息)

日志通常的作用:

  • 故障排查:当应用或系统出现问题时,首先要查找的就是日志。
  • 了解系统/应用程序行为:当系统或应用运行时,我们不知道里面发生了什么,这时,就通常依赖日志来进行查看
  • 审计:用于记录是否正常用户登陆,以及非法尝试登陆的记录。
  • 预测分析:随着大数据以及人工智能的发展,预测分析变得越来越流行。通过产生的历史数据,来预测未来可能发生的事情。预测分析使组织变得积极主动和前瞻性思维。预测分析的一些用例包括:建议用户购买 什么商品,检测是否欺诈,优化营销活动等等。

通过上面的了解,我们知道日志的好处,同样,也有一些挑战:

  • 没有一致的格式:由于系统和应用的繁多,产生的日志也是五花八门,由于格式的不同,搜索也将变得困难。
  • 日志分散:一般情况下,日志都各自存储在本地服务器,随着设备资源的增多,日志也变得越来越多。
  • 没有一致的时间格式:每个系统/应用程序都以自己的方式定义记录时间,因此很多确定事件发生的确切事件。
  • 非结构化数据:由于日志格式的不同,也就是没有统一的日志规范,很难对数据进行搜索分析。

2. Logstash 介绍

Logstash是一个开源数据收集引擎,具有实时流水线功能。Logstash可以动态统一来自不同来源的数据,并将数据规范化为您选择的目的地。

虽然Logstash最初推动了日志收集方面的创新,但其功能远远超出了该用例。任何类型的事件都可以通过广泛的输入,过滤和输出插件进行丰富和转换,许多本机编解码器进一步简化了摄取过程。

Logstash 的一些显著特点如下:

  • 可插拔的管道架构:Logstash 包含超过200个由Elastic和开放源码社区开发的插件,这些插件可用于混合、匹配和编排不同的输入、过滤器和输出,同时构建数据处理管道。
  • 日志和指标:处理所有类型的日志记录数据,如 Apache、Nginx、系统和窗口事件日志。享受与Filebeat互补的安全日志转发功能,通过TCP和UDP从Ganglia、collectd、NetFlow、JMX和许多其他基础设施和应用程序平台收集指标。Logstash数据处理管道可以很容易地水平伸缩,并且从Logstash 5开始支持持久队列,因此能够可靠地处理大量传入事件/数据。
  • 协同:目前 Logstash 与 Elasticsearch、Beats和Kibana有很强的协同作用,因此可以轻松构建端到端日志分析解决方案。

Logstach 架构

Logstash 事件处理管道分为三个阶段:输入,过滤和输出。其中输入和输出是必需元素。

  • Inputs:一般比较常用的是 file 或 beats 。
  • Filters:主要是用来筛选数据,格式化数据等。
  • Outputs:这里用的比较多的是 elasticsearch 。

输入和输出支持编解码器,它允许您在数据进入或退出管道时对数据进行编码或解码,而无需使用单独的过滤器。

默认情况下,Logstash 使用管道中的内存有界队列(过滤输入和输出)来缓冲事件。如果 Logstash 意外终止,那么存储在内存中的事件就可能会丢失。为了防止数据丢失,Logstash 同样支持事件的持久化。

[success]可以通过配置文件 logstash.yml 中的 queue.type: persisted 设置持久化。

Logstash 管道配置文件以 .conf 结尾,主要包括三个部分:

1
2
3
4
5
6
7
8
9
10
11
input {

}

fileter {

}

output {

}

每个部分都包含一个或多个插件配置。

//例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ vim simple.conf
input {
stdin {}
}

filter {
mutate {
uppercase => [ "message" ]
}
}

output {
stdout {
codec => rubydebug
}
}

//运行

1
logstash -f ./simple.conf

Logstach 插件

Logstash 包含丰富的输入、过滤、输出插件。默认情况下,作为 Logstash 的一部分,许多通用插件都是开箱即用的。

1. 插件的常见操作

1
2
3
4
5
$ /usr/share/logstash/bin/logstsh-plugin list          //显示所有已安装的插件
$ logstash-plugin list --group filter //显示关于 filter 插件组
$ logstash-plugin list 'grok' //显示包含 grok 的插件
$ logstash-plugin install logstash-output-email
$ logstash-plugin update logstash-output-s3

(1)Input plugins

输入插件用于配置一组事件,这些事件将被提供给Logstash。该插件允许您配置单个或多个输入源。它充当第一部分,这在Logstash配置文件中是必需的。

插件列表:https://www.elastic.co/guide/en/logstash/7.3/input-plugins.html

1)File

文件插件用于逐行的从文件中传输事件。它的工作方式与 tail -f 类似。对于每个文件,它跟踪文件中的更改,并且只发送上次读取之后的数据,还会自动检测日志的切割。

文件插件会记录每个文件的当前位置。它是通过一个名为 sincedb 的单独文件来记录,位置为 /usr/share/logstash/data/plugins/inputs/file/.sincedb* ,可以通过修改 sincedb_path 插件来进行覆盖。

[success]有的时候,也许没那么快显示出来,可以稍后看,文件内容分别为:inode、主设备号、次要设备号、文件大小、偏移量(不肯定)、文件

//例1

1
2
3
4
5
6
7
8
9
10
11
input {
file {
path => "/var/log/secure"
}
}

output {
stdout {
codec => rubydebug
}
}

//例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
file {
path => ["/var/log/secure", "/var/log/*.log"]
start_position => "beginning"
exclude => [".csv"]
type => "applogs"
}
}

output {
stdout {
codec => rubydebug
}
}

参数 start_position => "beginning"sincedb_path => "NULL"将强制文件从头开始流式传输。

[success] ‘type => “applogs”‘ 添加新的附加字段。在转换过滤器插件中的事件或标识输出中的事件时,添加其他字段会很有帮助。

2)Beats

Beats是一组轻量级守护进程的集合,它们收集来自服务器的操作数据,并将其发送到配置输出,如Logstash、Elasticsearch、Redis等。常见的 Beats 如:Metricbeat、Filebeat、Winlogbeat 等等。

通过使用 beats输入插件,我们可以让 Logstash 监听一个 beats 连接所需的端口。

//例1

1
2
3
4
5
6
7
8
9
10
11
input {
beats {
port => 5044
}
}

output {
stdout {
codec => rubydebug
}
}

端口是这个插件唯一需要的设置。

//例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
beats {
port => 5044
}
beats {
port => 5055
}
}

output {
stdout {
codec => rubydebug
}
}

[success]-r 在运行Logstash时使用该标志允许您在对其进行更改并保存时自动重新加载配置。这在测试新配置时很有用,因为您可以对其进行修改,以便每次对配置进行更改时都不需要手动启动Logstash。

(2)Filter plugins

过滤器插件用于对数据执行转换过滤。它允许您组合一个或多个插件,插件的顺序定义了数据转换的顺序。它充当输入和输出之间的中间部分,是Logstash配置中的一个可选部分。

插件列表:https://www.elastic.co/guide/en/logstash/7.3/filter-plugins.html

[danger]这个后面会单独详细介绍,知识点比较多

(3)Output plugins

输出插件用于将数据发送到目标。输出插件允许您配置单个或多个输出源。它们充当最后一部分,这在Logstash配置文件中是必需的。

插件列表:https://www.elastic.co/guide/en/logstash/7.3/output-plugins.html

1)Elasticsearch

此插件是将事件/日志数据从 Logstash 推送到 Elasticsearch 的推荐方法。不配置 elasticsearch 主机的话,默认会尝试连接 localhost:9200 。

例1:

1
2
3
4
5
6
7
8
9
input {
stdin{
}
}

output {
elasticsearch {
}
}

例2:

1
2
3
4
5
6
7
8
9
10
11
12
input {
stdin{
}
}

output {
elasticsearch {
index => "company"
# document_type => "employee" # 该类型新版本已经取消
hosts => "192.168.20.177:9200"
}
}

如果未指定索引,默认情况下,索引模式是 logstash-%(+YYYY.MM.dd) ,document_type 是 _doc

(4)Codec pluginsCodec

插件用于输入和输出编码或解码。输入编解码器提供了在数据输入之前对其进行解码。输出编解码器提供在数据离开输出之前对其进行编码。

插件列表:https://www.elastic.co/guide/en/logstash/7.3/codec-plugins.html

1)JSON

如果数据由.json文档组成,并且用于以.json格式编码(如果用于输出插件)或解码(如果用于输入插件)数据,则此编解码器非常有用。如果发送的数据是JSON数组的根,那么将创建多个事件(即每个元素一个)。

1
2
3
4
5
input{
stdin{
codec => "json"
}
}

[success]如果有多个JSON记录,并且这些记录由\n分隔,那么使用json_lines编解码器。

如果json编解码器从无效json输入接收有效负载,那么它将返回到纯文本并添加一个JSON parse error标记。

2) Rubydebug

这个编解码器将使用Ruby Awesome Print库输出Logstash事件数据。

1
2
3
4
5
output{
stdout{
codec => "rubydebug"
}
}

Filter

过滤器插件用于对数据执行转换。它允许我们组合一个或多个插件,插件的顺序定义了数据转换的顺序。

从输入插件生成的事件经过filter部分中定义的每个插件,在此期间,它根据定义的插件转换事件。最后,它被发送到输出插件以将事件发送到适当的目的地。

下面我们看一些常用的过滤器插件。

1. CSV filter

这个过滤器对于解析 .csv 文件很有用。这个插件接受一个包含CSV数据的事件,解析并将其存储为单独的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ cat amazon2.csv
"id","title","description","manufacturer","price"
"b000jz4hqo"," clickart",," broderbund",7.9
"b0006zf55o","ca international ","oem arcserve backup","computer associates",8.99
"b00004tkvy"," noah's",,"victory multimedia",3

$ cat csv.conf
input {
file {
path => "/root/amazon2.csv"
start_position => "beginning"
}
}

filter {
csv {
autodetect_column_names => true
}
}

output {
stdout {
codec => rubydebug
}
}

2. Mutate filter

此过滤器对字段执行一般的转换。事件中的字段可以重命名、转换、剥离和修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ cat csv2.conf
input {
file {
path => "/root/amazon2.csv"
start_position => "beginning"
sincedb_path => "NULL"
}
}

filter {
csv {
autodetect_column_names => true
}
mutate {
convert => {
"price" => "integer"
}
rename => {
"id" => "newid"
"title" => "biaoti"
}
gsub => [
"newid", "b", "B"
]
strip => [ "description","biaoti"]
uppercase => [ "manufacturer" ]
}
}


output {
stdout {
codec => rubydebug
}
}

convert:更改字段的数据类型。可以转换的类型有:整数、字符串、浮点数和布尔值。
rename:重命名一个或多个字段。后续的操作需使用重命名后的新字段
gsub:根据字段值匹配正则表达式,并用替换字符串替换所有的匹配。
strip:用于去除开头和结尾的空白。
uppercase:将字符串转为大写。

类似地,还有更多的操作,如 lowercase、update、replace、join 和 merge。

参考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

3. Grok 过滤器

这是一个强大的,经常用的插件,用于解析所述非结构化数据转换为结构化数据,从而使数据易于查询/过滤。简单来说,Grok是一种根据模式匹配一行的方法(基于正则表达式),并将该行的特定部分映射到专用字段。grok模式的一般语法如下:

1
2
%{PATTERN:FIELDNAME}          //基本匹配模式,默认字段是字符串
%{PATTERN:FIELDNAME:type} //字段类型转换为指定类型

默认情况下,Logstash ships只有大约120个模式。这些模式是可重用和可扩展的。您可以通过组合现有模式创建自定义模式。这些模式基于Oniguruma正则表达式库。

//模式由标签和正则表达式组成

1
USERNAME [a-zA-Z0-9._-]+

//模式还可以包含其他模式

1
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

[success]模式的完整列表:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns , 或者: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

//如果模式不符合自己的需求,还可以使用正则表达式

1
(?<field_name>regex)

例1:

1
regex (?<phone>\d\d\d\d\d\d\d\d\d\d\d)        //匹配 12345678901

例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ cat test.log
2019-08-19T09:03:30.000+00:00 user1 001 this is test data
2019-08-19T09:05:30.000+00:00 user2 003 this is test error data

$ cat grok_test.conf
input {
file {
path => "/root/test.log"
start_position => "beginning"
sincedb_path => "NULL"
}
}

filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:eventtime} %{USERNAME:userid} %{GREEDYDATA:data}"
}
}
}

output {
stdout {
codec => rubydebug
}
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
{
"userid" => "user1",
"@timestamp" => 2019-08-19T02:09:39.890Z,
"eventtime" => "2019-08-19T09:03:30.000+00:00",
"path" => "/root/test.log",
"message" => "2019-08-19T09:03:30.000+00:00 user1 001 this is test data",
"data" => "001 this is test data",
"@version" => "1",
"host" => "c720177.xiodi.cn"
}
...

如果模式与文本不匹配,它将向 tags 字段添加 _grokparsefailure 标记。

1
2
3
4
5
6
7
8
9
10
{
"host" => "c720177.xiodi.cn",
"@version" => "1",
"message" => "2019-08-19T09:03:30.000+00:00 user1 001 this is test data",
"@timestamp" => 2019-08-19T01:13:00.646Z,
"path" => "/root/test.log",
"tags" => [
[0] "_grokparsefailure"
]
}

[success]需要注意的是,匹配的时候,要注意空格是否都已经匹配。

Debug grok:http://grokdebug.herokuapp.com/ ,现在 kibana 内置也有类似的工具。