mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
[logstash bridge]: stableapi bootstrap (#108171)
* [logstash bridge]: stableapi bootstrap Adds a new `logstash-bridge` project in `/libs` that exports api-stable wrappers for the elasticsearch-internal types that Logstash's Elastic Integration Filter relies on to provide ingest pipeline execution inside of Logstash. These bridge classes prevent Elasticsearch-internal refactorings from breaking the Logstash-owned project. * Update docs/changelog/108171.yaml * rename StableAPI -> StableBridgeAPI
This commit is contained in:
parent
fd7a184330
commit
ea4ad5e7de
19 changed files with 865 additions and 0 deletions
3
.github/CODEOWNERS
vendored
3
.github/CODEOWNERS
vendored
|
@ -20,6 +20,9 @@ x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monito
|
||||||
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet @elastic/fleet
|
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet @elastic/fleet
|
||||||
x-pack/plugin/core/src/main/resources/fleet-* @elastic/fleet
|
x-pack/plugin/core/src/main/resources/fleet-* @elastic/fleet
|
||||||
|
|
||||||
|
# Logstash
|
||||||
|
libs/logstash-bridge @elastic/logstash
|
||||||
|
|
||||||
# Kibana Security
|
# Kibana Security
|
||||||
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security
|
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security
|
||||||
|
|
||||||
|
|
5
docs/changelog/108171.yaml
Normal file
5
docs/changelog/108171.yaml
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
pr: 108171
|
||||||
|
summary: "add Elastic-internal stable bridge api for use by Logstash"
|
||||||
|
area: Infra/Core
|
||||||
|
type: enhancement
|
||||||
|
issues: []
|
8
libs/logstash-bridge/README.md
Normal file
8
libs/logstash-bridge/README.md
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
## Logstash Bridge
|
||||||
|
|
||||||
|
This package contains bridge functionality to ensure that Logstash's Elastic Integration plugin
|
||||||
|
has access to the minimal subset of Elasticsearch to perform its functions without relying on
|
||||||
|
other Elasticsearch internals.
|
||||||
|
|
||||||
|
If a change is introduced in a separate Elasticsearch project that causes this project to fail,
|
||||||
|
please consult with members of @elastic/logstash to chart a path forward.
|
24
libs/logstash-bridge/build.gradle
Normal file
24
libs/logstash-bridge/build.gradle
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
apply plugin: 'elasticsearch.build'
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
compileOnly project(':server')
|
||||||
|
compileOnly project(':libs:elasticsearch-core')
|
||||||
|
compileOnly project(':libs:elasticsearch-plugin-api')
|
||||||
|
compileOnly project(':libs:elasticsearch-x-content')
|
||||||
|
compileOnly project(':modules:lang-painless')
|
||||||
|
compileOnly project(':modules:lang-painless:spi')
|
||||||
|
compileOnly project(':modules:lang-mustache')
|
||||||
|
compileOnly project(':modules:ingest-common')
|
||||||
|
// compileOnly project(':modules:ingest-geoip')
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks.named('forbiddenApisMain').configure {
|
||||||
|
replaceSignatureFiles 'jdk-signatures'
|
||||||
|
}
|
27
libs/logstash-bridge/src/main/java/module-info.java
Normal file
27
libs/logstash-bridge/src/main/java/module-info.java
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Elasticsearch Logstash Bridge. */
|
||||||
|
module org.elasticsearch.logstashbridge {
|
||||||
|
requires org.elasticsearch.base;
|
||||||
|
requires org.elasticsearch.grok;
|
||||||
|
requires org.elasticsearch.server;
|
||||||
|
requires org.elasticsearch.painless;
|
||||||
|
requires org.elasticsearch.painless.spi;
|
||||||
|
requires org.elasticsearch.mustache;
|
||||||
|
requires org.elasticsearch.xcontent;
|
||||||
|
|
||||||
|
exports org.elasticsearch.logstashbridge;
|
||||||
|
exports org.elasticsearch.logstashbridge.common;
|
||||||
|
exports org.elasticsearch.logstashbridge.core;
|
||||||
|
exports org.elasticsearch.logstashbridge.env;
|
||||||
|
exports org.elasticsearch.logstashbridge.ingest;
|
||||||
|
exports org.elasticsearch.logstashbridge.plugins;
|
||||||
|
exports org.elasticsearch.logstashbridge.script;
|
||||||
|
exports org.elasticsearch.logstashbridge.threadpool;
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@code StableBridgeAPI} is the stable bridge to an Elasticsearch API, and can produce instances
|
||||||
|
* from the actual API that they mirror. As part of the LogstashBridge project, these classes are relied
|
||||||
|
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes mut not change
|
||||||
|
* without coordination with the maintainers of that project.
|
||||||
|
*
|
||||||
|
* @param <T> the actual type of the Elasticsearch API being mirrored
|
||||||
|
*/
|
||||||
|
public interface StableBridgeAPI<T> {
|
||||||
|
T unwrap();
|
||||||
|
|
||||||
|
static <T> T unwrapNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
|
||||||
|
if (Objects.isNull(nullableStableBridgeAPI)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return nullableStableBridgeAPI.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
static <K, T> Map<K, T> unwrap(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
|
||||||
|
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().unwrap()));
|
||||||
|
}
|
||||||
|
|
||||||
|
static <K, T, B extends StableBridgeAPI<T>> Map<K, B> wrap(final Map<K, T> rawMap, final Function<T, B> wrapFunction) {
|
||||||
|
return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> wrapFunction.apply(e.getValue())));
|
||||||
|
}
|
||||||
|
|
||||||
|
static <T, B extends StableBridgeAPI<T>> B wrap(final T delegate, final Function<T, B> wrapFunction) {
|
||||||
|
if (Objects.isNull(delegate)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return wrapFunction.apply(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class Proxy<T> implements StableBridgeAPI<T> {
|
||||||
|
protected final T delegate;
|
||||||
|
|
||||||
|
protected Proxy(final T delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T unwrap() {
|
||||||
|
return delegate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.common;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
|
||||||
|
public class SettingsBridge extends StableBridgeAPI.Proxy<Settings> {
|
||||||
|
|
||||||
|
public static SettingsBridge wrap(final Settings delegate) {
|
||||||
|
return new SettingsBridge(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Builder builder() {
|
||||||
|
return Builder.wrap(Settings.builder());
|
||||||
|
}
|
||||||
|
|
||||||
|
public SettingsBridge(final Settings delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Settings unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Builder extends StableBridgeAPI.Proxy<Settings.Builder> {
|
||||||
|
static Builder wrap(final Settings.Builder delegate) {
|
||||||
|
return new Builder(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Builder(final Settings.Builder delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder put(final String key, final String value) {
|
||||||
|
this.delegate.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SettingsBridge build() {
|
||||||
|
return new SettingsBridge(this.delegate.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.core;
|
||||||
|
|
||||||
|
import org.elasticsearch.core.IOUtils;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
|
public class IOUtilsBridge {
|
||||||
|
public static void closeWhileHandlingException(final Iterable<? extends Closeable> objects) {
|
||||||
|
IOUtils.closeWhileHandlingException(objects);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void closeWhileHandlingException(final Closeable closeable) {
|
||||||
|
IOUtils.closeWhileHandlingException(closeable);
|
||||||
|
}
|
||||||
|
}
|
33
libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java
vendored
Normal file
33
libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java
vendored
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.env;
|
||||||
|
|
||||||
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.common.SettingsBridge;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
public class EnvironmentBridge extends StableBridgeAPI.Proxy<Environment> {
|
||||||
|
public static EnvironmentBridge wrap(final Environment delegate) {
|
||||||
|
return new EnvironmentBridge(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) {
|
||||||
|
this(new Environment(settingsBridge.unwrap(), configPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
private EnvironmentBridge(final Environment delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Environment unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
|
||||||
|
import org.elasticsearch.logstashbridge.script.TemplateScriptBridge;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ConfigurationUtilsBridge {
|
||||||
|
public static TemplateScriptBridge.Factory compileTemplate(
|
||||||
|
final String processorType,
|
||||||
|
final String processorTag,
|
||||||
|
final String propertyName,
|
||||||
|
final String propertyValue,
|
||||||
|
final ScriptServiceBridge scriptServiceBridge
|
||||||
|
) {
|
||||||
|
return new TemplateScriptBridge.Factory(
|
||||||
|
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.unwrap())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String readStringProperty(
|
||||||
|
final String processorType,
|
||||||
|
final String processorTag,
|
||||||
|
final Map<String, Object> configuration,
|
||||||
|
final String propertyName
|
||||||
|
) {
|
||||||
|
return ConfigurationUtils.readStringProperty(processorType, processorTag, configuration, propertyName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Boolean readBooleanProperty(
|
||||||
|
final String processorType,
|
||||||
|
final String processorTag,
|
||||||
|
final Map<String, Object> configuration,
|
||||||
|
final String propertyName,
|
||||||
|
final boolean defaultValue
|
||||||
|
) {
|
||||||
|
return ConfigurationUtils.readBooleanProperty(processorType, processorTag, configuration, propertyName, defaultValue);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.LogstashInternalBridge;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.script.MetadataBridge;
|
||||||
|
import org.elasticsearch.logstashbridge.script.TemplateScriptBridge;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
|
public class IngestDocumentBridge extends StableBridgeAPI.Proxy<IngestDocument> {
|
||||||
|
|
||||||
|
public static String INGEST_KEY = IngestDocument.INGEST_KEY;
|
||||||
|
|
||||||
|
public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) {
|
||||||
|
if (ingestDocument == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new IngestDocumentBridge(ingestDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IngestDocumentBridge(final Map<String, Object> sourceAndMetadata, final Map<String, Object> ingestMetadata) {
|
||||||
|
this(new IngestDocument(sourceAndMetadata, ingestMetadata));
|
||||||
|
}
|
||||||
|
|
||||||
|
private IngestDocumentBridge(IngestDocument inner) {
|
||||||
|
super(inner);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetadataBridge getMetadata() {
|
||||||
|
return new MetadataBridge(delegate.getMetadata());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getSource() {
|
||||||
|
return delegate.getSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean updateIndexHistory(final String index) {
|
||||||
|
return delegate.updateIndexHistory(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getIndexHistory() {
|
||||||
|
return Set.copyOf(delegate.getIndexHistory());
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReroute() {
|
||||||
|
return LogstashInternalBridge.isReroute(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetReroute() {
|
||||||
|
LogstashInternalBridge.resetReroute(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getIngestMetadata() {
|
||||||
|
return Map.copyOf(delegate.getIngestMetadata());
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T getFieldValue(final String fieldName, final Class<T> type) {
|
||||||
|
return delegate.getFieldValue(fieldName, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T getFieldValue(final String fieldName, final Class<T> type, final boolean ignoreMissing) {
|
||||||
|
return delegate.getFieldValue(fieldName, type, ignoreMissing);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) {
|
||||||
|
return delegate.renderTemplate(templateScriptFactory.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFieldValue(final String path, final Object value) {
|
||||||
|
delegate.setFieldValue(path, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeField(final String path) {
|
||||||
|
delegate.removeField(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
// public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
|
||||||
|
public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
|
||||||
|
this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
|
public class PipelineBridge extends StableBridgeAPI.Proxy<Pipeline> {
|
||||||
|
public static PipelineBridge wrap(final Pipeline pipeline) {
|
||||||
|
return new PipelineBridge(pipeline);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PipelineBridge create(
|
||||||
|
String id,
|
||||||
|
Map<String, Object> config,
|
||||||
|
Map<String, ProcessorBridge.Factory> processorFactories,
|
||||||
|
ScriptServiceBridge scriptServiceBridge
|
||||||
|
) throws Exception {
|
||||||
|
return wrap(
|
||||||
|
Pipeline.create(id, config, StableBridgeAPI.unwrap(processorFactories), StableBridgeAPI.unwrapNullable(scriptServiceBridge))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PipelineBridge(final Pipeline delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return delegate.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
|
||||||
|
this.delegate.execute(
|
||||||
|
StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
|
||||||
|
(unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.ingest.PipelineConfiguration;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.xcontent.XContentType;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class PipelineConfigurationBridge extends StableBridgeAPI.Proxy<PipelineConfiguration> {
|
||||||
|
public PipelineConfigurationBridge(final PipelineConfiguration delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PipelineConfigurationBridge(final String pipelineId, final String jsonEncodedConfig) {
|
||||||
|
this(new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return delegate.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getConfigAsMap() {
|
||||||
|
return delegate.getConfigAsMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return delegate.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return delegate.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object obj) {
|
||||||
|
if (this == obj) {
|
||||||
|
return true;
|
||||||
|
} else if (obj instanceof PipelineConfigurationBridge other) {
|
||||||
|
return delegate.equals(other.delegate);
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,150 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
|
import org.elasticsearch.ingest.IngestService;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.env.EnvironmentBridge;
|
||||||
|
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
|
||||||
|
import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
|
public interface ProcessorBridge extends StableBridgeAPI<Processor> {
|
||||||
|
String getType();
|
||||||
|
|
||||||
|
String getTag();
|
||||||
|
|
||||||
|
String getDescription();
|
||||||
|
|
||||||
|
boolean isAsync();
|
||||||
|
|
||||||
|
void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler) throws Exception;
|
||||||
|
|
||||||
|
static ProcessorBridge wrap(final Processor delegate) {
|
||||||
|
return new Wrapped(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
class Wrapped extends StableBridgeAPI.Proxy<Processor> implements ProcessorBridge {
|
||||||
|
public Wrapped(final Processor delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return unwrap().getType();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTag() {
|
||||||
|
return unwrap().getTag();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return unwrap().getDescription();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAsync() {
|
||||||
|
return unwrap().isAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler)
|
||||||
|
throws Exception {
|
||||||
|
delegate.execute(
|
||||||
|
StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
|
||||||
|
(id, e) -> handler.accept(IngestDocumentBridge.wrap(id), e)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Parameters extends StableBridgeAPI.Proxy<Processor.Parameters> {
|
||||||
|
|
||||||
|
public Parameters(
|
||||||
|
final EnvironmentBridge environmentBridge,
|
||||||
|
final ScriptServiceBridge scriptServiceBridge,
|
||||||
|
final ThreadPoolBridge threadPoolBridge
|
||||||
|
) {
|
||||||
|
this(
|
||||||
|
new Processor.Parameters(
|
||||||
|
environmentBridge.unwrap(),
|
||||||
|
scriptServiceBridge.unwrap(),
|
||||||
|
null,
|
||||||
|
threadPoolBridge.unwrap().getThreadContext(),
|
||||||
|
threadPoolBridge.unwrap()::relativeTimeInMillis,
|
||||||
|
(delay, command) -> threadPoolBridge.unwrap()
|
||||||
|
.schedule(command, TimeValue.timeValueMillis(delay), threadPoolBridge.unwrap().generic()),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
threadPoolBridge.unwrap().generic()::execute,
|
||||||
|
IngestService.createGrokThreadWatchdog(environmentBridge.unwrap(), threadPoolBridge.unwrap())
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Parameters(final Processor.Parameters delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Processor.Parameters unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Factory extends StableBridgeAPI<Processor.Factory> {
|
||||||
|
ProcessorBridge create(
|
||||||
|
Map<String, ProcessorBridge.Factory> registry,
|
||||||
|
String processorTag,
|
||||||
|
String description,
|
||||||
|
Map<String, Object> config
|
||||||
|
) throws Exception;
|
||||||
|
|
||||||
|
static Factory wrap(final Processor.Factory delegate) {
|
||||||
|
return new Wrapped(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default Processor.Factory unwrap() {
|
||||||
|
final Factory stableAPIFactory = this;
|
||||||
|
return (registry, tag, description, config) -> stableAPIFactory.create(
|
||||||
|
StableBridgeAPI.wrap(registry, Factory::wrap),
|
||||||
|
tag,
|
||||||
|
description,
|
||||||
|
config
|
||||||
|
).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
class Wrapped extends StableBridgeAPI.Proxy<Processor.Factory> implements Factory {
|
||||||
|
private Wrapped(final Processor.Factory delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProcessorBridge create(
|
||||||
|
final Map<String, Factory> registry,
|
||||||
|
final String processorTag,
|
||||||
|
final String description,
|
||||||
|
final Map<String, Object> config
|
||||||
|
) throws Exception {
|
||||||
|
return ProcessorBridge.wrap(this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Processor.Factory unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.plugins;
|
||||||
|
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
|
||||||
|
import org.elasticsearch.plugins.IngestPlugin;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public interface IngestPluginBridge {
|
||||||
|
Map<String, ProcessorBridge.Factory> getProcessors(ProcessorBridge.Parameters parameters);
|
||||||
|
|
||||||
|
static Wrapped wrap(final IngestPlugin delegate) {
|
||||||
|
return new Wrapped(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
class Wrapped extends StableBridgeAPI.Proxy<IngestPlugin> implements IngestPluginBridge, Closeable {
|
||||||
|
|
||||||
|
private Wrapped(final IngestPlugin delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, ProcessorBridge.Factory> getProcessors(final ProcessorBridge.Parameters parameters) {
|
||||||
|
return StableBridgeAPI.wrap(this.delegate.getProcessors(parameters.unwrap()), ProcessorBridge.Factory::wrap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IngestPlugin unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (this.delegate instanceof Closeable closeableDelegate) {
|
||||||
|
closeableDelegate.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.script;
|
||||||
|
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.script.Metadata;
|
||||||
|
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
|
||||||
|
public class MetadataBridge extends StableBridgeAPI.Proxy<Metadata> {
|
||||||
|
public MetadataBridge(final Metadata delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getIndex() {
|
||||||
|
return delegate.getIndex();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIndex(final String index) {
|
||||||
|
delegate.setIndex(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return delegate.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(final String id) {
|
||||||
|
delegate.setId(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getVersion() {
|
||||||
|
return delegate.getVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVersion(final long version) {
|
||||||
|
delegate.setVersion(version);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getVersionType() {
|
||||||
|
return delegate.getVersionType();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVersionType(final String versionType) {
|
||||||
|
delegate.setVersionType(versionType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRouting() {
|
||||||
|
return delegate.getRouting();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRouting(final String routing) {
|
||||||
|
delegate.setRouting(routing);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ZonedDateTime getNow() {
|
||||||
|
return delegate.getNow();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.script;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.common.SettingsBridge;
|
||||||
|
import org.elasticsearch.painless.PainlessPlugin;
|
||||||
|
import org.elasticsearch.painless.PainlessScriptEngine;
|
||||||
|
import org.elasticsearch.painless.spi.Whitelist;
|
||||||
|
import org.elasticsearch.script.IngestConditionalScript;
|
||||||
|
import org.elasticsearch.script.IngestScript;
|
||||||
|
import org.elasticsearch.script.ScriptContext;
|
||||||
|
import org.elasticsearch.script.ScriptEngine;
|
||||||
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
import org.elasticsearch.script.mustache.MustacheScriptEngine;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
|
public class ScriptServiceBridge extends StableBridgeAPI.Proxy<ScriptService> implements Closeable {
|
||||||
|
public ScriptServiceBridge wrap(final ScriptService delegate) {
|
||||||
|
return new ScriptServiceBridge(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScriptServiceBridge(final SettingsBridge settingsBridge, final LongSupplier timeProvider) {
|
||||||
|
super(getScriptService(settingsBridge.unwrap(), timeProvider));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScriptServiceBridge(ScriptService delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) {
|
||||||
|
final List<Whitelist> painlessBaseWhitelist = getPainlessBaseWhiteList();
|
||||||
|
final Map<ScriptContext<?>, List<Whitelist>> scriptContexts = Map.of(
|
||||||
|
IngestScript.CONTEXT,
|
||||||
|
painlessBaseWhitelist,
|
||||||
|
IngestConditionalScript.CONTEXT,
|
||||||
|
painlessBaseWhitelist
|
||||||
|
);
|
||||||
|
final Map<String, ScriptEngine> scriptEngines = Map.of(
|
||||||
|
PainlessScriptEngine.NAME,
|
||||||
|
new PainlessScriptEngine(settings, scriptContexts),
|
||||||
|
MustacheScriptEngine.NAME,
|
||||||
|
new MustacheScriptEngine()
|
||||||
|
);
|
||||||
|
return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Whitelist> getPainlessBaseWhiteList() {
|
||||||
|
return PainlessPlugin.baseWhiteList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
this.delegate.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.script;
|
||||||
|
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.script.TemplateScript;
|
||||||
|
|
||||||
|
public class TemplateScriptBridge {
|
||||||
|
public static class Factory extends StableBridgeAPI.Proxy<TemplateScript.Factory> {
|
||||||
|
public static Factory wrap(final TemplateScript.Factory delegate) {
|
||||||
|
return new Factory(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Factory(final TemplateScript.Factory delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TemplateScript.Factory unwrap() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.logstashbridge.threadpool;
|
||||||
|
|
||||||
|
import org.elasticsearch.logstashbridge.StableBridgeAPI;
|
||||||
|
import org.elasticsearch.logstashbridge.common.SettingsBridge;
|
||||||
|
import org.elasticsearch.telemetry.metric.MeterRegistry;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class ThreadPoolBridge extends StableBridgeAPI.Proxy<ThreadPool> {
|
||||||
|
|
||||||
|
public ThreadPoolBridge(final SettingsBridge settingsBridge) {
|
||||||
|
this(new ThreadPool(settingsBridge.unwrap(), MeterRegistry.NOOP));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ThreadPoolBridge(final ThreadPool delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean terminate(final ThreadPoolBridge pool, final long timeout, final TimeUnit timeUnit) {
|
||||||
|
return ThreadPool.terminate(pool.unwrap(), timeout, timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long relativeTimeInMillis() {
|
||||||
|
return delegate.relativeTimeInMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long absoluteTimeInMillis() {
|
||||||
|
return delegate.absoluteTimeInMillis();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue