logstash/docs/static/advanced-pipeline.asciidoc
Matthias Bastian 951e242290 fix callout numbering
GitHub's rendering is quite ok, just a single callout number is wrong. On https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html the rendered numbering gets quite confusing.

Fixes #5146
2016-04-20 19:03:44 +00:00

612 lines
24 KiB
Text
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

[[advanced-pipeline]]
=== Setting Up an Advanced Logstash Pipeline
A Logstash pipeline in most use cases has one or more input, filter, and output plugins. The scenarios in this section
build Logstash configuration files to specify these plugins and discuss what each plugin is doing.
The Logstash configuration file defines your _Logstash pipeline_. When you start a Logstash instance, use the
`-f <path/to/file>` option to specify the configuration file that defines that instances pipeline.
A Logstash pipeline has two required elements, `input` and `output`, and one optional element, `filter`. The input
plugins consume data from a source, the filter plugins modify the data as you specify, and the output plugins write
the data to a destination.
image::static/images/basic_logstash_pipeline.png[]
The following text represents the skeleton of a configuration pipeline:
[source,shell]
--------------------------------------------------------------------------------
# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}
--------------------------------------------------------------------------------
This skeleton is non-functional, because the input and output sections dont have any valid options defined. The
examples in this tutorial build configuration files to address specific use cases.
Paste the skeleton into a file named `first-pipeline.conf` in your home Logstash directory.
[[parsing-into-es]]
==== Parsing Apache Logs into Elasticsearch
This example creates a Logstash pipeline that takes Apache web logs as input, parses those logs to create specific,
named fields from the logs, and writes the parsed data to an Elasticsearch cluster.
You can download the sample data set used in this example
https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz[here]. Unpack this file.
[float]
[[configuring-file-input]]
==== Configuring Logstash for File Input
To start your Logstash pipeline, configure the Logstash instance to read from a file using the
{logstash}plugins-inputs-file.html[file] input plugin.
Edit the `first-pipeline.conf` file to add the following text:
[source,json]
--------------------------------------------------------------------------------
input {
file {
path => "/path/to/logstash-tutorial.log"
start_position => beginning <1>
ignore_older => 0 <2>
}
}
--------------------------------------------------------------------------------
<1> The default behavior of the file input plugin is to monitor a file for new information, in a manner similar to the
UNIX `tail -f` command. To change this default behavior and process the entire file, we need to specify the position
where Logstash starts processing the file.
<2> The default behavior of the file input plugin is to ignore files whose last modification is greater than 86400s. To change this default behavior and process the tutorial file (which date can be much older than a day), we need to specify to not ignore old files.
Replace `/path/to/` with the actual path to the location of `logstash-tutorial.log` in your file system.
[float]
[[configuring-grok-filter]]
===== Parsing Web Logs with the Grok Filter Plugin
The {logstash}plugins-filters-grok.html[`grok`] filter plugin is one of several plugins that are available by default in
Logstash. For details on how to manage Logstash plugins, see the <<working-with-plugins,reference documentation>> for
the plugin manager.
Because the `grok` filter plugin looks for patterns in the incoming log data, configuration requires you to make
decisions about how to identify the patterns that are of interest to your use case. A representative line from the web
server log sample looks like this:
[source,shell]
--------------------------------------------------------------------------------
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
--------------------------------------------------------------------------------
The IP address at the beginning of the line is easy to identify, as is the timestamp in brackets. In this tutorial, use
the `%{COMBINEDAPACHELOG}` grok pattern, which structures lines from the Apache log using the following schema:
[horizontal]
*Information*:: *Field Name*
IP Address:: `clientip`
User ID:: `ident`
User Authentication:: `auth`
timestamp:: `timestamp`
HTTP Verb:: `verb`
Request body:: `request`
HTTP Version:: `httpversion`
HTTP Status Code:: `response`
Bytes served:: `bytes`
Referrer URL:: `referrer`
User agent:: `agent`
Edit the `first-pipeline.conf` file to add the following text:
[source,json]
--------------------------------------------------------------------------------
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
--------------------------------------------------------------------------------
After processing, the sample line has the following JSON representation:
[source,json]
--------------------------------------------------------------------------------
{
"clientip" : "83.149.9.216",
"ident" : ,
"auth" : ,
"timestamp" : "04/Jan/2015:05:13:42 +0000",
"verb" : "GET",
"request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"httpversion" : "HTTP/1.1",
"response" : "200",
"bytes" : "203023",
"referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/",
"agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
}
--------------------------------------------------------------------------------
[float]
[[indexing-parsed-data-into-elasticsearch]]
===== Indexing Parsed Data into Elasticsearch
Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an
Elasticsearch cluster. Edit the `first-pipeline.conf` file to add the following text after the `input` section:
[source,json]
--------------------------------------------------------------------------------
output {
elasticsearch {
}
}
--------------------------------------------------------------------------------
With this configuration, Logstash uses http protocol to connect to Elasticsearch. The above example assumes Logstash
and Elasticsearch to be running on the same instance. You can specify a remote Elasticsearch instance using `hosts`
configuration like `hosts => "es-machine:9092"`.
[float]
[[configuring-geoip-plugin]]
===== Enhancing Your Data with the Geoip Filter Plugin
In addition to parsing log data for better searches, filter plugins can derive supplementary information from existing
data. As an example, the {logstash}plugins-filters-geoip.html[`geoip`] plugin looks up IP addresses, derives geographic
location information from the addresses, and adds that location information to the logs.
Configure your Logstash instance to use the `geoip` filter plugin by adding the following lines to the `filter` section
of the `first-pipeline.conf` file:
[source,json]
--------------------------------------------------------------------------------
geoip {
source => "clientip"
}
--------------------------------------------------------------------------------
The `geoip` plugin configuration requires data that is already defined as separate fields. Make sure that the `geoip`
section is after the `grok` section of the configuration file.
Specify the name of the field that contains the IP address to look up. In this tutorial, the field name is `clientip`.
[float]
[[testing-initial-pipeline]]
===== Testing Your Initial Pipeline
At this point, your `first-pipeline.conf` file has input, filter, and output sections properly configured, and looks
like this:
[source,json]
--------------------------------------------------------------------------------
input {
file {
path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset"
start_position => beginning
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {}
stdout {}
}
--------------------------------------------------------------------------------
To verify your configuration, run the following command:
[source,shell]
--------------------------------------------------------------------------------
bin/logstash -f first-pipeline.conf --configtest
--------------------------------------------------------------------------------
The `--configtest` option parses your configuration file and reports any errors. When the configuration file passes
the configuration test, start Logstash with the following command:
[source,shell]
--------------------------------------------------------------------------------
bin/logstash -f first-pipeline.conf
--------------------------------------------------------------------------------
Try a test query to Elasticsearch based on the fields created by the `grok` filter plugin:
[source,shell]
--------------------------------------------------------------------------------
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200'
--------------------------------------------------------------------------------
Replace $DATE with the current date, in YYYY.MM.DD format.
Since our sample has just one 200 HTTP response, we get one hit back:
[source,json]
--------------------------------------------------------------------------------
{"took":2,
"timed_out":false,
"_shards":{"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.5351382,
"hits":[{"_index":"logstash-2015.07.30",
"_type":"logs",
"_id":"AU7gqOky1um3U6ZomFaF",
"_score":1.5351382,
"_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"@version":"1",
"@timestamp":"2015-07-30T20:30:41.265Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"83.149.9.216",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:13:45 +0000",
"verb":"GET",
"request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png",
"httpversion":"1.1",
"response":"200",
"bytes":"52878",
"referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
}
}]
}
}
--------------------------------------------------------------------------------
Try another search for the geographic information derived from the IP address:
[source,shell]
--------------------------------------------------------------------------------
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo'
--------------------------------------------------------------------------------
Replace $DATE with the current date, in YYYY.MM.DD format.
Only one of the log entries comes from Buffalo, so the query produces a single response:
[source,json]
--------------------------------------------------------------------------------
{"took":3,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.03399,
"hits":[{"_index":"logstash-2015.07.31",
"_type":"logs",
"_id":"AU7mK3CVSiMeBsJ0b_EP",
"_score":1.03399,
"_source":{
"message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"",
"@version":"1",
"@timestamp":"2015-07-31T22:11:22.347Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"108.174.55.234",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:27:45 +0000",
"verb":"GET",
"request":"/?flav=rss20",
"httpversion":"1.1",
"response":"200",
"bytes":"29941",
"referrer":"\"-\"",
"agent":"\"-\"",
"geoip":{
"ip":"108.174.55.234",
"country_code2":"US",
"country_code3":"USA",
"country_name":"United States",
"continent_code":"NA",
"region_name":"NY",
"city_name":"Buffalo",
"postal_code":"14221",
"latitude":42.9864,
"longitude":-78.7279,
"dma_code":514,
"area_code":716,
"timezone":"America/New_York",
"real_region_name":"New York",
"location":[-78.7279,42.9864]
}
}
}]
}
}
--------------------------------------------------------------------------------
[[multiple-input-output-plugins]]
==== Multiple Input and Output Plugins
The information you need to manage often comes from several disparate sources, and use cases can require multiple
destinations for your data. Your Logstash pipeline can use multiple input and output plugins to handle these
requirements.
This example creates a Logstash pipeline that takes input from a Twitter feed and the Filebeat client, then
sends the information to an Elasticsearch cluster as well as writing the information directly to a file.
[float]
[[twitter-configuration]]
==== Reading from a Twitter feed
To add a Twitter feed, you need several pieces of information:
* A _consumer_ key, which uniquely identifies your Twitter app, which is Logstash in this case.
* A _consumer secret_, which serves as the password for your Twitter app.
* One or more _keywords_ to search in the incoming feed.
* An _oauth token_, which identifies the Twitter account using this app.
* An _oauth token secret_, which serves as the password of the Twitter account.
Visit https://dev.twitter.com/apps to set up a Twitter account and generate your consumer key and secret, as well as
your OAuth token and secret.
Use this information to add the following lines to the `input` section of the `first-pipeline.conf` file:
[source,json]
--------------------------------------------------------------------------------
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
--------------------------------------------------------------------------------
[float]
[[configuring-lsf]]
==== The Filebeat Client
The https://github.com/elastic/beats/tree/master/filebeat[filebeat] client is a lightweight, resource-friendly tool that
collects logs from files on the server and forwards these logs to your Logstash instance for processing. The
Filebeat client uses the secure Beats protocol to communicate with your Logstash instance. The
lumberjack protocol is designed for reliability and low latency. Filebeat uses the computing resources of
the machine hosting the source data, and the {logstash}plugins-inputs-beats.html[Beats input] plugin minimizes the
resource demands on the Logstash instance.
NOTE: In a typical use case, Filebeat runs on a separate machine from the machine running your
Logstash instance. For the purposes of this tutorial, Logstash and Filebeat are running on the
same machine.
Default Logstash configuration includes the {logstash}plugins-inputs-beats.html[Beats input plugin], which is
designed to be resource-friendly. To install Filebeat on your data source machine, download the
appropriate package from the Filebeat https://www.elastic.co/downloads/beats/filebeat[product page].
Create a configuration file for Filebeat similar to the following example:
[source,shell]
--------------------------------------------------------------------------------
filebeat:
prospectors:
-
paths:
- "/path/to/sample-log" <1>
fields:
type: syslog
output:
logstash:
hosts: ["localhost:5043"]
tls:
certificate: /path/to/ssl-certificate.crt <2>
certificate_key: /path/to/ssl-certificate.key
certificate_authorities: /path/to/ssl-certificate.crt
timeout: 15
--------------------------------------------------------------------------------
<1> Path to the file or files that Filebeat processes.
<2> Path to the SSL certificate for the Logstash instance.
Save this configuration file as `filebeat.yml`.
Configure your Logstash instance to use the Filebeat input plugin by adding the following lines to the `input` section
of the `first-pipeline.conf` file:
[source,json]
--------------------------------------------------------------------------------
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert" <1>
ssl_key => "/path/to/ssl-key" <2>
}
--------------------------------------------------------------------------------
<1> Path to the SSL certificate that the Logstash instance uses to authenticate itself to Filebeat.
<2> Path to the key for the SSL certificate.
[float]
[[logstash-file-output]]
==== Writing Logstash Data to a File
You can configure your Logstash pipeline to write data directly to a file with the
{logstash}plugins-outputs-file.html[`file`] output plugin.
Configure your Logstash instance to use the `file` output plugin by adding the following lines to the `output` section
of the `first-pipeline.conf` file:
[source,json]
--------------------------------------------------------------------------------
file {
path => /path/to/target/file
}
--------------------------------------------------------------------------------
[float]
[[multiple-es-nodes]]
==== Writing to multiple Elasticsearch nodes
Writing to multiple Elasticsearch nodes lightens the resource demands on a given Elasticsearch node, as well as
providing redundant points of entry into the cluster when a particular node is unavailable.
To configure your Logstash instance to write to multiple Elasticsearch nodes, edit the output section of the `first-pipeline.conf` file to read:
[source,json]
--------------------------------------------------------------------------------
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
}
--------------------------------------------------------------------------------
Use the IP addresses of three non-master nodes in your Elasticsearch cluster in the host line. When the `hosts`
parameter lists multiple IP addresses, Logstash load-balances requests across the list of addresses. Also note that
default port for Elasticsearch is `9200` and can be omitted in the configuration above.
[float]
[[testing-second-pipeline]]
===== Testing the Pipeline
At this point, your `first-pipeline.conf` file looks like this:
[source,json]
--------------------------------------------------------------------------------
input {
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert"
ssl_key => "/path/to/ssl-key"
}
}
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => /path/to/target/file
}
}
--------------------------------------------------------------------------------
Logstash is consuming data from the Twitter feed you configured, receiving data from Filebeat, and
indexing this information to three nodes in an Elasticsearch cluster as well as writing to a file.
At the data source machine, run Filebeat with the following command:
[source,shell]
--------------------------------------------------------------------------------
sudo ./filebeat -e -c filebeat.yml -d "publish"
--------------------------------------------------------------------------------
Filebeat will attempt to connect on port 5403. Until Logstash starts with an active Beats plugin, there
wont be any answer on that port, so any messages you see regarding failure to connect on that port are normal for now.
To verify your configuration, run the following command:
[source,shell]
--------------------------------------------------------------------------------
bin/logstash -f first-pipeline.conf --configtest
--------------------------------------------------------------------------------
The `--configtest` option parses your configuration file and reports any errors. When the configuration file passes
the configuration test, start Logstash with the following command:
[source,shell]
--------------------------------------------------------------------------------
bin/logstash -f first-pipeline.conf
--------------------------------------------------------------------------------
Use the `grep` utility to search in the target file to verify that information is present:
[source,shell]
--------------------------------------------------------------------------------
grep Mozilla /path/to/target/file
--------------------------------------------------------------------------------
Run an Elasticsearch query to find the same information in the Elasticsearch cluster:
[source,shell]
--------------------------------------------------------------------------------
curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla'
--------------------------------------------------------------------------------
[[stalled-shutdown]]
=== Stalled Shutdown Detection
Shutting down a running Logstash instance involves the following steps:
* Stop all input, filter and output plugins
* Process all in-flight events
* Terminate the Logstash process
The following conditions affect the shutdown process:
* An input plugin receiving data at a slow pace.
* A slow filter, like a Ruby filter executing `sleep(10000)` or an Elasticsearch filter that is executing a very heavy
query.
* A disconnected output plugin that is waiting to reconnect to flush in-flight events.
These situations make the duration and success of the shutdown process unpredictable.
Logstash has a stall detection mechanism that analyzes the behavior of the pipeline and plugins during shutdown.
This mechanism produces periodic information about the count of inflight events in internal queues and a list of busy
worker threads.
To enable Logstash to forcibly terminate in the case of a stalled shutdown, use the `--allow-unsafe-shutdown` flag when
you start Logstash.
[[shutdown-stall-example]]
==== Stall Detection Example
In this example, slow filter execution prevents the pipeline from clean shutdown. By starting Logstash with the
`--allow-unsafe-shutdown` flag, quitting with *Ctrl+C* results in an eventual shutdown that loses 20 events.
========
[source,shell]
% bin/logstash -e 'input { generator { } } filter { ruby { code => "sleep 10000" } } \
output { stdout { codec => dots } }' -w 1 --allow-unsafe-shutdown
Default settings used: Filter workers: 1
Logstash startup completed
^CSIGINT received. Shutting down the pipeline. {:level=>:warn}
Received shutdown signal, but pipeline is still waiting for in-flight events
to be processed. Sending another ^C will force quit Logstash, but this may cause
data loss. {:level=>:warn}
{:level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"input_to_filter"=>20, "total"=>20},
"STALLING_THREADS"=>
{["LogStash::Filters::Ruby", {"code"=>"sleep 10000"}]=>[{"thread_id"=>15,
"name"=>"|filterworker.0", "current_call"=>"
(ruby filter code):1:in `sleep'"}]}}
The shutdown process appears to be stalled due to busy or blocked plugins. Check
the logs for more information.
{:level=>:error}
{:level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"input_to_filter"=>20, "total"=>20},
"STALLING_THREADS"=>
{["LogStash::Filters::Ruby", {"code"=>"sleep 10000"}]=>[{"thread_id"=>15,
"name"=>"|filterworker.0", "current_call"=>"
(ruby filter code):1:in `sleep'"}]}}
{:level=>:warn, "INFLIGHT_EVENT_COUNT"=>{"input_to_filter"=>20, "total"=>20},
"STALLING_THREADS"=>
{["LogStash::Filters::Ruby", {"code"=>"sleep 10000"}]=>[{"thread_id"=>15,
"name"=>"|filterworker.0", "current_call"=>"
(ruby filter code):1:in `sleep'"}]}}
Forcefully quitting logstash.. {:level=>:fatal}
========
When `--allow-unsafe-shutdown` isn't enabled, Logstash continues to run and produce these reports periodically.