Adds example plugin for custom ingest processor (#112282)

* Adds example plugin for custom ingest processor

Adds an example for creating a plugin with a simple custom ingest
processor. The example processor repeats the value of an expected filed
in a document, or ignores it if the expected field does not exist.

Closes #111539
This commit is contained in:
Sam Xiao 2024-09-06 12:05:52 -04:00 committed by GitHub
parent 6ff02460e1
commit 7cd6de76f4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 220 additions and 2 deletions

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.esplugin'
apply plugin: 'elasticsearch.yaml-rest-test'
esplugin {
name 'custom-processor'
description 'An example plugin showing how to register a custom ingest processor'
classname 'org.elasticsearch.example.customprocessor.ExampleProcessorPlugin'
licenseFile rootProject.file('SSPL-1.0+ELASTIC-LICENSE-2.0.txt')
noticeFile rootProject.file('NOTICE.txt')
}
dependencies {
yamlRestTestRuntimeOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}"
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.example.customprocessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.Map;
public class ExampleProcessorPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(ExampleRepeatProcessor.TYPE, new ExampleRepeatProcessor.Factory());
}
}

View file

@ -0,0 +1,53 @@
package org.elasticsearch.example.customprocessor;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.Map;
/**
* Example of adding an ingest processor with a plugin.
*/
public class ExampleRepeatProcessor extends AbstractProcessor {
public static final String TYPE = "repeat";
public static final String FIELD_KEY_NAME = "field";
private final String field;
ExampleRepeatProcessor(String tag, String description, String field) {
super(tag, description);
this.field = field;
}
@Override
public IngestDocument execute(IngestDocument document) {
Object val = document.getFieldValue(field, Object.class, true);
if (val instanceof String string) {
String repeated = string.concat(string);
document.setFieldValue(field, repeated);
}
return document;
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory {
@Override
public ExampleRepeatProcessor create(
Map<String, Processor.Factory> registry,
String tag,
String description,
Map<String, Object> config
) {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, FIELD_KEY_NAME);
return new ExampleRepeatProcessor(tag, description, field);
}
}
}

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.example.customprocessor;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
/**
* {@link ExampleProcessorClientYamlTestSuiteIT} executes the plugin's REST API integration tests.
* <p>
* The tests can be executed using the command: ./gradlew :custom-processor:yamlRestTest
* <p>
* This class extends {@link ESClientYamlSuiteTestCase}, which takes care of parsing the YAML files
* located in the src/yamlRestTest/resources/rest-api-spec/test/ directory and validates them against the
* custom REST API definition files located in src/yamlRestTest/resources/rest-api-spec/api/.
* <p>
* Once validated, {@link ESClientYamlSuiteTestCase} executes the REST tests against a single node
* integration cluster which has the plugin already installed by the Gradle build script.
* </p>
*/
public class ExampleProcessorClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public ExampleProcessorClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
// The test executes all the test candidates by default
// see ESClientYamlSuiteTestCase.REST_TESTS_SUITE
return ESClientYamlSuiteTestCase.createParameters();
}
}

View file

@ -0,0 +1,15 @@
"Custom processor is present":
- do:
ingest.put_pipeline:
id: pipeline1
body: >
{
"processors": [
{
"repeat" : {
"field": "test"
}
}
]
}
- match: { acknowledged: true }

View file

@ -0,0 +1,59 @@
setup:
- do:
ingest.put_pipeline:
id: pipeline1
body: >
{
"processors": [
{
"repeat" : {
"field": "to_repeat"
}
}
]
}
---
teardown:
- do:
ingest.delete_pipeline:
id: pipeline1
ignore: 404
- do:
indices.delete:
index: index1
ignore: 404
---
"Process document":
# index a document with field to be processed
- do:
index:
id: doc1
index: index1
pipeline: pipeline1
body: { to_repeat: "foo" }
- match: { result: "created" }
# validate document is processed
- do:
get:
index: index1
id: doc1
- match: { _source: { to_repeat: "foofoo" } }
---
"Does not process document without field":
# index a document without field to be processed
- do:
index:
id: doc1
index: index1
pipeline: pipeline1
body: { field1: "foo" }
- match: { result: "created" }
# validate document is not processed
- do:
get:
index: index1
id: doc1
- match: { _source: { field1: "foo" } }