elasticsearch/docs/reference/ingest.asciidoc
2021-03-22 10:43:44 -04:00

682 lines
16 KiB
Text

[[ingest]]
= Ingest pipelines
Ingest pipelines let you perform common transformations on your data before
indexing. For example, you can use pipelines to remove fields, extract values
from text, and enrich your data.
A pipeline consists of a series of configurable tasks called
<<processors,processors>>. Each processor runs sequentially, making specific
changes to incoming documents. After the processors have run, {es} adds the
transformed documents to your data stream or index.
image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
You can create and manage ingest pipelines using {kib}'s **Ingest Node
Pipelines** feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in
the <<cluster-state,cluster state>>.
[discrete]
[[ingest-prerequisites]]
=== Prerequisites
* Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
processing. To use ingest pipelines, your cluster must have at least one node
with the `ingest` role. For heavy ingest loads, we recommend creating
<<node-ingest-node,dedicated ingest nodes>>.
* If the {es} security features are enabled, you must have the `manage_pipeline`
<<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
{kib}'s **Ingest Node Pipelines** feature, you also need the
`cluster:monitor/nodes/info` cluster privileges.
* Pipelines including the `enrich` processor require additional setup. See
<<ingest-enriching-data>>.
[discrete]
[[create-manage-ingest-pipelines]]
=== Create and manage pipelines
In {kib}, open the main menu and click **Stack Management** > **Ingest Node
Pipelines**. From the list view, you can:
* View a list of your pipelines and drill down into details
* Edit or clone existing pipelines
* Delete pipelines
To create a new pipeline, click **Create a pipeline**. For an example tutorial,
see <<common-log-format-example>>.
[role="screenshot"]
image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Node Pipelines list view,align="center"]
You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
The following <<put-pipeline-api,create pipeline API>> request creates
a pipeline containing two <<set-processor,`set`>> processors followed by a
<<lowercase-processor,`lowercase`>> processor. The processors run sequentially
in the order specified.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"description": "My pipeline description",
"processors": [
{
"set": {
"field": "my-long-field",
"value": 10
}
},
{
"set": {
"field": "my-boolean-field",
"value": true
}
},
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
}
----
// TESTSETUP
[discrete]
[[test-pipeline]]
=== Test a pipeline
Before using a pipeline in production, we recommend you test it using sample
documents. When creating or editing a pipeline in {kib}, click **Add
documents**. In the **Documents** tab, provide sample documents and click **Run
the pipeline**.
[role="screenshot"]
image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
API>>.
[source,console]
----
POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
----
The API returns transformed documents:
[source,console-result]
----
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"my-long-field": 10,
"my-boolean-field": true,
"my-keyword-field": "foo"
},
"_ingest": {
"timestamp": "2099-02-30T22:30:03.187Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"my-long-field": 10,
"my-boolean-field": true,
"my-keyword-field": "bar"
},
"_ingest": {
"timestamp": "2099-02-30T22:30:03.188Z"
}
}
}
]
}
----
// TESTRESPONSE[s/"2099-02-30T22:30:03.187Z"/$body.docs.0.doc._ingest.timestamp/]
// TESTRESPONSE[s/"2099-02-30T22:30:03.188Z"/$body.docs.1.doc._ingest.timestamp/]
[discrete]
[[add-pipeline-to-indexing-request]]
=== Add a pipeline to an indexing request
Use the `pipeline` query parameter to apply a pipeline to documents in
<<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
[source,console]
----
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:04:05.000Z", "my-keyword-field" : "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "my-keyword-field" : "bar" }
----
You can also use the `pipeline` parameter with the <<docs-update-by-query,update
by query>> or <<docs-reindex,reindex>> APIs.
[source,console]
----
POST my-data-stream/_update_by_query?pipeline=my-pipeline
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
----
// TEST[continued]
[discrete]
[[set-default-pipeline]]
=== Set a default pipeline
Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
a default pipeline. {es} applies this pipeline if no `pipeline` parameter
is specified.
[discrete]
[[set-final-pipeline]]
=== Set a final pipeline
Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
final pipeline. {es} applies this pipeline after the request or default
pipeline, even if neither is specified.
[discrete]
[[access-source-fields]]
=== Access source fields in a processor
Processors have read and write access to an incoming document's source fields.
To access a field key in a processor, use its field name. The following `set`
processor accesses `my-long-field`.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "my-long-field",
"value": 10
}
}
]
}
----
You can also prepend the `_source` prefix.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "_source.my-long-field",
"value": 10
}
}
]
}
----
Use dot notation to access object fields.
IMPORTANT: If your document contains flattened objects, use the
<<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
ingest processors cannot access flattened objects.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"dot_expander": {
"field": "my-object-field.my-property"
}
},
{
"set": {
"field": "my-object-field.my-property",
"value": 10
}
}
]
}
----
[[template-snippets]]
To access field values, enclose the field name in double curly brackets `{{ }}`
to create a https://mustache.github.io[Mustache] template snippet. You can use
template snippets to dynamically set field names. The following processor sets a
field name as the `service` field value.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
]
}
----
[discrete]
[[access-metadata-fields]]
=== Access metadata fields in a processor
Processors can access the following metadata fields by name:
* `_index`
* `_id`
* `_routing`
For example, the following `set` processor sets the document's routing value as
the `geoip.country_iso_code` field value.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors" : [
{
"set" : {
"field": "_routing",
"value": "{{geoip.country_iso_code}}"
}
}
]
}
----
Use a Mustache template snippet to access metadata field values. For example,
`{{_routing}}` retrieves a document's routing value.
WARNING: If you <<create-document-ids-automatically,automatically generate>>
document IDs, you cannot use `{{_id}}` in a processor. {es} assigns
auto-generated `_id` values after ingest.
[discrete]
[[access-ingest-metadata]]
=== Access ingest metadata in a processor
Ingest processors can add and access ingest metadata using the `_ingest` key.
Unlike source and metadata fields, {es} does not index ingest metadata fields by
default. {es} also allows source fields that start with an `_ingest` key. If
your data includes such source fields, use `_source._ingest` to access them.
Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
This field contains a timestamp of when {es} received the document's indexing
request. To index `_ingest.timestamp` or other ingest metadata fields, use the
`set` processor.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}
}
]
}
----
[discrete]
[[handling-pipeline-failures]]
=== Handing pipeline failures
A pipeline's processors run sequentially. By default, pipeline processing stops
when one of these processors fails or encounters an error.
To ignore a processor failure and run the pipeline's remaining processors, set
`ignore_failure` to `true`.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"field": "foo",
"target_field": "bar",
"ignore_failure": true
}
}
]
}
----
Use the `on_failure` parameter to specify a list of processors to run
immediately after a processor failure. If `on_failure` is specified, {es}
afterward runs the pipeline's remaining processors, even if the `on_failure`
configuration is empty.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"field": "foo",
"target_field": "bar",
"on_failure": [
{
"set": {
"field": "error.message",
"value": "field \"foo\" does not exist, cannot rename to \"bar\"",
"override": false
}
}
]
}
}
]
}
----
Nest a list of `on_failure` processors for nested error handling.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"field": "foo",
"target_field": "bar",
"on_failure": [
{
"set": {
"field": "error.message",
"value": "field \"foo\" does not exist, cannot rename to \"bar\"",
"override": false,
"on_failure": [
{
"set": {
"field": "error.message.multi",
"value": "Document encountered multiple ingest errors",
"override": true
}
}
]
}
}
]
}
}
]
}
----
You can also specify `on_failure` for a pipeline.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
----
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
[discrete]
[[conditionally-run-processor]]
=== Conditionally run a processor
Each processor supports an optional `if` condition, written as a
{painless}/painless-guide.html[Painless script]. If provided, the processor only
runs when the `if` condition is `true`.
IMPORTANT: `if` condition scripts run in Painless's
{painless}/painless-ingest-processor-context.html[ingest processor context]. In
`if` conditions, `ctx` values are read-only.
The following <<drop-processor,`drop`>> processor uses an `if` condition to drop
documents with a `network_name` of `Guest`.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"if": "ctx?.network_name == 'Guest'"
}
}
]
}
----
If the static `script.painless.regex.enabled` cluster setting is enabled, you
can use regular expressions in your `if` condition scripts. For supported
syntax, see the {painless}/painless-regexes.html[Painless regexes]
documentation.
TIP: If possible, avoid using regular expressions. Expensive regular expressions
can slow indexing speeds.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"if": "ctx.href?.url =~ /^http[^s]/",
"field": "href.insecure",
"value": true
}
}
]
}
----
You must specify `if` conditions as valid JSON on a single line. However, you
can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
console]'s triple quote syntax to write and debug larger scripts.
TIP: If possible, avoid using complex or expensive `if` condition scripts.
Expensive condition scripts can slow indexing speeds.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
----
You can also specify a <<modules-scripting-stored-scripts,stored script>> as the
`if` condition.
[source,console]
----
PUT _scripts/my-stored-script
{
"script": {
"lang": "painless",
"source": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"if": { "id": "my-stored-script" }
}
}
]
}
----
Incoming documents often contain object fields. If a processor script attempts
to access a field whose parent object does not exist, {es} returns a
`NullPointerException`. To avoid these exceptions, use
{painless}/painless-operators-reference.html#null-safe-operator[null safe
operators], such as `?.`, and write your scripts to be null safe.
For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
`ctx.network?.name` can return null. Rewrite the script as
`'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
`Guest` is always non-null.
If you can't rewrite a script to be null safe, include an explicit null check.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
}
}
]
}
----
[discrete]
[[conditionally-apply-pipelines]]
=== Conditionally apply pipelines
Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
to apply other pipelines to documents based on your criteria. You can use this
pipeline as the <<set-default-pipeline,default pipeline>> in an
<<index-templates,index template>> used to configure multiple data streams or
indices.
The following pipeline applies different pipelines to incoming documents based
on the `service.name` field value.
[source,console]
----
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
"processors": [
{
"pipeline": {
"if": "ctx.service?.name == 'apache_httpd'",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"if": "ctx.service?.name == 'syslog'",
"name": "syslog_pipeline"
}
},
{
"fail": {
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
----
[discrete]
[[get-pipeline-usage-stats]]
=== Get pipeline usage statistics
Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
ingest statistics. Use these stats to determine which pipelines run most
frequently or spend the most time processing.
[source,console]
----
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
----
include::ingest/common-log-format-example.asciidoc[]
include::ingest/enrich.asciidoc[]
include::ingest/processors.asciidoc[]