logstash简明实用教程

一、logstash是什么

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。

Logstash常用于日志关系系统中做日志采集设备;
logstash
系统结构
logstash系统结构
Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成:inputs –> filters –> outputs

输入(inpust):必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、beats(如:Filebeats)
Logstash 支持 各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

过滤器(filters):可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
过滤器能实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 简化整体处理,不受数据源、格式或架构的影响

输出(outpus):必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd
Logstash 提供众多输出选择,可以将数据发送到指定的地方,并且能够灵活地解锁众多下游用例

其中inputs和outputs支持codecs(coder&decoder)在1.3.0 版之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,所以完整的数据流程应该是:input | decode | filter | encode | output;codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如:graphite、fluent、netflow、collectd,以及使用 msgpack、

二、为什么是logstash

Logstash和Flume对比
1、Logstash比较偏重于字段的预处理,在异常情况下可能会出现数据丢失,只是在运维日志场景下,一般认为这个可能不重要;而Flume偏重数据的传输,几乎没有数据的预处理,仅仅是数据的产生,封装成event然后传输;传输的时候flume比logstash多考虑了一些可靠性。因为数据会持久化在channel中,数据只有存储在下一个存储位置(可能是最终的存储位置,如HDFS;也可能是下一个Flume节点的channel),数据才会从当前的channel中删除。这个过程是通过事务来控制的,这样就保证了数据的可靠性。
2、Logstash有几十个插件,配置比较灵活,flume强调用户自定义开发;
3、Logstash的input和filter还有output之间都存在buffer,进行缓冲;Flume直接使用channel做持久化
4、Logstash性能以及资源消耗比较严重,且不支持缓存;
Logstash和Flume对比
详见参考:https://blog.csdn.net/songfeihu0810232/article/details/94406608

三、logstash安装与配置

1、logstash安装

环境要求:jdk1.8或以上
下载logstash软件包 logstash官方下载地址https://www.elastic.co/cn/downloads/logstash
下载后直接解压就可以用
如:tar zxvf logstash-7.6.0.tar.gz
解压后目录如下:
logstash目录
目录结构说明见官方文档:https://www.elastic.co/guide/en/logstash/7.1/dir-layout.html
解压以后可以对logstash进行简单的测试。

1
bin/logstash -e 'input { stdin { } } output { stdout {} }'

这条命令表示通过命令的方式指定logstash的输入输出分别是标准的输入输出,什么是标准的输入输出呢?就是控制台的输入输出。
等待启动成功,直接输入“hello world”
将会在控制台会有相应的输出
出现这些信息就表示logstash安装成功并且可以使用了。
logstash测试

2、在线安装logstash插件

虽然logstash默认安装了大部分的插件,但是有些插件没有默认安装,如logstash-output-syslog、logstash-output-jdbc

2.1. 安装Gem并更新

1
2
3
4
5
6
# yum install -y gem
# gem -v
2.0.14.1
# gem update --system
# gem -v
3.1.2

2.2 检查并修改镜像源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# gem sources -l
*** CURRENT SOURCES ***

https://rubygems.org/

# gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
https://gems.ruby-china.com/ added to sources
https://rubygems.org/ removed from sources

# cat ~/.gemrc
---
:backtrace: false
:bulk_threshold: 1000
:sources:
- https://gems.ruby-china.com/
:update_sources: true
:verbose: true
:concurrent_downloads: 8

请注意:国内的镜像站从https://gems.ruby-china.org 换成了 https://gems.ruby-china.com
!!!现在很多网上的资料就都是写的https://gems.ruby-china.org,导致很多人换了镜像源也装不上。

2.3. 修改 logstash的 gem 镜像源

cd到logstach的安装目录,可以看到Gemfile文件 vi Gemfile

1
2
3
4
5
6
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.

source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
......

更改默认的 https://rubygems.orghttps://gems.ruby-china.com

2.4、安装插件

安装插件

3、离线安装logstash插件

在实际应用过程中有些生产环境是封闭的网络环境,没法连接外网。这时候就需要离线安装插件。
说离线安装实际就是在一个有网络的环境下将插件装好,然后再装到离线环境中。
这里有两种方式,
一种就是在有网络的环境下将插件装好,将整个logsash包离线拷到生产环境。
另一种方式就是在有网络的环境下将插件装好后,将logstash的需要离线安装的插件打个离线包,然后再在生产环境进行离线安装。
离线插件包打包安装具体操作如下:

3.1 通过logstash-plugin prepare-offline-pack 命令将插件打成zip包

1
./logstash-plugin prepare-offline-pack --output logstash-output-syslog.zip logstash-output-syslog

打包
打包命令执行成功以后,可以看到在logstash的bin目录下有打成的离线插件zip包
打包后生成的离线包

3.2 通过logstash-plugin install 进行离线安装

1
bin/logstash-plugin install file:///home/logstash/logstash-7.6.2/tools/logstash-output-syslog.zip

安装后成功会有如下提示

1
2
Installing file: /home/logstash/logstash-7.6.2/tools/logstash-output-syslog.zip
Install successful

3.2 通过logstash-plugin list命令检查插件是否安装成功

logstash-plugin list

4、logstash简单配置说明

语法

Logstash 设计了自己的 DSL ——包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。

区段(section)

Logstash 用 {} 来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。示例如下:

1
2
3
4
input {
stdin {}
syslog {}
}

数据类型

Logstash 支持少量的数据值类型:

  • bool
    1
    debug => true
  • string
    1
    host => "hostname"
  • number
    1
    port => 514
  • array
    1
    match => ["datetime", "UNIX", "ISO8601"]
  • hash
    1
    2
    3
    4
    options => {
    key1 => "value1",
    key2 => "value2"
    }

    字段引用(field reference)

    如果想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 [] 里就行了,这就叫字段引用。
    对于 嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层的字段名都写在 [] 里就可以了。比如,你可以从 geoip 里这样获取 longitude 值:
    [geoip][location][0]
    logstash 的数组也支持倒序下标,即 [geoip][location][-1] 可以获取数组最后一个元素的值。
    Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
    “the longitude is %{[geoip][location][0]}”

条件判断(condition)

表达式支持下面这些操作符:
equality, etc: ==, !=, <, >, <=, >=
regexp: =, !
inclusion: in, not in
boolean: and, or, nand, xor
unary: !()
通常来说,你都会在表达式里用到字段引用。比如:
if “_grokparsefailure” not in [tags] {
} else if [status] !~ /^2\d\d/ and [url] == “/noc.gif” {
} else {
}

命令行参数

Logstash 提供了一个 shell 脚本叫 logstash 方便快速运行。它支持一下参数:
-e
意即执行。我们在 “Hello World” 的时候已经用过这个参数了。事实上可以不写任何具体配置,直接运行 bin/logstash -e ‘’ 达到相同效果。这个参数的默认值是下面这样:
input {
stdin { }
}
output {
stdout { }
}
–config 或 -f
意即文件。真实运用中,我们会写很长的配置,甚至可能超过 shell 所能支持的 1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。
此外,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
–configtest 或 -t
意即测试。用来测试 Logstash 读取到的配置文件语法是否能正常解析。Logstash 配置语法是用 grammar.treetop 定义的。尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。
–log 或 -l
意即日志。Logstash 默认输出日志到标准错误。生产环境下你可以通过 bin/logstash -l logs/logstash.log 命令来统一存储日志。
–filterworkers 或 -w
意即工作线程。Logstash 会运行多个线程。你可以用 bin/logstash -w 5 这样的方式强制 Logstash 为过滤插件运行 5 个线程。

四、logstash使用的几种典型的应用场景

1、通过logstash将syslog日志原始日志转发

syslog日志原始日志转发
在logstash的bin目录下新建配置文件

1
vi test-pipeline.conf

编辑input和output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input {
stdin{
type => "test-log"
}
syslog{
type => "test-log"
port => 514
}
}
output
{
stdout {
codec => rubydebug
}
syslog{
host => "192.168.2.185"
port => 514
}
}

这样就相当于把日志转发到了192.168.2.185这台机器的514端口
编辑好配置文件以后执行
./logstash -f test-pipeline.conf --config.test_and_exit 对配置文件进行检查,如果配置文件写得有问题,将会有错误提示。
配置文件检查没有问题后就可以启动logstash执行了./logstash -f test-pipeline.conf --config.reload.automatic
用udpsender工具往这台机器上发送日志信息,可以看到日志转发到192.168.2.185这台机器上了。
updsend发送日志
在控制台可以看到192.168.2.185,接收到了192.168.2.173转发过来的日志
控制台日志

2、通过rsyslog、logstash采集nginx等中间件的日志送到ES

logstash可以与rsyslog、filebeat等无缝结合采集nginx等中间件日志,送给数据存储。
通过rsyslog、logstash采集nginx等中间件的日志送到ES
具体参见:https://xiejava.gitee.io/posts/f3e97829/

3、通过logstash将日志入kafka再入mysql或ES

数据先放到kafka队列里缓存削峰,然后从kafka队列里读取数据到mysql或其他存储系统中进行保存。logstash将日志入kafka再入mysql或ES
具体参见:https://xiejava.gitee.io/posts/54e4fd14/

4、通过logstash进行日志补全后再转发或入库

采集原始日志以后,需要对原始日志进行调整合补齐,比如最常见的是根据IP来补齐IP的经纬度等信息。
logstash进行日志补全
这里就可以用logstash的geotip,也可以用其他的外部API接口,为了更具代表性的说明,这里调用的是第三方的http接口(http://ip-api.com/json/),这个接口也是可以自己定义的。
参考配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
input {
stdin { }
syslog {
port => "514"
}
}
filter {
grok{
#匹配获取IP
match => {"message" => "%{IPV4:ip}"}
}
http {
#调用外部接口获取IP的详细信息
url => "http://ip-api.com/json/%{ip}"
verb => "GET"
add_field => {
"new_field" => "new_static_value"
}
}
mutate {
replace => {
#这里对原始日志数据进行补全,如加了新的字段及从接口中获取的信息
"message" => "%{message}|%{ip}: My new message|%{new_field}|%{[body][as]}"
}
}
}
output {
stdout { }
syslog {
host => "192.168.2.173"
port => "7514"
}
}

通过http接口调用API取得数据,然后通过mutate重新组合补全信息,这里通过调用获取IP地址信息的API获取IP的信息,然后补全到原始日志中。
在这里插入图片描述

可以看出logstash是一个非常灵活好用的数据采集框架工具,可以通过简单的配置满足绝大多数数据采集场景的需求。

0%