mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
648 lines
20 KiB
Text
648 lines
20 KiB
Text
[[transformation]]
|
|
== Transforming Data
|
|
|
|
With over 200 plugins in the Logstash plugin ecosystem, it's sometimes
|
|
challenging to choose the best plugin to meet your data processing needs.
|
|
In this section, we've collected a list of popular plugins and organized them
|
|
according to their processing capabilities:
|
|
|
|
* <<core-operations>>
|
|
* <<data-deserialization>>
|
|
* <<field-extraction>>
|
|
* <<lookup-enrichment>>
|
|
|
|
Also see <<filter-plugins>> and <<codec-plugins>> for the full list of available
|
|
data processing plugins.
|
|
|
|
[[core-operations]]
|
|
=== Performing Core Operations
|
|
|
|
The plugins described in this section are useful for core operations, such as
|
|
mutating and dropping events.
|
|
|
|
<<plugins-filters-date,date filter>>::
|
|
|
|
Parses dates from fields to use as Logstash timestamps for events.
|
|
+
|
|
The following config parses a field called `logdate` to set the Logstash
|
|
timestamp:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
date {
|
|
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-filters-drop,drop filter>>::
|
|
|
|
Drops events. This filter is typically used in combination with conditionals.
|
|
+
|
|
The following config drops `debug` level log messages:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
if [loglevel] == "debug" {
|
|
drop { }
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-filters-fingerprint,fingerprint filter>>::
|
|
|
|
Fingerprints fields by applying a consistent hash.
|
|
+
|
|
The following config fingerprints the `IP`, `@timestamp`, and `message` fields
|
|
and adds the hash to a metadata field called `generated_id`:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
fingerprint {
|
|
source => ["IP", "@timestamp", "message"]
|
|
method => "SHA1"
|
|
key => "0123"
|
|
target => "[@metadata][generated_id]"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-filters-mutate,mutate filter>>::
|
|
|
|
Performs general mutations on fields. You can rename, remove, replace, and
|
|
modify fields in your events.
|
|
+
|
|
The following config renames the `HOSTORIP` field to `client_ip`:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
mutate {
|
|
rename => { "HOSTORIP" => "client_ip" }
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
The following config strips leading and trailing whitespace from the specified
|
|
fields:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
mutate {
|
|
strip => ["field1", "field2"]
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-filters-ruby,ruby filter>>::
|
|
|
|
Executes Ruby code.
|
|
+
|
|
The following config executes Ruby code that cancels 90% of the events:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
ruby {
|
|
code => "event.cancel if rand <= 0.90"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
[[data-deserialization]]
|
|
=== Deserializing Data
|
|
|
|
The plugins described in this section are useful for deserializing data into
|
|
Logstash events.
|
|
|
|
<<plugins-codecs-avro,avro codec>>::
|
|
|
|
Reads serialized Avro records as Logstash events. This plugin deserializes
|
|
individual Avro records. It is not for reading Avro files. Avro files have a
|
|
unique format that must be handled upon input.
|
|
+
|
|
The following config deserializes input from Kafka:
|
|
+
|
|
[source,json]
|
|
----------------------------------
|
|
input {
|
|
kafka {
|
|
codec => {
|
|
avro => {
|
|
schema_uri => "/tmp/schema.avsc"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
...
|
|
----------------------------------
|
|
|
|
|
|
<<plugins-filters-csv,csv filter>>::
|
|
|
|
Parses comma-separated value data into individual fields. By default, the
|
|
filter autogenerates field names (column1, column2, and so on), or you can specify
|
|
a list of names. You can also change the column separator.
|
|
+
|
|
The following config parses CSV data into the field names specified in the
|
|
`columns` field:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
csv {
|
|
separator => ","
|
|
columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ]
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
<<plugins-codecs-fluent,fluent codec>>::
|
|
|
|
Reads the Fluentd `msgpack` schema.
|
|
+
|
|
The following config decodes logs received from `fluent-logger-ruby`:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
input {
|
|
tcp {
|
|
codec => fluent
|
|
port => 4000
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-codecs-json,json codec>>::
|
|
|
|
Decodes (via inputs) and encodes (via outputs) JSON formatted content, creating
|
|
one event per element in a JSON array.
|
|
+
|
|
The following config decodes the JSON formatted content in a file:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
input {
|
|
file {
|
|
path => "/path/to/myfile.json"
|
|
codec =>"json"
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-codecs-protobuf,protobuf codec>>::
|
|
|
|
Reads protobuf encoded messages and converts them to Logstash events. Requires
|
|
the protobuf definitions to be compiled as Ruby files. You can compile them by
|
|
using the
|
|
https://github.com/codekitchen/ruby-protocol-buffers[ruby-protoc compiler].
|
|
+
|
|
The following config decodes events from a Kafka stream:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
input
|
|
kafka {
|
|
zk_connect => "127.0.0.1"
|
|
topic_id => "your_topic_goes_here"
|
|
codec => protobuf {
|
|
class_name => "Animal::Unicorn"
|
|
include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
|
|
}
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
<<plugins-filters-xml,xml filter>>::
|
|
|
|
Parses XML into fields.
|
|
+
|
|
The following config parses the whole XML document stored in the `message` field:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
xml {
|
|
source => "message"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
|
|
[[field-extraction]]
|
|
=== Extracting Fields and Wrangling Data
|
|
|
|
The plugins described in this section are useful for extracting fields and
|
|
parsing unstructured data into fields.
|
|
|
|
<<plugins-filters-dissect,dissect filter>>::
|
|
|
|
Extracts unstructured event data into fields by using delimiters. The dissect
|
|
filter does not use regular expressions and is very fast. However, if the
|
|
structure of the data varies from line to line, the grok filter is more
|
|
suitable.
|
|
+
|
|
For example, let's say you have a log that contains the following message:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
The following config dissects the message:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
dissect {
|
|
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
After the dissect filter is applied, the event will be dissected into the following
|
|
fields:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
{
|
|
"msg" => "Starting system activity accounting tool...",
|
|
"@timestamp" => 2017-04-26T19:33:39.257Z,
|
|
"src" => "localhost",
|
|
"@version" => "1",
|
|
"host" => "localhost.localdomain",
|
|
"pid" => "1",
|
|
"message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
|
|
"type" => "stdin",
|
|
"prog" => "systemd",
|
|
"ts" => "Apr 26 12:20:02"
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
<<plugins-filters-kv,kv filter>>::
|
|
|
|
Parses key-value pairs.
|
|
+
|
|
For example, let's say you have a log message that contains the following
|
|
key-value pairs:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
ip=1.2.3.4 error=REFUSED
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
The following config parses the key-value pairs into fields:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
kv { }
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
After the filter is applied, the event in the example will have these fields:
|
|
+
|
|
* `ip: 1.2.3.4`
|
|
* `error: REFUSED`
|
|
|
|
|
|
<<plugins-filters-grok,grok filter>>::
|
|
|
|
Parses unstructured event data into fields. This tool is perfect for syslog
|
|
logs, Apache and other webserver logs, MySQL logs, and in general, any log
|
|
format that is generally written for humans and not computer consumption.
|
|
Grok works by combining text patterns into something that matches your
|
|
logs.
|
|
+
|
|
For example, let's say you have an HTTP request log that contains
|
|
the following message:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
55.3.244.1 GET /index.html 15824 0.043
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
The following config parses the message into fields:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
grok {
|
|
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
After the filter is applied, the event in the example will have these fields:
|
|
+
|
|
* `client: 55.3.244.1`
|
|
* `method: GET`
|
|
* `request: /index.html`
|
|
* `bytes: 15824`
|
|
* `duration: 0.043`
|
|
|
|
TIP: If you need help building grok patterns, try the
|
|
{kibana-ref}/xpack-grokdebugger.html[Grok Debugger]. The Grok Debugger is an
|
|
{xpack} feature under the Basic License and is therefore *free to use*.
|
|
|
|
|
|
[[lookup-enrichment]]
|
|
=== Enriching Data with Lookups
|
|
|
|
These plugins can help you enrich data with
|
|
additional info, such as GeoIP and user agent info:
|
|
|
|
* <<dns-def,dns filter>>
|
|
* <<es-def,elasticsearch filter>>
|
|
* <<geoip-def,geoip filter>>
|
|
* <<http-def,http filter>>
|
|
* <<jdbc-static-def,jdbc_static filter>>
|
|
* <<jdbc-stream-def,jdbc_streaming filter>>
|
|
* <<memcached-def,memcached filter>>
|
|
* <<translate-def,translate filter>>
|
|
* <<useragent-def,useragent filter>>
|
|
|
|
[float]
|
|
[[lookup-plugins]]
|
|
=== Lookup plugins
|
|
|
|
[[dns-def]]dns filter::
|
|
|
|
The <<plugins-filters-dns,dns filter plugin>> performs a standard or reverse DNS lookup.
|
|
+
|
|
The following config performs a reverse lookup on the address in the
|
|
`source_host` field and replaces it with the domain name:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
dns {
|
|
reverse => [ "source_host" ]
|
|
action => "replace"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
[[es-def]]elasticsearch filter::
|
|
|
|
The <<plugins-filters-elasticsearch,elasticsearch filter>> copies fields from previous log events in Elasticsearch to current events.
|
|
+
|
|
The following config shows a complete example of how this filter might
|
|
be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch
|
|
filter to find the matching "start" event based on some operation identifier.
|
|
Then it copies the `@timestamp` field from the "start" event into a new field on
|
|
the "end" event. Finally, using a combination of the date filter and the
|
|
ruby filter, the code in the example calculates the time duration in hours
|
|
between the two events.
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------
|
|
if [type] == "end" {
|
|
elasticsearch {
|
|
hosts => ["es-server"]
|
|
query => "type:start AND operation:%{[opid]}"
|
|
fields => { "@timestamp" => "started" }
|
|
}
|
|
date {
|
|
match => ["[started]", "ISO8601"]
|
|
target => "[started]"
|
|
}
|
|
ruby {
|
|
code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil'
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
|
|
[[geoip-def]]geoip filter::
|
|
|
|
The <<plugins-filters-geoip,geoip filter>> adds geographical information about the location of IP addresses. For example:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
geoip {
|
|
source => "clientip"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
After the geoip filter is applied, the event will be enriched with geoip fields.
|
|
For example:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
geoip {
|
|
source => "clientip"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
[[http-def]]http filter::
|
|
|
|
The <<plugins-filters-http,http filter>> integrates with external web
|
|
services/REST APIs, and enables lookup enrichment against any HTTP service or
|
|
endpoint. This plugin is well suited for many enrichment use cases, such as
|
|
social APIs, sentiment APIs, security feed APIs, and business service APIs.
|
|
//+
|
|
//[source,txt]
|
|
//-----
|
|
//filter {
|
|
// http {
|
|
// url => "http://example.com"
|
|
// verb => GET
|
|
// body => {
|
|
// "user-id" => "%{user}"
|
|
// "api-key" => "%{api_key}"
|
|
// }
|
|
// body_format => "json"
|
|
// headers =>
|
|
// "Content-type" => "application/json"
|
|
// }
|
|
// target_body => "new_field"
|
|
// }
|
|
//}
|
|
//-----
|
|
|
|
[[jdbc-static-def]]jdbc_static filter::
|
|
|
|
The <<plugins-filters-jdbc_static,jdbc_static filter>> enriches events with data pre-loaded from a remote database.
|
|
+
|
|
The following example fetches data from a remote database, caches it in a local
|
|
database, and uses lookups to enrich events with data cached in the local
|
|
database.
|
|
+
|
|
["source","json",subs="callouts"]
|
|
-----
|
|
filter {
|
|
jdbc_static {
|
|
loaders => [ <1>
|
|
{
|
|
id => "remote-servers"
|
|
query => "select ip, descr from ref.local_ips order by ip"
|
|
local_table => "servers"
|
|
},
|
|
{
|
|
id => "remote-users"
|
|
query => "select firstname, lastname, userid from ref.local_users order by userid"
|
|
local_table => "users"
|
|
}
|
|
]
|
|
local_db_objects => [ <2>
|
|
{
|
|
name => "servers"
|
|
index_columns => ["ip"]
|
|
columns => [
|
|
["ip", "varchar(15)"],
|
|
["descr", "varchar(255)"]
|
|
]
|
|
},
|
|
{
|
|
name => "users"
|
|
index_columns => ["userid"]
|
|
columns => [
|
|
["firstname", "varchar(255)"],
|
|
["lastname", "varchar(255)"],
|
|
["userid", "int"]
|
|
]
|
|
}
|
|
]
|
|
local_lookups => [ <3>
|
|
{
|
|
id => "local-servers"
|
|
query => "select descr as description from servers WHERE ip = :ip"
|
|
parameters => {ip => "[from_ip]"}
|
|
target => "server"
|
|
},
|
|
{
|
|
id => "local-users"
|
|
query => "select firstname, lastname from users WHERE userid = :id"
|
|
parameters => {id => "[loggedin_userid]"}
|
|
target => "user" <4>
|
|
}
|
|
]
|
|
# using add_field here to add & rename values to the event root
|
|
add_field => { server_name => "%{[server][0][description]}" }
|
|
add_field => { user_firstname => "%{[user][0][firstname]}" } <5>
|
|
add_field => { user_lastname => "%{[user][0][lastname]}" }
|
|
remove_field => ["server", "user"]
|
|
jdbc_user => "logstash"
|
|
jdbc_password => "example"
|
|
jdbc_driver_class => "org.postgresql.Driver"
|
|
jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
|
|
jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
|
|
}
|
|
}
|
|
-----
|
|
<1> Queries an external database to fetch the dataset that will be cached
|
|
locally.
|
|
<2> Defines the columns, types, and indexes used to build the local database
|
|
structure. The column names and types should match the external database.
|
|
<3> Performs lookup queries on the local database to enrich the events.
|
|
<4> Specifies the event field that will store the looked-up data. If the lookup
|
|
returns multiple columns, the data is stored as a JSON object within the field.
|
|
<5> Takes data from the JSON object and stores it in top-level event fields for
|
|
easier analysis in Kibana.
|
|
|
|
[[jdbc-stream-def]]jdbc_streaming filter::
|
|
|
|
The <<plugins-filters-jdbc_streaming,jdbc_streaming filter>> enriches events with database data.
|
|
+
|
|
The following example executes a SQL query and stores the result set in a field
|
|
called `country_details`:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
jdbc_streaming {
|
|
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
|
|
jdbc_driver_class => "com.mysql.jdbc.Driver"
|
|
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
|
|
jdbc_user => "me"
|
|
jdbc_password => "secret"
|
|
statement => "select * from WORLD.COUNTRY WHERE Code = :code"
|
|
parameters => { "code" => "country_code"}
|
|
target => "country_details"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
[[memcached-def]]memcached filter::
|
|
|
|
The <<plugins-filters-memcached,memcached filter>> enables key/value lookup
|
|
enrichment against a Memcached object caching system.
|
|
It supports both read (GET) and write (SET) operations. It is a notable addition
|
|
for security analytics use cases.
|
|
|
|
[[translate-def]]translate filter::
|
|
|
|
The <<plugins-filters-translate,translate filter>> replaces field contents based on replacement values specified in a hash or file.
|
|
Currently supports these file types: YAML, JSON, and CSV.
|
|
+
|
|
The following example takes the value of the `response_code` field, translates
|
|
it to a description based on the values specified in the dictionary, and then
|
|
removes the `response_code` field from the event:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
translate {
|
|
field => "response_code"
|
|
destination => "http_response"
|
|
dictionary => {
|
|
"200" => "OK"
|
|
"403" => "Forbidden"
|
|
"404" => "Not Found"
|
|
"408" => "Request Timeout"
|
|
}
|
|
remove_field => "response_code"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
|
|
[[useragent-def]]useragent filter::
|
|
|
|
The <<plugins-filters-useragent,useragent filter>> parses user agent strings into fields.
|
|
+
|
|
The following example takes the user agent string in the `agent` field, parses
|
|
it into user agent fields, and adds the user agent fields to a new field called
|
|
`user_agent`. It also removes the original `agent` field:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
filter {
|
|
useragent {
|
|
source => "agent"
|
|
target => "user_agent"
|
|
remove_field => "agent"
|
|
}
|
|
}
|
|
--------------------------------------------------------------------------------
|
|
+
|
|
After the filter is applied, the event will be enriched with user agent fields.
|
|
For example:
|
|
+
|
|
[source,json]
|
|
--------------------------------------------------------------------------------
|
|
"user_agent": {
|
|
"os": "Mac OS X 10.12",
|
|
"major": "50",
|
|
"minor": "0",
|
|
"os_minor": "12",
|
|
"os_major": "10",
|
|
"name": "Firefox",
|
|
"os_name": "Mac OS X",
|
|
"device": "Other"
|
|
}
|
|
--------------------------------------------------------------------------------
|