diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index 7dcda9c2b003..84b0ea4a5937 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -35,7 +35,6 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; @@ -66,7 +65,6 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPl @Override public List> getActions() { return List.of( - new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class), new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class), new ActionHandler<>( DownsampleShardPersistentTaskExecutor.DelegatingAction.INSTANCE, diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java index b5ac4b0ae37a..148a32851b0a 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java @@ -25,8 +25,6 @@ import java.util.Map; * - Add a constant for its name, following the naming conventions for metrics. * - Register it in method {@link #doStart}. * - Add a function for recording its value. - * - If needed, inject {@link DownsampleMetrics} to the action containing the logic - * that records the metric value. For reference, see {@link TransportDownsampleIndexerAction}. */ public class DownsampleMetrics extends AbstractLifecycleComponent { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 2c08dcd9017f..4aaead13f753 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -103,7 +103,7 @@ import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDE /** * The master downsample action that coordinates * - creating the downsample index - * - instantiating {@link DownsampleShardIndexer}s to index downsample documents + * - instantiating {@link org.elasticsearch.persistent.PersistentTasksExecutor} to start a persistent downsample task * - cleaning up state */ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAction { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java deleted file mode 100644 index 8c396c493495..000000000000 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.downsample; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.client.internal.OriginSettingClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.injection.guice.Inject; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; -import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; -import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; -import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A {@link TransportBroadcastAction} that downsamples all the shards of a source index into a new downsample index. - * - * TODO: Enforce that we don't retry on another replica if we throw an error after sending some buckets. - */ -public class TransportDownsampleIndexerAction extends TransportBroadcastAction< - DownsampleIndexerAction.Request, - DownsampleIndexerAction.Response, - DownsampleIndexerAction.ShardDownsampleRequest, - DownsampleIndexerAction.ShardDownsampleResponse> { - - private final Client client; - private final IndicesService indicesService; - - private final DownsampleMetrics downsampleMetrics; - - @Inject - public TransportDownsampleIndexerAction( - Client client, - ClusterService clusterService, - TransportService transportService, - IndicesService indicesService, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - DownsampleMetrics downsampleMetrics - ) { - super( - DownsampleIndexerAction.NAME, - clusterService, - transportService, - actionFilters, - indexNameExpressionResolver, - DownsampleIndexerAction.Request::new, - DownsampleIndexerAction.ShardDownsampleRequest::new, - transportService.getThreadPool().executor(Downsample.DOWNSAMPLE_TASK_THREAD_POOL_NAME) - ); - this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); - this.indicesService = indicesService; - this.downsampleMetrics = downsampleMetrics; - } - - @Override - protected List shards(ClusterState clusterState, DownsampleIndexerAction.Request request, String[] concreteIndices) { - if (concreteIndices.length > 1) { - throw new IllegalArgumentException("multiple indices: " + Arrays.toString(concreteIndices)); - } - - final List groups = clusterService.operationRouting().searchShards(clusterState, concreteIndices, null, null); - for (ShardIterator group : groups) { - // fails fast if any non-active groups - if (group.size() == 0) { - throw new NoShardAvailableActionException(group.shardId()); - } - } - return groups; - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, DownsampleIndexerAction.Request request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - - @Override - protected ClusterBlockException checkRequestBlock( - ClusterState state, - DownsampleIndexerAction.Request request, - String[] concreteIndices - ) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); - } - - @Override - protected void doExecute( - Task task, - DownsampleIndexerAction.Request request, - ActionListener listener - ) { - new Async(task, request, listener).start(); - } - - @Override - protected DownsampleIndexerAction.ShardDownsampleRequest newShardRequest( - int numShards, - ShardRouting shard, - DownsampleIndexerAction.Request request - ) { - return new DownsampleIndexerAction.ShardDownsampleRequest(shard.shardId(), request); - } - - @Override - protected DownsampleIndexerAction.ShardDownsampleResponse shardOperation( - DownsampleIndexerAction.ShardDownsampleRequest request, - Task task - ) throws IOException { - IndexService indexService = indicesService.indexService(request.shardId().getIndex()); - DownsampleShardIndexer indexer = new DownsampleShardIndexer( - (DownsampleShardTask) task, - client, - indexService, - downsampleMetrics, - request.shardId(), - request.getDownsampleIndex(), - request.getRollupConfig(), - request.getMetricFields(), - request.getLabelFields(), - request.getDimensionFields(), - new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) - ); - return indexer.execute(); - } - - @Override - protected DownsampleIndexerAction.ShardDownsampleResponse readShardResponse(StreamInput in) throws IOException { - return new DownsampleIndexerAction.ShardDownsampleResponse(in); - } - - @Override - protected DownsampleIndexerAction.Response newResponse( - DownsampleIndexerAction.Request request, - AtomicReferenceArray shardsResponses, - ClusterState clusterState - ) { - long numIndexed = 0; - int successfulShards = 0; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - throw new ElasticsearchException("missing shard"); - } else if (shardResponse instanceof DownsampleIndexerAction.ShardDownsampleResponse r) { - successfulShards++; - numIndexed += r.getNumIndexed(); - } else if (shardResponse instanceof Exception e) { - throw new ElasticsearchException(e); - } else { - assert false : "unknown response [" + shardResponse + "]"; - throw new IllegalStateException("unknown response [" + shardResponse + "]"); - } - } - return new DownsampleIndexerAction.Response(true, shardsResponses.length(), successfulShards, 0, numIndexed); - } - - private class Async extends AsyncBroadcastAction { - private final DownsampleIndexerAction.Request request; - private final ActionListener listener; - - protected Async(Task task, DownsampleIndexerAction.Request request, ActionListener listener) { - super(task, request, listener); - this.request = request; - this.listener = listener; - } - - @Override - protected void finishHim() { - try { - DownsampleIndexerAction.Response resp = newResponse(request, shardsResponses, clusterService.state()); - listener.onResponse(resp); - } catch (Exception e) { - listener.onFailure(e); - } - } - } -} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 6ea522a4276a..4c9a5bee5577 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -556,7 +556,6 @@ public class Constants { "indices:admin/xpack/ccr/put_follow", "indices:admin/xpack/ccr/unfollow", "indices:admin/xpack/downsample", - "indices:admin/xpack/downsample_indexer", "indices:data/read/downsample_delegate", "indices:data/read/async_search/delete", "indices:data/read/async_search/get",