mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
Changes the type of the version parameter in `IngestDocument` from `Long` to `long` and moves it to the third argument, so all required values occur before nullable arguments. The `IngestService` expects a non-null version for a document and will throw an `NullPointerException` if one is not provided. Related: #87309
982 lines
26 KiB
Text
982 lines
26 KiB
Text
[[ingest]]
|
|
= Ingest pipelines
|
|
|
|
Ingest pipelines let you perform common transformations on your data before
|
|
indexing. For example, you can use pipelines to remove fields, extract values
|
|
from text, and enrich your data.
|
|
|
|
A pipeline consists of a series of configurable tasks called
|
|
<<processors,processors>>. Each processor runs sequentially, making specific
|
|
changes to incoming documents. After the processors have run, {es} adds the
|
|
transformed documents to your data stream or index.
|
|
|
|
image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
|
|
|
|
You can create and manage ingest pipelines using {kib}'s **Ingest Pipelines**
|
|
feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in the
|
|
<<cluster-state,cluster state>>.
|
|
|
|
[discrete]
|
|
[[ingest-prerequisites]]
|
|
=== Prerequisites
|
|
|
|
* Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
|
|
processing. To use ingest pipelines, your cluster must have at least one node
|
|
with the `ingest` role. For heavy ingest loads, we recommend creating
|
|
<<node-ingest-node,dedicated ingest nodes>>.
|
|
|
|
* If the {es} security features are enabled, you must have the `manage_pipeline`
|
|
<<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
|
|
{kib}'s **Ingest Pipelines** feature, you also need the
|
|
`cluster:monitor/nodes/info` cluster privileges.
|
|
|
|
* Pipelines including the `enrich` processor require additional setup. See
|
|
<<ingest-enriching-data>>.
|
|
|
|
[discrete]
|
|
[[create-manage-ingest-pipelines]]
|
|
=== Create and manage pipelines
|
|
|
|
In {kib}, open the main menu and click **Stack Management > Ingest
|
|
Pipelines**. From the list view, you can:
|
|
|
|
* View a list of your pipelines and drill down into details
|
|
* Edit or clone existing pipelines
|
|
* Delete pipelines
|
|
|
|
[role="screenshot"]
|
|
image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Pipelines list view,align="center"]
|
|
|
|
To create a pipeline, click **Create pipeline > New pipeline**. For an example
|
|
tutorial, see <<common-log-format-example>>.
|
|
|
|
TIP: The **New pipeline from CSV** option lets you use a CSV to create an ingest
|
|
pipeline that maps custom data to the {ecs-ref}[Elastic Common Schema (ECS)].
|
|
Mapping your custom data to ECS makes the data easier to search and lets you
|
|
reuse visualizations from other datasets. To get started, check
|
|
{ecs-ref}/ecs-converting.html[Map custom data to ECS].
|
|
|
|
You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
|
|
The following <<put-pipeline-api,create pipeline API>> request creates
|
|
a pipeline containing two <<set-processor,`set`>> processors followed by a
|
|
<<lowercase-processor,`lowercase`>> processor. The processors run sequentially
|
|
in the order specified.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"description": "My optional pipeline description",
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "My optional processor description",
|
|
"field": "my-long-field",
|
|
"value": 10
|
|
}
|
|
},
|
|
{
|
|
"set": {
|
|
"description": "Set 'my-boolean-field' to true",
|
|
"field": "my-boolean-field",
|
|
"value": true
|
|
}
|
|
},
|
|
{
|
|
"lowercase": {
|
|
"field": "my-keyword-field"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
// TESTSETUP
|
|
|
|
[discrete]
|
|
[[manage-pipeline-versions]]
|
|
=== Manage pipeline versions
|
|
|
|
When you create or update a pipeline, you can specify an optional `version`
|
|
integer. You can use this version number with the
|
|
<<put-pipeline-api-query-params,`if_version`>> parameter to conditionally
|
|
update the pipeline. When the `if_version` parameter is specified, a successful
|
|
update increments the pipeline's version.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline-id
|
|
{
|
|
"version": 1,
|
|
"processors": [ ... ]
|
|
}
|
|
----
|
|
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
|
|
|
|
To unset the `version` number using the API, replace or update the pipeline
|
|
without specifying the `version` parameter.
|
|
|
|
[discrete]
|
|
[[test-pipeline]]
|
|
=== Test a pipeline
|
|
|
|
Before using a pipeline in production, we recommend you test it using sample
|
|
documents. When creating or editing a pipeline in {kib}, click **Add
|
|
documents**. In the **Documents** tab, provide sample documents and click **Run
|
|
the pipeline**.
|
|
|
|
[role="screenshot"]
|
|
image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
|
|
|
|
You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
|
|
API>>. You can specify a configured pipeline in the request path. For example,
|
|
the following request tests `my-pipeline`.
|
|
|
|
[source,console]
|
|
----
|
|
POST _ingest/pipeline/my-pipeline/_simulate
|
|
{
|
|
"docs": [
|
|
{
|
|
"_source": {
|
|
"my-keyword-field": "FOO"
|
|
}
|
|
},
|
|
{
|
|
"_source": {
|
|
"my-keyword-field": "BAR"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
Alternatively, you can specify a pipeline and its processors in the request
|
|
body.
|
|
|
|
[source,console]
|
|
----
|
|
POST _ingest/pipeline/_simulate
|
|
{
|
|
"pipeline": {
|
|
"processors": [
|
|
{
|
|
"lowercase": {
|
|
"field": "my-keyword-field"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"docs": [
|
|
{
|
|
"_source": {
|
|
"my-keyword-field": "FOO"
|
|
}
|
|
},
|
|
{
|
|
"_source": {
|
|
"my-keyword-field": "BAR"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
The API returns transformed documents:
|
|
|
|
[source,console-result]
|
|
----
|
|
{
|
|
"docs": [
|
|
{
|
|
"doc": {
|
|
"_index": "_index",
|
|
"_id": "_id",
|
|
"_version": "-3",
|
|
"_source": {
|
|
"my-keyword-field": "foo"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2099-03-07T11:04:03.000Z"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"doc": {
|
|
"_index": "_index",
|
|
"_id": "_id",
|
|
"_version": "-3",
|
|
"_source": {
|
|
"my-keyword-field": "bar"
|
|
},
|
|
"_ingest": {
|
|
"timestamp": "2099-03-07T11:04:04.000Z"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
// TESTRESPONSE[s/"2099-03-07T11:04:03.000Z"/$body.docs.0.doc._ingest.timestamp/]
|
|
// TESTRESPONSE[s/"2099-03-07T11:04:04.000Z"/$body.docs.1.doc._ingest.timestamp/]
|
|
|
|
[discrete]
|
|
[[add-pipeline-to-indexing-request]]
|
|
=== Add a pipeline to an indexing request
|
|
|
|
Use the `pipeline` query parameter to apply a pipeline to documents in
|
|
<<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
|
|
|
|
[source,console]
|
|
----
|
|
POST my-data-stream/_doc?pipeline=my-pipeline
|
|
{
|
|
"@timestamp": "2099-03-07T11:04:05.000Z",
|
|
"my-keyword-field": "foo"
|
|
}
|
|
|
|
PUT my-data-stream/_bulk?pipeline=my-pipeline
|
|
{ "create":{ } }
|
|
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
|
|
{ "create":{ } }
|
|
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
|
|
----
|
|
// TEST[setup:my_data_stream]
|
|
// TEST[teardown:data_stream_cleanup]
|
|
|
|
You can also use the `pipeline` parameter with the <<docs-update-by-query,update
|
|
by query>> or <<docs-reindex,reindex>> APIs.
|
|
|
|
[source,console]
|
|
----
|
|
POST my-data-stream/_update_by_query?pipeline=my-pipeline
|
|
|
|
POST _reindex
|
|
{
|
|
"source": {
|
|
"index": "my-data-stream"
|
|
},
|
|
"dest": {
|
|
"index": "my-new-data-stream",
|
|
"op_type": "create",
|
|
"pipeline": "my-pipeline"
|
|
}
|
|
}
|
|
----
|
|
// TEST[setup:my_data_stream]
|
|
// TEST[teardown:data_stream_cleanup]
|
|
|
|
[discrete]
|
|
[[set-default-pipeline]]
|
|
=== Set a default pipeline
|
|
|
|
Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
|
|
a default pipeline. {es} applies this pipeline to indexing requests if no
|
|
`pipeline` parameter is specified.
|
|
|
|
[discrete]
|
|
[[set-final-pipeline]]
|
|
=== Set a final pipeline
|
|
|
|
Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
|
|
final pipeline. {es} applies this pipeline after the request or default
|
|
pipeline, even if neither is specified.
|
|
|
|
[discrete]
|
|
[[pipelines-for-beats]]
|
|
=== Pipelines for {beats}
|
|
|
|
To add an ingest pipeline to an Elastic Beat, specify the `pipeline`
|
|
parameter under `output.elasticsearch` in `<BEAT_NAME>.yml`. For example,
|
|
for {filebeat}, you'd specify `pipeline` in `filebeat.yml`.
|
|
|
|
[source,yaml]
|
|
----
|
|
output.elasticsearch:
|
|
hosts: ["localhost:9200"]
|
|
pipeline: my-pipeline
|
|
----
|
|
|
|
[discrete]
|
|
[[pipelines-for-fleet-elastic-agent]]
|
|
=== Pipelines for {fleet} and {agent}
|
|
|
|
{fleet-guide}/index.html[{fleet}] automatically adds ingest pipelines for its
|
|
integrations. {fleet} applies these pipelines using <<index-templates,index
|
|
templates>> that include <<set-default-pipeline,pipeline index settings>>. {es}
|
|
matches these templates to your {fleet} data streams based on the
|
|
{fleet-guide}/data-streams.html#data-streams-naming-scheme[stream's naming
|
|
scheme].
|
|
|
|
WARNING: Do not change {fleet}'s ingest pipelines or use custom pipelines for
|
|
your {fleet} integrations. Doing so can break your {fleet} data streams.
|
|
|
|
{fleet} doesn't provide an ingest pipeline for the **Custom logs** integration.
|
|
You can safely specify a pipeline for this integration in one of two ways: an
|
|
<<pipeline-custom-logs-index-template,index template>> or a
|
|
<<pipeline-custom-logs-configuration,custom configuration>>.
|
|
|
|
[[pipeline-custom-logs-index-template]]
|
|
**Option 1: Index template**
|
|
|
|
// tag::create-name-custom-logs-pipeline[]
|
|
. <<create-manage-ingest-pipelines,Create>> and <<test-pipeline,test>> your
|
|
ingest pipeline. Name your pipeline `logs-<dataset-name>-default`. This makes
|
|
tracking the pipeline for your integration easier.
|
|
+
|
|
--
|
|
For example, the following request creates a pipeline for the `my-app` dataset.
|
|
The pipeline's name is `logs-my_app-default`.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/logs-my_app-default
|
|
{
|
|
"description": "Pipeline for `my_app` dataset",
|
|
"processors": [ ... ]
|
|
}
|
|
----
|
|
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
|
|
--
|
|
// end::create-name-custom-logs-pipeline[]
|
|
|
|
. Create an <<index-templates,index template>> that includes your pipeline in
|
|
the <<index-default-pipeline,`index.default_pipeline`>> or
|
|
<<index-final-pipeline,`index.final_pipeline`>> index setting. Ensure the
|
|
template is <<create-index-template,data stream enabled>>. The
|
|
template's index pattern should match `logs-<dataset-name>-*`.
|
|
+
|
|
--
|
|
You can create this template using {kib}'s <<manage-index-templates,**Index
|
|
Management**>> feature or the <<indices-put-template,create index template
|
|
API>>.
|
|
|
|
For example, the following request creates a template matching `logs-my_app-*`.
|
|
The template uses a component template that contains the
|
|
`index.default_pipeline` index setting.
|
|
|
|
[source,console]
|
|
----
|
|
# Creates a component template for index settings
|
|
PUT _component_template/logs-my_app-settings
|
|
{
|
|
"template": {
|
|
"settings": {
|
|
"index.default_pipeline": "logs-my_app-default",
|
|
"index.lifecycle.name": "logs"
|
|
}
|
|
}
|
|
}
|
|
|
|
# Creates an index template matching `logs-my_app-*`
|
|
PUT _index_template/logs-my_app-template
|
|
{
|
|
"index_patterns": ["logs-my_app-*"],
|
|
"data_stream": { },
|
|
"priority": 500,
|
|
"composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
// TEST[s/, "logs-my_app-mappings"//]
|
|
--
|
|
// tag::name-custom-logs-dataset[]
|
|
. When adding or editing your **Custom logs** integration in {fleet},
|
|
click **Configure integration > Custom log file > Advanced options**.
|
|
|
|
. In **Dataset name**, specify your dataset's name. {fleet} will add new data
|
|
for the integration to the resulting `logs-<dataset-name>-default` data stream.
|
|
+
|
|
For example, if your dataset's name was `my_app`, {fleet} adds new data to the
|
|
`logs-my_app-default` data stream.
|
|
// end::name-custom-logs-dataset[]
|
|
+
|
|
[role="screenshot"]
|
|
image::images/ingest/custom-logs.png[Set up custom log integration in Fleet,align="center"]
|
|
|
|
. Use the <<indices-rollover-index,rollover API>> to roll over your data stream.
|
|
This ensures {es} applies the index template and its pipeline settings to any
|
|
new data for the integration.
|
|
+
|
|
--
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT _data_stream/logs-my_app-default
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,console]
|
|
----
|
|
POST logs-my_app-default/_rollover/
|
|
----
|
|
// TEST[continued]
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
DELETE _data_stream/*
|
|
DELETE _index_template/*
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
--
|
|
|
|
[[pipeline-custom-logs-configuration]]
|
|
**Option 2: Custom configuration**
|
|
|
|
include::ingest.asciidoc[tag=create-name-custom-logs-pipeline]
|
|
|
|
include::ingest.asciidoc[tag=name-custom-logs-dataset]
|
|
|
|
. In **Custom Configurations**, specify your pipeline in the `pipeline` policy
|
|
setting.
|
|
+
|
|
[role="screenshot"]
|
|
image::images/ingest/custom-logs-pipeline.png[Custom pipeline configuration for custom log integration,align="center"]
|
|
|
|
**{agent} standalone**
|
|
|
|
If you run {agent} standalone, you can apply pipelines using an
|
|
<<index-templates,index template>> that includes the
|
|
<<index-default-pipeline,`index.default_pipeline`>> or
|
|
<<index-final-pipeline,`index.final_pipeline`>> index setting. Alternatively,
|
|
you can specify the `pipeline` policy setting in your `elastic-agent.yml`
|
|
configuration. See {fleet-guide}/install-standalone-elastic-agent.html[Install standalone {agent}s].
|
|
|
|
[discrete]
|
|
[[access-source-fields]]
|
|
=== Access source fields in a processor
|
|
|
|
Processors have read and write access to an incoming document's source fields.
|
|
To access a field key in a processor, use its field name. The following `set`
|
|
processor accesses `my-long-field`.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"field": "my-long-field",
|
|
"value": 10
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
You can also prepend the `_source` prefix.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"field": "_source.my-long-field",
|
|
"value": 10
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
Use dot notation to access object fields.
|
|
|
|
IMPORTANT: If your document contains flattened objects, use the
|
|
<<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
|
|
ingest processors cannot access flattened objects.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"dot_expander": {
|
|
"description": "Expand 'my-object-field.my-property'",
|
|
"field": "my-object-field.my-property"
|
|
}
|
|
},
|
|
{
|
|
"set": {
|
|
"description": "Set 'my-object-field.my-property' to 10",
|
|
"field": "my-object-field.my-property",
|
|
"value": 10
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
[[template-snippets]]
|
|
Several processor parameters support https://mustache.github.io[Mustache]
|
|
template snippets. To access field values in a template snippet, enclose the
|
|
field name in triple curly brackets:`{{{field-name}}}`. You can use template
|
|
snippets to dynamically set field names.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "Set dynamic '<service>' field to 'code' value",
|
|
"field": "{{{service}}}",
|
|
"value": "{{{code}}}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
[discrete]
|
|
[[access-metadata-fields]]
|
|
=== Access metadata fields in a processor
|
|
|
|
Processors can access the following metadata fields by name:
|
|
|
|
* `_index`
|
|
* `_id`
|
|
* `_routing`
|
|
* `_dynamic_templates`
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "Set '_routing' to 'geoip.country_iso_code' value",
|
|
"field": "_routing",
|
|
"value": "{{{geoip.country_iso_code}}}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
Use a Mustache template snippet to access metadata field values. For example,
|
|
`{{{_routing}}}` retrieves a document's routing value.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "Use geo_point dynamic template for address field",
|
|
"field": "_dynamic_templates",
|
|
"value": {
|
|
"address": "geo_point"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
The set processor above tells ES to use the dynamic template named `geo_point`
|
|
for the field `address` if this field is not defined in the mapping of the index
|
|
yet. This processor overrides the dynamic template for the field `address` if
|
|
already defined in the bulk request, but has no effect on other dynamic
|
|
templates defined in the bulk request.
|
|
|
|
WARNING: If you <<create-document-ids-automatically,automatically generate>>
|
|
document IDs, you cannot use `{{{_id}}}` in a processor. {es} assigns
|
|
auto-generated `_id` values after ingest.
|
|
|
|
[discrete]
|
|
[[access-ingest-metadata]]
|
|
=== Access ingest metadata in a processor
|
|
|
|
Ingest processors can add and access ingest metadata using the `_ingest` key.
|
|
|
|
Unlike source and metadata fields, {es} does not index ingest metadata fields by
|
|
default. {es} also allows source fields that start with an `_ingest` key. If
|
|
your data includes such source fields, use `_source._ingest` to access them.
|
|
|
|
Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
|
|
This field contains a timestamp of when {es} received the document's indexing
|
|
request. To index `_ingest.timestamp` or other ingest metadata fields, use the
|
|
`set` processor.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "Index the ingest timestamp as 'event.ingested'",
|
|
"field": "event.ingested",
|
|
"value": "{{{_ingest.timestamp}}}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
[discrete]
|
|
[[handling-pipeline-failures]]
|
|
=== Handling pipeline failures
|
|
|
|
A pipeline's processors run sequentially. By default, pipeline processing stops
|
|
when one of these processors fails or encounters an error.
|
|
|
|
To ignore a processor failure and run the pipeline's remaining processors, set
|
|
`ignore_failure` to `true`.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"rename": {
|
|
"description": "Rename 'provider' to 'cloud.provider'",
|
|
"field": "provider",
|
|
"target_field": "cloud.provider",
|
|
"ignore_failure": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
Use the `on_failure` parameter to specify a list of processors to run
|
|
immediately after a processor failure. If `on_failure` is specified, {es}
|
|
afterward runs the pipeline's remaining processors, even if the `on_failure`
|
|
configuration is empty.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"rename": {
|
|
"description": "Rename 'provider' to 'cloud.provider'",
|
|
"field": "provider",
|
|
"target_field": "cloud.provider",
|
|
"on_failure": [
|
|
{
|
|
"set": {
|
|
"description": "Set 'error.message'",
|
|
"field": "error.message",
|
|
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
|
|
"override": false
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
Nest a list of `on_failure` processors for nested error handling.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"rename": {
|
|
"description": "Rename 'provider' to 'cloud.provider'",
|
|
"field": "provider",
|
|
"target_field": "cloud.provider",
|
|
"on_failure": [
|
|
{
|
|
"set": {
|
|
"description": "Set 'error.message'",
|
|
"field": "error.message",
|
|
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
|
|
"override": false,
|
|
"on_failure": [
|
|
{
|
|
"set": {
|
|
"description": "Set 'error.message.multi'",
|
|
"field": "error.message.multi",
|
|
"value": "Document encountered multiple ingest errors",
|
|
"override": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
You can also specify `on_failure` for a pipeline. If a processor without an
|
|
`on_failure` value fails, {es} uses this pipeline-level parameter as a fallback.
|
|
{es} will not attempt to run the pipeline's remaining processors.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [ ... ],
|
|
"on_failure": [
|
|
{
|
|
"set": {
|
|
"description": "Index document to 'failed-<index>'",
|
|
"field": "_index",
|
|
"value": "failed-{{{ _index }}}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
|
|
|
|
Additional information about the pipeline failure may be available in the
|
|
document metadata fields `on_failure_message`, `on_failure_processor_type`,
|
|
`on_failure_processor_tag`, and `on_failure_pipeline`. These fields are
|
|
accessible only from within an `on_failure` block.
|
|
|
|
The following example uses the metadata fields to include information about
|
|
pipeline failures in documents.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [ ... ],
|
|
"on_failure": [
|
|
{
|
|
"set": {
|
|
"description": "Record error information",
|
|
"field": "error_information",
|
|
"value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
|
|
|
|
[discrete]
|
|
[[conditionally-run-processor]]
|
|
=== Conditionally run a processor
|
|
|
|
Each processor supports an optional `if` condition, written as a
|
|
{painless}/painless-guide.html[Painless script]. If provided, the processor only
|
|
runs when the `if` condition is `true`.
|
|
|
|
IMPORTANT: `if` condition scripts run in Painless's
|
|
{painless}/painless-ingest-processor-context.html[ingest processor context]. In
|
|
`if` conditions, `ctx` values are read-only.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"description": "Drop documents with 'network.name' of 'Guest'",
|
|
"if": "ctx?.network?.name == 'Guest'"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
If the <<script-painless-regex-enabled,`script.painless.regex.enabled`>> cluster
|
|
setting is enabled, you can use regular expressions in your `if` condition
|
|
scripts. For supported syntax, see {painless}/painless-regexes.html[Painless
|
|
regular expressions].
|
|
|
|
TIP: If possible, avoid using regular expressions. Expensive regular expressions
|
|
can slow indexing speeds.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"set": {
|
|
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
|
|
"if": "ctx.url?.scheme =~ /^http[^s]/",
|
|
"field": "url.insecure",
|
|
"value": true
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
You must specify `if` conditions as valid JSON on a single line. However, you
|
|
can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
|
|
console]'s triple quote syntax to write and debug larger scripts.
|
|
|
|
TIP: If possible, avoid using complex or expensive `if` condition scripts.
|
|
Expensive condition scripts can slow indexing speeds.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"description": "Drop documents that don't contain 'prod' tag",
|
|
"if": """
|
|
Collection tags = ctx.tags;
|
|
if(tags != null){
|
|
for (String tag : tags) {
|
|
if (tag.toLowerCase().contains('prod')) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
"""
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
You can also specify a <<modules-scripting-stored-scripts,stored script>> as the
|
|
`if` condition.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _scripts/my-prod-tag-script
|
|
{
|
|
"script": {
|
|
"lang": "painless",
|
|
"source": """
|
|
Collection tags = ctx.tags;
|
|
if(tags != null){
|
|
for (String tag : tags) {
|
|
if (tag.toLowerCase().contains('prod')) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
"""
|
|
}
|
|
}
|
|
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"description": "Drop documents that don't contain 'prod' tag",
|
|
"if": { "id": "my-prod-tag-script" }
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
DELETE _scripts/my-prod-tag-script
|
|
DELETE _ingest/pipeline/my-pipeline
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
Incoming documents often contain object fields. If a processor script attempts
|
|
to access a field whose parent object does not exist, {es} returns a
|
|
`NullPointerException`. To avoid these exceptions, use
|
|
{painless}/painless-operators-reference.html#null-safe-operator[null safe
|
|
operators], such as `?.`, and write your scripts to be null safe.
|
|
|
|
For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
|
|
`ctx.network?.name` can return null. Rewrite the script as
|
|
`'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
|
|
`Guest` is always non-null.
|
|
|
|
If you can't rewrite a script to be null safe, include an explicit null check.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/my-pipeline
|
|
{
|
|
"processors": [
|
|
{
|
|
"drop": {
|
|
"description": "Drop documents that contain 'network.name' of 'Guest'",
|
|
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
[discrete]
|
|
[[conditionally-apply-pipelines]]
|
|
=== Conditionally apply pipelines
|
|
|
|
Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
|
|
to apply other pipelines to documents based on your criteria. You can use this
|
|
pipeline as the <<set-default-pipeline,default pipeline>> in an
|
|
<<index-templates,index template>> used to configure multiple data streams or
|
|
indices.
|
|
|
|
[source,console]
|
|
----
|
|
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
|
|
{
|
|
"processors": [
|
|
{
|
|
"pipeline": {
|
|
"description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
|
|
"if": "ctx.service?.name == 'apache_httpd'",
|
|
"name": "httpd_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"pipeline": {
|
|
"description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
|
|
"if": "ctx.service?.name == 'syslog'",
|
|
"name": "syslog_pipeline"
|
|
}
|
|
},
|
|
{
|
|
"fail": {
|
|
"description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
|
|
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
|
|
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
[discrete]
|
|
[[get-pipeline-usage-stats]]
|
|
=== Get pipeline usage statistics
|
|
|
|
Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
|
|
ingest statistics. Use these stats to determine which pipelines run most
|
|
frequently or spend the most time processing.
|
|
|
|
[source,console]
|
|
----
|
|
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
|
|
----
|
|
|
|
include::ingest/common-log-format-example.asciidoc[]
|
|
include::ingest/enrich.asciidoc[]
|
|
include::ingest/processors.asciidoc[]
|