mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
Add copy_from parameter for set ingest processor (#63540)
This commit is contained in:
parent
39a86438cd
commit
b17ce85f13
7 changed files with 240 additions and 21 deletions
|
@ -13,7 +13,8 @@ its value will be replaced with the provided one.
|
|||
|======
|
||||
| Name | Required | Default | Description
|
||||
| `field` | yes | - | The field to insert, upsert, or update. Supports <<accessing-template-fields,template snippets>>.
|
||||
| `value` | yes | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>.
|
||||
| `value` | yes* | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>. May specify only one of `value` or `copy_from`.
|
||||
| `copy_from` | no | - | The origin field which will be copied to `field`, cannot set `value` simultaneously. Supported data types are `boolean`, `number`, `array`, `object`, `string`, `date`, etc.
|
||||
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
|
||||
| `ignore_empty_value` | no | `false` | If `true` and `value` is a <<accessing-template-fields,template snippet>> that evaluates to `null` or the empty string, the processor quietly exits without modifying the document
|
||||
include::common-options.asciidoc[]
|
||||
|
@ -87,3 +88,54 @@ Result:
|
|||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/2019-03-11T21:54:37.909224Z/$body.docs.0.doc._ingest.timestamp/]
|
||||
The contents of a field including complex values such as arrays and objects can be copied to another field using `copy_from`:
|
||||
[source,console]
|
||||
--------------------------------------------------
|
||||
PUT _ingest/pipeline/set_bar
|
||||
{
|
||||
"description": "sets the value of bar from the field foo",
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "bar",
|
||||
"copy_from": "foo"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
POST _ingest/pipeline/set_bar/_simulate
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_source": {
|
||||
"foo": ["foo1", "foo2"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
Result:
|
||||
|
||||
[source,console-result]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"docs" : [
|
||||
{
|
||||
"doc" : {
|
||||
"_index" : "_index",
|
||||
"_id" : "_id",
|
||||
"_source" : {
|
||||
"bar": ["foo1", "foo2"],
|
||||
"foo": ["foo1", "foo2"]
|
||||
},
|
||||
"_ingest" : {
|
||||
"timestamp" : "2020-09-30T12:55:17.742795Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/2020-09-30T12:55:17.742795Z/$body.docs.0.doc._ingest.timestamp/]
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.elasticsearch.script.TemplateScript;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
||||
|
||||
/**
|
||||
* Processor that adds new fields with their corresponding values. If the field is already present, its value
|
||||
* will be replaced with the provided one.
|
||||
|
@ -40,18 +42,20 @@ public final class SetProcessor extends AbstractProcessor {
|
|||
private final boolean overrideEnabled;
|
||||
private final TemplateScript.Factory field;
|
||||
private final ValueSource value;
|
||||
private final String copyFrom;
|
||||
private final boolean ignoreEmptyValue;
|
||||
|
||||
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
|
||||
this(tag, description, field, value, true, false);
|
||||
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) {
|
||||
this(tag, description, field, value, copyFrom, true, false);
|
||||
}
|
||||
|
||||
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean overrideEnabled,
|
||||
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom, boolean overrideEnabled,
|
||||
boolean ignoreEmptyValue) {
|
||||
super(tag, description);
|
||||
this.overrideEnabled = overrideEnabled;
|
||||
this.field = field;
|
||||
this.value = value;
|
||||
this.copyFrom = copyFrom;
|
||||
this.ignoreEmptyValue = ignoreEmptyValue;
|
||||
}
|
||||
|
||||
|
@ -67,6 +71,10 @@ public final class SetProcessor extends AbstractProcessor {
|
|||
return value;
|
||||
}
|
||||
|
||||
public String getCopyFrom() {
|
||||
return copyFrom;
|
||||
}
|
||||
|
||||
public boolean isIgnoreEmptyValue() {
|
||||
return ignoreEmptyValue;
|
||||
}
|
||||
|
@ -74,7 +82,12 @@ public final class SetProcessor extends AbstractProcessor {
|
|||
@Override
|
||||
public IngestDocument execute(IngestDocument document) {
|
||||
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
|
||||
document.setFieldValue(field, value, ignoreEmptyValue);
|
||||
if (copyFrom != null) {
|
||||
Object fieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue);
|
||||
document.setFieldValue(field, fieldValue, ignoreEmptyValue);
|
||||
} else {
|
||||
document.setFieldValue(field, value, ignoreEmptyValue);
|
||||
}
|
||||
}
|
||||
return document;
|
||||
}
|
||||
|
@ -96,16 +109,30 @@ public final class SetProcessor extends AbstractProcessor {
|
|||
public SetProcessor create(Map<String, Processor.Factory> registry, String processorTag,
|
||||
String description, Map<String, Object> config) throws Exception {
|
||||
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
|
||||
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
|
||||
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
|
||||
ValueSource valueSource = null;
|
||||
if (copyFrom == null) {
|
||||
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
|
||||
valueSource = ValueSource.wrap(value, scriptService);
|
||||
} else {
|
||||
Object value = config.remove("value");
|
||||
if (value != null) {
|
||||
throw newConfigurationException(TYPE, processorTag, "copy_from",
|
||||
"cannot set both `copy_from` and `value` in the same processor");
|
||||
}
|
||||
}
|
||||
|
||||
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);
|
||||
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
|
||||
"field", field, scriptService);
|
||||
boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false);
|
||||
|
||||
return new SetProcessor(
|
||||
processorTag,
|
||||
description,
|
||||
compiledTemplate,
|
||||
ValueSource.wrap(value, scriptService),
|
||||
valueSource,
|
||||
copyFrom,
|
||||
overrideEnabled,
|
||||
ignoreEmptyValue);
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
|||
ForEachProcessor processor = new ForEachProcessor(
|
||||
"_tag", null, "values", new SetProcessor("_tag",
|
||||
null, new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
|
||||
(model) -> model.get("other")), false);
|
||||
(model) -> model.get("other"), null), false);
|
||||
processor.execute(ingestDocument, (result, e) -> {});
|
||||
|
||||
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
|
||||
|
|
|
@ -112,4 +112,25 @@ public class SetProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(exception.getMetadata("es.processor_tag").get(0), equalTo(processorTag));
|
||||
}
|
||||
|
||||
public void testCreateWithCopyFrom() throws Exception {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
config.put("copy_from", "field2");
|
||||
String processorTag = randomAlphaOfLength(10);
|
||||
SetProcessor setProcessor = factory.create(null, processorTag, null, config);
|
||||
assertThat(setProcessor.getTag(), equalTo(processorTag));
|
||||
assertThat(setProcessor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
|
||||
assertThat(setProcessor.getCopyFrom(), equalTo("field2"));
|
||||
}
|
||||
|
||||
public void testCreateWithCopyFromAndValue() throws Exception {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
config.put("copy_from", "field2");
|
||||
config.put("value", "value1");
|
||||
String processorTag = randomAlphaOfLength(10);
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> factory.create(null, processorTag, null, config));
|
||||
assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -38,7 +39,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -50,7 +51,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -59,7 +60,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
public void testSetFieldsTypeMismatch() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
ingestDocument.setFieldValue("field", "value");
|
||||
Processor processor = createSetProcessor("field.inner", "value", true, false);
|
||||
Processor processor = createSetProcessor("field.inner", "value", null, true, false);
|
||||
try {
|
||||
processor.execute(ingestDocument);
|
||||
fail("processor execute should have failed");
|
||||
|
@ -73,7 +74,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
String fieldName = RandomDocumentPicks.randomFieldName(random());
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, false, false);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue, null, false, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -83,7 +84,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
Object fieldValue = "foo";
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
|
||||
Processor processor = createSetProcessor(fieldName, "bar", false, false);
|
||||
Processor processor = createSetProcessor(fieldName, "bar", null, false, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -94,7 +95,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
Object fieldValue = null;
|
||||
Object newValue = "bar";
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
|
||||
Processor processor = createSetProcessor(fieldName, newValue, false, false);
|
||||
Processor processor = createSetProcessor(fieldName, newValue, null, false, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
|
||||
|
@ -102,7 +103,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
|
||||
public void testSetMetadataExceptVersion() throws Exception {
|
||||
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING);
|
||||
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false);
|
||||
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", null, true, false);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value"));
|
||||
|
@ -110,7 +111,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
|
||||
public void testSetMetadataVersion() throws Exception {
|
||||
long version = randomNonNegativeLong();
|
||||
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false);
|
||||
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, null, true, false);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
|
||||
|
@ -118,7 +119,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
|
||||
public void testSetMetadataVersionType() throws Exception {
|
||||
String versionType = randomFrom("internal", "external", "external_gte");
|
||||
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false);
|
||||
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, null, true, false);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
|
||||
|
@ -126,7 +127,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
|
||||
public void testSetMetadataIfSeqNo() throws Exception {
|
||||
long ifSeqNo = randomNonNegativeLong();
|
||||
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false);
|
||||
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, null, true, false);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
|
||||
|
@ -134,14 +135,29 @@ public class SetProcessorTests extends ESTestCase {
|
|||
|
||||
public void testSetMetadataIfPrimaryTerm() throws Exception {
|
||||
long ifPrimaryTerm = randomNonNegativeLong();
|
||||
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false);
|
||||
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, null, true, false);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
|
||||
}
|
||||
|
||||
private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) {
|
||||
public void testCopyFromOtherField() throws Exception {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
document.put("field", fieldValue);
|
||||
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
|
||||
|
||||
Processor processor = createSetProcessor(fieldName, null, "field", true, false);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
}
|
||||
|
||||
private static Processor createSetProcessor(String fieldName, Object fieldValue, String copyFrom, boolean overrideEnabled,
|
||||
boolean ignoreEmptyValue) {
|
||||
return new SetProcessor(randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
|
||||
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled, ignoreEmptyValue);
|
||||
ValueSource.wrap(fieldValue, TestTemplateService.instance()), copyFrom, overrideEnabled, ignoreEmptyValue);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,3 +101,79 @@ teardown:
|
|||
pipeline: 1
|
||||
require_alias: true
|
||||
body: { foo: bar }
|
||||
|
||||
---
|
||||
"Test set processor with copy_from":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "1"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "copied_foo_object",
|
||||
"copy_from" : "foo_object"
|
||||
}
|
||||
},
|
||||
{
|
||||
"set" : {
|
||||
"field": "copied_foo_array",
|
||||
"copy_from" : "foo_array"
|
||||
}
|
||||
},
|
||||
{
|
||||
"set" : {
|
||||
"field": "copied_foo_string",
|
||||
"copy_from" : "foo_string"
|
||||
}
|
||||
},
|
||||
{
|
||||
"set" : {
|
||||
"field": "copied_foo_number",
|
||||
"copy_from" : "foo_number"
|
||||
}
|
||||
},
|
||||
{
|
||||
"set" : {
|
||||
"field": "copied_foo_boolean",
|
||||
"copy_from" : "foo_boolean"
|
||||
}
|
||||
},
|
||||
{
|
||||
"set" : {
|
||||
"field": "foo_nochange",
|
||||
"copy_from" : "foo_none",
|
||||
"ignore_empty_value" : true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: 1
|
||||
pipeline: "1"
|
||||
body: {
|
||||
foo_object: {
|
||||
"hello": "world"
|
||||
},
|
||||
foo_array: [1, 2, 3],
|
||||
foo_string: "bla bla",
|
||||
foo_number: 3,
|
||||
foo_boolean: true,
|
||||
foo_nochange: "no change"
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
id: 1
|
||||
- match: { _source.copied_foo_object.hello: "world" }
|
||||
- match: { _source.copied_foo_array.0: 1 }
|
||||
- match: { _source.copied_foo_string: "bla bla" }
|
||||
- match: { _source.copied_foo_number: 3 }
|
||||
- is_true: _source.copied_foo_boolean
|
||||
- match: { _source.foo_nochange: "no change" }
|
||||
|
|
|
@ -488,6 +488,33 @@ public final class IngestDocument {
|
|||
setFieldValue(fieldPathTemplate.newInstance(model).execute(), value, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the provided value to the provided path in the document.
|
||||
* Any non existing path element will be created. If the last element is a list,
|
||||
* the value will replace the existing list.
|
||||
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
|
||||
* @param value The value to put in for the path key
|
||||
* @param ignoreEmptyValue The flag to determine whether to exit quietly when the value produced by TemplatedValue is null or empty
|
||||
* @throws IllegalArgumentException if the path is null, empty, invalid or if the value cannot be set to the
|
||||
* item identified by the provided path.
|
||||
*/
|
||||
public void setFieldValue(TemplateScript.Factory fieldPathTemplate, Object value, boolean ignoreEmptyValue) {
|
||||
Map<String, Object> model = createTemplateModel();
|
||||
if (ignoreEmptyValue) {
|
||||
if (value == null) {
|
||||
return;
|
||||
}
|
||||
if (value instanceof String){
|
||||
String valueStr = (String) value;
|
||||
if (valueStr.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setFieldValue(fieldPathTemplate.newInstance(model).execute(), value, false);
|
||||
}
|
||||
|
||||
private void setFieldValue(String path, Object value, boolean append) {
|
||||
setFieldValue(path, value, append, true);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue