Make TransportRequest an interface (#126733)

In order to support a future TransportRequest variant that accepts the
response type, TransportRequest needs to be an interface. This commit
adds AbstractTransportRequest as a concrete implementation and makes
TransportRequest a simple interface that joints together the parent
interfaces from TransportMessage.

Note that this was done entirely in Intellij using structural find and
replace.
This commit is contained in:
Ryan Ernst 2025-04-14 14:22:28 -07:00 committed by GitHub
parent b917d9a1e0
commit 83ce15ae06
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
103 changed files with 473 additions and 434 deletions

View file

@ -18,7 +18,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -187,7 +187,7 @@ public class GetDatabaseConfigurationAction extends ActionType<Response> {
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
private final String[] databaseIds;

View file

@ -21,7 +21,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -68,7 +68,7 @@ public class GeoIpStatsAction {
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}

View file

@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef;
@ -47,7 +48,7 @@ public class CustomAuthorizationEngineTests extends ESTestCase {
public void testAuthorizeRunAs() {
final String action = "cluster:monitor/foo";
final TransportRequest request = new TransportRequest() {
final TransportRequest request = new AbstractTransportRequest() {
};
CustomAuthorizationEngine engine = new CustomAuthorizationEngine();
// unauthorized
@ -181,7 +182,7 @@ public class CustomAuthorizationEngineTests extends ESTestCase {
private RequestInfo getRequestInfo() {
final String action = "cluster:monitor/foo";
final TransportRequest request = new TransportRequest() {
final TransportRequest request = new AbstractTransportRequest() {
};
final Authentication authentication = Authentication.newRealmAuthentication(
new User("joe", "custom_superuser"),

View file

@ -11,11 +11,11 @@ package org.elasticsearch.action;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
public abstract class ActionRequest extends TransportRequest {
public abstract class ActionRequest extends AbstractTransportRequest {
public ActionRequest() {
super();

View file

@ -25,7 +25,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -110,7 +110,7 @@ public class TransportNodesCapabilitiesAction extends TransportNodesAction<
return new NodeCapability(supported, transportService.getLocalNode());
}
public static class NodeCapabilitiesRequest extends TransportRequest {
public static class NodeCapabilitiesRequest extends AbstractTransportRequest {
private final RestRequest.Method method;
private final String path;
private final Set<String> parameters;

View file

@ -21,13 +21,14 @@ import org.elasticsearch.features.FeatureService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // this can be removed in v10. It may be called by v8 nodes to v9 nodes.
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA)
// this can be removed in v10. It may be called by v8 nodes to v9 nodes.
public class TransportNodesFeaturesAction extends TransportNodesAction<
NodesFeaturesRequest,
NodesFeaturesResponse,
@ -82,7 +83,7 @@ public class TransportNodesFeaturesAction extends TransportNodesAction<
return new NodeFeatures(featureService.getNodeFeatures().keySet(), transportService.getLocalNode());
}
public static class NodeFeaturesRequest extends TransportRequest {
public static class NodeFeaturesRequest extends AbstractTransportRequest {
public NodeFeaturesRequest(StreamInput in) throws IOException {
super(in);
}

View file

@ -25,8 +25,8 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -106,7 +106,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
final HotThreads.RequestOptions requestOptions;

View file

@ -21,7 +21,7 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -95,7 +95,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<
);
}
public static class NodeInfoRequest extends TransportRequest {
public static class NodeInfoRequest extends AbstractTransportRequest {
private final NodesInfoMetrics nodesInfoMetrics;

View file

@ -24,8 +24,8 @@ import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Arrays;
@ -82,7 +82,7 @@ public class NodesReloadSecureSettingsRequest extends BaseNodesRequest {
return new NodeRequest(secureSettingsPassword, refs);
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
@Nullable
private final SecureString secureSettingsPassword;

View file

@ -12,7 +12,7 @@ package org.elasticsearch.action.admin.cluster.node.shutdown;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Collection;
@ -21,8 +21,8 @@ import java.util.Set;
/**
* A node-specific request derived from the corresponding {@link PrevalidateShardPathRequest}.
*/
public class NodePrevalidateShardPathRequest extends TransportRequest {
*/
public class NodePrevalidateShardPathRequest extends AbstractTransportRequest {
private final Set<ShardId> shardIds;

View file

@ -34,7 +34,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
@ -179,7 +179,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<
);
}
public static class NodeStatsRequest extends TransportRequest {
public static class NodeStatsRequest extends AbstractTransportRequest {
private final NodesStatsRequestParameters nodesStatsRequestParameters;

View file

@ -21,7 +21,7 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
@ -85,7 +85,7 @@ public class TransportNodesUsageAction extends TransportNodesAction<
return new NodeUsage(clusterService.localNode(), System.currentTimeMillis(), sinceTime, restUsage, aggsUsage);
}
public static class NodeUsageRequest extends TransportRequest {
public static class NodeUsageRequest extends AbstractTransportRequest {
final boolean restActions;
final boolean aggregations;

View file

@ -29,7 +29,7 @@ import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -148,7 +148,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
private final List<Snapshot> snapshots;
@ -157,7 +157,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<
snapshots = in.readCollectionAsList(Snapshot::new);
}
NodeRequest(TransportNodesSnapshotsStatus.Request request) {
NodeRequest(Request request) {
snapshots = Arrays.asList(request.snapshots);
}

View file

@ -57,10 +57,10 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.usage.SearchUsageHolder;
@ -319,7 +319,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
);
}
public static class ClusterStatsNodeRequest extends TransportRequest {
public static class ClusterStatsNodeRequest extends AbstractTransportRequest {
ClusterStatsNodeRequest() {}

View file

@ -11,14 +11,14 @@ package org.elasticsearch.action.admin.indices.dangling.find;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* Used when querying every node in the cluster for a specific dangling index.
*/
public class NodeFindDanglingIndexRequest extends TransportRequest {
public class NodeFindDanglingIndexRequest extends AbstractTransportRequest {
private final String indexUUID;
public NodeFindDanglingIndexRequest(String indexUUID) {

View file

@ -11,14 +11,14 @@ package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* Used when querying every node in the cluster for dangling indices, in response to a list request.
*/
public class NodeListDanglingIndicesRequest extends TransportRequest {
public class NodeListDanglingIndicesRequest extends AbstractTransportRequest {
/**
* Filter the response by index UUID. Leave as null to find all indices.
*/

View file

@ -26,8 +26,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
@ -100,7 +100,7 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
// TODO: Remove this transition in 9.0
private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
private static class PreShardSyncedFlushRequest extends TransportRequest {
private static class PreShardSyncedFlushRequest extends AbstractTransportRequest {
private final ShardId shardId;
private PreShardSyncedFlushRequest(StreamInput in) throws IOException {

View file

@ -29,7 +29,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.ArrayList;
@ -41,7 +41,7 @@ import java.util.Map;
/**
* Node-level request used during can-match phase
*/
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {
public class CanMatchNodeRequest extends AbstractTransportRequest implements IndicesRequest {
private final SearchSourceBuilder source;
private final List<Shard> shards;

View file

@ -50,13 +50,13 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
@ -286,7 +286,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
/**
* Request for starting the query phase for multiple shards.
*/
public static final class NodeQueryRequest extends TransportRequest implements IndicesRequest {
public static final class NodeQueryRequest extends AbstractTransportRequest implements IndicesRequest {
private final List<ShardToQuery> shards;
private final SearchRequest searchRequest;
private final Map<String, AliasFilter> aliasFilters;
@ -368,7 +368,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
* Check if, based on already collected results, a shard search can be updated with a lower search threshold than is current set.
* When the query executes via batched execution, data nodes this take into account the results of queries run against shards local
* to the datanode. On the coordinating node results received from all data nodes are taken into account.
*
* <p>
* See {@link BottomSortValuesCollector} for details.
*/
private static ShardSearchRequest tryRewriteWithUpdatedSortValue(

View file

@ -48,11 +48,11 @@ import org.elasticsearch.search.rank.feature.RankFeatureResult;
import org.elasticsearch.search.rank.feature.RankFeatureShardRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -67,7 +67,7 @@ import java.util.concurrent.Executor;
import java.util.function.BiFunction;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* An encapsulation of {@link SearchService} operations exposed through
* transport.
*/
public class SearchTransportService {
@ -323,7 +323,7 @@ public class SearchTransportService {
return new HashMap<>(clientConnections);
}
static class ScrollFreeContextRequest extends TransportRequest {
static class ScrollFreeContextRequest extends AbstractTransportRequest {
private final ShardSearchContextId contextId;
ScrollFreeContextRequest(ShardSearchContextId contextId) {
@ -347,7 +347,7 @@ public class SearchTransportService {
}
private static class ClearScrollContextsRequest extends TransportRequest {
private static class ClearScrollContextsRequest extends AbstractTransportRequest {
ClearScrollContextsRequest() {}
ClearScrollContextsRequest(StreamInput in) throws IOException {

View file

@ -39,10 +39,10 @@ import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -92,12 +92,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
ShardOpenReaderRequest::new,
new ShardOpenReaderRequestHandler()
);
TransportActionProxy.registerProxyAction(
transportService,
OPEN_SHARD_READER_CONTEXT_NAME,
false,
TransportOpenPointInTimeAction.ShardOpenReaderResponse::new
);
TransportActionProxy.registerProxyAction(transportService, OPEN_SHARD_READER_CONTEXT_NAME, false, ShardOpenReaderResponse::new);
}
@Override
@ -278,7 +273,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
}
}
private static final class ShardOpenReaderRequest extends TransportRequest implements IndicesRequest {
private static final class ShardOpenReaderRequest extends AbstractTransportRequest implements IndicesRequest {
final ShardId shardId;
final OriginalIndices originalIndices;
final TimeValue keepAlive;

View file

@ -15,11 +15,11 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
public abstract class BroadcastShardRequest extends TransportRequest implements IndicesRequest {
public abstract class BroadcastShardRequest extends AbstractTransportRequest implements IndicesRequest {
private final ShardId shardId;

View file

@ -42,8 +42,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -222,7 +222,7 @@ public abstract class TransportBroadcastByNodeAction<
* Resolves a list of concrete index names. Override this if index names should be resolved differently than normal.
*
* @param clusterState the cluster state
* @param request the underlying request
* @param request the underlying request
* @return a list of concrete index names that this action should operate on
*/
protected String[] resolveConcreteIndexNames(ClusterState clusterState, Request request) {
@ -465,7 +465,7 @@ public abstract class TransportBroadcastByNodeAction<
}.run(task, shards.iterator(), listener);
}
class NodeRequest extends TransportRequest implements IndicesRequest {
class NodeRequest extends AbstractTransportRequest implements IndicesRequest {
private final Request indicesLevelRequest;
private final List<ShardRouting> shards;
private final String nodeId;

View file

@ -9,13 +9,14 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -26,13 +27,13 @@ import java.util.Map;
* MasterNodeRequest#masterTerm()} sent out over the wire.
* <p>
* Note that in production this is <i>only</i> used for sending the request out, so there's no need to preserve other marker interfaces such
* as {@link org.elasticsearch.action.IndicesRequest} or {@link org.elasticsearch.action.IndicesRequest.Replaceable} on the wrapped request.
* as {@link IndicesRequest} or {@link IndicesRequest.Replaceable} on the wrapped request.
* The receiving node will deserialize a request without a wrapper, with the correct interfaces and the appropriate master term stored
* directly in {@link MasterNodeRequest#masterTerm()}. However in tests sometimes we want to intercept the request as it's being sent, for
* which it may be necessary to use the test utility {@code MasterNodeRequestHelper#unwrapTermOverride} to remove the wrapper and access the
* inner request.
*/
class TermOverridingMasterNodeRequest extends TransportRequest {
class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
private static final Logger logger = LogManager.getLogger(TermOverridingMasterNodeRequest.class);

View file

@ -63,6 +63,7 @@ import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RawIndexingDataTransportRequest;
import org.elasticsearch.transport.TransportChannel;
@ -84,7 +85,7 @@ import static org.elasticsearch.core.Strings.format;
/**
* Base class for requests that should be executed on a primary copy followed by replica copies.
* Subclasses can resolve the target shard and provide implementation for primary and replica operations.
*
* <p>
* The action samples cluster state on the receiving node to reroute to node with primary copy and on the
* primary node to validate request before primary operation followed by sampling state again for resolving
* nodes with replica copies to perform replication.
@ -793,7 +794,7 @@ public abstract class TransportReplicationAction<
* Responsible for routing and retrying failed operations on the primary.
* The actual primary operation is done in {@link ReplicationOperation} on the
* node with primary copy.
*
* <p>
* Resolves index and shard id for the request before routing it to target node
*/
final class ReroutePhase extends AbstractRunnable {
@ -1331,12 +1332,16 @@ public abstract class TransportReplicationAction<
}
}
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
/**
* a wrapper class to encapsulate a request when being sent to a specific allocation id
**/
public static class ConcreteShardRequest<R extends TransportRequest> extends AbstractTransportRequest
implements
RawIndexingDataTransportRequest {
/** {@link AllocationId#getId()} of the shard this request is sent to **/
/**
* {@link AllocationId#getId()} of the shard this request is sent to
**/
private final String targetAllocationID;
private final long primaryTerm;
private final R request;
@ -1346,7 +1351,7 @@ public abstract class TransportReplicationAction<
// is only true if sentFromLocalReroute is true.
private final boolean localRerouteInitiatedByNodeClient;
public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
public ConcreteShardRequest(Reader<R> requestReader, StreamInput in) throws IOException {
targetAllocationID = in.readString();
primaryTerm = in.readVLong();
sentFromLocalReroute = false;
@ -1460,7 +1465,7 @@ public abstract class TransportReplicationAction<
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
public ConcreteReplicaRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
public ConcreteReplicaRequest(Reader<R> requestReader, StreamInput in) throws IOException {
super(requestReader, in);
globalCheckpoint = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();

View file

@ -30,8 +30,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -243,10 +243,11 @@ public abstract class TransportTasksAction<
/**
* Perform the required operation on the task. It is OK start an asynchronous operation or to throw an exception but not both.
*
* @param actionTask The related transport action task. Can be used to create a task ID to handle upstream transport cancellations.
* @param request the original transport request
* @param task the task on which the operation is taking place
* @param listener the listener to signal.
* @param request the original transport request
* @param task the task on which the operation is taking place
* @param listener the listener to signal.
*/
protected abstract void taskOperation(
CancellableTask actionTask,
@ -271,7 +272,7 @@ public abstract class TransportTasksAction<
}
}
private class NodeTaskRequest extends TransportRequest {
private class NodeTaskRequest extends AbstractTransportRequest {
private final TasksRequest tasksRequest;
protected NodeTaskRequest(StreamInput in) throws IOException {

View file

@ -49,10 +49,12 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
@ -172,13 +174,13 @@ public class ShardStateAction {
* that the shard should be failed because a write made it into the primary but was not replicated to this shard copy. If the shard
* does not exist anymore but still has an entry in the in-sync set, remove its allocation id from the in-sync set.
*
* @param shardId shard id of the shard to fail
* @param allocationId allocation id of the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive.
* @param markAsStale whether or not to mark a failing shard as stale (eg. removing from in-sync set) when failing the shard.
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
* @param shardId shard id of the shard to fail
* @param allocationId allocation id of the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive.
* @param markAsStale whether or not to mark a failing shard as stale (eg. removing from in-sync set) when failing the shard.
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void remoteShardFailed(
final ShardId shardId,
@ -203,7 +205,7 @@ public class ShardStateAction {
/**
* Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by
* {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests
* {@link IndicesClusterStateService} in case of a master failover to enable sending fresh requests
* to the new master right away on master failover.
* This method is best effort in so far that it might clear out valid requests in edge cases during master failover. This is not an
* issue functionally and merely results in some unnecessary transport requests.
@ -308,7 +310,7 @@ public class ShardStateAction {
@Override
public ClusterState execute(BatchExecutionContext<FailedShardUpdateTask> batchExecutionContext) throws Exception {
List<ClusterStateTaskExecutor.TaskContext<FailedShardUpdateTask>> tasksToBeApplied = new ArrayList<>();
List<TaskContext<FailedShardUpdateTask>> tasksToBeApplied = new ArrayList<>();
List<FailedShard> failedShardsToBeApplied = new ArrayList<>();
List<StaleShard> staleShardsToBeApplied = new ArrayList<>();
final ClusterState initialState = batchExecutionContext.initialState();
@ -443,7 +445,7 @@ public class ShardStateAction {
}
}
public static class FailedShardEntry extends TransportRequest {
public static class FailedShardEntry extends AbstractTransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;
@ -611,7 +613,8 @@ public class ShardStateAction {
/**
* Holder of the pair of time ranges needed in cluster state - one for @timestamp, the other for 'event.ingested'.
* Since 'event.ingested' was added well after @timestamp, it can be UNKNOWN when @timestamp range is present.
* @param timestampRange range for @timestamp
*
* @param timestampRange range for @timestamp
* @param eventIngestedRange range for event.ingested
*/
record ClusterStateTimeRanges(IndexLongFieldRange timestampRange, IndexLongFieldRange eventIngestedRange) {}
@ -833,7 +836,7 @@ public class ShardStateAction {
}
}
public static class StartedShardEntry extends TransportRequest {
public static class StartedShardEntry extends AbstractTransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;

View file

@ -30,12 +30,12 @@ import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponseHandler;
@ -434,7 +434,7 @@ public final class FollowersChecker {
}
}
public static class FollowerCheckRequest extends TransportRequest {
public static class FollowerCheckRequest extends AbstractTransportRequest {
private final long term;

View file

@ -42,9 +42,9 @@ import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -200,7 +200,7 @@ public class JoinHelper {
/**
* Saves information about a join failure. The failure information may be logged later via either {@link FailedJoinAttempt#logNow}
* or {@link FailedJoinAttempt#lastFailedJoinAttempt}.
*
* <p>
* Package-private for testing.
*/
static class FailedJoinAttempt {
@ -212,7 +212,7 @@ public class JoinHelper {
/**
* @param destination the master node targeted by the join request.
* @param joinRequest the join request that was sent to the perceived master node.
* @param exception the error response received in reply to the join request attempt.
* @param exception the error response received in reply to the join request attempt.
*/
FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, ElasticsearchException exception) {
this.destination = destination;
@ -610,7 +610,7 @@ public class JoinHelper {
static final String PENDING_JOIN_CONNECT_FAILED = "failed to connect";
static final String PENDING_JOIN_FAILED = "failed";
static class JoinPingRequest extends TransportRequest {
static class JoinPingRequest extends AbstractTransportRequest {
JoinPingRequest() {}
JoinPingRequest(StreamInput in) throws IOException {

View file

@ -12,14 +12,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class JoinRequest extends TransportRequest {
public class JoinRequest extends AbstractTransportRequest {
/**
* The sending (i.e. joining) node.

View file

@ -28,13 +28,13 @@ import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponseHandler;
@ -397,7 +397,7 @@ public class LeaderChecker {
}
}
static class LeaderCheckRequest extends TransportRequest {
static class LeaderCheckRequest extends AbstractTransportRequest {
private final DiscoveryNode sender;

View file

@ -12,12 +12,12 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Objects;
public class PreVoteRequest extends TransportRequest {
public class PreVoteRequest extends AbstractTransportRequest {
private final DiscoveryNode sourceNode;
private final long currentTerm;

View file

@ -11,19 +11,19 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* Represents the action of requesting a join vote (see {@link Join}) from a node.
*
* <p>
* A {@link StartJoinRequest} is broadcast to each node in the cluster, requesting
* that each node join the new cluster formed around the master candidate node in a
* new term. The sender is either the new master candidate or the current master
* abdicating to another eligible node in the cluster.
*/
public class StartJoinRequest extends TransportRequest {
public class StartJoinRequest extends AbstractTransportRequest {
private final DiscoveryNode masterCandidateNode;

View file

@ -12,11 +12,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
abstract class TermVersionRequest extends TransportRequest implements Writeable {
abstract class TermVersionRequest extends AbstractTransportRequest implements Writeable {
protected final DiscoveryNode sourceNode;
protected final long term;
protected final long version;

View file

@ -12,13 +12,13 @@ package org.elasticsearch.discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class PeersRequest extends TransportRequest {
public class PeersRequest extends AbstractTransportRequest {
private final DiscoveryNode sourceNode;
private final List<DiscoveryNode> knownPeers;

View file

@ -36,8 +36,8 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -251,7 +251,7 @@ public class LocalAllocateDangledIndices {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}
public static class AllocateDangledRequest extends TransportRequest {
public static class AllocateDangledRequest extends AbstractTransportRequest {
DiscoveryNode fromNode;
IndexMetadata[] indices;

View file

@ -38,7 +38,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@ -229,7 +229,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
private final ShardId shardId;
@Nullable

View file

@ -21,7 +21,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.metrics.Counters;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.List;
@ -50,7 +50,7 @@ public class HealthApiStatsAction extends ActionType<HealthApiStatsAction.Respon
return "health_api_stats";
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
public Node(StreamInput in) throws IOException {
super(in);

View file

@ -13,14 +13,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* The request object to handoff the primary context to the relocation target.
*/
class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
class RecoveryHandoffPrimaryContextRequest extends AbstractTransportRequest {
private final long recoveryId;
private final ShardId shardId;

View file

@ -12,11 +12,11 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
public abstract class RecoveryTransportRequest extends TransportRequest {
public abstract class RecoveryTransportRequest extends AbstractTransportRequest {
private final long requestSeqNo;

View file

@ -12,14 +12,14 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* Represents a request for starting a peer recovery.
*/
public class ReestablishRecoveryRequest extends TransportRequest {
public class ReestablishRecoveryRequest extends AbstractTransportRequest {
private final long recoveryId;
private final ShardId shardId;

View file

@ -17,14 +17,14 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
/**
* Represents a request for starting a peer recovery.
*/
public class StartRecoveryRequest extends TransportRequest {
public class StartRecoveryRequest extends AbstractTransportRequest {
private final long recoveryId;
private final ShardId shardId;

View file

@ -47,9 +47,9 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
@ -462,7 +462,7 @@ public final class IndicesStore implements ClusterStateListener, Closeable {
}
private static class ShardActiveRequest extends TransportRequest {
private static class ShardActiveRequest extends AbstractTransportRequest {
private final TimeValue timeout;
private final ClusterName clusterName;
private final String indexUUID;

View file

@ -11,6 +11,7 @@ package org.elasticsearch.indices.store;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexNotFoundException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionType;
@ -45,7 +46,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -142,7 +143,7 @@ public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
);
exists = true;
return storeFilesMetadata;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
} catch (IndexNotFoundException e) {
logger.trace(() -> "[" + shardId + "] node is missing index, responding with empty", e);
return StoreFilesMetadata.EMPTY;
} catch (IOException e) {
@ -306,7 +307,7 @@ public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
private final ShardId shardId;
@Nullable

View file

@ -26,7 +26,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -88,7 +88,7 @@ public final class IndexModeStatsActionType extends ActionType<IndexModeStatsAct
}
}
public static final class NodeRequest extends TransportRequest {
public static final class NodeRequest extends AbstractTransportRequest {
NodeRequest() {
}

View file

@ -23,7 +23,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.rank.RankDocShardInfo;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.List;
@ -33,7 +33,7 @@ import java.util.Map;
* Shard level fetch base request. Holds all the info needed to execute a fetch.
* Used with search scroll as the original request doesn't hold indices.
*/
public class ShardFetchRequest extends TransportRequest {
public class ShardFetchRequest extends AbstractTransportRequest {
private final ShardSearchContextId contextId;

View file

@ -16,12 +16,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Map;
public class InternalScrollSearchRequest extends TransportRequest {
public class InternalScrollSearchRequest extends AbstractTransportRequest {
private ShardSearchContextId contextId;

View file

@ -51,7 +51,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.ArrayList;
@ -68,7 +68,7 @@ import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_D
* It provides all the methods that the {@link SearchContext} needs.
* Provides a cache key based on its content that can be used to cache shard level response.
*/
public class ShardSearchRequest extends TransportRequest implements IndicesRequest {
public class ShardSearchRequest extends AbstractTransportRequest implements IndicesRequest {
private final String clusterAlias;
private final ShardId shardId;
private final int shardRequestIndex;

View file

@ -22,12 +22,12 @@ import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Map;
public class QuerySearchRequest extends TransportRequest implements IndicesRequest {
public class QuerySearchRequest extends AbstractTransportRequest implements IndicesRequest {
private final ShardSearchContextId contextId;
private final AggregatedDfs dfs;

View file

@ -18,7 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.List;
@ -29,7 +29,7 @@ import java.util.stream.IntStream;
* Shard level request for extracting all needed feature for a global reranker
*/
public class RankFeatureShardRequest extends TransportRequest implements IndicesRequest {
public class RankFeatureShardRequest extends AbstractTransportRequest implements IndicesRequest {
private final OriginalIndices originalIndices;
private final ShardSearchRequest shardSearchRequest;

View file

@ -27,12 +27,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
@ -299,7 +299,7 @@ public class TaskCancellationService {
return cause instanceof NodeDisconnectedException || cause instanceof NodeNotConnectedException;
}
private static class BanParentTaskRequest extends TransportRequest {
private static class BanParentTaskRequest extends AbstractTransportRequest {
private final TaskId parentTaskId;
private final boolean ban;
@ -375,7 +375,7 @@ public class TaskCancellationService {
}
}
private static class CancelChildRequest extends TransportRequest {
private static class CancelChildRequest extends AbstractTransportRequest {
private final TaskId parentTaskId;
private final long childRequestId;

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.net.InetSocketAddress;
public abstract class AbstractTransportRequest extends TransportMessage implements TransportRequest {
@Nullable // set by the transport service on inbound messages; unset on outbound messages
private InetSocketAddress remoteAddress;
/**
* Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".
*/
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
/**
* Request ID. Defaults to -1, meaning "no request ID is set".
*/
private volatile long requestId = -1;
public AbstractTransportRequest() {}
public AbstractTransportRequest(StreamInput in) throws IOException {
parentTaskId = TaskId.readFromStream(in);
}
@Override
public void remoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Nullable // set by the transport service on inbound messages; unset on outbound messages
@Override
public InetSocketAddress remoteAddress() {
return remoteAddress;
}
/**
* Set a reference to task that created this request.
*/
@Override
public void setParentTask(TaskId taskId) {
this.parentTaskId = taskId;
}
/**
* Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
*/
@Override
public TaskId getParentTask() {
return parentTaskId;
}
/**
* Set the request ID of this request.
*/
@Override
public void setRequestId(long requestId) {
this.requestId = requestId;
}
@Override
public long getRequestId() {
return requestId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
parentTaskId.writeTo(out);
}
@Override
public String toString() {
return getClass().getName() + "/" + getParentTask();
}
}

View file

@ -21,7 +21,7 @@ import java.io.IOException;
* A specialized, bytes only request, that can potentially be optimized on the network
* layer, specifically for the same large buffer send to several nodes.
*/
public class BytesTransportRequest extends TransportRequest {
public class BytesTransportRequest extends AbstractTransportRequest {
final ReleasableBytesReference bytes;
private final TransportVersion version;

View file

@ -125,7 +125,7 @@ public final class TransportActionProxy {
}
}
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
static class ProxyRequest<T extends TransportRequest> extends AbstractTransportRequest {
final T wrapped;
final DiscoveryNode targetNode;

View file

@ -337,7 +337,7 @@ final class TransportHandshaker {
}
}
static final class HandshakeRequest extends TransportRequest {
static final class HandshakeRequest extends AbstractTransportRequest {
/**
* The {@link TransportVersion#current()} of the requesting node.

View file

@ -9,81 +9,14 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.net.InetSocketAddress;
public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {
public interface TransportRequest extends Writeable, RefCounted, TaskAwareRequest {
void remoteAddress(InetSocketAddress remoteAddress);
@Nullable // set by the transport service on inbound messages; unset on outbound messages
private InetSocketAddress remoteAddress;
/**
* Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".
*/
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
/**
* Request ID. Defaults to -1, meaning "no request ID is set".
*/
private volatile long requestId = -1;
public TransportRequest() {}
public TransportRequest(StreamInput in) throws IOException {
parentTaskId = TaskId.readFromStream(in);
}
public void remoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Nullable // set by the transport service on inbound messages; unset on outbound messages
public InetSocketAddress remoteAddress() {
return remoteAddress;
}
/**
* Set a reference to task that created this request.
*/
@Override
public void setParentTask(TaskId taskId) {
this.parentTaskId = taskId;
}
/**
* Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
*/
@Override
public TaskId getParentTask() {
return parentTaskId;
}
/**
* Set the request ID of this request.
*/
@Override
public void setRequestId(long requestId) {
this.requestId = requestId;
}
@Override
public long getRequestId() {
return requestId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
parentTaskId.writeTo(out);
}
@Override
public String toString() {
return getClass().getName() + "/" + getParentTask();
}
InetSocketAddress remoteAddress();
}

View file

@ -141,7 +141,9 @@ public class TransportService extends AbstractLifecycleComponent
private final RemoteClusterService remoteClusterService;
/** if set will call requests sent to this id to shortcut and executed locally */
/**
* if set will call requests sent to this id to shortcut and executed locally
*/
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@Override
@ -220,7 +222,8 @@ public class TransportService extends AbstractLifecycleComponent
* Build the service.
*
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
* updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING}
* and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(
Settings settings,
@ -424,7 +427,7 @@ public class TransportService extends AbstractLifecycleComponent
/**
* Start accepting incoming requests.
*
* <p>
* The transport service starts before it's ready to accept incoming requests because we need to know the address(es) to which we are
* bound, which means we have to actually bind to them and start accepting incoming connections. However until this method is called we
* reject any incoming requests, including handshakes, by closing the connection.
@ -484,7 +487,7 @@ public class TransportService extends AbstractLifecycleComponent
* Connect to the specified node with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param node the node to connect to
* @param listener the action listener to notify
*/
public void connectToNode(DiscoveryNode node, ActionListener<Releasable> listener) throws ConnectTransportException {
@ -495,9 +498,9 @@ public class TransportService extends AbstractLifecycleComponent
* Connect to the specified node with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param listener the action listener to notify
* @param listener the action listener to notify
*/
public void connectToNode(
final DiscoveryNode node,
@ -540,9 +543,10 @@ public class TransportService extends AbstractLifecycleComponent
* Establishes a new connection to the given node. The connection is NOT maintained by this service, it's the callers
* responsibility to close the connection once it goes out of scope.
* The ActionListener will be called on the calling thread or the generic thread pool.
* @param node the node to connect to
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use
* @param listener the action listener to notify
* @param listener the action listener to notify
*/
public void openConnection(
final DiscoveryNode node,
@ -567,7 +571,7 @@ public class TransportService extends AbstractLifecycleComponent
* @param handshakeTimeout handshake timeout
* @param listener action listener to notify
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
* @throws IllegalStateException if the handshake failed
*/
public void handshake(
final Transport.Connection connection,
@ -584,10 +588,10 @@ public class TransportService extends AbstractLifecycleComponent
* name on the target node doesn't match the local cluster name.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @param clusterNamePredicate cluster name validation predicate
* @param listener action listener to notify
* @param listener action listener to notify
* @throws IllegalStateException if the handshake failed
*/
public void handshake(
@ -641,7 +645,7 @@ public class TransportService extends AbstractLifecycleComponent
return transport.newNetworkBytesStream();
}
static class HandshakeRequest extends TransportRequest {
static class HandshakeRequest extends AbstractTransportRequest {
public static final HandshakeRequest INSTANCE = new HandshakeRequest();
@ -884,6 +888,7 @@ public class TransportService extends AbstractLifecycleComponent
/**
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
*
* @throws NodeNotConnectedException if the given node is not connected
*/
public Transport.Connection getConnection(DiscoveryNode node) {
@ -1189,10 +1194,10 @@ public class TransportService extends AbstractLifecycleComponent
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
* @param action The action the request handler is associated with
* @param requestReader a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(
String action,
@ -1219,7 +1224,7 @@ public class TransportService extends AbstractLifecycleComponent
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader The request class that will be used to construct new instances for streaming
* @param requestReader The request class that will be used to construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
@ -1262,7 +1267,9 @@ public class TransportService extends AbstractLifecycleComponent
}
}
/** called by the {@link Transport} implementation once a request has been sent */
/**
* called by the {@link Transport} implementation once a request has been sent
*/
@Override
public void onRequestSent(
DiscoveryNode node,
@ -1286,7 +1293,9 @@ public class TransportService extends AbstractLifecycleComponent
}
}
/** called by the {@link Transport} implementation once a response was sent to calling node */
/**
* called by the {@link Transport} implementation once a response was sent to calling node
*/
@Override
public void onResponseSent(long requestId, String action) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
@ -1294,7 +1303,9 @@ public class TransportService extends AbstractLifecycleComponent
}
}
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
/**
* called by the {@link Transport} implementation after an exception was sent as a response to an incoming request
*/
@Override
public void onResponseSent(long requestId, String action, Exception e) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {

View file

@ -33,9 +33,9 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.FakeTcpChannel;
import org.elasticsearch.transport.TestTransportChannels;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -62,7 +62,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class CancellableTasksTests extends TaskManagerTestCase {
public static class CancellableNodeRequest extends TransportRequest {
public static class CancellableNodeRequest extends AbstractTransportRequest {
protected String requestName;
public CancellableNodeRequest() {

View file

@ -40,6 +40,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
@ -161,7 +162,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
protected final String requestName;
protected final boolean shouldBlock;

View file

@ -46,7 +46,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
@ -78,7 +78,7 @@ import static org.hamcrest.Matchers.nullValue;
public class TransportTasksActionTests extends TaskManagerTestCase {
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
protected String requestName;
public NodeRequest(StreamInput in) throws IOException {

View file

@ -38,8 +38,8 @@ import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import org.junit.After;
@ -507,7 +507,7 @@ public class TransportNodesActionTests extends ESTestCase {
}
}
private static class TestNodeRequest extends TransportRequest {
private static class TestNodeRequest extends AbstractTransportRequest {
private final RefCounted refCounted = AbstractRefCounted.of(() -> {});
TestNodeRequest() {}

View file

@ -25,10 +25,10 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.EmptyRequest;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
@ -133,7 +133,7 @@ public class BanFailureLoggingTests extends TaskManagerTestCase {
childTransportService.registerRequestHandler(
"internal:testAction[c]",
threadPool.executor(ThreadPool.Names.MANAGEMENT), // busy-wait for cancellation but not on a transport thread
(StreamInput in) -> new TransportRequest(in) {
(StreamInput in) -> new AbstractTransportRequest(in) {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);

View file

@ -31,6 +31,7 @@ import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.FakeTcpChannel;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransportChannel;
@ -42,7 +43,6 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
@ -282,7 +282,7 @@ public class TaskManagerTests extends ESTestCase {
* Check that registering a task also causes tracing to be started on that task.
*/
public void testRegisterTaskStartsTracing() {
final Tracer mockTracer = Mockito.mock(Tracer.class);
final Tracer mockTracer = mock(Tracer.class);
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer);
final Task task = taskManager.register("testType", "testAction", new TaskAwareRequest() {
@ -306,7 +306,7 @@ public class TaskManagerTests extends ESTestCase {
* Check that unregistering a task also causes tracing to be stopped on that task.
*/
public void testUnregisterTaskStopsTracing() {
final Tracer mockTracer = Mockito.mock(Tracer.class);
final Tracer mockTracer = mock(Tracer.class);
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer);
final Task task = taskManager.register("testType", "testAction", new TaskAwareRequest() {
@ -332,7 +332,7 @@ public class TaskManagerTests extends ESTestCase {
* Check that registering and executing a task also causes tracing to be started and stopped on that task.
*/
public void testRegisterAndExecuteStartsAndStopsTracing() {
final Tracer mockTracer = Mockito.mock(Tracer.class);
final Tracer mockTracer = mock(Tracer.class);
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer);
final Task task = taskManager.registerAndExecute(
@ -370,7 +370,7 @@ public class TaskManagerTests extends ESTestCase {
}
public void testRegisterWithEnabledDisabledTracing() {
final Tracer mockTracer = Mockito.mock(Tracer.class);
final Tracer mockTracer = mock(Tracer.class);
final TaskManager taskManager = spy(new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer));
taskManager.register("type", "action", makeTaskRequest(true, 123), false);
@ -390,7 +390,7 @@ public class TaskManagerTests extends ESTestCase {
verify(taskManager, times(1)).startTrace(any(), any());
}
static class CancellableRequest extends TransportRequest {
static class CancellableRequest extends AbstractTransportRequest {
private final String requestId;
CancellableRequest(String requestId) {

View file

@ -28,7 +28,7 @@ public class ResultDeduplicatorTests extends ESTestCase {
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failureCount = new AtomicInteger();
Exception failure = randomBoolean() ? new TransportException("simulated") : null;
final TransportRequest request = new TransportRequest() {
final TransportRequest request = new AbstractTransportRequest() {
@Override
public void setParentTask(final TaskId taskId) {}
};

View file

@ -400,7 +400,7 @@ public class TransportActionProxyTests extends ESTestCase {
latch.await();
}
public static class SimpleTestRequest extends TransportRequest {
public static class SimpleTestRequest extends AbstractTransportRequest {
final boolean cancellable;
final String sourceNode;

View file

@ -246,7 +246,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
version,
threadPool,
clusterSettings,
Collections.emptySet(),
emptySet(),
interceptor
);
service.start();
@ -1472,7 +1472,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
public static class StringMessageRequest extends TransportRequest implements RawIndexingDataTransportRequest {
public static class StringMessageRequest extends AbstractTransportRequest implements RawIndexingDataTransportRequest {
private String message;
private long timeout;
@ -1533,7 +1533,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
public static class Version0Request extends TransportRequest {
public static class Version0Request extends AbstractTransportRequest {
int value1;
@ -2053,7 +2053,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
public static class TestRequest extends TransportRequest {
public static class TestRequest extends AbstractTransportRequest {
String info;
int resendCount;
@ -3228,7 +3228,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testActionStats() throws Exception {
final String ACTION = "internal:action";
class Request extends TransportRequest {
class Request extends AbstractTransportRequest {
final int refSize;
Request(int refSize) {
@ -3477,7 +3477,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
* Connect to the specified node with the default connection profile
*
* @param service service to connect from
* @param node the node to connect to
* @param node the node to connect to
*/
public static void connectToNode(TransportService service, DiscoveryNode node) throws ConnectTransportException {
connectToNode(service, node, null);
@ -3486,8 +3486,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
/**
* Connect to the specified node with the given connection profile
*
* @param service service to connect from
* @param node the node to connect to
* @param service service to connect from
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
*/
public static void connectToNode(TransportService service, DiscoveryNode node, ConnectionProfile connectionProfile) {
@ -3512,8 +3512,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
/**
* Establishes and returns a new connection to the given node from the given {@link TransportService}.
*
* @param service service to connect from
* @param node the node to connect to
* @param service service to connect from
* @param node the node to connect to
* @param connectionProfile the connection profile to use
*/
public static Transport.Connection openConnection(TransportService service, DiscoveryNode node, ConnectionProfile connectionProfile) {

View file

@ -17,7 +17,7 @@ import java.io.IOException;
* A transport request with an empty payload. Not really entirely empty: all transport requests include the parent task ID, a request ID,
* and the remote address (if applicable).
*/
public final class EmptyRequest extends TransportRequest {
public final class EmptyRequest extends AbstractTransportRequest {
public EmptyRequest() {}
public EmptyRequest(StreamInput in) throws IOException {

View file

@ -13,7 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class TestRequest extends TransportRequest {
public class TestRequest extends AbstractTransportRequest {
String value;

View file

@ -620,7 +620,7 @@ public class DisruptableMockTransportTests extends ESTestCase {
);
}
private class TestRequest extends TransportRequest {
private class TestRequest extends AbstractTransportRequest {
private final RefCounted refCounted;
TestRequest() {

View file

@ -16,7 +16,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.common.stats.EnumCounters;
@ -81,7 +81,7 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}
@ -109,9 +109,7 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
}
public EnumCounters<Item> getStats() {
List<EnumCounters<Item>> countersPerNode = getNodes().stream()
.map(AnalyticsStatsAction.NodeResponse::getStats)
.collect(Collectors.toList());
List<EnumCounters<Item>> countersPerNode = getNodes().stream().map(NodeResponse::getStats).collect(Collectors.toList());
return EnumCounters.merge(Item.class, countersPerNode);
}

View file

@ -35,7 +35,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -171,7 +171,7 @@ public class NodesDataTiersUsageTransportAction extends TransportNodesAction<
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);

View file

@ -18,7 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -56,7 +56,7 @@ public class GetInferenceDiagnosticsAction extends ActionType<GetInferenceDiagno
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}

View file

@ -10,7 +10,7 @@ package org.elasticsearch.xpack.core.security.action;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -41,7 +41,7 @@ public class ClearSecurityCacheRequest extends BaseNodesRequest {
return keys;
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private String cacheName;
private String[] keys;

View file

@ -10,7 +10,7 @@ package org.elasticsearch.xpack.core.security.action.privilege;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -41,7 +41,7 @@ public class ClearPrivilegesCacheRequest extends BaseNodesRequest {
return clearRolesCache;
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private String[] applicationNames;
private boolean clearRolesCache;

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.security.action.realm;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -23,14 +23,14 @@ public class ClearRealmCacheRequest extends BaseNodesRequest {
}
/**
* @return {@code true} if this request targets realms, {@code false} otherwise.
* @return {@code true} if this request targets realms, {@code false} otherwise.
*/
public boolean allRealms() {
return realms == null || realms.length == 0;
}
/**
* @return The realms that should be evicted. Empty array indicates all realms.
* @return The realms that should be evicted. Empty array indicates all realms.
*/
public String[] realms() {
return realms;
@ -40,7 +40,7 @@ public class ClearRealmCacheRequest extends BaseNodesRequest {
* Sets the realms for which caches will be evicted. When not set all the caches of all realms will be
* evicted.
*
* @param realms The realm names
* @param realms The realm names
*/
public ClearRealmCacheRequest realms(String... realms) {
this.realms = realms;
@ -48,14 +48,14 @@ public class ClearRealmCacheRequest extends BaseNodesRequest {
}
/**
* @return {@code true} if this request targets users, {@code false} otherwise.
* @return {@code true} if this request targets users, {@code false} otherwise.
*/
public boolean allUsernames() {
return usernames == null || usernames.length == 0;
}
/**
* @return The usernames of the users that should be evicted. Empty array indicates all users.
* @return The usernames of the users that should be evicted. Empty array indicates all users.
*/
public String[] usernames() {
return usernames;
@ -72,7 +72,7 @@ public class ClearRealmCacheRequest extends BaseNodesRequest {
return this;
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private String[] realms;
private String[] usernames;

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.security.action.role;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -27,7 +27,7 @@ public class ClearRolesCacheRequest extends BaseNodesRequest {
/**
* Sets the roles for which caches will be evicted. When not set all the roles will be evicted from the cache.
*
* @param names The role names
* @param names The role names
*/
public ClearRolesCacheRequest names(String... names) {
this.names = names;
@ -41,7 +41,7 @@ public class ClearRolesCacheRequest extends BaseNodesRequest {
return names;
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private String[] names;
public Node(StreamInput in) throws IOException {

View file

@ -10,7 +10,7 @@ package org.elasticsearch.xpack.core.security.action.service;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -29,7 +29,7 @@ public class GetServiceAccountCredentialsNodesRequest extends BaseNodesRequest {
this.serviceName = serviceName;
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private final String namespace;
private final String serviceName;

View file

@ -16,7 +16,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.common.stats.EnumCounters;
@ -76,7 +76,7 @@ public class SpatialStatsAction extends ActionType<SpatialStatsAction.Response>
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}
@ -104,9 +104,7 @@ public class SpatialStatsAction extends ActionType<SpatialStatsAction.Response>
}
public EnumCounters<Item> getStats() {
List<EnumCounters<Item>> countersPerNode = getNodes().stream()
.map(SpatialStatsAction.NodeResponse::getStats)
.collect(Collectors.toList());
List<EnumCounters<Item>> countersPerNode = getNodes().stream().map(NodeResponse::getStats).collect(Collectors.toList());
return EnumCounters.merge(Item.class, countersPerNode);
}

View file

@ -15,7 +15,7 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Collections;
@ -25,7 +25,7 @@ import java.util.Set;
* Internal terms enum request executed directly against a specific node, querying potentially many
* shards in one request
*/
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {
public class NodeTermsEnumRequest extends AbstractTransportRequest implements IndicesRequest {
private final String field;
private final String string;

View file

@ -19,8 +19,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformSchedulerStats;
@ -95,7 +94,7 @@ public class GetTransformNodeStatsAction extends ActionType<GetTransformNodeStat
}
}
public static class NodeStatsRequest extends TransportRequest {
public static class NodeStatsRequest extends AbstractTransportRequest {
public NodeStatsRequest() {}
@ -134,7 +133,7 @@ public class GetTransformNodeStatsAction extends ActionType<GetTransformNodeStat
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SCHEDULER_STATS_FIELD_NAME, schedulerStats);
return builder.endObject();

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.watcher.transport.actions.stats;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -55,7 +55,7 @@ public class WatcherStatsRequest extends BaseNodesRequest {
return "watcher_stats";
}
public static class Node extends TransportRequest {
public static class Node extends AbstractTransportRequest {
private boolean includeCurrentWatches;
private boolean includeQueuedWatches;

View file

@ -20,7 +20,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse;
import org.junit.Before;
@ -113,7 +113,7 @@ public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
}
}
public class TestRequest extends TransportRequest {
public class TestRequest extends AbstractTransportRequest {
private final String string;
public TestRequest(String string) {

View file

@ -12,7 +12,7 @@ import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
import java.io.IOException;
@ -31,7 +31,7 @@ public class NodesDeprecationCheckAction extends ActionType<NodesDeprecationChec
super(NAME);
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest() {}

View file

@ -18,7 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -27,7 +27,7 @@ import java.util.List;
import java.util.Objects;
/**
Resets deprecation indexing rate limiting cache on each node.
* Resets deprecation indexing rate limiting cache on each node.
*/
public class DeprecationCacheResetAction extends ActionType<DeprecationCacheResetAction.Response> {
public static final DeprecationCacheResetAction INSTANCE = new DeprecationCacheResetAction();
@ -96,7 +96,7 @@ public class DeprecationCacheResetAction extends ActionType<DeprecationCacheRese
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}

View file

@ -22,7 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
@ -51,7 +51,7 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
}
}
public static class NodeRequest extends TransportRequest {
public static class NodeRequest extends AbstractTransportRequest {
NodeRequest() {}

View file

@ -10,7 +10,7 @@ package org.elasticsearch.xpack.eql.plugin;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -38,7 +38,7 @@ public class EqlStatsRequest extends BaseNodesRequest {
return "eql_stats";
}
static class NodeStatsRequest extends TransportRequest {
static class NodeStatsRequest extends AbstractTransportRequest {
boolean includeStats;
NodeStatsRequest(StreamInput in) throws IOException {

View file

@ -12,13 +12,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public final class ExchangeRequest extends TransportRequest {
public final class ExchangeRequest extends AbstractTransportRequest {
private final String exchangeId;
private final boolean sourcesFinished;

View file

@ -33,9 +33,9 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -201,7 +201,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
}
}
private static class OpenExchangeRequest extends TransportRequest {
private static class OpenExchangeRequest extends AbstractTransportRequest {
private final String sessionId;
private final int exchangeBuffer;

View file

@ -59,6 +59,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
@ -88,33 +89,33 @@ import java.util.stream.IntStream;
* page against another index that <strong>must</strong> have only a single
* shard.
* <p>
* This registers a {@link TransportRequestHandler} so we can handle requests
* to join data that isn't local to the node, but it is much faster if the
* data is already local.
* This registers a {@link TransportRequestHandler} so we can handle requests
* to join data that isn't local to the node, but it is much faster if the
* data is already local.
* </p>
* <p>
* The join process spawns a {@link Driver} per incoming page which runs in
* two or three stages:
* The join process spawns a {@link Driver} per incoming page which runs in
* two or three stages:
* </p>
* <p>
* Stage 1: Finding matching document IDs for the input page. This stage is done
* by the {@link EnrichQuerySourceOperator}. The output page of this stage is
* represented as {@code [DocVector, IntBlock: positions of the input terms]}.
* Stage 1: Finding matching document IDs for the input page. This stage is done
* by the {@link EnrichQuerySourceOperator}. The output page of this stage is
* represented as {@code [DocVector, IntBlock: positions of the input terms]}.
* </p>
* <p>
* Stage 2: Extracting field values for the matched document IDs. The output page
* is represented as
* {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}.
* Stage 2: Extracting field values for the matched document IDs. The output page
* is represented as
* {@code [DocVector, IntBlock: positions, Block: field1, Block: field2,...]}.
* </p>
* <p>
* Stage 3: Optionally this combines the extracted values based on positions and filling
* nulls for positions without matches. This is done by {@link MergePositionsOperator}.
* The output page is represented as {@code [Block: field1, Block: field2,...]}.
* Stage 3: Optionally this combines the extracted values based on positions and filling
* nulls for positions without matches. This is done by {@link MergePositionsOperator}.
* The output page is represented as {@code [Block: field1, Block: field2,...]}.
* </p>
* <p>
* The {@link Page#getPositionCount()} of the output {@link Page} is equal to the
* {@link Page#getPositionCount()} of the input page. In other words - it returns
* the same number of rows that it was sent no matter how many documents match.
* The {@link Page#getPositionCount()} of the output {@link Page} is equal to the
* {@link Page#getPositionCount()} of the input page. In other words - it returns
* the same number of rows that it was sent no matter how many documents match.
* </p>
*/
public abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
@ -478,7 +479,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
}
}
abstract static class TransportRequest extends org.elasticsearch.transport.TransportRequest implements IndicesRequest {
abstract static class TransportRequest extends AbstractTransportRequest implements IndicesRequest {
final String sessionId;
final ShardId shardId;
/**

View file

@ -24,11 +24,11 @@ import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -64,8 +64,8 @@ import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithF
* Resolves enrich policies across clusters in several steps:
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
* 2. Sends out {@link LookupRequest} to each cluster to resolve policies. Internally, a remote cluster handles the lookup in two steps:
* - 2.1 Ensures the caller has permission to access the enrich policies.
* - 2.2 For each found enrich policy, uses {@link IndexResolver} to resolve the mappings of the concrete enrich index.
* - 2.1 Ensures the caller has permission to access the enrich policies.
* - 2.2 For each found enrich policy, uses {@link IndexResolver} to resolve the mappings of the concrete enrich index.
* 3. For each unresolved policy, combines the lookup results to compute the actual enrich policy and mappings depending on the enrich mode.
* This approach requires at most one cross-cluster call for each cluster.
*/
@ -168,6 +168,7 @@ public class EnrichPolicyResolver {
/**
* Resolve an enrich policy by merging the lookup responses from the target clusters.
*
* @return a resolved enrich policy or an error
*/
private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
@ -341,7 +342,7 @@ public class EnrichPolicyResolver {
}
}
private static class LookupRequest extends TransportRequest {
private static class LookupRequest extends AbstractTransportRequest {
private final String clusterAlias;
private final Collection<String> policyNames;
@ -376,6 +377,7 @@ public class EnrichPolicyResolver {
/**
* Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup
*
* @param connectionError Exception received when trying to connect to a remote cluster
*/
LookupResponse(Exception connectionError) {

View file

@ -20,7 +20,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.session.Configuration;
@ -35,7 +35,7 @@ import java.util.Objects;
* via {@link ExchangeService#openExchange} before sending this request to the remote cluster. The coordinator on the main cluster
* will poll pages from this sink. Internally, this compute will trigger sub-computes on data nodes via {@link DataNodeRequest}.
*/
final class ClusterComputeRequest extends TransportRequest implements IndicesRequest.Replaceable {
final class ClusterComputeRequest extends AbstractTransportRequest implements IndicesRequest.Replaceable {
private final String clusterAlias;
private final String sessionId;
private final Configuration configuration;
@ -46,10 +46,10 @@ final class ClusterComputeRequest extends TransportRequest implements IndicesReq
/**
* A request to start a compute on a remote cluster.
*
* @param clusterAlias the cluster alias of this remote cluster
* @param sessionId the sessionId in which the output pages will be placed in the exchange sink specified by this id
* @param configuration the configuration for this compute
* @param plan the physical plan to be executed
* @param clusterAlias the cluster alias of this remote cluster
* @param sessionId the sessionId in which the output pages will be placed in the exchange sink specified by this id
* @param configuration the configuration for this compute
* @param plan the physical plan to be executed
*/
ClusterComputeRequest(String clusterAlias, String sessionId, Configuration configuration, RemoteClusterPlan plan) {
this.clusterAlias = clusterAlias;

View file

@ -40,9 +40,9 @@ import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
@ -75,10 +75,10 @@ import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_
/**
* Once query is parsed and validated it is scheduled for execution by {@code org.elasticsearch.xpack.esql.plugin.ComputeService#execute}
* This method is responsible for splitting physical plan into coordinator and data node plans.
*
* <p>
* Coordinator plan is immediately executed locally (using {@code org.elasticsearch.xpack.esql.plugin.ComputeService#runCompute})
* and is prepared to collect and merge pages from data nodes into the final query result.
*
* <p>
* Data node plan is passed to {@code org.elasticsearch.xpack.esql.plugin.DataNodeComputeHandler#startComputeOnDataNodes}
* that is responsible for
* <ul>
@ -510,7 +510,7 @@ public class ComputeService {
}
}
private static class ComputeGroupTaskRequest extends TransportRequest {
private static class ComputeGroupTaskRequest extends AbstractTransportRequest {
private final Supplier<String> parentDescription;
ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) {

View file

@ -25,8 +25,8 @@ import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@ -43,7 +43,7 @@ import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField.NO_INDEX_PLACEHOLDER;
import static org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField.NO_INDICES_OR_ALIASES_ARRAY;
final class DataNodeRequest extends TransportRequest implements IndicesRequest.Replaceable {
final class DataNodeRequest extends AbstractTransportRequest implements IndicesRequest.Replaceable {
private static final Logger logger = LogManager.getLogger(DataNodeRequest.class);
private final String sessionId;

View file

@ -10,7 +10,7 @@ package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import java.io.IOException;
@ -38,7 +38,7 @@ public class EsqlStatsRequest extends BaseNodesRequest {
return "esql_stats";
}
static class NodeStatsRequest extends TransportRequest {
static class NodeStatsRequest extends AbstractTransportRequest {
boolean includeStats;
NodeStatsRequest(StreamInput in) throws IOException {

View file

@ -18,7 +18,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.TrainedModelCacheInfoAction;
import org.elasticsearch.xpack.core.ml.action.TrainedModelCacheInfoAction.Response.CacheInfo;
@ -85,7 +85,7 @@ public class TransportTrainedModelCacheInfoAction extends TransportNodesAction<
);
}
public static class NodeModelCacheInfoRequest extends TransportRequest {
public static class NodeModelCacheInfoRequest extends AbstractTransportRequest {
NodeModelCacheInfoRequest() {}

View file

@ -19,7 +19,7 @@ import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -78,7 +78,7 @@ public final class TransportClearRepositoriesStatsArchiveAction extends Transpor
return new RepositoriesNodeMeteringResponse(clusterService.localNode(), clearedStats);
}
static final class ClearRepositoriesStatsArchiveNodeRequest extends TransportRequest {
static final class ClearRepositoriesStatsArchiveNodeRequest extends AbstractTransportRequest {
private final long maxVersionToClear;
ClearRepositoriesStatsArchiveNodeRequest(long maxVersionToClear) {

View file

@ -17,7 +17,7 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -75,7 +75,7 @@ public final class TransportRepositoriesStatsAction extends TransportNodesAction
return new RepositoriesNodeMeteringResponse(clusterService.localNode(), repositoriesService.repositoriesStats());
}
static final class RepositoriesNodeStatsRequest extends TransportRequest {
static final class RepositoriesNodeStatsRequest extends AbstractTransportRequest {
RepositoriesNodeStatsRequest() {}
RepositoriesNodeStatsRequest(StreamInput in) throws IOException {

View file

@ -24,7 +24,7 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService;
@ -107,7 +107,7 @@ public class TransportSearchableSnapshotCacheStoresAction extends TransportNodes
}
}
public static final class NodeRequest extends TransportRequest {
public static final class NodeRequest extends AbstractTransportRequest {
private final SnapshotId snapshotId;
private final ShardId shardId;

View file

@ -26,7 +26,7 @@ import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
@ -135,7 +135,7 @@ public class TransportSearchableSnapshotsNodeCachesStatsAction extends Transport
);
}
public static final class NodeRequest extends TransportRequest {
public static final class NodeRequest extends AbstractTransportRequest {
public NodeRequest() {}

View file

@ -39,9 +39,9 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.test.rest.FakeRestRequest.Builder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
@ -2981,7 +2981,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
private Tuple<Channel, RestRequest> prepareRestContent(String uri, InetSocketAddress remoteAddress, Map<String, String> params) {
final RestContent content = randomFrom(RestContent.values());
final FakeRestRequest.Builder builder = new Builder(NamedXContentRegistry.EMPTY);
final Builder builder = new Builder(NamedXContentRegistry.EMPTY);
if (content.hasContent()) {
builder.withContent(content.content(), XContentType.JSON);
}
@ -3005,7 +3005,9 @@ public class LoggingAuditTrailTests extends ESTestCase {
return new Tuple<>(channel, builder.build());
}
/** creates address without any lookups. hostname can be null, for missing */
/**
* creates address without any lookups. hostname can be null, for missing
*/
protected static InetAddress forge(String hostname, String address) throws IOException {
final byte bytes[] = InetAddress.getByName(address).getAddress();
return InetAddress.getByAddress(hostname, bytes);
@ -3054,7 +3056,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
return authentication;
}
static class MockRequest extends TransportRequest {
static class MockRequest extends AbstractTransportRequest {
MockRequest(ThreadContext threadContext) throws IOException {
if (randomBoolean()) {
@ -3265,7 +3267,9 @@ public class LoggingAuditTrailTests extends ESTestCase {
}
}
private record ApiKeyMetadataWithSerialization(Map<String, Object> metadata, String serialization) {};
private record ApiKeyMetadataWithSerialization(Map<String, Object> metadata, String serialization) {}
;
private ApiKeyMetadataWithSerialization randomApiKeyMetadataWithSerialization() {
final int metadataCase = randomInt(3);
@ -3288,7 +3292,9 @@ public class LoggingAuditTrailTests extends ESTestCase {
};
}
private record CrossClusterApiKeyAccessWithSerialization(String access, String serialization) {};
private record CrossClusterApiKeyAccessWithSerialization(String access, String serialization) {}
;
private CrossClusterApiKeyAccessWithSerialization randomCrossClusterApiKeyAccessWithSerialization() {
return randomFrom(

Some files were not shown because too many files have changed in this diff Show more