ELK logstash处理流程(23rd)

  • A+
所属分类:ELK

logstash处理事件有三个阶段:input ---> filter ---> output。input产生事件,filter 对事件进行修改,output输出到其它地方。input和output支持解码,可以对进入的或者退出管道的数据进行编码或解码而无需单独经过过滤器处理。

input

常用的input有:

  • file:从文件系统中读取文件,类似于Linux下的tail -0F。
  • syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。
  • redis:从redis服务器读取,同时使用redis channel和redis list。
  • beats: 通过Filebeat发送事件。

其它的一些input有:

logstash-input-beats

logstash-input-couchdb_changes

logstash-input-elasticsearch

logstash-input-eventlog

logstash-input-exec

logstash-input-file

logstash-input-ganglia

logstash-input-gelf

logstash-input-generator

logstash-input-graphite

logstash-input-heartbeat

logstash-input-http

logstash-input-imap

logstash-input-irc

logstash-input-jdbc

logstash-input-kafka

logstash-input-log4j

logstash-input-lumberjack

logstash-input-pipe

logstash-input-rabbitmq

logstash-input-redis

logstash-input-s3

logstash-input-snmptrap

logstash-input-sqs

logstash-input-stdin

logstash-input-syslog

logstash-input-tcp

logstash-input-twitter

logstash-input-udp

logstash-input-unix

logstash-input-xmpp

logstash-input-zeromq

filter

filter是logstash管道中间处理的设备。可以结合条件语句对符合标准的事件进行处理。

一些有用的过滤器如下:

  • grok: 解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。
  • mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
  • drop: 完全丢弃事件,如debug事件。
  • clone: 复制事件,可能添加或者删除字段。
  • geoip: 添加有关IP地址地理位置信息。

其它filter有:

logstash-filter-anonymize

logstash-filter-checksum

logstash-filter-clone

logstash-filter-csv

logstash-filter-date

logstash-filter-dns

logstash-filter-drop

logstash-filter-fingerprint

logstash-filter-geoip

logstash-filter-grok

logstash-filter-json

logstash-filter-kv

logstash-filter-metrics

logstash-filter-multiline

logstash-filter-mutate

logstash-filter-ruby

logstash-filter-sleep

logstash-filter-split

logstash-filter-syslog_pri

logstash-filter-throttle

logstash-filter-urldecode

logstash-filter-useragent

logstash-filter-uuid

logstash-filter-xml

output

output是logstash管道的最后一个阶段。一个事件可以经过多个output。但是一旦所有输出处理完,该事件已经执行完。

常用的output有:

  • elasticsearch: 发送事件数据到 Elasticsearch。如果要将数据保存在一个高效、便捷、易于查询的格式,elasticsearch将是不二人选。
  • file: 将事件数据写入到磁盘文件上。
  • graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
  • statsd: 发送事件数据到 statsd。

其它output有:

logstash-output-cloudwatch

logstash-output-csv

logstash-output-elasticsearch

logstash-output-email

logstash-output-exec

logstash-output-file

logstash-output-ganglia

logstash-output-gelf

logstash-output-graphite

logstash-output-hipchat

logstash-output-http

logstash-output-irc

logstash-output-juggernaut

logstash-output-kafka

logstash-output-lumberjack

logstash-output-nagios

logstash-output-nagios_nsca

logstash-output-null

logstash-output-opentsdb

logstash-output-pagerduty

logstash-output-pipe

logstash-output-rabbitmq

logstash-output-redis

logstash-output-s3

logstash-output-sns

logstash-output-sqs

logstash-output-statsd

logstash-output-stdout

logstash-output-tcp

logstash-output-udp

logstash-output-xmpp

logstash-output-zeromq

Coders

大众化的编码器有json、msgpack、plain(text)。

  • json: 以json格式编码或者解码数据。
  • multiline: 合并多行文本事件,如java异常和堆栈跟踪信息到一个单一事件。

其它coders有:

logstash-codec-collectd

logstash-codec-dots

logstash-codec-edn

logstash-codec-edn_lines

logstash-codec-es_bulk

logstash-codec-fluent

logstash-codec-graphite

logstash-codec-json

logstash-codec-json_lines

logstash-codec-line

logstash-codec-msgpack

logstash-codec-multiline

logstash-codec-netflow

logstash-codec-oldlogstashjson

logstash-codec-plain

logstash-codec-rubydebug

故障容错

事件从一个管道到另一个管道使用内部的Ruby SizedQueue队列实现的。一个SizedQueue有最大的项目数。当队列达到最大值,所有的写入队列将会被阻塞。

logstash设置每个队列大小为20。这意味着最多20个事件可以挂起进入下一个阶段,这可以防止数据丢失和保持logstash作为一个数据存储系统。这些内部队列不用于长期存放信息。

小队列意味着当logstash任务繁重或者管道临时有问题时,更容易堵塞。当出现问题时,要么队列不限制要么丢弃信息。队列不限制时,会无限的增长一直超出内存大小,导致崩溃,从而队列中的所有信息丢失。在多数情况下,丢弃消息也是不希望接受的。

大多数output会不断尝试受故障影响的事件。output失败或者下游服务的问题如磁盘满、权限问题、网络故障、服务中止。

如果output失败,output线程等待直到output能成功发送消息。output停止从output队列读取,这意味着事件填满了队列。

当output队列满了,过滤器是被阻塞的,因此它们不能写入新的事件到输出队列。虽然写入到output队列被阻塞了,过滤器停止从filter队列读取。最终,可能会导致filter队列(input--->filter)满。

一个满的filter队列,阻塞input写入到filter。这将导致所有input停止处理数据无论是新的事件。

在理想的情况下,这种行为类似于当tcp窗口关闭为0。没有新的数据发送,因为接收器还没有处理完当前队列的数据,直到下游(output)问题解决,消息重新流动起来。

线程模型

当前logstash线程模型是:

input threads | filter worker threads | output worker

过滤器是可选的,没有filter是这样的:

input threads | output worker

每个input运行在自身的一个线程。这可以防止繁忙的input阻塞慢的。

filter线程模型是工人模型,每个工人接收到一个事件,并应用于所有过滤器,再将其发送到output队列。在CPU可扩展性,因为许多过滤器是CPU密集型的。filter workers默认是1,在启动logstash时,可以通过-w指定。

output当前工作模式是一个线程。output接收事件的顺序是以配置文件定义的输出顺序。

output在发布它们之前可能会决定临时缓存事件。这方面elasticsearch output是一个例子,缓存事件和刷新它们使用一个单独的线程。该机制(缓存很多事件和写使用单独的线程)可以改善性能,因为可以防止logstash管道一直等待elasticsearch响应。

资源使用

logstash通常至少有3个线程,如果没有filter只有2个。一个input线程,一个filter worker线程,和一个output线程。如果看到logstash使用多个CPU,这就是原因。如果想要了解每个线程在做神马,应该看看Debugging Java Performance这篇文章。java线程有名称,可以使用jstack和top找出谁在使用这些资源。

这篇全理论,枯燥无味。

weinxin
微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~
默北

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: