Streams - Log's Enable, Disable and Status endpoints (#129474)

* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite
This commit is contained in:
Luke Whiting 2025-06-19 11:48:44 +01:00 committed by GitHub
parent 6858c32529
commit 1ccf1c6806
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 957 additions and 0 deletions

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
apply plugin: 'elasticsearch.test-with-dependencies'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'
esplugin {
description = 'The module adds support for the wired streams functionality including logs ingest'
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
}
restResources {
restApi {
include '_common', 'streams'
}
}
configurations {
basicRestSpecs {
attributes {
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
}
}
}
artifacts {
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
}
dependencies {
testImplementation project(path: ':test:test-clusters')
}

View file

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.is;
public class TestToggleIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class);
}
public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException {
boolean[] testParams = new boolean[] { true, false, true };
for (boolean enable : testParams) {
doLogStreamToggleTest(enable);
}
}
private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException {
LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
enable
);
AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get();
ElasticsearchAssertions.assertAcked(acknowledgedResponse);
ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get();
ProjectMetadata projectMetadata = clusterStateResponse.getState().metadata().getProject(ProjectId.DEFAULT);
assertThat(projectMetadata.<StreamsMetadata>custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable));
}
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
module org.elasticsearch.rest.root {
requires org.elasticsearch.server;
requires org.elasticsearch.xcontent;
requires org.apache.lucene.core;
requires org.elasticsearch.base;
requires org.apache.logging.log4j;
exports org.elasticsearch.rest.streams;
exports org.elasticsearch.rest.streams.logs;
}

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* This plugin provides the Streams feature which builds upon data streams to
* provide the user with a more "batteries included" experience for ingesting large
* streams of data, such as logs.
*/
public class StreamsPlugin extends Plugin implements ActionPlugin {
@Override
public List<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
}
return Collections.emptyList();
}
@Override
public List<ActionHandler> getActions() {
return List.of(
new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
);
}
}

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Map;
public class LogsStreamsActivationToggleAction {
public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");
public static class Request extends AcknowledgedRequest<Request> {
private final boolean enable;
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
super(masterNodeTimeout, ackTimeout);
this.enable = enable;
}
public Request(StreamInput in) throws IOException {
super(in);
this.enable = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enable);
}
@Override
public String toString() {
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
}
public boolean shouldEnable() {
return enable;
}
@Override
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
}
}
}

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.POST;
@ServerlessScope(Scope.PUBLIC)
public class RestSetLogStreamsEnabledAction extends BaseRestHandler {
public static final Set<String> SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM);
@Override
public String getName() {
return "streams_logs_set_enabled_action";
}
@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final boolean enabled = request.path().endsWith("_enable");
assert enabled || request.path().endsWith("_disable");
LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request(
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request),
enabled
);
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
LogsStreamsActivationToggleAction.INSTANCE,
activationRequest,
new RestToXContentListener<>(restChannel)
);
}
@Override
public Set<String> supportedQueryParameters() {
return SUPPORTED_PARAMS;
}
}

View file

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.GET;
@ServerlessScope(Scope.PUBLIC)
public class RestStreamsStatusAction extends BaseRestHandler {
public static final Set<String> SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM);
@Override
public String getName() {
return "streams_status_action";
}
@Override
public List<RestHandler.Route> routes() {
return List.of(new RestHandler.Route(GET, "/_streams/status"));
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request));
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
StreamsStatusAction.INSTANCE,
statusRequest,
new RestToXContentListener<>(restChannel)
);
}
@Override
public Set<String> supportedQueryParameters() {
return SUPPORTED_PARAMS;
}
}

View file

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
public class StreamsStatusAction {
public static ActionType<Response> INSTANCE = new ActionType<>("cluster:admin/streams/status");
public static class Request extends LocalClusterStateRequest {
protected Request(TimeValue masterTimeout) {
super(masterTimeout);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "Streams status request", parentTaskId, headers);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final boolean logs_enabled;
public Response(boolean logsEnabled) {
logs_enabled = logsEnabled;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startObject("logs");
builder.field("enabled", logs_enabled);
builder.endObject();
builder.endObject();
return builder;
}
}
}

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SequentialAckingBatchedTaskExecutor;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Locale;
/**
* Transport action to toggle the activation state of logs streams in a project / cluster.
*/
public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportMasterNodeAction<LogsStreamsActivationToggleAction.Request> {
private static final Logger logger = LogManager.getLogger(TransportLogsStreamsToggleActivation.class);
private final ProjectResolver projectResolver;
private final MasterServiceTaskQueue<StreamsMetadataUpdateTask> taskQueue;
@Inject
public TransportLogsStreamsToggleActivation(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
LogsStreamsActivationToggleAction.INSTANCE.name(),
transportService,
clusterService,
threadPool,
actionFilters,
LogsStreamsActivationToggleAction.Request::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.taskQueue = clusterService.createTaskQueue(
"streams-update-state-queue",
Priority.NORMAL,
new SequentialAckingBatchedTaskExecutor<>()
);
this.projectResolver = projectResolver;
}
@Override
protected void masterOperation(
Task task,
LogsStreamsActivationToggleAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
ProjectId projectId = projectResolver.getProjectId();
StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
boolean currentlyEnabled = streamsState.isLogsEnabled();
boolean shouldEnable = request.shouldEnable();
if (shouldEnable != currentlyEnabled) {
StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
} else {
logger.debug("Logs streams are already in the requested state: {}", shouldEnable);
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
@Override
protected ClusterBlockException checkBlock(LogsStreamsActivationToggleAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask {
private final ProjectId projectId;
private final boolean enabled;
StreamsMetadataUpdateTask(
AcknowledgedRequest<?> request,
ActionListener<? extends AcknowledgedResponse> listener,
ProjectId projectId,
boolean enabled
) {
super(request, listener);
this.projectId = projectId;
this.enabled = enabled;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState.copyAndUpdateProject(
projectId,
builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled))
);
}
}
}

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.rest.streams.logs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* Transport action to retrieve the status of logs streams in a project / cluster.
* Results are broken down by stream type. Currently only logs streams are implemented.
*/
public class TransportStreamsStatusAction extends TransportLocalProjectMetadataAction<
StreamsStatusAction.Request,
StreamsStatusAction.Response> {
@Inject
public TransportStreamsStatusAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
StreamsStatusAction.INSTANCE.name(),
actionFilters,
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
projectResolver
);
}
@Override
protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ProjectState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected void localClusterStateOperation(
Task task,
StreamsStatusAction.Request request,
ProjectState state,
ActionListener<StreamsStatusAction.Response> listener
) {
StreamsMetadata streamsState = state.metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
boolean logsEnabled = streamsState.isLogsEnabled();
StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled);
listener.onResponse(response);
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.streams;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;
public class StreamsYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public StreamsYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").build();
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}

View file

@ -0,0 +1,43 @@
---
"Basic toggle of logs state enable to disable and back":
- do:
streams.logs_enable: { }
- is_true: acknowledged
- do:
streams.status: { }
- is_true: logs.enabled
- do:
streams.logs_disable: { }
- is_true: acknowledged
- do:
streams.status: { }
- is_false: logs.enabled
- do:
streams.logs_enable: { }
- is_true: acknowledged
- do:
streams.status: { }
- is_true: logs.enabled
---
"Check for repeated toggle to same state":
- do:
streams.logs_enable: { }
- is_true: acknowledged
- do:
streams.status: { }
- is_true: logs.enabled
- do:
streams.logs_enable: { }
- is_true: acknowledged
- do:
streams.status: { }
- is_true: logs.enabled

View file

@ -0,0 +1,37 @@
{
"streams.logs_disable": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-disable.html",
"description": "Disable the Logs Streams feature for this cluster"
},
"stability": "stable",
"visibility": "feature_flag",
"feature_flag": "logs_stream",
"headers": {
"accept": [
"application/json",
"text/plain"
]
},
"url": {
"paths": [
{
"path": "/_streams/logs/_disable",
"methods": [
"POST"
]
}
]
},
"params": {
"timeout": {
"type": "time",
"description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error."
},
"master_timeout": {
"type": "time",
"description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error."
}
}
}
}

View file

@ -0,0 +1,37 @@
{
"streams.logs_enable": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-enable.html",
"description": "Enable the Logs Streams feature for this cluster"
},
"stability": "stable",
"visibility": "feature_flag",
"feature_flag": "logs_stream",
"headers": {
"accept": [
"application/json",
"text/plain"
]
},
"url": {
"paths": [
{
"path": "/_streams/logs/_enable",
"methods": [
"POST"
]
}
]
},
"params": {
"timeout": {
"type": "time",
"description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error."
},
"master_timeout": {
"type": "time",
"description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error."
}
}
}
}

View file

@ -0,0 +1,32 @@
{
"streams.status": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-status.html",
"description": "Return the current status of the streams feature for each streams type"
},
"stability": "stable",
"visibility": "feature_flag",
"feature_flag": "logs_stream",
"headers": {
"accept": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_streams/status",
"methods": [
"GET"
]
}
]
},
"params": {
"mater_timeout": {
"type": "time",
"description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error."
}
}
}
}

View file

@ -306,6 +306,7 @@ public class TransportVersions {
public static final TransportVersion PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP = def(9_101_0_00);
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_REMOVE_ERROR_PARSING = def(9_102_0_00);
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_BATCH_SIZE = def(9_103_0_00);
public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_104_0_00);
/*
* STOP! READ THIS FIRST! No, really,

View file

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetadataMappingService;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -300,6 +301,10 @@ public class ClusterModule extends AbstractModule {
// Health API
entries.addAll(HealthNodeTaskExecutor.getNamedWriteables());
entries.addAll(HealthMetadataService.getNamedWriteables());
// Streams
registerProjectCustom(entries, StreamsMetadata.TYPE, StreamsMetadata::new, StreamsMetadata::readDiffFrom);
return entries;
}

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster;
import org.elasticsearch.core.Tuple;
/**
* A task executor that executes tasks sequentially, allowing each task to acknowledge the cluster state update.
* This executor is used for tasks that need to be executed one after another, where each task can produce a new
* cluster state and can listen for acknowledgments.
*
* @param <Task> The type of the task that extends {@link AckedClusterStateUpdateTask}.
*/
public class SequentialAckingBatchedTaskExecutor<Task extends AckedClusterStateUpdateTask> extends SimpleBatchedAckListenerTaskExecutor<
Task> {
@Override
public Tuple<ClusterState, ClusterStateAckListener> executeTask(Task task, ClusterState clusterState) throws Exception {
return Tuple.tuple(task.execute(clusterState), task);
}
}

View file

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.xcontent.ToXContent;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;
/**
* Metadata for the Streams feature, which allows enabling or disabling logs for data streams.
* This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state.
*/
public class StreamsMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom> implements Metadata.ProjectCustom {
public static final String TYPE = "streams";
public static final StreamsMetadata EMPTY = new StreamsMetadata(false);
public boolean logsEnabled;
public StreamsMetadata(StreamInput in) throws IOException {
logsEnabled = in.readBoolean();
}
public StreamsMetadata(boolean logsEnabled) {
this.logsEnabled = logsEnabled;
}
public boolean isLogsEnabled() {
return logsEnabled;
}
@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.ALL_CONTEXTS;
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.STREAMS_LOGS_SUPPORT;
}
public static NamedDiff<Metadata.ProjectCustom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(logsEnabled);
}
@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled)));
}
@Override
public boolean equals(Object o) {
if ((o instanceof StreamsMetadata that)) {
return logsEnabled == that.logsEnabled;
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hashCode(logsEnabled);
}
}

View file

@ -72,6 +72,8 @@ public class Constants {
"cluster:admin/script_language/get",
"cluster:admin/scripts/painless/context",
"cluster:admin/scripts/painless/execute",
"cluster:admin/streams/logs/toggle",
"cluster:admin/streams/status",
"cluster:admin/synonyms/delete",
"cluster:admin/synonyms/get",
"cluster:admin/synonyms/put",

View file

@ -1,4 +1,13 @@
apply plugin: 'elasticsearch.internal-yaml-rest-test'
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import org.elasticsearch.gradle.util.GradleUtils
dependencies {
@ -13,6 +22,7 @@ dependencies {
clusterModules project(':modules:data-streams')
clusterModules project(':modules:lang-mustache')
clusterModules project(':modules:parent-join')
clusterModules project(':modules:streams')
clusterModules project(xpackModule('stack'))
clusterModules project(xpackModule('ilm'))
clusterModules project(xpackModule('mapper-constant-keyword'))
@ -21,6 +31,7 @@ dependencies {
restTestConfig project(path: ':modules:data-streams', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:ingest-common', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:reindex', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:streams', configuration: "basicRestSpecs")
}
// let the yamlRestTests see the classpath of test

View file

@ -37,6 +37,7 @@ public class CoreWithMultipleProjectsClientYamlTestSuiteIT extends MultipleProje
.module("test-multi-project")
.module("lang-mustache")
.module("parent-join")
.module("streams")
.setting("test.multi_project.enabled", "true")
.setting("xpack.security.enabled", "true")
.setting("xpack.watcher.enabled", "false")