Fix support for infinite ?master_timeout (#107050)

Specifying `?master_timeout=-1` on an API which performs a cluster state
update means that the cluster state update task will never time out
while waiting in the pending tasks queue. However this parameter is also
re-used in a few places where a timeout of `-1` means something else,
typically to timeout immediately. This commit fixes those places so that
`?master_timeout=-1` consistently means to wait forever.
This commit is contained in:
David Turner 2024-04-10 18:32:38 +01:00 committed by GitHub
parent 1587dada58
commit ccbb5badce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 133 additions and 97 deletions

View file

@ -0,0 +1,5 @@
pr: 107050
summary: Fix support for infinite `?master_timeout`
area: Cluster Coordination
type: bug
issues: []

View file

@ -57,10 +57,7 @@ This API deletes a configured collection of
[[ccr-delete-auto-follow-pattern-query-params]] [[ccr-delete-auto-follow-pattern-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-delete-auto-follow-pattern-examples]] [[ccr-delete-auto-follow-pattern-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -75,10 +75,7 @@ This API will return the specified auto-follow pattern collection.
[[ccr-get-auto-follow-pattern-query-params]] [[ccr-get-auto-follow-pattern-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-get-auto-follow-pattern-examples]] [[ccr-get-auto-follow-pattern-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -43,10 +43,7 @@ meantime.
[[ccr-pause-auto-follow-pattern-query-params]] [[ccr-pause-auto-follow-pattern-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-pause-auto-follow-pattern-examples]] [[ccr-pause-auto-follow-pattern-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -74,10 +74,7 @@ the new patterns.
[[ccr-put-auto-follow-pattern-query-params]] [[ccr-put-auto-follow-pattern-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-put-auto-follow-pattern-request-body]] [[ccr-put-auto-follow-pattern-request-body]]
==== {api-request-body-title} ==== {api-request-body-title}

View file

@ -38,10 +38,7 @@ have been deleted or closed in the meantime.
[[ccr-resume-auto-follow-pattern-query-params]] [[ccr-resume-auto-follow-pattern-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-resume-auto-follow-pattern-examples]] [[ccr-resume-auto-follow-pattern-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -52,10 +52,7 @@ replication options and whether the follower indices are active or paused.
[[ccr-get-follow-info-query-params]] [[ccr-get-follow-info-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[role="child_attributes"] [role="child_attributes"]
[[ccr-get-follow-info-response-body]] [[ccr-get-follow-info-response-body]]

View file

@ -56,10 +56,7 @@ following task.
[[ccr-post-pause-follow-query-params]] [[ccr-post-pause-follow-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-post-pause-follow-examples]] [[ccr-post-pause-follow-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -69,10 +69,7 @@ returns, the follower index will resume fetching operations from the leader inde
[[ccr-post-resume-follow-query-params]] [[ccr-post-resume-follow-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-post-resume-follow-request-body]] [[ccr-post-resume-follow-request-body]]
==== {api-request-body-title} ==== {api-request-body-title}

View file

@ -63,10 +63,7 @@ irreversible operation.
[[ccr-post-unfollow-query-params]] [[ccr-post-unfollow-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-post-unfollow-examples]] [[ccr-post-unfollow-examples]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -65,11 +65,7 @@ referenced leader index. When this API returns, the follower index exists, and
follower shard requires transferring all the remote Lucene segment files to follower shard requires transferring all the remote Lucene segment files to
the follower index. the follower index.
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[ccr-put-follow-request-body]] [[ccr-put-follow-request-body]]
==== {api-request-body-title} ==== {api-request-body-title}

View file

@ -56,10 +56,7 @@ shard-level stats as in the <<ccr-get-follow-stats,get follower stats API>>.
`timeout`:: `timeout`::
(Optional, time) Controls the amount of time to wait for results. Defaults to unlimited. (Optional, time) Controls the amount of time to wait for results. Defaults to unlimited.
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[role="child_attributes"] [role="child_attributes"]
[[ccr-get-stats-response-body]] [[ccr-get-stats-response-body]]

View file

@ -1218,9 +1218,9 @@ tag::timeoutparms[]
tag::master-timeout[] tag::master-timeout[]
`master_timeout`:: `master_timeout`::
(Optional, <<time-units, time units>>) (Optional, <<time-units, time units>>)
Period to wait for a connection to the master node. If no response is received Period to wait for the master node. If the master node is not available before
before the timeout expires, the request fails and returns an error. Defaults to the timeout expires, the request fails and returns an error. Defaults to `30s`.
`30s`. Can also be set to `-1` to indicate that the request should never timeout.
end::master-timeout[] end::master-timeout[]
tag::timeout[] tag::timeout[]

View file

@ -27,10 +27,7 @@ information, see <<security-privileges>>.
[[watcher-api-start-query-params]] [[watcher-api-start-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
//[[watcher-api-start-request-body]] //[[watcher-api-start-request-body]]
//==== {api-request-body-title} //==== {api-request-body-title}

View file

@ -27,10 +27,7 @@ information, see <<security-privileges>>.
[[watcher-api-stop-query-params]] [[watcher-api-stop-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
//[[watcher-api-stop-request-body]] //[[watcher-api-stop-request-body]]
//==== {api-request-body-title} //==== {api-request-body-title}

View file

@ -42,10 +42,7 @@ Name of the snapshot repository that both source and target snapshot belong to.
[[clone-snapshot-api-query-params]] [[clone-snapshot-api-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
`timeout`:: `timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for (Optional, <<time-units, time units>>) Specifies the period of time to wait for
@ -55,4 +52,4 @@ fails and returns an error. Defaults to `30s`.
`indices`:: `indices`::
(Required, string) (Required, string)
A comma-separated list of indices to include in the snapshot. A comma-separated list of indices to include in the snapshot.
<<api-multi-index,multi-target syntax>> is supported. <<api-multi-index,multi-target syntax>> is supported.

View file

@ -51,10 +51,7 @@ supported.
[[delete-snapshot-repo-api-query-params]] [[delete-snapshot-repo-api-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
`timeout`:: `timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for (Optional, <<time-units, time units>>) Specifies the period of time to wait for

View file

@ -59,10 +59,7 @@ cluster, omit this parameter or use `*` or `_all`.
only. If `false`, the request gets information from the master node. Defaults to only. If `false`, the request gets information from the master node. Defaults to
`false`. `false`.
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[role="child_attributes"] [role="child_attributes"]
[[get-snapshot-repo-api-response-body]] [[get-snapshot-repo-api-response-body]]

View file

@ -52,10 +52,7 @@ IMPORTANT: Several options for this API can be specified using a query parameter
or a request body parameter. If both parameters are specified, only the query or a request body parameter. If both parameters are specified, only the query
parameter is used. parameter is used.
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
`timeout`:: `timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for (Optional, <<time-units, time units>>) Specifies the period of time to wait for

View file

@ -47,10 +47,7 @@ Name of the snapshot repository to verify.
[[verify-snapshot-repo-api-query-params]] [[verify-snapshot-repo-api-query-params]]
==== {api-query-parms-title} ==== {api-query-parms-title}
`master_timeout`:: include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
`timeout`:: `timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for (Optional, <<time-units, time units>>) Specifies the period of time to wait for

View file

@ -81,9 +81,12 @@ public class UpdateDataStreamGlobalRetentionService {
List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams, List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
) { ) {
final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout();
// NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately.
// TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly
taskQueue.submitTask( taskQueue.submitTask(
"remove-data-stream-global-retention", "remove-data-stream-global-retention",
new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()), new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout),
request.masterNodeTimeout() request.masterNodeTimeout()
); );
} }
@ -137,7 +140,7 @@ public class UpdateDataStreamGlobalRetentionService {
@Nullable DataStreamGlobalRetention globalRetention, @Nullable DataStreamGlobalRetention globalRetention,
List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams, List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
ActionListener<UpdateDataStreamGlobalRetentionResponse> listener, ActionListener<UpdateDataStreamGlobalRetentionResponse> listener,
TimeValue masterTimeout TimeValue ackTimeout
) implements ClusterStateTaskListener, ClusterStateAckListener { ) implements ClusterStateTaskListener, ClusterStateAckListener {
@Override @Override
@ -166,10 +169,5 @@ public class UpdateDataStreamGlobalRetentionService {
logger.debug("Failed to update global retention [{}] because timeout was reached", globalRetention); logger.debug("Failed to update global retention [{}] because timeout was reached", globalRetention);
listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED); listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED);
} }
@Override
public TimeValue ackTimeout() {
return masterTimeout;
}
} }
} }

View file

@ -121,6 +121,14 @@ public class RolloverIT extends ESIntegTestCase {
); );
} }
public void testInfiniteMasterNodeTimeout() {
assertAcked(prepareCreate("test_index-2").addAlias(new Alias("test_alias")).get());
indexDoc("test_index-2", "1", "field", "value");
flush("test_index-2");
final RolloverResponse response = indicesAdmin().prepareRolloverIndex("test_alias").setMasterNodeTimeout(TimeValue.MINUS_ONE).get();
assertTrue(response.isShardsAcknowledged());
}
public void testRolloverWithExplicitWriteIndex() throws Exception { public void testRolloverWithExplicitWriteIndex() throws Exception {
long beforeTime = client().threadPool().absoluteTimeInMillis() - 1000L; long beforeTime = client().threadPool().absoluteTimeInMillis() - 1000L;
assertAcked(prepareCreate("test_index-2").addAlias(new Alias("test_alias").writeIndex(true)).get()); assertAcked(prepareCreate("test_index-2").addAlias(new Alias("test_alias").writeIndex(true)).get());

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.client.Request;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
public class RestClusterStateActionIT extends ESIntegTestCase {
@Override
protected boolean addMockHttpTransport() {
return false;
}
public void testInfiniteTimeOut() throws IOException {
final var request = new Request("GET", "/_cluster/state/none");
request.addParameter("master_timeout", "-1");
getRestClient().performRequest(request);
}
}

View file

@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -688,6 +689,32 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
} }
} }
public void testInfiniteTimeout() throws Exception {
createRepository("test-repo", "mock");
createIndex("test-idx", 1, 0);
indexRandomDocs("test-idx", 10);
ensureGreen();
blockAllDataNodes("test-repo");
final ActionFuture<CreateSnapshotResponse> snapshotResponseFuture = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.execute();
try {
waitForBlockOnAnyDataNode("test-repo");
final List<SnapshotStatus> snapshotStatus = clusterAdmin().prepareSnapshotStatus("test-repo")
.setMasterNodeTimeout(TimeValue.MINUS_ONE)
.get()
.getSnapshots();
assertThat(snapshotStatus, hasSize(1));
assertEquals("test-snap", snapshotStatus.get(0).getSnapshot().getSnapshotId().getName());
// a timeout of a node-level request results in a successful response but without node-level details, so this checks no timeout:
assertThat(snapshotStatus.get(0).getShards().get(0).getStats().getTotalFileCount(), greaterThan(0));
assertFalse(snapshotResponseFuture.isDone());
} finally {
unblockAllDataNodes("test-repo");
snapshotResponseFuture.get(10, TimeUnit.SECONDS);
}
}
private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0); return snapshotStatus.getIndices().get(indexName).getShards().get(0);
} }

View file

@ -141,7 +141,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
client.executeLocally( client.executeLocally(
TransportNodesSnapshotsStatus.TYPE, TransportNodesSnapshotsStatus.TYPE,
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots) new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots)
.timeout(request.masterNodeTimeout()), .timeout(request.masterNodeTimeout().millis() < 0 ? null : request.masterNodeTimeout()),
// fork to snapshot meta since building the response is expensive for large snapshots // fork to snapshot meta since building the response is expensive for large snapshots
new RefCountAwareThreadedActionListener<>( new RefCountAwareThreadedActionListener<>(
threadPool.executor(ThreadPool.Names.SNAPSHOT_META), threadPool.executor(ThreadPool.Names.SNAPSHOT_META),

View file

@ -514,6 +514,10 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
final var rolloverIndexName = rolloverResult.rolloverIndexName(); final var rolloverIndexName = rolloverResult.rolloverIndexName();
final var sourceIndexName = rolloverResult.sourceIndexName(); final var sourceIndexName = rolloverResult.sourceIndexName();
final var waitForActiveShardsTimeout = rolloverRequest.masterNodeTimeout().millis() < 0
? null
: rolloverRequest.masterNodeTimeout();
rolloverTaskContext.success(() -> { rolloverTaskContext.success(() -> {
// Now assuming we have a new state and the name of the rolled over index, we need to wait for the configured number of // Now assuming we have a new state and the name of the rolled over index, we need to wait for the configured number of
// active shards, as well as return the names of the indices that were rolled/created // active shards, as well as return the names of the indices that were rolled/created
@ -521,7 +525,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
clusterService, clusterService,
new String[] { rolloverIndexName }, new String[] { rolloverIndexName },
rolloverRequest.getCreateIndexRequest().waitForActiveShards(), rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(), waitForActiveShardsTimeout,
allocationActionMultiListener.delay(rolloverTask.listener()) allocationActionMultiListener.delay(rolloverTask.listener())
.map( .map(
isShardsAcknowledged -> new RolloverResponse( isShardsAcknowledged -> new RolloverResponse(

View file

@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
@ -43,7 +44,7 @@ public enum ActiveShardsObserver {
ClusterService clusterService, ClusterService clusterService,
final String[] indexNames, final String[] indexNames,
final ActiveShardCount activeShardCount, final ActiveShardCount activeShardCount,
final TimeValue timeout, @Nullable final TimeValue timeout,
final ActionListener<Boolean> listener final ActionListener<Boolean> listener
) { ) {
if (activeShardCount == ActiveShardCount.NONE) { if (activeShardCount == ActiveShardCount.NONE) {

View file

@ -39,7 +39,8 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
} }
/** /**
* A timeout value in case the master has not been discovered yet or disconnected. * Specifies how long to wait when the master has not been discovered yet, or is disconnected, or is busy processing other tasks. The
* value {@link TimeValue#MINUS_ONE} means to wait forever.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final Request masterNodeTimeout(TimeValue timeout) { public final Request masterNodeTimeout(TimeValue timeout) {
@ -48,12 +49,17 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
} }
/** /**
* A timeout value in case the master has not been discovered yet or disconnected. * Specifies how long to wait when the master has not been discovered yet, or is disconnected, or is busy processing other tasks. The
* value {@link TimeValue#MINUS_ONE} means to wait forever.
*/ */
public final Request masterNodeTimeout(String timeout) { public final Request masterNodeTimeout(String timeout) {
return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout")); return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));
} }
/**
* @return how long to wait when the master has not been discovered yet, or is disconnected, or is busy processing other tasks. The
* value {@link TimeValue#MINUS_ONE} means to wait forever.
*/
public final TimeValue masterNodeTimeout() { public final TimeValue masterNodeTimeout() {
return this.masterNodeTimeout; return this.masterNodeTimeout;
} }

View file

@ -285,16 +285,22 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
private void retry(long currentStateVersion, final Throwable failure, final Predicate<ClusterState> statePredicate) { private void retry(long currentStateVersion, final Throwable failure, final Predicate<ClusterState> statePredicate) {
if (observer == null) { if (observer == null) {
final long remainingTimeoutMS = request.masterNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime); final TimeValue timeout;
if (remainingTimeoutMS <= 0) { if (request.masterNodeTimeout().millis() < 0) {
logger.debug(() -> "timed out before retrying [" + actionName + "] after failure", failure); timeout = null;
listener.onFailure(new MasterNotDiscoveredException(failure)); } else {
return; final long remainingTimeoutMS = request.masterNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
if (remainingTimeoutMS <= 0) {
logger.debug(() -> "timed out before retrying [" + actionName + "] after failure", failure);
listener.onFailure(new MasterNotDiscoveredException(failure));
return;
}
timeout = TimeValue.timeValueMillis(remainingTimeoutMS);
} }
this.observer = new ClusterStateObserver( this.observer = new ClusterStateObserver(
currentStateVersion, currentStateVersion,
clusterService.getClusterApplierService(), clusterService.getClusterApplierService(),
TimeValue.timeValueMillis(remainingTimeoutMS), timeout,
logger, logger,
threadPool.getThreadContext() threadPool.getThreadContext()
); );

View file

@ -152,6 +152,7 @@ public class RestClusterStateAction extends BaseRestHandler {
@Override @Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) { public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
if (request.local() == false if (request.local() == false
&& request.masterNodeTimeout().millis() >= 0
&& currentTimeMillisSupplier.getAsLong() - startTimeMillis > request.masterNodeTimeout().millis()) { && currentTimeMillisSupplier.getAsLong() - startTimeMillis > request.masterNodeTimeout().millis()) {
throw new ElasticsearchTimeoutException("Timed out getting cluster state"); throw new ElasticsearchTimeoutException("Timed out getting cluster state");
} }

View file

@ -477,6 +477,9 @@ public class TransportMasterNodeActionTests extends ESTestCase {
public void testMasterBecomesAvailable() throws ExecutionException, InterruptedException { public void testMasterBecomesAvailable() throws ExecutionException, InterruptedException {
Request request = new Request(); Request request = new Request();
if (randomBoolean()) {
request.masterNodeTimeout(TimeValue.MINUS_ONE);
}
setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes)); setState(clusterService, ClusterStateCreationUtils.state(localNode, null, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>(); PlainActionFuture<Response> listener = new PlainActionFuture<>();
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool), null, request, listener); ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool), null, request, listener);

View file

@ -252,10 +252,14 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
delegate.onResponse(finalConfig); delegate.onResponse(finalConfig);
}); });
final var deleterTimeout = masterNodeTimeout.millis() < 0 ? TimeValue.MAX_VALUE : masterNodeTimeout;
// NB a negative masterNodeTimeout means never to time out, but recording dataframe analytics configs does not support infinite
// timeouts so we just use a very long timeout here instead
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
if (clusterState == null) { if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null"); logger.warn("Cannot update doc mapping because clusterState == null");
configProvider.put(config, headers, masterNodeTimeout, auditingListener); configProvider.put(config, headers, deleterTimeout, auditingListener);
return; return;
} }
ElasticsearchMappings.addDocMappingIfMissing( ElasticsearchMappings.addDocMappingIfMissing(
@ -264,7 +268,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
client, client,
clusterState, clusterState,
masterNodeTimeout, masterNodeTimeout,
auditingListener.delegateFailureAndWrap((l, unused) -> configProvider.put(config, headers, masterNodeTimeout, l)), auditingListener.delegateFailureAndWrap((l, unused) -> configProvider.put(config, headers, deleterTimeout, l)),
MlConfigIndex.CONFIG_INDEX_MAPPINGS_VERSION MlConfigIndex.CONFIG_INDEX_MAPPINGS_VERSION
); );
} }