logstash/docs/static/transforming-data.asciidoc
2019-02-12 22:38:45 +00:00

662 lines
19 KiB
Text

[[transformation]]
== Transforming Data
With over 200 plugins in the Logstash plugin ecosystem, it's sometimes
challenging to choose the best plugin to meet your data processing needs.
In this section, we've collected a list of popular plugins and organized them
according to their processing capabilities:
* <<core-operations>>
* <<data-deserialization>>
* <<field-extraction>>
* <<lookup-enrichment>>
Also see <<filter-plugins>> and <<codec-plugins>> for the full list of available
data processing plugins.
[[core-operations]]
=== Performing Core Operations
The plugins described in this section are useful for core operations, such as
mutating and dropping events.
<<plugins-filters-date,date filter>>::
Parses dates from fields to use as Logstash timestamps for events.
+
The following config parses a field called `logdate` to set the Logstash
timestamp:
+
[source,json]
--------------------------------------------------------------------------------
filter {
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
}
}
--------------------------------------------------------------------------------
<<plugins-filters-drop,drop filter>>::
Drops events. This filter is typically used in combination with conditionals.
+
The following config drops `debug` level log messages:
+
[source,json]
--------------------------------------------------------------------------------
filter {
if [loglevel] == "debug" {
drop { }
}
}
--------------------------------------------------------------------------------
<<plugins-filters-fingerprint,fingerprint filter>>::
Fingerprints fields by applying a consistent hash.
+
The following config fingerprints the `IP`, `@timestamp`, and `message` fields
and adds the hash to a metadata field called `generated_id`:
+
[source,json]
--------------------------------------------------------------------------------
filter {
fingerprint {
source => ["IP", "@timestamp", "message"]
method => "SHA1"
key => "0123"
target => "[@metadata][generated_id]"
}
}
--------------------------------------------------------------------------------
<<plugins-filters-mutate,mutate filter>>::
Performs general mutations on fields. You can rename, remove, replace, and
modify fields in your events.
+
The following config renames the `HOSTORIP` field to `client_ip`:
+
[source,json]
--------------------------------------------------------------------------------
filter {
mutate {
rename => { "HOSTORIP" => "client_ip" }
}
}
--------------------------------------------------------------------------------
+
The following config strips leading and trailing whitespace from the specified
fields:
+
[source,json]
--------------------------------------------------------------------------------
filter {
mutate {
strip => ["field1", "field2"]
}
}
--------------------------------------------------------------------------------
<<plugins-filters-ruby,ruby filter>>::
Executes Ruby code.
+
The following config executes Ruby code that cancels 90% of the events:
+
[source,json]
--------------------------------------------------------------------------------
filter {
ruby {
code => "event.cancel if rand <= 0.90"
}
}
--------------------------------------------------------------------------------
[[data-deserialization]]
=== Deserializing Data
The plugins described in this section are useful for deserializing data into
Logstash events.
<<plugins-codecs-avro,avro codec>>::
Reads serialized Avro records as Logstash events. This plugin deserializes
individual Avro records. It is not for reading Avro files. Avro files have a
unique format that must be handled upon input.
+
The following config deserializes input from Kafka:
+
[source,json]
----------------------------------
input {
kafka {
codec => {
avro => {
schema_uri => "/tmp/schema.avsc"
}
}
}
}
...
----------------------------------
<<plugins-filters-csv,csv filter>>::
Parses comma-separated value data into individual fields. By default, the
filter autogenerates field names (column1, column2, and so on), or you can specify
a list of names. You can also change the column separator.
+
The following config parses CSV data into the field names specified in the
`columns` field:
+
[source,json]
--------------------------------------------------------------------------------
filter {
csv {
separator => ","
columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ]
}
}
--------------------------------------------------------------------------------
<<plugins-codecs-fluent,fluent codec>>::
Reads the Fluentd `msgpack` schema.
+
The following config decodes logs received from `fluent-logger-ruby`:
+
[source,json]
--------------------------------------------------------------------------------
input {
tcp {
codec => fluent
port => 4000
}
}
--------------------------------------------------------------------------------
<<plugins-codecs-json,json codec>>::
Decodes (via inputs) and encodes (via outputs) JSON formatted content, creating
one event per element in a JSON array.
+
The following config decodes the JSON formatted content in a file:
+
[source,json]
--------------------------------------------------------------------------------
input {
file {
path => "/path/to/myfile.json"
codec =>"json"
}
--------------------------------------------------------------------------------
<<plugins-codecs-protobuf,protobuf codec>>::
Reads protobuf encoded messages and converts them to Logstash events. Requires
the protobuf definitions to be compiled as Ruby files. You can compile them by
using the
https://github.com/codekitchen/ruby-protocol-buffers[ruby-protoc compiler].
+
The following config decodes events from a Kafka stream:
+
[source,json]
--------------------------------------------------------------------------------
input
kafka {
zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
codec => protobuf {
class_name => "Animal::Unicorn"
include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
}
}
}
--------------------------------------------------------------------------------
<<plugins-filters-xml,xml filter>>::
Parses XML into fields.
+
The following config parses the whole XML document stored in the `message` field:
+
[source,json]
--------------------------------------------------------------------------------
filter {
xml {
source => "message"
}
}
--------------------------------------------------------------------------------
[[field-extraction]]
=== Extracting Fields and Wrangling Data
The plugins described in this section are useful for extracting fields and
parsing unstructured data into fields.
<<plugins-filters-dissect,dissect filter>>::
Extracts unstructured event data into fields by using delimiters. The dissect
filter does not use regular expressions and is very fast. However, if the
structure of the data varies from line to line, the grok filter is more
suitable.
+
For example, let's say you have a log that contains the following message:
+
[source,json]
--------------------------------------------------------------------------------
Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...
--------------------------------------------------------------------------------
+
The following config dissects the message:
+
[source,json]
--------------------------------------------------------------------------------
filter {
dissect {
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
}
}
--------------------------------------------------------------------------------
+
After the dissect filter is applied, the event will be dissected into the following
fields:
+
[source,json]
--------------------------------------------------------------------------------
{
"msg" => "Starting system activity accounting tool...",
"@timestamp" => 2017-04-26T19:33:39.257Z,
"src" => "localhost",
"@version" => "1",
"host" => "localhost.localdomain",
"pid" => "1",
"message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
"type" => "stdin",
"prog" => "systemd",
"ts" => "Apr 26 12:20:02"
}
--------------------------------------------------------------------------------
<<plugins-filters-kv,kv filter>>::
Parses key-value pairs.
+
For example, let's say you have a log message that contains the following
key-value pairs:
+
[source,json]
--------------------------------------------------------------------------------
ip=1.2.3.4 error=REFUSED
--------------------------------------------------------------------------------
+
The following config parses the key-value pairs into fields:
+
[source,json]
--------------------------------------------------------------------------------
filter {
kv { }
}
--------------------------------------------------------------------------------
+
After the filter is applied, the event in the example will have these fields:
+
* `ip: 1.2.3.4`
* `error: REFUSED`
<<plugins-filters-grok,grok filter>>::
Parses unstructured event data into fields. This tool is perfect for syslog
logs, Apache and other webserver logs, MySQL logs, and in general, any log
format that is generally written for humans and not computer consumption.
Grok works by combining text patterns into something that matches your
logs.
+
For example, let's say you have an HTTP request log that contains
the following message:
+
[source,json]
--------------------------------------------------------------------------------
55.3.244.1 GET /index.html 15824 0.043
--------------------------------------------------------------------------------
+
The following config parses the message into fields:
+
[source,json]
--------------------------------------------------------------------------------
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
--------------------------------------------------------------------------------
+
After the filter is applied, the event in the example will have these fields:
+
* `client: 55.3.244.1`
* `method: GET`
* `request: /index.html`
* `bytes: 15824`
* `duration: 0.043`
TIP: If you need help building grok patterns, try out the
{kibana-ref}/xpack-grokdebugger.html[Grok Debugger]. The Grok Debugger is an
{xpack} feature under the Basic License and is therefore *free to use*.
[[lookup-enrichment]]
=== Enriching Data with Lookups
These plugins can help you enriching data with
additional info, such as GeoIP and user agent info:
* dns filter
* elasticsearch filter
* geoip filter
* http filter
* jdbc_static filter
* jdbc_streaming filter
* memcached filter
* translate filter
* useragent filter
[float]
[[lookup-plugins]]
=== Lookup plugins
<<plugins-filters-dns,dns filter>>::
Performs a standard or reverse DNS lookup.
+
The following config performs a reverse lookup on the address in the
`source_host` field and replaces it with the domain name:
+
[source,json]
--------------------------------------------------------------------------------
filter {
dns {
reverse => [ "source_host" ]
action => "replace"
}
}
--------------------------------------------------------------------------------
<<plugins-filters-elasticsearch,elasticsearch filter>>::
Copies fields from previous log events in Elasticsearch to current events.
+
The following config shows a complete example of how this filter might
be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch
filter to find the matching "start" event based on some operation identifier.
Then it copies the `@timestamp` field from the "start" event into a new field on
the "end" event. Finally, using a combination of the date filter and the
ruby filter, the code in the example calculates the time duration in hours
between the two events.
+
[source,json]
--------------------------------------------------
if [type] == "end" {
elasticsearch {
hosts => ["es-server"]
query => "type:start AND operation:%{[opid]}"
fields => { "@timestamp" => "started" }
}
date {
match => ["[started]", "ISO8601"]
target => "[started]"
}
ruby {
code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil'
}
}
--------------------------------------------------
<<plugins-filters-geoip,geoip filter>>::
Adds geographical information about the location of IP addresses. For example:
+
[source,json]
--------------------------------------------------------------------------------
filter {
geoip {
source => "clientip"
}
}
--------------------------------------------------------------------------------
+
After the geoip filter is applied, the event will be enriched with geoip fields.
For example:
+
[source,json]
--------------------------------------------------------------------------------
filter {
geoip {
source => "clientip"
}
}
--------------------------------------------------------------------------------
<<plugins-filters-http,http filter>>::
Integrates with external web services/REST APIs, and
enables lookup enrichment against any HTTP service or endpoint.
The <<plugins-filters-http,http filter>> is well suited to many enrichment use
cases, such as social APIs, sentiment APIs, security feed APIs, and business
service APIs.
+
[source,txt]
-----
filter {
http {
url => "http://example.com"
verb => GET
body => {
"user-id" => "%{user}"
"api-key" => "%{api_key}"
}
body_format => "json"
headers =>
"Content-type" => "application/json"
}
target_body => "new_field"
}
}
-----
<<plugins-filters-jdbc_static,jdbc_static filter>>::
Enriches events with data pre-loaded from a remote database.
+
The following example fetches data from a remote database, caches it in a local
database, and uses lookups to enrich events with data cached in the local
database.
+
["source","json",subs="callouts"]
-----
filter {
jdbc_static {
loaders => [ <1>
{
id => "remote-servers"
query => "select ip, descr from ref.local_ips order by ip"
local_table => "servers"
},
{
id => "remote-users"
query => "select firstname, lastname, userid from ref.local_users order by userid"
local_table => "users"
}
]
local_db_objects => [ <2>
{
name => "servers"
index_columns => ["ip"]
columns => [
["ip", "varchar(15)"],
["descr", "varchar(255)"]
]
},
{
name => "users"
index_columns => ["userid"]
columns => [
["firstname", "varchar(255)"],
["lastname", "varchar(255)"],
["userid", "int"]
]
}
]
local_lookups => [ <3>
{
id => "local-servers"
query => "select descr as description from servers WHERE ip = :ip"
parameters => {ip => "[from_ip]"}
target => "server"
},
{
id => "local-users"
query => "select firstname, lastname from users WHERE userid = :id"
parameters => {id => "[loggedin_userid]"}
target => "user" <4>
}
]
# using add_field here to add & rename values to the event root
add_field => { server_name => "%{[server][0][description]}" }
add_field => { user_firstname => "%{[user][0][firstname]}" } <5>
add_field => { user_lastname => "%{[user][0][lastname]}" } <5>
remove_field => ["server", "user"]
jdbc_user => "logstash"
jdbc_password => "example"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
}
}
-----
<1> Queries an external database to fetch the dataset that will be cached
locally.
<2> Defines the columns, types, and indexes used to build the local database
structure. The column names and types should match the external database.
<3> Performs lookup queries on the local database to enrich the events.
<4> Specifies the event field that will store the looked-up data. If the lookup
returns multiple columns, the data is stored as a JSON object within the field.
<5> Takes data from the JSON object and stores it in top-level event fields for
easier analysis in Kibana.
<<plugins-filters-jdbc_streaming,jdbc_streaming filter>>::
Enriches events with database data.
+
The following example executes a SQL query and stores the result set in a field
called `country_details`:
+
[source,json]
--------------------------------------------------------------------------------
filter {
jdbc_streaming {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
jdbc_user => "me"
jdbc_password => "secret"
statement => "select * from WORLD.COUNTRY WHERE Code = :code"
parameters => { "code" => "country_code"}
target => "country_details"
}
}
--------------------------------------------------------------------------------
<<plugins-filters-memcached,memcached filter>>::
Enables key/value lookup enrichment against a Memcached object caching system.
It supports both read (GET) and write (SET) operations. It is a notable addition
for security analytics use cases. For example, you can use this plugin to query
for a value, and set it if not found.
+
[source,txt]
-----
filter {
memcached {
url => "http://example.com"
verb => GET
body => {TODO-complete example
}
}
-----
<<plugins-filters-translate,translate filter>>::
Replaces field contents based on replacement values specified in a hash or file.
Currently supports these file types: YAML, JSON, and CSV.
+
The following example takes the value of the `response_code` field, translates
it to a description based on the values specified in the dictionary, and then
removes the `response_code` field from the event:
+
[source,json]
--------------------------------------------------------------------------------
filter {
translate {
field => "response_code"
destination => "http_response"
dictionary => {
"200" => "OK"
"403" => "Forbidden"
"404" => "Not Found"
"408" => "Request Timeout"
}
remove_field => "response_code"
}
}
--------------------------------------------------------------------------------
<<plugins-filters-useragent,useragent filter>>::
Parses user agent strings into fields.
+
The following example takes the user agent string in the `agent` field, parses
it into user agent fields, and adds the user agent fields to a new field called
`user_agent`. It also removes the original `agent` field:
+
[source,json]
--------------------------------------------------------------------------------
filter {
useragent {
source => "agent"
target => "user_agent"
remove_field => "agent"
}
}
--------------------------------------------------------------------------------
+
After the filter is applied, the event will be enriched with user agent fields.
For example:
+
[source,json]
--------------------------------------------------------------------------------
"user_agent": {
"os": "Mac OS X 10.12",
"major": "50",
"minor": "0",
"os_minor": "12",
"os_major": "10",
"name": "Firefox",
"os_name": "Mac OS X",
"device": "Other"
}
--------------------------------------------------------------------------------