mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-27 17:10:22 -04:00
Log stack traces on data nodes before they are cleared for transport (#125732)
We recently cleared stack traces on data nodes before transport back to the coordinating node when error_trace=false to reduce unnecessary data transfer and memory on the coordinating node (#118266). However, all logging of exceptions happens on the coordinating node, so stack traces disappeared from any logs. This change logs stack traces directly on the data node when error_trace=false.
This commit is contained in:
parent
e4ce993c16
commit
9f6eb1d4e3
7 changed files with 458 additions and 7 deletions
5
docs/changelog/125732.yaml
Normal file
5
docs/changelog/125732.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 125732
|
||||
summary: Log stack traces on data nodes before they are cleared for transport
|
||||
area: Search
|
||||
type: bug
|
||||
issues: []
|
|
@ -11,6 +11,8 @@ package org.elasticsearch.http;
|
|||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.nio.entity.NByteArrayEntity;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.client.Request;
|
||||
|
@ -20,10 +22,12 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.search.ErrorTraceHelper;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.test.MockLog;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
|
@ -40,6 +44,11 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
|
|||
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setDebugLogLevel() {
|
||||
Configurator.setLevel(SearchService.class, Level.DEBUG);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupMessageListener() {
|
||||
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
|
||||
|
@ -118,6 +127,61 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
|
|||
assertFalse(hasStackTrace.getAsBoolean());
|
||||
}
|
||||
|
||||
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
Request searchRequest = new Request("POST", "/_search");
|
||||
searchRequest.setJsonEntity("""
|
||||
{
|
||||
"query": {
|
||||
"simple_query_string" : {
|
||||
"query": "foo",
|
||||
"fields": ["field"]
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
searchRequest.addParameter("error_trace", "true");
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
Request searchRequest = new Request("POST", "/_search");
|
||||
searchRequest.setJsonEntity("""
|
||||
{
|
||||
"query": {
|
||||
"simple_query_string" : {
|
||||
"query": "foo",
|
||||
"fields": ["field"]
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
// error_trace defaults to false so we can test both cases with some randomization
|
||||
if (randomBoolean()) {
|
||||
searchRequest.addParameter("error_trace", "false");
|
||||
}
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
|
||||
public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
|
@ -168,4 +232,57 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase {
|
|||
|
||||
assertFalse(hasStackTrace.getAsBoolean());
|
||||
}
|
||||
|
||||
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrueMultiSearch() throws IOException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
XContentType contentType = XContentType.JSON;
|
||||
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
|
||||
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
|
||||
);
|
||||
Request searchRequest = new Request("POST", "/_msearch");
|
||||
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
|
||||
searchRequest.setEntity(
|
||||
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
|
||||
);
|
||||
|
||||
searchRequest.addParameter("error_trace", "true");
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmptyMultiSearch() throws IOException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
XContentType contentType = XContentType.JSON;
|
||||
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(
|
||||
new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field")))
|
||||
);
|
||||
Request searchRequest = new Request("POST", "/_msearch");
|
||||
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
|
||||
searchRequest.setEntity(
|
||||
new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null))
|
||||
);
|
||||
|
||||
// error_trace defaults to false so we can test both cases with some randomization
|
||||
if (randomBoolean()) {
|
||||
searchRequest.addParameter("error_trace", "false");
|
||||
}
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -156,6 +156,7 @@ import java.util.function.LongSupplier;
|
|||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER;
|
||||
import static org.elasticsearch.common.Strings.format;
|
||||
import static org.elasticsearch.core.TimeValue.timeValueHours;
|
||||
import static org.elasticsearch.core.TimeValue.timeValueMillis;
|
||||
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
|
||||
|
@ -563,12 +564,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
* @param <T> the type of the response
|
||||
* @param listener the action listener to be wrapped
|
||||
* @param version channel version of the request
|
||||
* @param nodeId id of the current node
|
||||
* @param shardId id of the shard being searched
|
||||
* @param taskId id of the task being executed
|
||||
* @param threadPool with context where to write the new header
|
||||
* @return the wrapped action listener
|
||||
*/
|
||||
static <T> ActionListener<T> maybeWrapListenerForStackTrace(
|
||||
ActionListener<T> listener,
|
||||
TransportVersion version,
|
||||
String nodeId,
|
||||
ShardId shardId,
|
||||
long taskId,
|
||||
ThreadPool threadPool
|
||||
) {
|
||||
boolean header = true;
|
||||
|
@ -577,6 +584,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
if (header == false) {
|
||||
return listener.delegateResponse((l, e) -> {
|
||||
org.apache.logging.log4j.util.Supplier<String> messageSupplier = () -> format(
|
||||
"[%s]%s: failed to execute search request for task [%d]",
|
||||
nodeId,
|
||||
shardId,
|
||||
taskId
|
||||
);
|
||||
// Keep this logic aligned with that of SUPPRESSED_ERROR_LOGGER in RestResponse
|
||||
if (ExceptionsHelper.status(e).getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
|
||||
logger.debug(messageSupplier, e);
|
||||
} else {
|
||||
logger.warn(messageSupplier, e);
|
||||
}
|
||||
ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> {
|
||||
err.setStackTrace(EMPTY_STACK_TRACE_ARRAY);
|
||||
return false;
|
||||
|
@ -588,7 +607,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
|
||||
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
|
||||
listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
|
||||
listener = maybeWrapListenerForStackTrace(
|
||||
listener,
|
||||
request.getChannelVersion(),
|
||||
clusterService.localNode().getId(),
|
||||
request.shardId(),
|
||||
task.getId(),
|
||||
threadPool
|
||||
);
|
||||
final IndexShard shard = getShard(request);
|
||||
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
|
||||
// fork the execution in the search thread pool
|
||||
|
@ -632,7 +658,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
rewriteAndFetchShardRequest(
|
||||
shard,
|
||||
request,
|
||||
maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool).delegateFailure((l, orig) -> {
|
||||
maybeWrapListenerForStackTrace(
|
||||
listener,
|
||||
request.getChannelVersion(),
|
||||
clusterService.localNode().getId(),
|
||||
request.shardId(),
|
||||
task.getId(),
|
||||
threadPool
|
||||
).delegateFailure((l, orig) -> {
|
||||
// check if we can shortcut the query phase entirely.
|
||||
if (orig.canReturnNullResponseIfMatchNoDocs()) {
|
||||
assert orig.scroll() == null;
|
||||
|
@ -830,9 +863,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
|
||||
public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener<RankFeatureResult> listener) {
|
||||
listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool);
|
||||
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
|
||||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
|
||||
listener = maybeWrapListenerForStackTrace(
|
||||
listener,
|
||||
shardSearchRequest.getChannelVersion(),
|
||||
clusterService.localNode().getId(),
|
||||
shardSearchRequest.shardId(),
|
||||
task.getId(),
|
||||
threadPool
|
||||
);
|
||||
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
|
||||
runAsync(getExecutor(readerContext.indexShard()), () -> {
|
||||
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.RANK_FEATURE, false)) {
|
||||
|
@ -881,8 +921,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
ActionListener<ScrollQuerySearchResult> listener,
|
||||
TransportVersion version
|
||||
) {
|
||||
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
|
||||
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
|
||||
listener = maybeWrapListenerForStackTrace(
|
||||
listener,
|
||||
version,
|
||||
clusterService.localNode().getId(),
|
||||
readerContext.indexShard().shardId(),
|
||||
task.getId(),
|
||||
threadPool
|
||||
);
|
||||
final Releasable markAsUsed;
|
||||
try {
|
||||
markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll()));
|
||||
|
@ -930,9 +977,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
ActionListener<QuerySearchResult> listener,
|
||||
TransportVersion version
|
||||
) {
|
||||
listener = maybeWrapListenerForStackTrace(listener, version, threadPool);
|
||||
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
|
||||
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
|
||||
listener = maybeWrapListenerForStackTrace(
|
||||
listener,
|
||||
version,
|
||||
clusterService.localNode().getId(),
|
||||
shardSearchRequest.shardId(),
|
||||
task.getId(),
|
||||
threadPool
|
||||
);
|
||||
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
|
||||
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
|
||||
// fork the execution in the search thread pool
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
|
||||
package org.elasticsearch.search;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SortField;
|
||||
|
@ -51,6 +53,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
|
|||
import org.elasticsearch.search.sort.BucketedSort;
|
||||
import org.elasticsearch.search.sort.MinAndMax;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.MockLog;
|
||||
import org.elasticsearch.xcontent.XContentParserConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -59,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.function.BiFunction;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static org.elasticsearch.common.Strings.format;
|
||||
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
@ -125,6 +129,7 @@ public class SearchServiceTests extends IndexShardTestCase {
|
|||
}
|
||||
|
||||
public void testMaybeWrapListenerForStackTrace() {
|
||||
ShardId shardId = new ShardId("index", "index", 0);
|
||||
// Tests that the same listener has stack trace if is not wrapped or does not have stack trace if it is wrapped.
|
||||
AtomicBoolean isWrapped = new AtomicBoolean(false);
|
||||
ActionListener<SearchPhaseResult> listener = new ActionListener<>() {
|
||||
|
@ -146,11 +151,86 @@ public class SearchServiceTests extends IndexShardTestCase {
|
|||
e.fillInStackTrace();
|
||||
assertThat(e.getStackTrace().length, is(not(0)));
|
||||
listener.onFailure(e);
|
||||
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), threadPool);
|
||||
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), "node", shardId, 123L, threadPool);
|
||||
isWrapped.set(true);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
public void testMaybeWrapListenerForStackTraceDebugLog() {
|
||||
final String nodeId = "node";
|
||||
final String index = "index";
|
||||
ShardId shardId = new ShardId(index, index, 0);
|
||||
final long taskId = 123L;
|
||||
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
Configurator.setLevel(SearchService.class, Level.DEBUG);
|
||||
final String exceptionMessage = "test exception message";
|
||||
mockLog.addExpectation(
|
||||
new MockLog.ExceptionSeenEventExpectation(
|
||||
format("\"[%s]%s: failed to execute search request for task [%d]\" and an exception logged", nodeId, shardId, taskId),
|
||||
SearchService.class.getCanonicalName(),
|
||||
Level.DEBUG, // We will throw a 400-level exception, so it should only be logged at the debug level
|
||||
format("[%s]%s: failed to execute search request for task [%d]", nodeId, shardId, taskId),
|
||||
IllegalArgumentException.class,
|
||||
exceptionMessage
|
||||
)
|
||||
);
|
||||
|
||||
// Tests the listener has logged if it is wrapped
|
||||
ActionListener<SearchPhaseResult> listener = new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
// noop - we only care about failure scenarios
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
};
|
||||
IllegalArgumentException e = new IllegalArgumentException(exceptionMessage); // 400-level exception
|
||||
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testMaybeWrapListenerForStackTraceWarnLog() {
|
||||
final String nodeId = "node";
|
||||
final String index = "index";
|
||||
ShardId shardId = new ShardId(index, index, 0);
|
||||
final long taskId = 123L;
|
||||
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
final String exceptionMessage = "test exception message";
|
||||
mockLog.addExpectation(
|
||||
new MockLog.ExceptionSeenEventExpectation(
|
||||
format("\"[%s]%s: failed to execute search request for task [%d]\" and an exception logged", nodeId, shardId, taskId),
|
||||
SearchService.class.getCanonicalName(),
|
||||
Level.WARN, // We will throw a 500-level exception, so it should be logged at the warn level
|
||||
format("[%s]%s: failed to execute search request for task [%d]", nodeId, shardId, taskId),
|
||||
IllegalStateException.class,
|
||||
exceptionMessage
|
||||
)
|
||||
);
|
||||
|
||||
// Tests the listener has logged if it is wrapped
|
||||
ActionListener<SearchPhaseResult> listener = new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
// noop - we only care about failure scenarios
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
};
|
||||
IllegalStateException e = new IllegalStateException(exceptionMessage); // 500-level exception
|
||||
listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), nodeId, shardId, taskId, threadPool);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTestCanMatch(
|
||||
SearchRequest searchRequest,
|
||||
SortField sortField,
|
||||
|
|
|
@ -9,16 +9,25 @@
|
|||
|
||||
package org.elasticsearch.search;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.index.query.QueryShardException;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.MockLog;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportMessageListener;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.Strings.format;
|
||||
import static org.elasticsearch.test.ESIntegTestCase.getNodeId;
|
||||
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
|
||||
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
|
||||
|
||||
/**
|
||||
|
@ -45,4 +54,68 @@ public enum ErrorTraceHelper {
|
|||
}));
|
||||
return transportMessageHasStackTrace::get;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds expectations for debug logging of a message and exception on each shard of the given index.
|
||||
*
|
||||
* @param numShards the number of shards in the index (an expectation will be added for each shard)
|
||||
* @param mockLog the mock log
|
||||
* @param errorTriggeringIndex the name of the index that will trigger the error
|
||||
*/
|
||||
public static void addSeenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) {
|
||||
String nodesDisjunction = format(
|
||||
"(%s)",
|
||||
Arrays.stream(internalCluster().getNodeNames()).map(ESIntegTestCase::getNodeId).collect(Collectors.joining("|"))
|
||||
);
|
||||
for (int shard = 0; shard < numShards; shard++) {
|
||||
mockLog.addExpectation(
|
||||
new MockLog.PatternAndExceptionSeenEventExpectation(
|
||||
format(
|
||||
"\"[%s][%s][%d]: failed to execute search request for task [\\d+]\" and an exception logged",
|
||||
nodesDisjunction,
|
||||
errorTriggeringIndex,
|
||||
shard
|
||||
),
|
||||
SearchService.class.getCanonicalName(),
|
||||
Level.DEBUG,
|
||||
format(
|
||||
"\\[%s\\]\\[%s\\]\\[%d\\]: failed to execute search request for task \\[\\d+\\]",
|
||||
nodesDisjunction,
|
||||
errorTriggeringIndex,
|
||||
shard
|
||||
),
|
||||
QueryShardException.class,
|
||||
"failed to create query: For input string: \"foo\""
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds expectations for the _absence_ of debug logging of a message. An unseen expectation is added for each
|
||||
* combination of node in the internal cluster and shard in the index.
|
||||
*
|
||||
* @param numShards the number of shards in the index (an expectation will be added for each shard)
|
||||
* @param mockLog the mock log
|
||||
* @param errorTriggeringIndex the name of the index that will trigger the error
|
||||
*/
|
||||
public static void addUnseenLoggingExpectations(int numShards, MockLog mockLog, String errorTriggeringIndex) {
|
||||
for (String nodeName : internalCluster().getNodeNames()) {
|
||||
for (int shard = 0; shard < numShards; shard++) {
|
||||
mockLog.addExpectation(
|
||||
new MockLog.UnseenEventExpectation(
|
||||
format(
|
||||
"\"[%s][%s][%d]: failed to execute search request\" and an exception logged",
|
||||
getNodeId(nodeName),
|
||||
errorTriggeringIndex,
|
||||
shard
|
||||
),
|
||||
SearchService.class.getCanonicalName(),
|
||||
Level.DEBUG,
|
||||
format("[%s][%s][%d]: failed to execute search request", getNodeId(nodeName), errorTriggeringIndex, shard)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,6 +316,41 @@ public class MockLog implements Releasable {
|
|||
}
|
||||
}
|
||||
|
||||
public static class PatternAndExceptionSeenEventExpectation extends SeenEventExpectation {
|
||||
|
||||
private final Pattern pattern;
|
||||
private final Class<? extends Exception> clazz;
|
||||
private final String exceptionMessage;
|
||||
|
||||
public PatternAndExceptionSeenEventExpectation(
|
||||
String name,
|
||||
String logger,
|
||||
Level level,
|
||||
String pattern,
|
||||
Class<? extends Exception> clazz,
|
||||
String exceptionMessage
|
||||
) {
|
||||
super(name, logger, level, pattern);
|
||||
this.pattern = Pattern.compile(pattern);
|
||||
this.clazz = clazz;
|
||||
this.exceptionMessage = exceptionMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void match(LogEvent event) {
|
||||
if (event.getLevel().equals(level) && event.getLoggerName().equals(logger)) {
|
||||
boolean patternMatches = pattern.matcher(event.getMessage().getFormattedMessage()).matches();
|
||||
boolean exceptionMatches = event.getThrown() != null
|
||||
&& event.getThrown().getClass() == clazz
|
||||
&& event.getThrown().getMessage().equals(exceptionMessage);
|
||||
|
||||
if (patternMatches && exceptionMatches) {
|
||||
seenLatch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper around {@link LoggingExpectation} to detect if the assertMatched method has been called
|
||||
*/
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
|
||||
package org.elasticsearch.xpack.search;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -17,10 +19,12 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.search.ErrorTraceHelper;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.MockLog;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -28,6 +32,7 @@ import java.util.Map;
|
|||
import java.util.function.BooleanSupplier;
|
||||
|
||||
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
|
||||
private BooleanSupplier transportMessageHasStackTrace;
|
||||
|
||||
@Override
|
||||
protected boolean addMockHttpTransport() {
|
||||
|
@ -40,7 +45,10 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
|
|||
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), AsyncSearch.class, MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
private BooleanSupplier transportMessageHasStackTrace;
|
||||
@BeforeClass
|
||||
public static void setDebugLogLevel() {
|
||||
Configurator.setLevel(SearchService.class, Level.DEBUG);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupMessageListener() {
|
||||
|
@ -146,6 +154,85 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
|
|||
assertFalse(transportMessageHasStackTrace.getAsBoolean());
|
||||
}
|
||||
|
||||
public void testDataNodeDoesNotLogStackTraceWhenErrorTraceTrue() throws IOException, InterruptedException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
Request searchRequest = new Request("POST", "/_async_search");
|
||||
searchRequest.setJsonEntity("""
|
||||
{
|
||||
"query": {
|
||||
"simple_query_string" : {
|
||||
"query": "foo",
|
||||
"fields": ["field"]
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
searchRequest.addParameter("error_trace", "true");
|
||||
searchRequest.addParameter("keep_on_completion", "true");
|
||||
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addUnseenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
|
||||
String asyncExecutionId = (String) responseEntity.get("id");
|
||||
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
|
||||
request.addParameter("error_trace", "true");
|
||||
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
|
||||
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
|
||||
}
|
||||
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDataNodeLogsStackTraceWhenErrorTraceFalseOrEmpty() throws IOException, InterruptedException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
// error_trace defaults to false so we can test both cases with some randomization
|
||||
final boolean defineErrorTraceFalse = randomBoolean();
|
||||
|
||||
Request searchRequest = new Request("POST", "/_async_search");
|
||||
searchRequest.setJsonEntity("""
|
||||
{
|
||||
"query": {
|
||||
"simple_query_string" : {
|
||||
"query": "foo",
|
||||
"fields": ["field"]
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
if (defineErrorTraceFalse) {
|
||||
searchRequest.addParameter("error_trace", "false");
|
||||
}
|
||||
searchRequest.addParameter("keep_on_completion", "true");
|
||||
searchRequest.addParameter("wait_for_completion_timeout", "0ms");
|
||||
|
||||
String errorTriggeringIndex = "test2";
|
||||
int numShards = getNumShards(errorTriggeringIndex).numPrimaries;
|
||||
try (var mockLog = MockLog.capture(SearchService.class)) {
|
||||
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
|
||||
|
||||
Map<String, Object> responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO);
|
||||
String asyncExecutionId = (String) responseEntity.get("id");
|
||||
Request request = new Request("GET", "/_async_search/" + asyncExecutionId);
|
||||
if (defineErrorTraceFalse) {
|
||||
request.addParameter("error_trace", "false");
|
||||
}
|
||||
while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) {
|
||||
responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L));
|
||||
}
|
||||
|
||||
getRestClient().performRequest(searchRequest);
|
||||
mockLog.assertAllExpectationsMatched();
|
||||
}
|
||||
}
|
||||
|
||||
public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws IOException, InterruptedException {
|
||||
setupIndexWithDocs();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue