ELK栈初见-2

我们在之前聊完了ELK栈第一个部分ElasticSearch的索引链程序Lucene , 也对搜索引擎有了一个大体的认知, 解下来就来看看剩下的组件吧.

ELK栈是怎么工作的?

一图胜千言.

ELK_mspaint

我们在我们的Server上安装收集日志的Agent (也可以叫做shipper), 通过一个消息队列发送给Logstash, 这里的消息队列一般是redis, 当然也可以是其他典型的MQ. Logstash可以进行数据清洗, 接着把处理的数据发送到我们的ES集群中, 最后, 由Kibana提供一个美观用户友好的管理接口.

接下来就来看看这个Logstash吧.

Logstash

Logstash基于JRuby研发, 因此也是需要跑在JVM之上的. 对于Logstash的工作模式, 其实就是一个**pipeline**. 这个管道就是承载输入输出过滤操作的载体. 首先, 输入将事件发送到一个默认存在于内存中的中央队列(也可以是磁盘), 每一个pipeline工作线程过会从这个队列中抓取批量事件, 接着按照我们定义的过滤器来执行, 最后发送到输出中. 默认情况下, 这些事件都是存在于内存中的, 因此一旦发生错误, 数据就会丢失. 所以Logstash也可以将发送的事件持久化到磁盘上. 我们Logstash的Server端是严重基于插件工作的. 插件用来定义输入, 编码, 过滤器, 输出等等.

安装Logstash很简单, 我们之前已经加入了elastic的yum仓库, 因此直接安装就可以了.

安装完成后, 我们可以来看一下安装生成配置文件:

1
2
3
4
5
6
7
[root@node1 logstash]# rpm -qc logstash
/etc/logstash/jvm.options
/etc/logstash/log4j2.properties
/etc/logstash/logstash-sample.conf
/etc/logstash/logstash.yml
/etc/logstash/pipelines.yml
/etc/logstash/startup.option

是不是感觉和Elasticsearch很类似? 这里我们主要关注的就是官方给出的conf示例和logstash.yml就行了.

简单的看一下这个示例配置, 你会发现, 定义一个logstash最基本的方法就是给出一个输入和一个输出就好了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@node1 ~]# cat /etc/logstash/logstash-sample.conf 
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
beats {
port => 5044
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}

这里能很清楚的看到, 这个配置文件是域配置类型的, 很简单就能读懂, 通过beats来采集日志, 并且把它输出到一个本机的ES中. 我们可以仿照着这个配置文件写一个最基本的标准输入输出做两端的配置然后跑一下试试:

1
2
3
4
5
6
7
8
[root@node1 logstash]# cat config/conf.d/simple.conf 
input {
stdin {}
}

output {
stdout {}
}

效果如下:

1
2
3
4
5
6
7
Hello, Logstash!
{
"host" => "node1",
"message" => "Hello, Logstash!",
"@version" => "1",
"@timestamp" => 2020-05-24T21:02:44.793Z
}

这里面会显示来源主机, 完整信息, 版本号和时间戳.

当然, 除了输入输出, 我们也可以配置过滤器. Logstash支持四种类型的插件配置:

  • input
  • filter
  • codec
  • output

这些插件都会有各种配置项, 而这些配置项支持的数据类型有:

1
2
3
4
5
6
7
8
9
Array: [item1, item2, ...]
Boolean: true: false
Bytes
Codec: 定义编码器
Hash: 字典
Number
Password
Path: 文件系统路径
String: 字符串

另外也支持: 字段引用: [], 条件判断(包括正则和包含, 并且可以使用复合表达式)

既然我们说Logstash是基于插件的, 因此我们要研究的东西就是插件了.

Logstash的各种插件

Input

File

从指定的文件中读取事件流, 这个插件的工作性质类似于tail -1 -f. 这个插件使用FileWatch(Ruby Gem库)监听文件的变化. 而文件读取位置一些信息会被保存在一个叫做.sincedb的数据库, 这样就保证了不会出现漏读的情况.

此外, File还可以识别日志的滚动, 跟随文件到最新的.

一个最简单的例子:

1
2
3
4
5
6
7
input {
file {
path => ["/var/log/messages"]
type -> "system"
start_position => "beginning"
}
}

其中type属性是所有插件共用的, 对于file插件来说, 只有path是一个必要项. 接下来我们还是补充一个标准输出. 好让logstash可以运行.

然后我们在另外一个SSH终端登出登入一下就会看到有变化了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"@timestamp" => 2020-05-24T22:49:49.574Z,
"message" => "May 25 06:49:49 node1 systemd-logind: Removed session 31.",
"host" => "node1",
"path" => "/var/log/messages",
"@version" => "1",
"type" => "system"
}
{
"@timestamp" => 2020-05-24T22:49:57.715Z,
"message" => "May 25 06:49:56 node1 systemd: Started Session 32 of user root.",
"host" => "node1",
"path" => "/var/log/messages",
"@version" => "1",
"type" => "system"
}
{
"@timestamp" => 2020-05-24T22:49:57.717Z,
"message" => "May 25 06:49:56 node1 systemd-logind: New session 32 of user root.",
"host" => "node1",
"path" => "/var/log/messages",
"@version" => "1",
"type" => "system"
}

udp

我们也可以通过udp协议从网络连接来读取message. 通过传递logstash需要监听的地址和端口就可以了. udp使用也很简单, 只需要指明必要的端口号即可.

怎么测试效果呢?我们可以找另外一个主机然后安装一个collectd. 这个程序就可以周期性的发送收集到的系统信息. 通过配置一个Network插件, 配置上我们logstash的地址和端口就可以了. 这里就跳过了.

redis

当然也可以从redis中读取数据, 支持redis channel和lists两种方式.

目前logstash支持的输入插件十分繁多, 需要用时还是直接官网查询就可.

Filter

接下来来看看过滤插件, 之前也说过了, filter是logstash在将event通过output发出之前对其实现某些处理功能. 插件繁多, 我们这里就先介绍一个: grok.

grok可以说是在我们必然会使用到的一个插件, 他可以将非结构化的文本数据变成结构化的, 方便日后分析.

另外, 还有另外一个也可以做到这一点的插件, 叫做Dissect. 区别是, 他不使用正则表达式, 因此更快速, 更适用于当日志文件是重复的情况下.

默认情况下, logstash已经内置支持了大约120种的模式, 包括我们熟悉的mysql, nginx, apache httpd等等等等. 基于这些模式, 我们进行匹配就可以将文本信息进行拆解.

举个例子, apache httpd的pattern就是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
HTTPDUSER %{EMAILADDRESS}|%{USER}
HTTPDERROR_DATE %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}

# Log formats
HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
HTTPD_COMBINEDLOG %{HTTPD_COMMONLOG} %{QS:referrer} %{QS:agent}

# Error logs
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:message}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel}\] \[pid %{POSINT:pid}(:tid %{NUMBER:tid})?\]( \(%{POSINT:proxy_errorcode}\)%{DATA:proxy_message}:)?( \[client %{IPORHOST:clientip}:%{POSINT:clientport}\])?( %{DATA:errorcode}:)? %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

# Deprecated
COMMONAPACHELOG %{HTTPD_COMMONLOG}
COMBINEDAPACHELOG %{HTTPD_COMBINEDLOG}

grok语法格式就是%{SYNTAX:SEMANTIC} 也就是预定义的模式名称和匹配到的文本的自定义标识符. 如果没有我们需要的, 就需要自定义了.

我们来用一下试试. 首先写好配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@node1 conf.d]# cat fromhttpd.conf 
input {
file {
path => "/var/log/http/access_log"
}
}

filter {
grok {
match => { "message" => "%{HTTPD_COMMONLOG}" }
}
}

output {
stdout {}
}

接着我们随便访问一次看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"@version" => "1",
"httpversion" => "1.1",
"clientip" => "192.168.10.101",
"bytes" => "4897",
"timestamp" => "25/May/2020:07:48:47 +0800",
"message" => "192.168.10.101 - - [25/May/2020:07:48:47 +0800] \"GET / HTTP/1.1\" 403 4897 \"-\" \"curl/7.29.0\"",
"verb" => "GET",
"path" => "/var/log/httpd/access_log",
"response" => "403",
"auth" => "-",
"ident" => "-",
"@timestamp" => 2020-05-24T23:48:47.965Z,
"request" => "/",
"host" => "node1"
}

可以看到, 整条信息会被当做message, 其他的已经被拆分出来了.

Output

接下来我们再聊聊输出.

redis

你会发现, redis既可以作为输入, 也可以作为输出. 回到我们一开始说的ELK的架构, 对于logstash的agent端而言, redis就是他的输出, 而对于server端, redis就是输入了. 简单的说, redis就是一个broker.

redis的配置也是十分简单的. 一般情况下我们只要说明一下数据类型和key就可以.

这里我们先略过, 来看一下另一个十分重要的output然后我们会在下一节综合使用这些插件.

elasticsearch

重要的输出插件当然就是本家的ES了. ES也没有什么必要的值, 一般情况下我们可以传递index的值以及es的位置就好了. 另外这个地方也可以使用https, 只要加上CA证书相关的配置就可以了.

ELK实例

接下来我们就来完整的把这个架构跑一下.

首先我们来说下实验的环境, 我们使用2个节点, 其中:

Node1 (192.168.10.101): httpd+logstash+redis

Node2 (192.168.10.102): logstash+elasticsearch+kibana

话不多说, 现在就开始配置两个节点.

在节点1上, 我们配置logstash的配置文件像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@node1 ~]# cat /usr/share/logstash/config/conf.d/fromhttpd.conf 
input {
file {
path => "/var/log/httpd/access_log"
type => "access"
}
}

filter {
grok {
match => { "message" => "%{HTTPD_COMMONLOG}" }
}
}

output {
redis {
data_type => "list"
host => "192.168.10.101"
key => "httpd-%{type}"
}
}

在节点2上, 我们的logstash配置像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node2 ~]# cat /usr/share/logstash/config/conf.d/toes.conf 
input {
redis {
host => "192.168.10.101"
data_type => "list"
key => "httpd-access"
}
}

output {
elasticsearch {
hosts => ["192.168.10.102"]
index => "elk-%{+yyyy.MM.dd}"
}
}

接着我们启动所有程序, 刷新一下web page, 这样日志就会增加, 然后由node1的logstash进行了处理推到了redis中. 接着, node2的logstash从redis中拿取数据交给了ES节点. 然后由kibana进行展示.

初次启动kibana需要让我们配置index和排序依据, 配置好了之后, 我们就可以看到记录了:

kibana_discover_show_1

我们可以搜索筛选, 上面还有时间和数量的展示图. 左侧有我们所有的域. 此外, 我们还以使用kibana进行数据可视化.

当然, 之前就说过, 我们可以使用kibana来对我们的集群进行管理, 监控

kibana_discover_show_1

以上.