From d6e2b575b7df5b1640e92c476e726729239dedb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lorenzo=20Dematt=C3=A9?= Date: Thu, 26 Jun 2025 18:37:32 +0200 Subject: [PATCH] Stop RecordingApmServer message processing before returning from tests (#130007) --- muted-tests.yml | 3 -- .../apmintegration/RecordingApmServer.java | 45 +++++++++---------- .../test/apmintegration/TracesApmIT.java | 4 +- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 1aef2f0d85b8..1a27ae5161a5 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -524,9 +524,6 @@ tests: - class: org.elasticsearch.search.query.VectorIT method: testFilteredQueryStrategy issue: https://github.com/elastic/elasticsearch/issues/129517 -- class: org.elasticsearch.test.apmintegration.TracesApmIT - method: testApmIntegration - issue: https://github.com/elastic/elasticsearch/issues/129651 - class: org.elasticsearch.snapshots.SnapshotShutdownIT method: testSnapshotShutdownProgressTracker issue: https://github.com/elastic/elasticsearch/issues/129752 diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java index db2ce9fb83a5..d20f2d08719e 100644 --- a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java +++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java @@ -15,7 +15,6 @@ import com.sun.net.httpserver.HttpServer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.xcontent.spi.XContentProvider; import org.junit.rules.ExternalResource; import java.io.BufferedReader; @@ -25,7 +24,6 @@ import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; @@ -35,14 +33,12 @@ import java.util.function.Consumer; public class RecordingApmServer extends ExternalResource { private static final Logger logger = LogManager.getLogger(RecordingApmServer.class); - private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent(); - final ArrayBlockingQueue received = new ArrayBlockingQueue<>(1000); private static HttpServer server; private final Thread messageConsumerThread = consumerThread(); private volatile Consumer consumer; - private volatile boolean consumerRunning = true; + private volatile boolean running = true; @Override protected void before() throws Throwable { @@ -56,7 +52,7 @@ public class RecordingApmServer extends ExternalResource { private Thread consumerThread() { return new Thread(() -> { - while (consumerRunning) { + while (running) { if (consumer != null) { try { String msg = received.poll(1L, TimeUnit.SECONDS); @@ -74,28 +70,38 @@ public class RecordingApmServer extends ExternalResource { @Override protected void after() { + running = false; server.stop(1); - consumerRunning = false; + consumer = null; } private void handle(HttpExchange exchange) throws IOException { try (exchange) { - try { - try (InputStream requestBody = exchange.getRequestBody()) { - if (requestBody != null) { - var read = readJsonMessages(requestBody); - received.addAll(read); + if (running) { + try { + try (InputStream requestBody = exchange.getRequestBody()) { + if (requestBody != null) { + var read = readJsonMessages(requestBody); + received.addAll(read); + } } - } - } catch (RuntimeException e) { - logger.warn("failed to parse request", e); + } catch (Throwable t) { + // The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received + // messages before the test starts running. We should also stop handling them before the test ends (and the test + // cluster is torn down), or we may run into IOException as the communication channel is interrupted. + // Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so + // we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from + // the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and + // if we have an error outside the test scope (before or after) that is OK. + logger.warn("failed to parse request", t); + } } exchange.sendResponseHeaders(201, 0); } } - private List readJsonMessages(InputStream input) throws IOException { + private List readJsonMessages(InputStream input) { // parse NDJSON return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList(); } @@ -104,14 +110,7 @@ public class RecordingApmServer extends ExternalResource { return server.getAddress().getPort(); } - public List getMessages() { - List list = new ArrayList<>(received.size()); - received.drainTo(list); - return list; - } - public void addMessageConsumer(Consumer messageConsumer) { this.consumer = messageConsumer; } - } diff --git a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java index 6b10140bd80e..afb9243e0f3e 100644 --- a/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java +++ b/test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java @@ -91,7 +91,8 @@ public class TracesApmIT extends ESRestTestCase { client().performRequest(nodeStatsRequest); - finished.await(30, TimeUnit.SECONDS); + var completed = finished.await(30, TimeUnit.SECONDS); + assertTrue("Timeout when waiting for assertions to complete", completed); assertThat(assertions, equalTo(Collections.emptySet())); } @@ -143,5 +144,4 @@ public class TracesApmIT extends ESRestTestCase { return Collections.emptyMap(); } } - }