Remove usages of TransportMessage (#126375)

This base class is kinda pointless: everywhere it's used we can either
be more specific (e.g. choosing between `TransportRequest` or
`TransportResponse`) or more general (e.g. choosing `Writeable`). This
commit removes all the usages apart from the `extends` clauses of its
direct descendants.
This commit is contained in:
David Turner 2025-04-07 18:50:28 +01:00 committed by GitHub
parent 212971a435
commit 5dc7ab77b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 31 additions and 30 deletions

View file

@ -32,8 +32,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Handles inbound messages by first deserializing a {@link TransportMessage} from an {@link InboundMessage} and then passing * Handles inbound messages by first deserializing a {@link TransportRequest} or {@link TransportResponse} from an {@link InboundMessage}
* it to the appropriate handler. * and then passing it to the appropriate handler.
*/ */
public class InboundHandler { public class InboundHandler {

View file

@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig; import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.lucene.grouping.TopFieldGroups; import org.elasticsearch.lucene.grouping.TopFieldGroups;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
@ -70,7 +71,6 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportMessage;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -177,7 +177,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, topDocsList, from, size, reducedCompletionSuggestions).scoreDocs(); ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, topDocsList, from, size, reducedCompletionSuggestions).scoreDocs();
assertThat(sortedDocs.length, equalTo(accumulatedLength)); assertThat(sortedDocs.length, equalTo(accumulatedLength));
} finally { } finally {
results.asList().forEach(TransportMessage::decRef); results.asList().forEach(RefCounted::decRef);
} }
} }
@ -211,7 +211,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()).scoreDocs(); sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()).scoreDocs();
} finally { } finally {
results.asList().forEach(TransportMessage::decRef); results.asList().forEach(RefCounted::decRef);
} }
results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore);
try { try {
@ -231,7 +231,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f);
} }
} finally { } finally {
results.asList().forEach(TransportMessage::decRef); results.asList().forEach(RefCounted::decRef);
} }
} }
@ -342,10 +342,10 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(mergedResponse.profile(), is(anEmptyMap())); assertThat(mergedResponse.profile(), is(anEmptyMap()));
} }
} finally { } finally {
fetchResults.asList().forEach(TransportMessage::decRef); fetchResults.asList().forEach(RefCounted::decRef);
} }
} finally { } finally {
queryResults.asList().forEach(TransportMessage::decRef); queryResults.asList().forEach(RefCounted::decRef);
} }
} }
} }
@ -419,11 +419,11 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length)); assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length));
assertThat(mergedResponse.profile(), is(anEmptyMap())); assertThat(mergedResponse.profile(), is(anEmptyMap()));
} finally { } finally {
fetchResults.asList().forEach(TransportMessage::decRef); fetchResults.asList().forEach(RefCounted::decRef);
} }
} finally { } finally {
queryResults.asList().forEach(TransportMessage::decRef); queryResults.asList().forEach(RefCounted::decRef);
} }
} }
} }

View file

@ -10,12 +10,12 @@
package org.elasticsearch.search.profile; package org.elasticsearch.search.profile;
import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportMessage;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -77,7 +77,7 @@ public class SearchProfileResultsBuilderTests extends ESTestCase {
equalTo((long) searchPhase.size()) equalTo((long) searchPhase.size())
); );
} finally { } finally {
fetchPhase.forEach(TransportMessage::decRef); fetchPhase.forEach(RefCounted::decRef);
} }
} }

View file

@ -15,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -269,7 +270,7 @@ public class InboundDecoderTests extends ESTestCase {
Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4); Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) { try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final TransportMessage transportMessage = isRequest final Writeable transportMessage = isRequest
? new TestRequest(randomAlphaOfLength(100)) ? new TestRequest(randomAlphaOfLength(100))
: new TestResponse(randomAlphaOfLength(100)); : new TestResponse(randomAlphaOfLength(100));
final BytesReference totalBytes = OutboundHandler.serialize( final BytesReference totalBytes = OutboundHandler.serialize(

View file

@ -159,7 +159,7 @@ public class TransportActionProxyTests extends ESTestCase {
TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new);
// Node A -> Node B -> Node C: different versions - serialize the response // Node A -> Node B -> Node C: different versions - serialize the response
{ {
final List<TransportMessage> responses = Collections.synchronizedList(new ArrayList<>()); final List<TransportResponse> responses = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
serviceB.addRequestHandlingBehavior( serviceB.addRequestHandlingBehavior(
TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.getProxyAction("internal:test"),
@ -212,7 +212,7 @@ public class TransportActionProxyTests extends ESTestCase {
{ {
AbstractSimpleTransportTestCase.connectToNode(serviceD, nodeB); AbstractSimpleTransportTestCase.connectToNode(serviceD, nodeB);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final List<TransportMessage> responses = Collections.synchronizedList(new ArrayList<>()); final List<TransportResponse> responses = Collections.synchronizedList(new ArrayList<>());
serviceB.addRequestHandlingBehavior( serviceB.addRequestHandlingBehavior(
TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.getProxyAction("internal:test"),
(handler, request, channel, task) -> handler.messageReceived( (handler, request, channel, task) -> handler.messageReceived(

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.security.authc;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest;
/** /**
* A AuthenticationFailureHandler is responsible for the handling of a request that has failed authentication. This must * A AuthenticationFailureHandler is responsible for the handling of a request that has failed authentication. This must
@ -51,7 +51,7 @@ public interface AuthenticationFailureHandler {
* @return ElasticsearchSecurityException with the appropriate headers and message * @return ElasticsearchSecurityException with the appropriate headers and message
*/ */
ElasticsearchSecurityException failedAuthentication( ElasticsearchSecurityException failedAuthentication(
TransportMessage message, TransportRequest message,
AuthenticationToken token, AuthenticationToken token,
String action, String action,
ThreadContext context ThreadContext context
@ -78,7 +78,7 @@ public interface AuthenticationFailureHandler {
* @param context The context of the request that failed authentication that could not be authenticated * @param context The context of the request that failed authentication that could not be authenticated
* @return ElasticsearchSecurityException with the appropriate headers and message * @return ElasticsearchSecurityException with the appropriate headers and message
*/ */
ElasticsearchSecurityException exceptionProcessingRequest(TransportMessage message, String action, Exception e, ThreadContext context); ElasticsearchSecurityException exceptionProcessingRequest(TransportRequest message, String action, Exception e, ThreadContext context);
/** /**
* This method is called when a REST request is received and no authentication token could be extracted AND anonymous * This method is called when a REST request is received and no authentication token could be extracted AND anonymous
@ -99,7 +99,7 @@ public interface AuthenticationFailureHandler {
* @param context The context of the request that failed authentication that could not be authenticated * @param context The context of the request that failed authentication that could not be authenticated
* @return ElasticsearchSecurityException with the appropriate headers and message * @return ElasticsearchSecurityException with the appropriate headers and message
*/ */
ElasticsearchSecurityException missingToken(TransportMessage message, String action, ThreadContext context); ElasticsearchSecurityException missingToken(TransportRequest message, String action, ThreadContext context);
/** /**
* This method is called when anonymous access is enabled, a request does not pass authorization with the anonymous * This method is called when anonymous access is enabled, a request does not pass authorization with the anonymous

View file

@ -11,7 +11,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import java.util.ArrayList; import java.util.ArrayList;
@ -98,7 +98,7 @@ public class DefaultAuthenticationFailureHandler implements AuthenticationFailur
@Override @Override
public ElasticsearchSecurityException failedAuthentication( public ElasticsearchSecurityException failedAuthentication(
TransportMessage message, TransportRequest message,
AuthenticationToken token, AuthenticationToken token,
String action, String action,
ThreadContext context ThreadContext context
@ -118,7 +118,7 @@ public class DefaultAuthenticationFailureHandler implements AuthenticationFailur
@Override @Override
public ElasticsearchSecurityException exceptionProcessingRequest( public ElasticsearchSecurityException exceptionProcessingRequest(
TransportMessage message, TransportRequest message,
String action, String action,
Exception e, Exception e,
ThreadContext context ThreadContext context
@ -137,7 +137,7 @@ public class DefaultAuthenticationFailureHandler implements AuthenticationFailur
} }
@Override @Override
public ElasticsearchSecurityException missingToken(TransportMessage message, String action, ThreadContext context) { public ElasticsearchSecurityException missingToken(TransportRequest message, String action, ThreadContext context) {
return createAuthenticationError("missing authentication credentials for action [{}]", null, action); return createAuthenticationError("missing authentication credentials for action [{}]", null, action);
} }

View file

@ -13,7 +13,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -37,9 +37,9 @@ public class AuditUtil {
return ""; return "";
} }
public static Set<String> indices(TransportMessage message) { public static Set<String> indices(TransportRequest message) {
if (message instanceof IndicesRequest) { if (message instanceof IndicesRequest indicesRequest) {
return arrayToSetOrNull(((IndicesRequest) message).indices()); return arrayToSetOrNull(indicesRequest.indices());
} }
return null; return null;
} }

View file

@ -9,7 +9,7 @@ package org.elasticsearch.example.realm;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.security.authc.AuthenticationToken; import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
import org.elasticsearch.xpack.core.security.authc.DefaultAuthenticationFailureHandler; import org.elasticsearch.xpack.core.security.authc.DefaultAuthenticationFailureHandler;
@ -31,7 +31,7 @@ public class CustomAuthenticationFailureHandler extends DefaultAuthenticationFai
@Override @Override
public ElasticsearchSecurityException failedAuthentication( public ElasticsearchSecurityException failedAuthentication(
TransportMessage message, TransportRequest message,
AuthenticationToken token, AuthenticationToken token,
String action, String action,
ThreadContext context ThreadContext context
@ -51,7 +51,7 @@ public class CustomAuthenticationFailureHandler extends DefaultAuthenticationFai
} }
@Override @Override
public ElasticsearchSecurityException missingToken(TransportMessage message, String action, ThreadContext context) { public ElasticsearchSecurityException missingToken(TransportRequest message, String action, ThreadContext context) {
ElasticsearchSecurityException e = super.missingToken(message, action, context); ElasticsearchSecurityException e = super.missingToken(message, action, context);
// set a custom header // set a custom header
e.addHeader("WWW-Authenticate", "custom-challenge"); e.addHeader("WWW-Authenticate", "custom-challenge");