Always log data node failures (#127420)

Log search exceptions as they occur on the data node no matter the value 
of error_trace.
This commit is contained in:
Ben Chaplin 2025-04-29 09:40:31 -04:00 committed by GitHub
parent fd93fad994
commit 053895854d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 73 additions and 166 deletions

View file

@ -127,33 +127,7 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
assertFalse(hasStackTrace.getAsBoolean());
}
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException {
setupIndexWithDocs();
Request searchRequest = new Request("POST", "/_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");
String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
searchRequest.addParameter("error_trace", "true");
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException {
public void testDataNodeLogsStackTrace() throws IOException {
setupIndexWithDocs();
Request searchRequest = new Request("POST", "/_search");
@ -173,10 +147,14 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
// error_trace defaults to false so we can test both cases with some randomization
if (randomBoolean()) {
// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
}
} // else empty
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
@ -233,7 +211,7 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
assertFalse(hasStackTrace.getAsBoolean());
}
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException {
public void testDataNodeLogsStackTraceMultiSearch() throws IOException {
setupIndexWithDocs();
XContentType contentType = XContentType.JSON;
@ -246,41 +224,19 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);
searchRequest.addParameter("error_trace", "true");
String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}
}
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException {
setupIndexWithDocs();
XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
);
Request searchRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
searchRequest.setEntity(
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
);
// error_trace defaults to false so we can test both cases with some randomization
if (randomBoolean()) {
searchRequest.addParameter("error_trace", "false");
}
String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
} // else empty
getRestClient().performRequest(searchRequest);
mockLog.assertAllExpectationsMatched();
}

View file

@ -574,10 +574,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
/**
* Wraps the listener to avoid sending StackTraces back to the coordinating
* node if the `error_trace` header is set to {@code false}. Upon reading we
* default to {@code true} to maintain the same behavior as before the change,
* due to older nodes not being able to specify whether it needs stack traces.
* Wraps the listener to ensure errors are logged and to avoid sending
* StackTraces back to the coordinating node if the `error_trace` header is
* set to {@code false}. Upon reading, we default to {@code true} to maintain
* the same behavior as before the change, due to older nodes not being able
* to specify whether they need stack traces.
*
* @param <T> the type of the response
* @param listener the action listener to be wrapped
@ -588,7 +589,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
* @param threadPool with context where to write the new header
* @return the wrapped action listener
*/
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
static <T> ActionListener<T> wrapListenerForErrorHandling(
ActionListener<T> listener,
TransportVersion version,
String nodeId,
@ -596,36 +597,39 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
long taskId,
ThreadPool threadPool
) {
boolean header = true;
final boolean header;
if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) {
header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false"));
} else {
header = true;
}
if (header == false) {
return listener.delegateResponse((l, e) -> {
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
"[%s]%s: failed to execute search request for task [%d]",
nodeId,
shardId,
taskId
);
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
logger.debug(messageSupplier, e);
} else {
logger.warn(messageSupplier, e);
}
return listener.delegateResponse((l, e) -> {
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
"[%s]%s: failed to execute search request for task [%d]",
nodeId,
shardId,
taskId
);
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
logger.debug(messageSupplier, e);
} else {
logger.warn(messageSupplier, e);
}
if (header == false) {
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
return false;
});
l.onFailure(e);
});
}
return listener;
}
l.onFailure(e);
});
}
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
request.getChannelVersion(),
clusterService.localNode().getId(),
@ -676,7 +680,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
rewriteAndFetchShardRequest(
shard,
request,
maybeWrapListenerForStackTrace(
wrapListenerForErrorHandling(
listener,
request.getChannelVersion(),
clusterService.localNode().getId(),
@ -913,7 +917,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
shardSearchRequest.getChannelVersion(),
clusterService.localNode().getId(),
@ -970,7 +974,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
TransportVersion version
) {
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
version,
clusterService.localNode().getId(),
@ -1032,7 +1036,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
listener = maybeWrapListenerForStackTrace(
listener = wrapListenerForErrorHandling(
listener,
version,
clusterService.localNode().getId(),

View file

@ -72,7 +72,7 @@ import java.util.function.Predicate;
import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor;
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
import static org.elasticsearch.search.SearchService.wrapListenerForErrorHandling;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.not;
@ -137,7 +137,7 @@ public class SearchServiceTests extends IndexShardTestCase {
doTestCanMatch(searchRequest, sortField, true, null, false);
}
public void testMaybeWrapListenerForStackTrace() {
public void testWrapListenerForErrorHandling() {
ShardId shardId = new ShardId("index", "index", 0);
// Tests that the same listener has stack trace if is not wrapped or does not have stack trace if it is wrapped.
AtomicBoolean isWrapped = new AtomicBoolean(false);
@ -160,12 +160,12 @@ public class SearchServiceTests extends IndexShardTestCase {
e.fillInStackTrace();
assertThat(e.getStackTrace().length, is(not(0)));
listener.onFailure(e);
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), "node", shardId, 123L, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), "node", shardId, 123L, threadPool);
isWrapped.set(true);
listener.onFailure(e);
}
public void testMaybeWrapListenerForStackTraceDebugLog() {
public void testWrapListenerForErrorHandlingDebugLog() {
final String nodeId = "node";
final String index = "index";
ShardId shardId = new ShardId(index, index, 0);
@ -198,12 +198,12 @@ public class SearchServiceTests extends IndexShardTestCase {
}
};
IllegalArgumentException e = new IllegalArgumentException(exceptionMessage); // 400-level exception
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener.onFailure(e);
}
}
public void testMaybeWrapListenerForStackTraceWarnLog() {
public void testWrapListenerForErrorHandlingWarnLog() {
final String nodeId = "node";
final String index = "index";
ShardId shardId = new ShardId(index, index, 0);
@ -235,7 +235,7 @@ public class SearchServiceTests extends IndexShardTestCase {
}
};
IllegalStateException e = new IllegalStateException(exceptionMessage); // 500-level exception
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
listener.onFailure(e);
}
}

View file

@ -26,7 +26,6 @@ import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.test.ESIntegTestCase.getNodeId;
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
@ -90,32 +89,4 @@ public enum ErrorTraceHelper {
);
}
}
/**
* Adds expectations for the _absence_ of debug logging of a message. An unseen expectation is added for each
* combination of node in the internal cluster and shard in the index.
*
* @param numShards the number of shards in the index (an expectation will be added for each shard)
* @param mockLog the mock log
* @param errorTriggeringIndex the name of the index that will trigger the error
*/
public static void addUnseenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) {
for (String nodeName : internalCluster().getNodeNames()) {
for (int shard = 0; shard < numShards; shard++) {
mockLog.addExpectation(
new MockLog.UnseenEventExpectation(
format(
"\"[%s][%s][%d]: failed to execute search request\" and an exception logged",
getNodeId(nodeName),
errorTriggeringIndex,
shard
),
SearchService.class.getCanonicalName(),
Level.DEBUG,
format("[%s][%s][%d]: failed to execute search request", getNodeId(nodeName), errorTriggeringIndex, shard)
)
);
}
}
}
}

View file

@ -154,42 +154,7 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
assertFalse(transportMessageHasStackTrace.getAsBoolean());
}
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException, InterruptedException {
setupIndexWithDocs();
Request searchRequest = new Request("POST", "/_async_search");
searchRequest.setJsonEntity("""
{
"query": {
"simple_query_string" : {
"query": "foo",
"fields": ["field"]
}
}
}
""");
searchRequest.addParameter("error_trace", "true");
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
String errorTriggeringIndex = "test2";
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
request.addParameter("error_trace", "true");
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
}
mockLog.assertAllExpectationsMatched();
}
}
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException, InterruptedException {
public void testDataNodeLogsStackTrace() throws IOException, InterruptedException {
setupIndexWithDocs();
// error_trace defaults to false so we can test both cases with some randomization
@ -206,9 +171,15 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
}
}
""");
if (defineErrorTraceFalse) {
// No matter the value of error_trace (empty, true, or false) we should see stack traces logged
int errorTraceValue = randomIntBetween(0, 2);
if (errorTraceValue == 0) {
searchRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
searchRequest.addParameter("error_trace", "false");
}
} // else empty
searchRequest.addParameter("keep_on_completion", "true");
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
@ -220,9 +191,14 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
String asyncExecutionId = (String) responseEntity.get("id");
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
if (defineErrorTraceFalse) {
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
request.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
request.addParameter("error_trace", "false");
}
} // else empty
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
}