From 053895854d3fa778a7eb066a6b44f63c002dba03 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 29 Apr 2025 09:40:31 -0400 Subject: [PATCH] Always log data node failures (#127420) Log search exceptions as they occur on the data node no matter the value of error_trace. --- .../http/SearchErrorTraceIT.java | 78 ++++--------------- .../elasticsearch/search/SearchService.java | 62 ++++++++------- .../search/SearchServiceTests.java | 14 ++-- .../search/ErrorTraceHelper.java | 29 ------- .../xpack/search/AsyncSearchErrorTraceIT.java | 56 ++++--------- 5 files changed, 73 insertions(+), 166 deletions(-) diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index baf7cc183afd..398e6d5c9477 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -127,33 +127,7 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { assertFalse(hasStackTrace.getAsBoolean()); } - public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException { - setupIndexWithDocs(); - - Request searchRequest = new Request("POST", "/_search"); - searchRequest.setJsonEntity(""" - { - "query": { - "simple_query_string" : { - "query": "foo", - "fields": ["field"] - } - } - } - """); - - String errorTriggeringIndex = "test2"; - int numShards = getNumShards(errorTriggeringIndex).numPrimaries; - try (var mockLog = MockLog.capture(SearchService.class)) { - ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - - searchRequest.addParameter("error_trace", "true"); - getRestClient().performRequest(searchRequest); - mockLog.assertAllExpectationsMatched(); - } - } - - public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException { + public void testDataNodeLogsStackTrace() throws IOException { setupIndexWithDocs(); Request searchRequest = new Request("POST", "/_search"); @@ -173,10 +147,14 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { try (var mockLog = MockLog.capture(SearchService.class)) { ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - // error_trace defaults to false so we can test both cases with some randomization - if (randomBoolean()) { + // No matter the value of error_trace (empty, true, or false) we should see stack traces logged + int errorTraceValue = randomIntBetween(0, 2); + if (errorTraceValue == 0) { + searchRequest.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { searchRequest.addParameter("error_trace", "false"); - } + } // else empty + getRestClient().performRequest(searchRequest); mockLog.assertAllExpectationsMatched(); } @@ -233,7 +211,7 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { assertFalse(hasStackTrace.getAsBoolean()); } - public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException { + public void testDataNodeLogsStackTraceMultiSearch() throws IOException { setupIndexWithDocs(); XContentType contentType = XContentType.JSON; @@ -246,41 +224,19 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) ); - searchRequest.addParameter("error_trace", "true"); - - String errorTriggeringIndex = "test2"; - int numShards = getNumShards(errorTriggeringIndex).numPrimaries; - try (var mockLog = MockLog.capture(SearchService.class)) { - ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - - getRestClient().performRequest(searchRequest); - mockLog.assertAllExpectationsMatched(); - } - } - - public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException { - setupIndexWithDocs(); - - XContentType contentType = XContentType.JSON; - MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add( - new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field"))) - ); - Request searchRequest = new Request("POST", "/_msearch"); - byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); - searchRequest.setEntity( - new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) - ); - - // error_trace defaults to false so we can test both cases with some randomization - if (randomBoolean()) { - searchRequest.addParameter("error_trace", "false"); - } - String errorTriggeringIndex = "test2"; int numShards = getNumShards(errorTriggeringIndex).numPrimaries; try (var mockLog = MockLog.capture(SearchService.class)) { ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); + // No matter the value of error_trace (empty, true, or false) we should see stack traces logged + int errorTraceValue = randomIntBetween(0, 2); + if (errorTraceValue == 0) { + searchRequest.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { + searchRequest.addParameter("error_trace", "false"); + } // else empty + getRestClient().performRequest(searchRequest); mockLog.assertAllExpectationsMatched(); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 79bfb4056df5..2a814a1a3648 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -574,10 +574,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } /** - * Wraps the listener to avoid sending StackTraces back to the coordinating - * node if the `error_trace` header is set to {@code false}. Upon reading we - * default to {@code true} to maintain the same behavior as before the change, - * due to older nodes not being able to specify whether it needs stack traces. + * Wraps the listener to ensure errors are logged and to avoid sending + * StackTraces back to the coordinating node if the `error_trace` header is + * set to {@code false}. Upon reading, we default to {@code true} to maintain + * the same behavior as before the change, due to older nodes not being able + * to specify whether they need stack traces. * * @param the type of the response * @param listener the action listener to be wrapped @@ -588,7 +589,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv * @param threadPool with context where to write the new header * @return the wrapped action listener */ - static ActionListener maybeWrapListenerForStackTrace( + static ActionListener wrapListenerForErrorHandling( ActionListener listener, TransportVersion version, String nodeId, @@ -596,36 +597,39 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv long taskId, ThreadPool threadPool ) { - boolean header = true; + final boolean header; if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) { header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false")); + } else { + header = true; } - if (header == false) { - return listener.delegateResponse((l, e) -> { - org.apache.logging.log4j.util.Supplier messageSupplier = () -> format( - "[%s]%s: failed to execute search request for task [%d]", - nodeId, - shardId, - taskId - ); - // Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse - if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) { - logger.debug(messageSupplier, e); - } else { - logger.warn(messageSupplier, e); - } + return listener.delegateResponse((l, e) -> { + org.apache.logging.log4j.util.Supplier messageSupplier = () -> format( + "[%s]%s: failed to execute search request for task [%d]", + nodeId, + shardId, + taskId + ); + // Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse + if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) { + logger.debug(messageSupplier, e); + } else { + logger.warn(messageSupplier, e); + } + + if (header == false) { ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> { err.setStackTrace(EMPTY_STACK_TRACE_ARRAY); return false; }); - l.onFailure(e); - }); - } - return listener; + } + + l.onFailure(e); + }); } public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - listener = maybeWrapListenerForStackTrace( + listener = wrapListenerForErrorHandling( listener, request.getChannelVersion(), clusterService.localNode().getId(), @@ -676,7 +680,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv rewriteAndFetchShardRequest( shard, request, - maybeWrapListenerForStackTrace( + wrapListenerForErrorHandling( listener, request.getChannelVersion(), clusterService.localNode().getId(), @@ -913,7 +917,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); - listener = maybeWrapListenerForStackTrace( + listener = wrapListenerForErrorHandling( listener, shardSearchRequest.getChannelVersion(), clusterService.localNode().getId(), @@ -970,7 +974,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv TransportVersion version ) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - listener = maybeWrapListenerForStackTrace( + listener = wrapListenerForErrorHandling( listener, version, clusterService.localNode().getId(), @@ -1032,7 +1036,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv ) { final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); - listener = maybeWrapListenerForStackTrace( + listener = wrapListenerForErrorHandling( listener, version, clusterService.localNode().getId(), diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index c26dfb630a4b..29380a059133 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -72,7 +72,7 @@ import java.util.function.Predicate; import static org.elasticsearch.common.Strings.format; import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE; import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor; -import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace; +import static org.elasticsearch.search.SearchService.wrapListenerForErrorHandling; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.not; @@ -137,7 +137,7 @@ public class SearchServiceTests extends IndexShardTestCase { doTestCanMatch(searchRequest, sortField, true, null, false); } - public void testMaybeWrapListenerForStackTrace() { + public void testWrapListenerForErrorHandling() { ShardId shardId = new ShardId("index", "index", 0); // Tests that the same listener has stack trace if is not wrapped or does not have stack trace if it is wrapped. AtomicBoolean isWrapped = new AtomicBoolean(false); @@ -160,12 +160,12 @@ public class SearchServiceTests extends IndexShardTestCase { e.fillInStackTrace(); assertThat(e.getStackTrace().length, is(not(0))); listener.onFailure(e); - listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), "node", shardId, 123L, threadPool); + listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), "node", shardId, 123L, threadPool); isWrapped.set(true); listener.onFailure(e); } - public void testMaybeWrapListenerForStackTraceDebugLog() { + public void testWrapListenerForErrorHandlingDebugLog() { final String nodeId = "node"; final String index = "index"; ShardId shardId = new ShardId(index, index, 0); @@ -198,12 +198,12 @@ public class SearchServiceTests extends IndexShardTestCase { } }; IllegalArgumentException e = new IllegalArgumentException(exceptionMessage); // 400-level exception - listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool); + listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool); listener.onFailure(e); } } - public void testMaybeWrapListenerForStackTraceWarnLog() { + public void testWrapListenerForErrorHandlingWarnLog() { final String nodeId = "node"; final String index = "index"; ShardId shardId = new ShardId(index, index, 0); @@ -235,7 +235,7 @@ public class SearchServiceTests extends IndexShardTestCase { } }; IllegalStateException e = new IllegalStateException(exceptionMessage); // 500-level exception - listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool); + listener = wrapListenerForErrorHandling(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool); listener.onFailure(e); } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index 90f2ddd00ace..b33fb852c52d 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -26,7 +26,6 @@ import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import static org.elasticsearch.common.Strings.format; -import static org.elasticsearch.test.ESIntegTestCase.getNodeId; import static org.elasticsearch.test.ESIntegTestCase.internalCluster; import static org.elasticsearch.test.ESTestCase.asInstanceOf; @@ -90,32 +89,4 @@ public enum ErrorTraceHelper { ); } } - - /** - * Adds expectations for the _absence_ of debug logging of a message. An unseen expectation is added for each - * combination of node in the internal cluster and shard in the index. - * - * @param numShards the number of shards in the index (an expectation will be added for each shard) - * @param mockLog the mock log - * @param errorTriggeringIndex the name of the index that will trigger the error - */ - public static void addUnseenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) { - for (String nodeName : internalCluster().getNodeNames()) { - for (int shard = 0; shard < numShards; shard++) { - mockLog.addExpectation( - new MockLog.UnseenEventExpectation( - format( - "\"[%s][%s][%d]: failed to execute search request\" and an exception logged", - getNodeId(nodeName), - errorTriggeringIndex, - shard - ), - SearchService.class.getCanonicalName(), - Level.DEBUG, - format("[%s][%s][%d]: failed to execute search request", getNodeId(nodeName), errorTriggeringIndex, shard) - ) - ); - } - } - } } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index ef05307743a4..bf8576afc5d7 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -154,42 +154,7 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase { assertFalse(transportMessageHasStackTrace.getAsBoolean()); } - public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException, InterruptedException { - setupIndexWithDocs(); - - Request searchRequest = new Request("POST", "/_async_search"); - searchRequest.setJsonEntity(""" - { - "query": { - "simple_query_string" : { - "query": "foo", - "fields": ["field"] - } - } - } - """); - searchRequest.addParameter("error_trace", "true"); - searchRequest.addParameter("keep_on_completion", "true"); - searchRequest.addParameter("wait_for_completion_timeout", "0ms"); - - String errorTriggeringIndex = "test2"; - int numShards = getNumShards(errorTriggeringIndex).numPrimaries; - try (var mockLog = MockLog.capture(SearchService.class)) { - ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex); - - Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); - String asyncExecutionId = (String) responseEntity.get("id"); - Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - request.addParameter("error_trace", "true"); - while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { - responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); - } - - mockLog.assertAllExpectationsMatched(); - } - } - - public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException, InterruptedException { + public void testDataNodeLogsStackTrace() throws IOException, InterruptedException { setupIndexWithDocs(); // error_trace defaults to false so we can test both cases with some randomization @@ -206,9 +171,15 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase { } } """); - if (defineErrorTraceFalse) { + + // No matter the value of error_trace (empty, true, or false) we should see stack traces logged + int errorTraceValue = randomIntBetween(0, 2); + if (errorTraceValue == 0) { + searchRequest.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { searchRequest.addParameter("error_trace", "false"); - } + } // else empty + searchRequest.addParameter("keep_on_completion", "true"); searchRequest.addParameter("wait_for_completion_timeout", "0ms"); @@ -220,9 +191,14 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase { Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); String asyncExecutionId = (String) responseEntity.get("id"); Request request = new Request("GET", "/_async_search/" + asyncExecutionId); - if (defineErrorTraceFalse) { + + // Use the same value of error_trace as the search request + if (errorTraceValue == 0) { + request.addParameter("error_trace", "true"); + } else if (errorTraceValue == 1) { request.addParameter("error_trace", "false"); - } + } // else empty + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); }