Dispatch TransportSingleShardAction response handling (#113630) (#113930)

We shouldn't be handling these responses on the transport worker.

Closes #110408
This commit is contained in:
David Turner 2024-10-03 15:52:38 +01:00 committed by GitHub
parent 8eca9f9ed3
commit ee2d35e2cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 114 additions and 72 deletions

View file

@ -23,6 +23,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import java.util.Arrays;
import java.util.Collection;
@ -121,8 +122,32 @@ public class KibanaThreadPoolIT extends ESIntegTestCase {
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
final var getFuture = client().prepareGet(USER_INDEX, "id").execute();
// response handling is force-executed on GET pool, so we must
// (a) wait for that task to be enqueued, expanding the queue beyond its configured limit, and
// (b) check for the exception in the background
try {
assertTrue(waitUntil(() -> {
if (getFuture.isDone()) {
return true;
}
for (ThreadPool threadPool : internalCluster().getInstances(ThreadPool.class)) {
for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) {
if (stats.name().equals(ThreadPool.Names.GET) && stats.queue() > 1) {
return true;
}
}
}
return false;
}));
} catch (Exception e) {
fail(e);
}
new Thread(() -> expectThrows(EsRejectedExecutionException.class, () -> getFuture.actionGet(SAFE_AWAIT_TIMEOUT))).start();
// intentionally commented out this test until https://github.com/elastic/elasticsearch/issues/97916 is fixed
// var e3 = expectThrows(
// SearchPhaseExecutionException.class,

View file

@ -10,6 +10,8 @@
package org.elasticsearch.get;
import org.apache.lucene.index.DirectoryReader;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -37,6 +39,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;
@ -50,6 +53,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import static java.util.Collections.singleton;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -104,25 +109,27 @@ public class GetActionIT extends ESIntegTestCase {
);
ensureGreen();
GetResponse response = client().prepareGet(indexOrAlias(), "1").get();
final Function<UnaryOperator<GetRequestBuilder>, GetResponse> docGetter = op -> getDocument(indexOrAlias(), "1", op);
GetResponse response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(false));
logger.info("--> index doc 1");
prepareIndex("test").setId("1").setSource("field1", "value1", "field2", "value2").get();
logger.info("--> non realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
response = docGetter.apply(r -> r.setRealtime(false));
assertThat(response.isExists(), equalTo(false));
logger.info("--> realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> realtime get 1 (no source, implicit)");
response = client().prepareGet(indexOrAlias(), "1").setStoredFields(Strings.EMPTY_ARRAY).get();
response = docGetter.apply(r -> r.setStoredFields(Strings.EMPTY_ARRAY));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
Set<String> fields = new HashSet<>(response.getFields().keySet());
@ -130,7 +137,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getSourceAsBytesRef(), nullValue());
logger.info("--> realtime get 1 (no source, explicit)");
response = client().prepareGet(indexOrAlias(), "1").setFetchSource(false).get();
response = docGetter.apply(r -> r.setFetchSource(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
fields = new HashSet<>(response.getFields().keySet());
@ -138,14 +145,14 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getSourceAsBytesRef(), nullValue());
logger.info("--> realtime get 1 (no type)");
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> realtime fetch of field");
response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").get();
response = docGetter.apply(r -> r.setStoredFields("field1"));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsBytesRef(), nullValue());
@ -153,7 +160,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getField("field2"), nullValue());
logger.info("--> realtime fetch of field & source");
response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").setFetchSource("field1", null).get();
response = docGetter.apply(r -> r.setStoredFields("field1").setFetchSource("field1", null));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap(), hasKey("field1"));
@ -162,7 +169,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getField("field2"), nullValue());
logger.info("--> realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1"));
@ -172,14 +179,14 @@ public class GetActionIT extends ESIntegTestCase {
refresh();
logger.info("--> non realtime get 1 (loaded from index)");
response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
response = docGetter.apply(r -> r.setRealtime(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> realtime fetch of field (loaded from index)");
response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").get();
response = docGetter.apply(r -> r.setStoredFields("field1"));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsBytesRef(), nullValue());
@ -187,7 +194,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getField("field2"), nullValue());
logger.info("--> realtime fetch of field & source (loaded from index)");
response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").setFetchSource(true).get();
response = docGetter.apply(r -> r.setStoredFields("field1").setFetchSource(true));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsBytesRef(), not(nullValue()));
@ -198,7 +205,7 @@ public class GetActionIT extends ESIntegTestCase {
prepareIndex("test").setId("1").setSource("field1", "value1_1", "field2", "value2_1").get();
logger.info("--> realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1_1"));
@ -207,7 +214,7 @@ public class GetActionIT extends ESIntegTestCase {
logger.info("--> update doc 1 again");
prepareIndex("test").setId("1").setSource("field1", "value1_2", "field2", "value2_2").get();
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1_2"));
@ -216,7 +223,7 @@ public class GetActionIT extends ESIntegTestCase {
DeleteResponse deleteResponse = client().prepareDelete("test", "1").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
response = client().prepareGet(indexOrAlias(), "1").get();
response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(false));
}
@ -232,14 +239,34 @@ public class GetActionIT extends ESIntegTestCase {
DocWriteResponse indexResponse = prepareIndex("index1").setId("id").setSource(Collections.singletonMap("foo", "bar")).get();
assertThat(indexResponse.status().getStatus(), equalTo(RestStatus.CREATED.getStatus()));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, client().prepareGet("alias1", "_alias_id"));
assertThat(exception.getMessage(), endsWith("can't execute a single index op"));
assertThat(
asInstanceOf(IllegalArgumentException.class, getDocumentFailure("alias1", "_alias_id", r -> r)).getMessage(),
endsWith("can't execute a single index op")
);
}
static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
private static GetResponse getDocument(String index, String id, UnaryOperator<GetRequestBuilder> requestOperator) {
return safeAwait(l -> getDocumentAsync(index, id, requestOperator, l));
}
private static Throwable getDocumentFailure(String index, String id, UnaryOperator<GetRequestBuilder> requestOperator) {
return ExceptionsHelper.unwrapCause(safeAwaitFailure(GetResponse.class, l -> getDocumentAsync(index, id, requestOperator, l)));
}
private static void getDocumentAsync(
String index,
String id,
UnaryOperator<GetRequestBuilder> requestOperator,
ActionListener<GetResponse> listener
) {
requestOperator.apply(client().prepareGet(index, id))
.execute(ActionListener.runBefore(listener, () -> ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GET)));
}
public void testSimpleMultiGet() throws Exception {
assertAcked(
prepareCreate("test").addAlias(new Alias("alias").writeIndex(randomFrom(true, false, null)))
@ -311,13 +338,14 @@ public class GetActionIT extends ESIntegTestCase {
assertAcked(prepareCreate("test").setMapping(mapping1));
ensureGreen();
GetResponse response = client().prepareGet("test", "1").get();
assertThat(response.isExists(), equalTo(false));
final Function<UnaryOperator<GetRequestBuilder>, GetResponse> docGetter = op -> getDocument("test", "1", op);
GetResponse response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(false));
prepareIndex("test").setId("1").setSource(jsonBuilder().startObject().array("field", "1", "2").endObject()).get();
response = client().prepareGet("test", "1").setStoredFields("field").get();
response = docGetter.apply(r -> r.setStoredFields("field"));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
Set<String> fields = new HashSet<>(response.getFields().keySet());
@ -328,7 +356,7 @@ public class GetActionIT extends ESIntegTestCase {
// Now test values being fetched from stored fields.
refresh();
response = client().prepareGet("test", "1").setStoredFields("field").get();
response = docGetter.apply(r -> r.setStoredFields("field"));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
fields = new HashSet<>(response.getFields().keySet());
@ -342,7 +370,9 @@ public class GetActionIT extends ESIntegTestCase {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")).setSettings(Settings.builder().put("index.refresh_interval", -1)));
ensureGreen();
GetResponse response = client().prepareGet("test", "1").get();
final Function<UnaryOperator<GetRequestBuilder>, GetResponse> docGetter = op -> getDocument(indexOrAlias(), "1", op);
GetResponse response = docGetter.apply(UnaryOperator.identity());
assertThat(response.isExists(), equalTo(false));
logger.info("--> index doc 1");
@ -350,64 +380,52 @@ public class GetActionIT extends ESIntegTestCase {
// From translog:
response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).get();
response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1L));
response = client().prepareGet(indexOrAlias(), "1").setVersion(1).get();
response = docGetter.apply(r -> r.setVersion(1));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1L));
try {
client().prepareGet(indexOrAlias(), "1").setVersion(2).get();
fail();
} catch (VersionConflictEngineException e) {
// all good
}
assertThat(getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(2)), instanceOf(VersionConflictEngineException.class));
// From Lucene index:
refresh();
response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).setRealtime(false).get();
response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY).setRealtime(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getVersion(), equalTo(1L));
response = client().prepareGet(indexOrAlias(), "1").setVersion(1).setRealtime(false).get();
response = docGetter.apply(r -> r.setVersion(1).setRealtime(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getVersion(), equalTo(1L));
try {
client().prepareGet(indexOrAlias(), "1").setVersion(2).setRealtime(false).get();
fail();
} catch (VersionConflictEngineException e) {
// all good
}
assertThat(
getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(2).setRealtime(false)),
instanceOf(VersionConflictEngineException.class)
);
logger.info("--> index doc 1 again, so increasing the version");
prepareIndex("test").setId("1").setSource("field1", "value1", "field2", "value2").get();
// From translog:
response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).get();
response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getVersion(), equalTo(2L));
try {
client().prepareGet(indexOrAlias(), "1").setVersion(1).get();
fail();
} catch (VersionConflictEngineException e) {
// all good
}
assertThat(getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(1)), instanceOf(VersionConflictEngineException.class));
response = client().prepareGet(indexOrAlias(), "1").setVersion(2).get();
response = docGetter.apply(r -> r.setVersion(2));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
@ -416,20 +434,18 @@ public class GetActionIT extends ESIntegTestCase {
// From Lucene index:
refresh();
response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).setRealtime(false).get();
response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY).setRealtime(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
assertThat(response.getVersion(), equalTo(2L));
try {
client().prepareGet(indexOrAlias(), "1").setVersion(1).setRealtime(false).get();
fail();
} catch (VersionConflictEngineException e) {
// all good
}
assertThat(
getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(1).setRealtime(false)),
instanceOf(VersionConflictEngineException.class)
);
response = client().prepareGet(indexOrAlias(), "1").setVersion(2).setRealtime(false).get();
response = docGetter.apply(r -> r.setVersion(2).setRealtime(false));
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getIndex(), equalTo("test"));
@ -572,15 +588,15 @@ public class GetActionIT extends ESIntegTestCase {
.setSource(jsonBuilder().startObject().startObject("field1").field("field2", "value1").endObject().endObject())
.get();
IllegalArgumentException exc = expectThrows(
IllegalArgumentException exc = asInstanceOf(
IllegalArgumentException.class,
client().prepareGet(indexOrAlias(), "1").setStoredFields("field1")
getDocumentFailure(indexOrAlias(), "1", r -> r.setStoredFields("field1"))
);
assertThat(exc.getMessage(), equalTo("field [field1] isn't a leaf field"));
flush();
exc = expectThrows(IllegalArgumentException.class, client().prepareGet(indexOrAlias(), "1").setStoredFields("field1"));
exc = asInstanceOf(IllegalArgumentException.class, getDocumentFailure(indexOrAlias(), "1", r -> r.setStoredFields("field1")));
assertThat(exc.getMessage(), equalTo("field [field1] isn't a leaf field"));
}
@ -650,13 +666,13 @@ public class GetActionIT extends ESIntegTestCase {
logger.info("checking real time retrieval");
String field = "field1.field2.field3.field4";
GetResponse getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get();
GetResponse getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field));
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getField(field).getValues().size(), equalTo(2));
assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1"));
assertThat(getResponse.getField(field).getValues().get(1).toString(), equalTo("value2"));
getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get();
getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field));
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getField(field).getValues().size(), equalTo(2));
assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1"));
@ -681,7 +697,7 @@ public class GetActionIT extends ESIntegTestCase {
logger.info("checking post-flush retrieval");
getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get();
getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field));
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getField(field).getValues().size(), equalTo(2));
assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1"));
@ -854,7 +870,7 @@ public class GetActionIT extends ESIntegTestCase {
// start tracking translog locations in the live version map
{
index("test", "0", Map.of("f", "empty"));
client().prepareGet("test", "0").setRealtime(true).get();
getDocument("test", "0", r -> r.setRealtime(true));
refresh("test");
}
Map<String, String> indexedDocs = new HashMap<>();
@ -909,7 +925,7 @@ public class GetActionIT extends ESIntegTestCase {
}
public void testGetRemoteIndex() {
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, client().prepareGet("cluster:index", "id"));
IllegalArgumentException iae = asInstanceOf(IllegalArgumentException.class, getDocumentFailure("cluster:index", "id", r -> r));
assertEquals(
"Cross-cluster calls are not supported in this context but remote indices were requested: [cluster:index]",
iae.getMessage()
@ -984,10 +1000,12 @@ public class GetActionIT extends ESIntegTestCase {
}
private GetResponse getDocument(String index, String docId, String field, @Nullable String routing) {
GetRequestBuilder getRequestBuilder = client().prepareGet().setIndex(index).setId(docId).setStoredFields(field);
return getDocument(index, docId, r -> {
r.setStoredFields(field);
if (routing != null) {
getRequestBuilder.setRouting(routing);
r.setRouting(routing);
}
return getRequestBuilder.get();
return r;
});
}
}

View file

@ -37,7 +37,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -183,7 +182,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
clusterService.localNode(),
transportShardAction,
internalRequest.request(),
new ActionListenerResponseHandler<>(listener, reader, TransportResponseHandler.TRANSPORT_WORKER)
new ActionListenerResponseHandler<>(listener, reader, executor)
);
} else {
perform(null);
@ -236,7 +235,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
node,
transportShardAction,
internalRequest.request(),
new ActionListenerResponseHandler<>(listener, reader, TransportResponseHandler.TRANSPORT_WORKER) {
new ActionListenerResponseHandler<>(listener, reader, executor) {
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);