diff --git a/docs/changelog/88946.yaml b/docs/changelog/88946.yaml new file mode 100644 index 000000000000..ae853f2e5ffa --- /dev/null +++ b/docs/changelog/88946.yaml @@ -0,0 +1,6 @@ +pr: 88946 +summary: "Graph: fix race condition in timeout" +area: Graph +type: bug +issues: + - 55396 diff --git a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java index 7623fffa777f..e9178675bd1a 100644 --- a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java +++ b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java @@ -222,19 +222,18 @@ public class GraphTests extends ESSingleNodeTestCase { assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people", "elvis"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55396") public void testTimedoutQueryCrawl() { GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); grb.setTimeout(TimeValue.timeValueMillis(400)); Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles - // 00s friends of beatles - grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); // A query that should cause a timeout ScriptQueryBuilder timeoutQuery = QueryBuilders.scriptQuery( new Script(ScriptType.INLINE, "mockscript", "graph_timeout", Collections.emptyMap()) ); grb.createNextHop(timeoutQuery).addVertexRequest("people").size(100).minDocCount(1); + // 00s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); GraphExploreResponse response = grb.get(); assertTrue(response.isTimedOut()); diff --git a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java index b50cd54bbbe4..93c28f63b0e8 100644 --- a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java +++ b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.graph.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; @@ -61,13 +63,13 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; /** * Performs a series of elasticsearch queries and aggregations to explore * connected terms in a single index. */ public class TransportGraphExploreAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportGraphExploreAction.class); private final ThreadPool threadPool; private final NodeClient client; @@ -115,7 +117,6 @@ public class TransportGraphExploreAction extends HandledTransportAction listener; private final long startTime; - private final AtomicBoolean timedOut; private volatile ShardOperationFailedException[] shardFailures; private Map vertices = new HashMap<>(); private Map connections = new HashMap<>(); @@ -128,7 +129,6 @@ public class TransportGraphExploreAction extends HandledTransportAction> lastHopFindings = hopFindings.get(currentHopNumber); if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) { // Either we gathered no leads from the last hop or we have // reached the final hop - listener.onResponse(buildResponse()); + listener.onResponse(buildResponse(false)); return; } Hop lastHop = request.getHop(currentHopNumber); @@ -318,16 +313,22 @@ public class TransportGraphExploreAction extends HandledTransportAction(listener) { @Override public void onResponse(SearchResponse searchResponse) { - // System.out.println(searchResponse); addShardFailures(searchResponse.getShardFailures()); ArrayList newConnections = new ArrayList(); @@ -676,7 +677,6 @@ public class TransportGraphExploreAction extends HandledTransportAction(listener) { @Override @@ -774,16 +774,6 @@ public class TransportGraphExploreAction extends HandledTransportAction