From 1ccf1c680676c2ca22c7d30acdd3822e60fcf817 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 19 Jun 2025 11:48:44 +0100 Subject: [PATCH] Streams - Log's Enable, Disable and Status endpoints (#129474) * Enable And Disable Endpoint * Status Endpoint * Integration Tests * REST Spec * REST Spec tests * Some documentation * Update docs/changelog/129474.yaml * Fix failing security test * PR Fixes * PR Fixes - Add missing feature flag name to YAML spec * PR Fixes - Fix support for timeout and master_timeout parameters * PR Fixes - Make the REST handler validation happy with the new params * Delete docs/changelog/129474.yaml * PR Fixes - Switch to local metadata action type and improve request handling * PR Fixes - Make enable / disable endpoint cancellable * PR Fixes - Switch timeout param name for status endpoint * PR Fixes - Switch timeout param name for status endpoint in spec * PR Fixes - Enforce local only use for status action * PR Fixes - Refactor StreamsMetadata into server * PR Fixes - Add streams module to multi project YAML test suite * PR Fixes - Add streams cluster module to multi project YAML test suite --- modules/streams/build.gradle | 41 ++++++ .../rest/streams/TestToggleIT.java | 62 +++++++++ .../streams/src/main/java/module-info.java | 19 +++ .../rest/streams/StreamsPlugin.java | 69 ++++++++++ .../LogsStreamsActivationToggleAction.java | 63 +++++++++ .../logs/RestSetLogStreamsEnabledAction.java | 64 +++++++++ .../streams/logs/RestStreamsStatusAction.java | 57 +++++++++ .../streams/logs/StreamsStatusAction.java | 67 ++++++++++ .../TransportLogsStreamsToggleActivation.java | 121 ++++++++++++++++++ .../logs/TransportStreamsStatusAction.java | 69 ++++++++++ .../streams/StreamsYamlTestSuiteIT.java | 37 ++++++ .../test/streams/logs/10_basic.yml | 43 +++++++ .../api/streams.logs_disable.json | 37 ++++++ .../api/streams.logs_enable.json | 37 ++++++ .../rest-api-spec/api/streams.status.json | 32 +++++ .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/cluster/ClusterModule.java | 5 + .../SequentialAckingBatchedTaskExecutor.java | 27 ++++ .../cluster/metadata/StreamsMetadata.java | 92 +++++++++++++ .../xpack/security/operator/Constants.java | 2 + .../build.gradle | 11 ++ ...MultipleProjectsClientYamlTestSuiteIT.java | 1 + 22 files changed, 957 insertions(+) create mode 100644 modules/streams/build.gradle create mode 100644 modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java create mode 100644 modules/streams/src/main/java/module-info.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java create mode 100644 modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java create mode 100644 modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json create mode 100644 server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle new file mode 100644 index 000000000000..fd56a627026b --- /dev/null +++ b/modules/streams/build.gradle @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +apply plugin: 'elasticsearch.test-with-dependencies' +apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.internal-java-rest-test' +apply plugin: 'elasticsearch.yaml-rest-compat-test' + +esplugin { + description = 'The module adds support for the wired streams functionality including logs ingest' + classname = 'org.elasticsearch.rest.streams.StreamsPlugin' +} + +restResources { + restApi { + include '_common', 'streams' + } +} + +configurations { + basicRestSpecs { + attributes { + attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE) + } + } +} + +artifacts { + basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test")) +} + +dependencies { + testImplementation project(path: ':test:test-clusters') +} diff --git a/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java new file mode 100644 index 000000000000..be4a5a33cfa9 --- /dev/null +++ b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.is; + +public class TestToggleIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class); + } + + public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException { + boolean[] testParams = new boolean[] { true, false, true }; + for (boolean enable : testParams) { + doLogStreamToggleTest(enable); + } + } + + private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException { + LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + enable + ); + + AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get(); + ElasticsearchAssertions.assertAcked(acknowledgedResponse); + + ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get(); + ProjectMetadata projectMetadata = clusterStateResponse.getState().metadata().getProject(ProjectId.DEFAULT); + + assertThat(projectMetadata.custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable)); + } + +} diff --git a/modules/streams/src/main/java/module-info.java b/modules/streams/src/main/java/module-info.java new file mode 100644 index 000000000000..1a48a5dbc4fb --- /dev/null +++ b/modules/streams/src/main/java/module-info.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +module org.elasticsearch.rest.root { + requires org.elasticsearch.server; + requires org.elasticsearch.xcontent; + requires org.apache.lucene.core; + requires org.elasticsearch.base; + requires org.apache.logging.log4j; + + exports org.elasticsearch.rest.streams; + exports org.elasticsearch.rest.streams.logs; +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java new file mode 100644 index 000000000000..4e3a531e20d5 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; +import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction; +import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction; +import org.elasticsearch.rest.streams.logs.StreamsStatusAction; +import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation; +import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction; + +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * This plugin provides the Streams feature which builds upon data streams to + * provide the user with a more "batteries included" experience for ingesting large + * streams of data, such as logs. + */ +public class StreamsPlugin extends Plugin implements ActionPlugin { + + @Override + public List getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + if (DataStream.LOGS_STREAM_FEATURE_FLAG) { + return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction()); + } + return Collections.emptyList(); + } + + @Override + public List getActions() { + return List.of( + new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class), + new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class) + ); + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java new file mode 100644 index 000000000000..07eda61784c8 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Map; + +public class LogsStreamsActivationToggleAction { + + public static ActionType INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle"); + + public static class Request extends AcknowledgedRequest { + + private final boolean enable; + + public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) { + super(masterNodeTimeout, ackTimeout); + this.enable = enable; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.enable = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(enable); + } + + @Override + public String toString() { + return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}'; + } + + public boolean shouldEnable() { + return enable; + } + + @Override + public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers); + } + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java new file mode 100644 index 000000000000..cdf429ccf02e --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.util.List; +import java.util.Set; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +@ServerlessScope(Scope.PUBLIC) +public class RestSetLogStreamsEnabledAction extends BaseRestHandler { + + public static final Set SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM); + + @Override + public String getName() { + return "streams_logs_set_enabled_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final boolean enabled = request.path().endsWith("_enable"); + assert enabled || request.path().endsWith("_disable"); + + LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request( + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request), + enabled + ); + + return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + LogsStreamsActivationToggleAction.INSTANCE, + activationRequest, + new RestToXContentListener<>(restChannel) + ); + } + + @Override + public Set supportedQueryParameters() { + return SUPPORTED_PARAMS; + } + +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java new file mode 100644 index 000000000000..a9b3f5b1efe7 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestStreamsStatusAction extends BaseRestHandler { + + public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); + + @Override + public String getName() { + return "streams_status_action"; + } + + @Override + public List routes() { + return List.of(new RestHandler.Route(GET, "/_streams/status")); + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)); + return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + StreamsStatusAction.INSTANCE, + statusRequest, + new RestToXContentListener<>(restChannel) + ); + } + + @Override + public Set supportedQueryParameters() { + return SUPPORTED_PARAMS; + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java new file mode 100644 index 000000000000..95a1783ac045 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; + +public class StreamsStatusAction { + + public static ActionType INSTANCE = new ActionType<>("cluster:admin/streams/status"); + + public static class Request extends LocalClusterStateRequest { + protected Request(TimeValue masterTimeout) { + super(masterTimeout); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Streams status request", parentTaskId, headers); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final boolean logs_enabled; + + public Response(boolean logsEnabled) { + logs_enabled = logsEnabled; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + TransportAction.localOnly(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + builder.startObject("logs"); + builder.field("enabled", logs_enabled); + builder.endObject(); + + builder.endObject(); + return builder; + } + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java new file mode 100644 index 000000000000..95b71fb73b25 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SequentialAckingBatchedTaskExecutor; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.StreamsMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Locale; + +/** + * Transport action to toggle the activation state of logs streams in a project / cluster. + */ +public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportLogsStreamsToggleActivation.class); + + private final ProjectResolver projectResolver; + private final MasterServiceTaskQueue taskQueue; + + @Inject + public TransportLogsStreamsToggleActivation( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver + ) { + super( + LogsStreamsActivationToggleAction.INSTANCE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + LogsStreamsActivationToggleAction.Request::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.taskQueue = clusterService.createTaskQueue( + "streams-update-state-queue", + Priority.NORMAL, + new SequentialAckingBatchedTaskExecutor<>() + ); + this.projectResolver = projectResolver; + } + + @Override + protected void masterOperation( + Task task, + LogsStreamsActivationToggleAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + ProjectId projectId = projectResolver.getProjectId(); + StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + boolean currentlyEnabled = streamsState.isLogsEnabled(); + boolean shouldEnable = request.shouldEnable(); + if (shouldEnable != currentlyEnabled) { + StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); + String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); + taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); + } else { + logger.debug("Logs streams are already in the requested state: {}", shouldEnable); + listener.onResponse(AcknowledgedResponse.TRUE); + } + } + + @Override + protected ClusterBlockException checkBlock(LogsStreamsActivationToggleAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask { + private final ProjectId projectId; + private final boolean enabled; + + StreamsMetadataUpdateTask( + AcknowledgedRequest request, + ActionListener listener, + ProjectId projectId, + boolean enabled + ) { + super(request, listener); + this.projectId = projectId; + this.enabled = enabled; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return currentState.copyAndUpdateProject( + projectId, + builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled)) + ); + } + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java new file mode 100644 index 000000000000..e389dd8c402b --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.StreamsMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * Transport action to retrieve the status of logs streams in a project / cluster. + * Results are broken down by stream type. Currently only logs streams are implemented. + */ +public class TransportStreamsStatusAction extends TransportLocalProjectMetadataAction< + StreamsStatusAction.Request, + StreamsStatusAction.Response> { + + @Inject + public TransportStreamsStatusAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver + ) { + super( + StreamsStatusAction.INSTANCE.name(), + actionFilters, + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver + ); + } + + @Override + protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ProjectState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void localClusterStateOperation( + Task task, + StreamsStatusAction.Request request, + ProjectState state, + ActionListener listener + ) { + StreamsMetadata streamsState = state.metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + boolean logsEnabled = streamsState.isLogsEnabled(); + StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled); + listener.onResponse(response); + } +} diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java new file mode 100644 index 000000000000..7a6e4665e9e6 --- /dev/null +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.streams; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; + +public class StreamsYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + public StreamsYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml new file mode 100644 index 000000000000..3c1bc7d6c53c --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -0,0 +1,43 @@ +--- +"Basic toggle of logs state enable to disable and back": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + streams.logs_disable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_false: logs.enabled + + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + +--- +"Check for repeated toggle to same state": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json new file mode 100644 index 000000000000..20f0e0c2feba --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json @@ -0,0 +1,37 @@ +{ + "streams.logs_disable": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-disable.html", + "description": "Disable the Logs Streams feature for this cluster" + }, + "stability": "stable", + "visibility": "feature_flag", + "feature_flag": "logs_stream", + "headers": { + "accept": [ + "application/json", + "text/plain" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/logs/_disable", + "methods": [ + "POST" + ] + } + ] + }, + "params": { + "timeout": { + "type": "time", + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." + }, + "master_timeout": { + "type": "time", + "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json new file mode 100644 index 000000000000..adaf544d9b60 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json @@ -0,0 +1,37 @@ +{ + "streams.logs_enable": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-enable.html", + "description": "Enable the Logs Streams feature for this cluster" + }, + "stability": "stable", + "visibility": "feature_flag", + "feature_flag": "logs_stream", + "headers": { + "accept": [ + "application/json", + "text/plain" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/logs/_enable", + "methods": [ + "POST" + ] + } + ] + }, + "params": { + "timeout": { + "type": "time", + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." + }, + "master_timeout": { + "type": "time", + "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json new file mode 100644 index 000000000000..722ddbee6675 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -0,0 +1,32 @@ +{ + "streams.status": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-status.html", + "description": "Return the current status of the streams feature for each streams type" + }, + "stability": "stable", + "visibility": "feature_flag", + "feature_flag": "logs_stream", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/status", + "methods": [ + "GET" + ] + } + ] + }, + "params": { + "mater_timeout": { + "type": "time", + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 494283835ca7..2d9f72441316 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -306,6 +306,7 @@ public class TransportVersions { public static final TransportVersion PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP = def(9_101_0_00); public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_REMOVE_ERROR_PARSING = def(9_102_0_00); public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_BATCH_SIZE = def(9_103_0_00); + public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_104_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 33a7139213f9..c4bc58b3d183 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetadataMappingService; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.DelayedAllocationService; import org.elasticsearch.cluster.routing.ShardRouting; @@ -300,6 +301,10 @@ public class ClusterModule extends AbstractModule { // Health API entries.addAll(HealthNodeTaskExecutor.getNamedWriteables()); entries.addAll(HealthMetadataService.getNamedWriteables()); + + // Streams + registerProjectCustom(entries, StreamsMetadata.TYPE, StreamsMetadata::new, StreamsMetadata::readDiffFrom); + return entries; } diff --git a/server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java new file mode 100644 index 000000000000..70f745c1b0c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.core.Tuple; + +/** + * A task executor that executes tasks sequentially, allowing each task to acknowledge the cluster state update. + * This executor is used for tasks that need to be executed one after another, where each task can produce a new + * cluster state and can listen for acknowledgments. + * + * @param The type of the task that extends {@link AckedClusterStateUpdateTask}. + */ +public class SequentialAckingBatchedTaskExecutor extends SimpleBatchedAckListenerTaskExecutor< + Task> { + @Override + public Tuple executeTask(Task task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), task); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java new file mode 100644 index 000000000000..71dd9eeaff24 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Objects; + +/** + * Metadata for the Streams feature, which allows enabling or disabling logs for data streams. + * This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state. + */ +public class StreamsMetadata extends AbstractNamedDiffable implements Metadata.ProjectCustom { + + public static final String TYPE = "streams"; + public static final StreamsMetadata EMPTY = new StreamsMetadata(false); + + public boolean logsEnabled; + + public StreamsMetadata(StreamInput in) throws IOException { + logsEnabled = in.readBoolean(); + } + + public StreamsMetadata(boolean logsEnabled) { + this.logsEnabled = logsEnabled; + } + + public boolean isLogsEnabled() { + return logsEnabled; + } + + @Override + public EnumSet context() { + return Metadata.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.STREAMS_LOGS_SUPPORT; + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(logsEnabled); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled))); + } + + @Override + public boolean equals(Object o) { + if ((o instanceof StreamsMetadata that)) { + return logsEnabled == that.logsEnabled; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hashCode(logsEnabled); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 891a0badfd74..72f50b070dca 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -72,6 +72,8 @@ public class Constants { "cluster:admin/script_language/get", "cluster:admin/scripts/painless/context", "cluster:admin/scripts/painless/execute", + "cluster:admin/streams/logs/toggle", + "cluster:admin/streams/status", "cluster:admin/synonyms/delete", "cluster:admin/synonyms/get", "cluster:admin/synonyms/put", diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 8e5462059792..39115556d290 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -1,4 +1,13 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test' +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + import org.elasticsearch.gradle.util.GradleUtils dependencies { @@ -13,6 +22,7 @@ dependencies { clusterModules project(':modules:data-streams') clusterModules project(':modules:lang-mustache') clusterModules project(':modules:parent-join') + clusterModules project(':modules:streams') clusterModules project(xpackModule('stack')) clusterModules project(xpackModule('ilm')) clusterModules project(xpackModule('mapper-constant-keyword')) @@ -21,6 +31,7 @@ dependencies { restTestConfig project(path: ':modules:data-streams', configuration: "basicRestSpecs") restTestConfig project(path: ':modules:ingest-common', configuration: "basicRestSpecs") restTestConfig project(path: ':modules:reindex', configuration: "basicRestSpecs") + restTestConfig project(path: ':modules:streams', configuration: "basicRestSpecs") } // let the yamlRestTests see the classpath of test diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java index 1ec091c84553..da0432a3e3c5 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java @@ -37,6 +37,7 @@ public class CoreWithMultipleProjectsClientYamlTestSuiteIT extends MultipleProje .module("test-multi-project") .module("lang-mustache") .module("parent-join") + .module("streams") .setting("test.multi_project.enabled", "true") .setting("xpack.security.enabled", "true") .setting("xpack.watcher.enabled", "false")