Drop useless AckedRequest interface (#113255)

Almost every implementation of `AckedRequest` is an
`AcknowledgedRequest` too, and the distinction is rather confusing.
Moreover the other implementations of `AckedRequest` are a potential
source of `null` timeouts that we'd like to get rid of. This commit
simplifies the situation by dropping the unnecessary `AckedRequest`
interface entirely.
This commit is contained in:
David Turner 2024-09-20 12:33:07 +01:00 committed by GitHub
parent 5530caa94d
commit 6ff138f558
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 86 additions and 113 deletions

View file

@ -19,7 +19,6 @@ import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -102,22 +101,9 @@ public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
private void updateClusterState(Function<ClusterState, ClusterState> transformationFn) {
final TimeValue timeout = TimeValue.timeValueSeconds(10);
final AckedRequest ackedRequest = new AckedRequest() {
@Override
public TimeValue ackTimeout() {
return timeout;
}
@Override
public TimeValue masterNodeTimeout() {
return timeout;
}
};
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
internalCluster().getAnyMasterNodeInstance(ClusterService.class)
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(timeout, timeout, future) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return transformationFn.apply(currentState);

View file

@ -9,7 +9,6 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
@ -23,9 +22,7 @@ import static org.elasticsearch.core.TimeValue.timeValueSeconds;
* Abstract base class for action requests that track acknowledgements of cluster state updates: such a request is acknowledged only once
* the cluster state update is committed and all relevant nodes have applied it and acknowledged its application to the elected master..
*/
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request>
implements
AckedRequest {
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request> {
public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30);
@ -74,7 +71,6 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
/**
* @return the current ack timeout as a {@link TimeValue}
*/
@Override
public final TimeValue ackTimeout() {
return ackTimeout;
}

View file

@ -9,8 +9,8 @@
package org.elasticsearch.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.TimeValue;
@ -23,21 +23,38 @@ import org.elasticsearch.core.TimeValue;
public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements ClusterStateAckListener {
private final ActionListener<AcknowledgedResponse> listener;
private final AckedRequest request;
private final TimeValue ackTimeout;
protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<? extends AcknowledgedResponse> listener) {
this(Priority.NORMAL, request, listener);
protected AckedClusterStateUpdateTask(AcknowledgedRequest<?> request, ActionListener<? extends AcknowledgedResponse> listener) {
this(Priority.NORMAL, request.masterNodeTimeout(), request.ackTimeout(), listener);
}
protected AckedClusterStateUpdateTask(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<? extends AcknowledgedResponse> listener
) {
this(Priority.NORMAL, masterNodeTimeout, ackTimeout, listener);
}
protected AckedClusterStateUpdateTask(
Priority priority,
AcknowledgedRequest<?> request,
ActionListener<? extends AcknowledgedResponse> listener
) {
this(priority, request.masterNodeTimeout(), request.ackTimeout(), listener);
}
@SuppressWarnings("unchecked")
protected AckedClusterStateUpdateTask(
Priority priority,
AckedRequest request,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
ActionListener<? extends AcknowledgedResponse> listener
) {
super(priority, request.masterNodeTimeout());
super(priority, masterNodeTimeout);
this.listener = (ActionListener<AcknowledgedResponse>) listener;
this.request = request;
this.ackTimeout = ackTimeout;
}
/**
@ -81,6 +98,6 @@ public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/
public final TimeValue ackTimeout() {
return request.ackTimeout();
return ackTimeout;
}
}

View file

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.ack;
import org.elasticsearch.core.TimeValue;
/**
* Identifies a cluster state update request with acknowledgement support
*/
public interface AckedRequest {
/**
* Returns the acknowledgement timeout
*/
TimeValue ackTimeout();
/**
* Returns the timeout for the request to be completed on the master node
*/
TimeValue masterNodeTimeout();
}

View file

@ -15,7 +15,7 @@ import org.elasticsearch.core.TimeValue;
* Base class to be used when needing to update the cluster state
* Contains the basic fields that are always needed
*/
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> implements AckedRequest {
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> {
private TimeValue ackTimeout;
private TimeValue masterNodeTimeout;
@ -23,7 +23,6 @@ public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequ
/**
* Returns the maximum time interval to wait for acknowledgements
*/
@Override
public TimeValue ackTimeout() {
return ackTimeout;
}
@ -41,7 +40,6 @@ public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequ
* Returns the maximum time interval to wait for the request to
* be completed on the master node
*/
@Override
public TimeValue masterNodeTimeout() {
return masterNodeTimeout;
}

View file

@ -99,7 +99,12 @@ public class MetadataCreateDataStreamService {
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
submitUnbatchedTask(
"create-data-stream [" + request.name + "]",
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
new AckedClusterStateUpdateTask(
Priority.HIGH,
request.masterNodeTimeout(),
request.ackTimeout(),
delegate.clusterStateUpdate()
) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// When we're manually creating a data stream (i.e. not an auto creation), we don't need to initialize the failure store

View file

@ -296,7 +296,12 @@ public class MetadataCreateIndexService {
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
submitUnbatchedTask(
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
new AckedClusterStateUpdateTask(Priority.URGENT, request, delegate.clusterStateUpdate()) {
new AckedClusterStateUpdateTask(
Priority.URGENT,
request.masterNodeTimeout(),
request.ackTimeout(),
delegate.clusterStateUpdate()
) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {

View file

@ -105,7 +105,12 @@ public class MetadataMigrateToDataStreamService {
var delegate = new AllocationActionListener<>(listener, threadContext);
submitUnbatchedTask(
"migrate-to-data-stream [" + request.aliasName + "]",
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
new AckedClusterStateUpdateTask(
Priority.HIGH,
request.masterNodeTimeout(),
request.ackTimeout(),
delegate.clusterStateUpdate()
) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {

View file

@ -15,6 +15,7 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -27,7 +28,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalMasterServiceTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@ -1571,7 +1571,7 @@ public class MasterServiceTests extends ESTestCase {
masterService.submitUnbatchedStateUpdateTask(
"test2",
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, null), null) {
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT), null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
@ -1623,7 +1623,7 @@ public class MasterServiceTests extends ESTestCase {
masterService.submitUnbatchedStateUpdateTask(
"test2",
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) {
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT), null) {
@Override
public ClusterState execute(ClusterState currentState) {
threadPool.getThreadContext().addResponseHeader(responseHeaderName, responseHeaderValue);
@ -1678,7 +1678,10 @@ public class MasterServiceTests extends ESTestCase {
masterService.submitUnbatchedStateUpdateTask(
"test2",
new AckedClusterStateUpdateTask(ackedRequest(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, null), null) {
new AckedClusterStateUpdateTask(
ackedRequest(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT),
null
) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
@ -2657,20 +2660,15 @@ public class MasterServiceTests extends ESTestCase {
}
/**
* Returns a plain {@link AckedRequest} that does not implement any functionality outside of the timeout getters.
* Returns a plain {@link AcknowledgedRequest} that does not implement any functionality outside of the timeout getters.
*/
public static AckedRequest ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
return new AckedRequest() {
@Override
public TimeValue ackTimeout() {
return ackTimeout;
public static AcknowledgedRequest<?> ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
class BareAcknowledgedRequest extends AcknowledgedRequest<BareAcknowledgedRequest> {
BareAcknowledgedRequest() {
super(masterNodeTimeout, ackTimeout);
}
@Override
public TimeValue masterNodeTimeout() {
return masterNodeTimeout;
}
};
}
return new BareAcknowledgedRequest();
}
/**

View file

@ -9,11 +9,11 @@ package org.elasticsearch.xpack.core.ilm;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
@ -36,7 +36,7 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
public static AckedClusterStateUpdateTask wrap(
OperationModeUpdateTask task,
AckedRequest request,
AcknowledgedRequest<?> request,
ActionListener<AcknowledgedResponse> listener
) {
return new AckedClusterStateUpdateTask(task.priority(), request, listener) {

View file

@ -16,14 +16,12 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -69,42 +67,35 @@ public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNo
final boolean manuallyStopped = request.getCommand() == WatcherServiceRequest.Command.STOP;
final String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started";
// TODO: make WatcherServiceRequest a real AckedRequest so that we have both a configurable timeout and master node timeout like
// we do elsewhere
submitUnbatchedTask(source, new AckedClusterStateUpdateTask(new AckedRequest() {
@Override
public TimeValue ackTimeout() {
return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
}
// TODO: make WatcherServiceRequest a real AcknowledgedRequest so that we have both a configurable timeout and master node timeout
// like we do elsewhere
submitUnbatchedTask(
source,
new AckedClusterStateUpdateTask(request.masterNodeTimeout(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, listener) {
@Override
public ClusterState execute(ClusterState clusterState) {
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
@Override
public TimeValue masterNodeTimeout() {
return request.masterNodeTimeout();
}
}, listener) {
@Override
public ClusterState execute(ClusterState clusterState) {
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
WatcherMetadata newWatcherMetadata = new WatcherMetadata(manuallyStopped);
WatcherMetadata currentMetadata = clusterState.metadata().custom(WatcherMetadata.TYPE);
WatcherMetadata newWatcherMetadata = new WatcherMetadata(manuallyStopped);
WatcherMetadata currentMetadata = clusterState.metadata().custom(WatcherMetadata.TYPE);
// adhere to the contract of returning the original state if nothing has changed
if (newWatcherMetadata.equals(currentMetadata)) {
return clusterState;
} else {
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
builder.metadata(Metadata.builder(clusterState.getMetadata()).putCustom(WatcherMetadata.TYPE, newWatcherMetadata));
return builder.build();
}
}
// adhere to the contract of returning the original state if nothing has changed
if (newWatcherMetadata.equals(currentMetadata)) {
return clusterState;
} else {
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
builder.metadata(Metadata.builder(clusterState.getMetadata()).putCustom(WatcherMetadata.TYPE, newWatcherMetadata));
return builder.build();
@Override
public void onFailure(Exception e) {
logger.error(() -> format("could not update watcher stopped status to [%s], source [%s]", manuallyStopped, source), e);
listener.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
logger.error(() -> format("could not update watcher stopped status to [%s], source [%s]", manuallyStopped, source), e);
listener.onFailure(e);
}
});
);
}
@Override