From d4dd78cc3d0b1106ac131a4ca71aacbbce71d0f6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 2 Sep 2019 08:17:03 +0200 Subject: [PATCH] Allow ingest processors access to node client. (#46077) This is the first PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. --- .../org/elasticsearch/ingest/IngestService.java | 5 +++-- .../java/org/elasticsearch/ingest/Processor.java | 9 ++++++++- .../src/main/java/org/elasticsearch/node/Node.java | 3 ++- .../elasticsearch/ingest/IngestServiceTests.java | 13 +++++++++---- .../snapshots/SnapshotResiliencyTests.java | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2c16ebef4672..07a92767dbdc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -86,7 +87,7 @@ public class IngestService implements ClusterStateApplier { public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, - List ingestPlugins) { + List ingestPlugins, Client client) { this.clusterService = clusterService; this.scriptService = scriptService; this.processorFactories = processorFactories( @@ -96,7 +97,7 @@ public class IngestService implements ClusterStateApplier { threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this + ), this, client ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index c064ddb35a12..10bd530e3c1e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -110,9 +111,14 @@ public interface Processor { */ public final BiFunction scheduler; + /** + * Provides access to the node client + */ + public final Client client; + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService) { + IngestService ingestService, Client client) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -120,6 +126,7 @@ public interface Processor { this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; this.ingestService = ingestService; + this.client = client; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index e612fa96966b..cb7888eddee9 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -366,7 +366,8 @@ public class Node implements Closeable { new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings()) .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, - scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); + scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), + pluginsService.filterPlugins(IngestPlugin.class), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3974bda06311..85ae970a7882 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -106,8 +107,9 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPlugin() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -115,18 +117,20 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPluginDuplicate() { ThreadPool tp = mock(ThreadPool.class); + Client client = mock(Client.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new IngestService(mock(ClusterService.class), tp, null, null, - null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); + null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, - null, Collections.singletonList(DUMMY_PLUGIN)); + null, Collections.singletonList(DUMMY_PLUGIN), client); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); @@ -1138,6 +1142,7 @@ public class IngestServiceTests extends ESTestCase { private static IngestService createWithProcessors(Map processors) { ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); final ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(executorService); return new IngestService(mock(ClusterService.class), threadPool, null, null, @@ -1146,7 +1151,7 @@ public class IngestServiceTests extends ESTestCase { public Map getProcessors(final Processor.Parameters parameters) { return processors; } - })); + }), client); } private class IngestDocumentMatcher extends ArgumentMatcher { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index fb95c3d3d602..e7e65f54bb71 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1045,7 +1045,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new IngestService( clusterService, threadPool, environment, scriptService, new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), - Collections.emptyList()), + Collections.emptyList(), client), client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) ));