From ea4ad5e7de9ca261dfd71f942d0970db0ce7424b Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 13 Jun 2024 10:32:46 -0700 Subject: [PATCH] [logstash bridge]: stableapi bootstrap (#108171) * [logstash bridge]: stableapi bootstrap Adds a new `logstash-bridge` project in `/libs` that exports api-stable wrappers for the elasticsearch-internal types that Logstash's Elastic Integration Filter relies on to provide ingest pipeline execution inside of Logstash. These bridge classes prevent Elasticsearch-internal refactorings from breaking the Logstash-owned project. * Update docs/changelog/108171.yaml * rename StableAPI -> StableBridgeAPI --- .github/CODEOWNERS | 3 + docs/changelog/108171.yaml | 5 + libs/logstash-bridge/README.md | 8 + libs/logstash-bridge/build.gradle | 24 +++ .../src/main/java/module-info.java | 27 ++++ .../logstashbridge/StableBridgeAPI.java | 60 +++++++ .../logstashbridge/common/SettingsBridge.java | 50 ++++++ .../logstashbridge/core/IOUtilsBridge.java | 22 +++ .../logstashbridge/env/EnvironmentBridge.java | 33 ++++ .../ingest/ConfigurationUtilsBridge.java | 47 ++++++ .../ingest/IngestDocumentBridge.java | 91 +++++++++++ .../logstashbridge/ingest/PipelineBridge.java | 47 ++++++ .../ingest/PipelineConfigurationBridge.java | 54 +++++++ .../ingest/ProcessorBridge.java | 150 ++++++++++++++++++ .../plugins/IngestPluginBridge.java | 47 ++++++ .../logstashbridge/script/MetadataBridge.java | 63 ++++++++ .../script/ScriptServiceBridge.java | 68 ++++++++ .../script/TemplateScriptBridge.java | 28 ++++ .../threadpool/ThreadPoolBridge.java | 38 +++++ 19 files changed, 865 insertions(+) create mode 100644 docs/changelog/108171.yaml create mode 100644 libs/logstash-bridge/README.md create mode 100644 libs/logstash-bridge/build.gradle create mode 100644 libs/logstash-bridge/src/main/java/module-info.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e1b476b65726..0f7e3073ed02 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -20,6 +20,9 @@ x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monito x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet @elastic/fleet x-pack/plugin/core/src/main/resources/fleet-* @elastic/fleet +# Logstash +libs/logstash-bridge @elastic/logstash + # Kibana Security x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security diff --git a/docs/changelog/108171.yaml b/docs/changelog/108171.yaml new file mode 100644 index 000000000000..1ec17bb3e411 --- /dev/null +++ b/docs/changelog/108171.yaml @@ -0,0 +1,5 @@ +pr: 108171 +summary: "add Elastic-internal stable bridge api for use by Logstash" +area: Infra/Core +type: enhancement +issues: [] diff --git a/libs/logstash-bridge/README.md b/libs/logstash-bridge/README.md new file mode 100644 index 000000000000..dd629724878b --- /dev/null +++ b/libs/logstash-bridge/README.md @@ -0,0 +1,8 @@ +## Logstash Bridge + +This package contains bridge functionality to ensure that Logstash's Elastic Integration plugin +has access to the minimal subset of Elasticsearch to perform its functions without relying on +other Elasticsearch internals. + +If a change is introduced in a separate Elasticsearch project that causes this project to fail, +please consult with members of @elastic/logstash to chart a path forward. diff --git a/libs/logstash-bridge/build.gradle b/libs/logstash-bridge/build.gradle new file mode 100644 index 000000000000..28fd6149fd7d --- /dev/null +++ b/libs/logstash-bridge/build.gradle @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +apply plugin: 'elasticsearch.build' + +dependencies { + compileOnly project(':server') + compileOnly project(':libs:elasticsearch-core') + compileOnly project(':libs:elasticsearch-plugin-api') + compileOnly project(':libs:elasticsearch-x-content') + compileOnly project(':modules:lang-painless') + compileOnly project(':modules:lang-painless:spi') + compileOnly project(':modules:lang-mustache') + compileOnly project(':modules:ingest-common') +// compileOnly project(':modules:ingest-geoip') +} + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} diff --git a/libs/logstash-bridge/src/main/java/module-info.java b/libs/logstash-bridge/src/main/java/module-info.java new file mode 100644 index 000000000000..49b0e13c14cd --- /dev/null +++ b/libs/logstash-bridge/src/main/java/module-info.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +/** Elasticsearch Logstash Bridge. */ +module org.elasticsearch.logstashbridge { + requires org.elasticsearch.base; + requires org.elasticsearch.grok; + requires org.elasticsearch.server; + requires org.elasticsearch.painless; + requires org.elasticsearch.painless.spi; + requires org.elasticsearch.mustache; + requires org.elasticsearch.xcontent; + + exports org.elasticsearch.logstashbridge; + exports org.elasticsearch.logstashbridge.common; + exports org.elasticsearch.logstashbridge.core; + exports org.elasticsearch.logstashbridge.env; + exports org.elasticsearch.logstashbridge.ingest; + exports org.elasticsearch.logstashbridge.plugins; + exports org.elasticsearch.logstashbridge.script; + exports org.elasticsearch.logstashbridge.threadpool; +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java new file mode 100644 index 000000000000..cdf2ab4ee7be --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A {@code StableBridgeAPI} is the stable bridge to an Elasticsearch API, and can produce instances + * from the actual API that they mirror. As part of the LogstashBridge project, these classes are relied + * upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes mut not change + * without coordination with the maintainers of that project. + * + * @param the actual type of the Elasticsearch API being mirrored + */ +public interface StableBridgeAPI { + T unwrap(); + + static T unwrapNullable(final StableBridgeAPI nullableStableBridgeAPI) { + if (Objects.isNull(nullableStableBridgeAPI)) { + return null; + } + return nullableStableBridgeAPI.unwrap(); + } + + static Map unwrap(final Map> bridgeMap) { + return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().unwrap())); + } + + static > Map wrap(final Map rawMap, final Function wrapFunction) { + return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> wrapFunction.apply(e.getValue()))); + } + + static > B wrap(final T delegate, final Function wrapFunction) { + if (Objects.isNull(delegate)) { + return null; + } + return wrapFunction.apply(delegate); + } + + abstract class Proxy implements StableBridgeAPI { + protected final T delegate; + + protected Proxy(final T delegate) { + this.delegate = delegate; + } + + @Override + public T unwrap() { + return delegate; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java new file mode 100644 index 000000000000..86fd0fcf7565 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.common; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public class SettingsBridge extends StableBridgeAPI.Proxy { + + public static SettingsBridge wrap(final Settings delegate) { + return new SettingsBridge(delegate); + } + + public static Builder builder() { + return Builder.wrap(Settings.builder()); + } + + public SettingsBridge(final Settings delegate) { + super(delegate); + } + + @Override + public Settings unwrap() { + return this.delegate; + } + + public static class Builder extends StableBridgeAPI.Proxy { + static Builder wrap(final Settings.Builder delegate) { + return new Builder(delegate); + } + + private Builder(final Settings.Builder delegate) { + super(delegate); + } + + public Builder put(final String key, final String value) { + this.delegate.put(key, value); + return this; + } + + public SettingsBridge build() { + return new SettingsBridge(this.delegate.build()); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java new file mode 100644 index 000000000000..810c671e5b8e --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.core.IOUtils; + +import java.io.Closeable; + +public class IOUtilsBridge { + public static void closeWhileHandlingException(final Iterable objects) { + IOUtils.closeWhileHandlingException(objects); + } + + public static void closeWhileHandlingException(final Closeable closeable) { + IOUtils.closeWhileHandlingException(closeable); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java new file mode 100644 index 000000000000..8ae3ce2d33d2 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.env; + +import org.elasticsearch.env.Environment; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.SettingsBridge; + +import java.nio.file.Path; + +public class EnvironmentBridge extends StableBridgeAPI.Proxy { + public static EnvironmentBridge wrap(final Environment delegate) { + return new EnvironmentBridge(delegate); + } + + public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) { + this(new Environment(settingsBridge.unwrap(), configPath)); + } + + private EnvironmentBridge(final Environment delegate) { + super(delegate); + } + + @Override + public Environment unwrap() { + return this.delegate; + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java new file mode 100644 index 000000000000..2d7f5c27b16e --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; + +import java.util.Map; + +public class ConfigurationUtilsBridge { + public static TemplateScriptBridge.Factory compileTemplate( + final String processorType, + final String processorTag, + final String propertyName, + final String propertyValue, + final ScriptServiceBridge scriptServiceBridge + ) { + return new TemplateScriptBridge.Factory( + ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.unwrap()) + ); + } + + public static String readStringProperty( + final String processorType, + final String processorTag, + final Map configuration, + final String propertyName + ) { + return ConfigurationUtils.readStringProperty(processorType, processorTag, configuration, propertyName); + } + + public static Boolean readBooleanProperty( + final String processorType, + final String processorTag, + final Map configuration, + final String propertyName, + final boolean defaultValue + ) { + return ConfigurationUtils.readBooleanProperty(processorType, processorTag, configuration, propertyName, defaultValue); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java new file mode 100644 index 000000000000..513503448539 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.LogstashInternalBridge; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.script.MetadataBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; + +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +public class IngestDocumentBridge extends StableBridgeAPI.Proxy { + + public static String INGEST_KEY = IngestDocument.INGEST_KEY; + + public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) { + if (ingestDocument == null) { + return null; + } + return new IngestDocumentBridge(ingestDocument); + } + + public IngestDocumentBridge(final Map sourceAndMetadata, final Map ingestMetadata) { + this(new IngestDocument(sourceAndMetadata, ingestMetadata)); + } + + private IngestDocumentBridge(IngestDocument inner) { + super(inner); + } + + public MetadataBridge getMetadata() { + return new MetadataBridge(delegate.getMetadata()); + } + + public Map getSource() { + return delegate.getSource(); + } + + public boolean updateIndexHistory(final String index) { + return delegate.updateIndexHistory(index); + } + + public Set getIndexHistory() { + return Set.copyOf(delegate.getIndexHistory()); + } + + public boolean isReroute() { + return LogstashInternalBridge.isReroute(delegate); + } + + public void resetReroute() { + LogstashInternalBridge.resetReroute(delegate); + } + + public Map getIngestMetadata() { + return Map.copyOf(delegate.getIngestMetadata()); + } + + public T getFieldValue(final String fieldName, final Class type) { + return delegate.getFieldValue(fieldName, type); + } + + public T getFieldValue(final String fieldName, final Class type, final boolean ignoreMissing) { + return delegate.getFieldValue(fieldName, type, ignoreMissing); + } + + public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) { + return delegate.renderTemplate(templateScriptFactory.unwrap()); + } + + public void setFieldValue(final String path, final Object value) { + delegate.setFieldValue(path, value); + } + + public void removeField(final String path) { + delegate.removeField(path); + } + + // public void executePipeline(Pipeline pipeline, BiConsumer handler) { + public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer handler) { + this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e)); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java new file mode 100644 index 000000000000..835e377c71b3 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; + +import java.util.Map; +import java.util.function.BiConsumer; + +public class PipelineBridge extends StableBridgeAPI.Proxy { + public static PipelineBridge wrap(final Pipeline pipeline) { + return new PipelineBridge(pipeline); + } + + public static PipelineBridge create( + String id, + Map config, + Map processorFactories, + ScriptServiceBridge scriptServiceBridge + ) throws Exception { + return wrap( + Pipeline.create(id, config, StableBridgeAPI.unwrap(processorFactories), StableBridgeAPI.unwrapNullable(scriptServiceBridge)) + ); + } + + public PipelineBridge(final Pipeline delegate) { + super(delegate); + } + + public String getId() { + return delegate.getId(); + } + + public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) { + this.delegate.execute( + StableBridgeAPI.unwrapNullable(ingestDocumentBridge), + (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e) + ); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java new file mode 100644 index 000000000000..d2aff89d1f23 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.xcontent.XContentType; + +import java.util.Map; + +public class PipelineConfigurationBridge extends StableBridgeAPI.Proxy { + public PipelineConfigurationBridge(final PipelineConfiguration delegate) { + super(delegate); + } + + public PipelineConfigurationBridge(final String pipelineId, final String jsonEncodedConfig) { + this(new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON)); + } + + public String getId() { + return delegate.getId(); + } + + public Map getConfigAsMap() { + return delegate.getConfigAsMap(); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public String toString() { + return delegate.toString(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof PipelineConfigurationBridge other) { + return delegate.equals(other.delegate); + } else { + return false; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java new file mode 100644 index 000000000000..7b88b12eb3c1 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.env.EnvironmentBridge; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; +import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; + +import java.util.Map; +import java.util.function.BiConsumer; + +public interface ProcessorBridge extends StableBridgeAPI { + String getType(); + + String getTag(); + + String getDescription(); + + boolean isAsync(); + + void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer handler) throws Exception; + + static ProcessorBridge wrap(final Processor delegate) { + return new Wrapped(delegate); + } + + class Wrapped extends StableBridgeAPI.Proxy implements ProcessorBridge { + public Wrapped(final Processor delegate) { + super(delegate); + } + + @Override + public String getType() { + return unwrap().getType(); + } + + @Override + public String getTag() { + return unwrap().getTag(); + } + + @Override + public String getDescription() { + return unwrap().getDescription(); + } + + @Override + public boolean isAsync() { + return unwrap().isAsync(); + } + + @Override + public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) + throws Exception { + delegate.execute( + StableBridgeAPI.unwrapNullable(ingestDocumentBridge), + (id, e) -> handler.accept(IngestDocumentBridge.wrap(id), e) + ); + } + } + + class Parameters extends StableBridgeAPI.Proxy { + + public Parameters( + final EnvironmentBridge environmentBridge, + final ScriptServiceBridge scriptServiceBridge, + final ThreadPoolBridge threadPoolBridge + ) { + this( + new Processor.Parameters( + environmentBridge.unwrap(), + scriptServiceBridge.unwrap(), + null, + threadPoolBridge.unwrap().getThreadContext(), + threadPoolBridge.unwrap()::relativeTimeInMillis, + (delay, command) -> threadPoolBridge.unwrap() + .schedule(command, TimeValue.timeValueMillis(delay), threadPoolBridge.unwrap().generic()), + null, + null, + threadPoolBridge.unwrap().generic()::execute, + IngestService.createGrokThreadWatchdog(environmentBridge.unwrap(), threadPoolBridge.unwrap()) + ) + ); + } + + private Parameters(final Processor.Parameters delegate) { + super(delegate); + } + + @Override + public Processor.Parameters unwrap() { + return this.delegate; + } + } + + interface Factory extends StableBridgeAPI { + ProcessorBridge create( + Map registry, + String processorTag, + String description, + Map config + ) throws Exception; + + static Factory wrap(final Processor.Factory delegate) { + return new Wrapped(delegate); + } + + @Override + default Processor.Factory unwrap() { + final Factory stableAPIFactory = this; + return (registry, tag, description, config) -> stableAPIFactory.create( + StableBridgeAPI.wrap(registry, Factory::wrap), + tag, + description, + config + ).unwrap(); + } + + class Wrapped extends StableBridgeAPI.Proxy implements Factory { + private Wrapped(final Processor.Factory delegate) { + super(delegate); + } + + @Override + public ProcessorBridge create( + final Map registry, + final String processorTag, + final String description, + final Map config + ) throws Exception { + return ProcessorBridge.wrap(this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config)); + } + + @Override + public Processor.Factory unwrap() { + return this.delegate; + } + } + } + +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java new file mode 100644 index 000000000000..a27eaa9063dd --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.plugins; + +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.plugins.IngestPlugin; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +public interface IngestPluginBridge { + Map getProcessors(ProcessorBridge.Parameters parameters); + + static Wrapped wrap(final IngestPlugin delegate) { + return new Wrapped(delegate); + } + + class Wrapped extends StableBridgeAPI.Proxy implements IngestPluginBridge, Closeable { + + private Wrapped(final IngestPlugin delegate) { + super(delegate); + } + + public Map getProcessors(final ProcessorBridge.Parameters parameters) { + return StableBridgeAPI.wrap(this.delegate.getProcessors(parameters.unwrap()), ProcessorBridge.Factory::wrap); + } + + @Override + public IngestPlugin unwrap() { + return this.delegate; + } + + @Override + public void close() throws IOException { + if (this.delegate instanceof Closeable closeableDelegate) { + closeableDelegate.close(); + } + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java new file mode 100644 index 000000000000..4f0a712ca350 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.script; + +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.script.Metadata; + +import java.time.ZonedDateTime; + +public class MetadataBridge extends StableBridgeAPI.Proxy { + public MetadataBridge(final Metadata delegate) { + super(delegate); + } + + public String getIndex() { + return delegate.getIndex(); + } + + public void setIndex(final String index) { + delegate.setIndex(index); + } + + public String getId() { + return delegate.getId(); + } + + public void setId(final String id) { + delegate.setId(id); + } + + public long getVersion() { + return delegate.getVersion(); + } + + public void setVersion(final long version) { + delegate.setVersion(version); + } + + public String getVersionType() { + return delegate.getVersionType(); + } + + public void setVersionType(final String versionType) { + delegate.setVersionType(versionType); + } + + public String getRouting() { + return delegate.getRouting(); + } + + public void setRouting(final String routing) { + delegate.setRouting(routing); + } + + public ZonedDateTime getNow() { + return delegate.getNow(); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java new file mode 100644 index 000000000000..ec5af0f7020a --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.script; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.SettingsBridge; +import org.elasticsearch.painless.PainlessPlugin; +import org.elasticsearch.painless.PainlessScriptEngine; +import org.elasticsearch.painless.spi.Whitelist; +import org.elasticsearch.script.IngestConditionalScript; +import org.elasticsearch.script.IngestScript; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.mustache.MustacheScriptEngine; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; + +public class ScriptServiceBridge extends StableBridgeAPI.Proxy implements Closeable { + public ScriptServiceBridge wrap(final ScriptService delegate) { + return new ScriptServiceBridge(delegate); + } + + public ScriptServiceBridge(final SettingsBridge settingsBridge, final LongSupplier timeProvider) { + super(getScriptService(settingsBridge.unwrap(), timeProvider)); + } + + public ScriptServiceBridge(ScriptService delegate) { + super(delegate); + } + + private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) { + final List painlessBaseWhitelist = getPainlessBaseWhiteList(); + final Map, List> scriptContexts = Map.of( + IngestScript.CONTEXT, + painlessBaseWhitelist, + IngestConditionalScript.CONTEXT, + painlessBaseWhitelist + ); + final Map scriptEngines = Map.of( + PainlessScriptEngine.NAME, + new PainlessScriptEngine(settings, scriptContexts), + MustacheScriptEngine.NAME, + new MustacheScriptEngine() + ); + return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider); + } + + private static List getPainlessBaseWhiteList() { + return PainlessPlugin.baseWhiteList(); + } + + @Override + public void close() throws IOException { + this.delegate.close(); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java new file mode 100644 index 000000000000..715b357a4ee7 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.script; + +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.script.TemplateScript; + +public class TemplateScriptBridge { + public static class Factory extends StableBridgeAPI.Proxy { + public static Factory wrap(final TemplateScript.Factory delegate) { + return new Factory(delegate); + } + + public Factory(final TemplateScript.Factory delegate) { + super(delegate); + } + + @Override + public TemplateScript.Factory unwrap() { + return this.delegate; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java new file mode 100644 index 000000000000..13218a9b206a --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.logstashbridge.threadpool; + +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.SettingsBridge; +import org.elasticsearch.telemetry.metric.MeterRegistry; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; + +public class ThreadPoolBridge extends StableBridgeAPI.Proxy { + + public ThreadPoolBridge(final SettingsBridge settingsBridge) { + this(new ThreadPool(settingsBridge.unwrap(), MeterRegistry.NOOP)); + } + + public ThreadPoolBridge(final ThreadPool delegate) { + super(delegate); + } + + public static boolean terminate(final ThreadPoolBridge pool, final long timeout, final TimeUnit timeUnit) { + return ThreadPool.terminate(pool.unwrap(), timeout, timeUnit); + } + + public long relativeTimeInMillis() { + return delegate.relativeTimeInMillis(); + } + + public long absoluteTimeInMillis() { + return delegate.absoluteTimeInMillis(); + } +}