Stop RecordingApmServer message processing before returning from tests (#130007)

This commit is contained in:
Lorenzo Dematté 2025-06-26 18:37:32 +02:00 committed by GitHub
parent e2dfd5d707
commit d6e2b575b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 24 additions and 28 deletions

View file

@ -524,9 +524,6 @@ tests:
- class: org.elasticsearch.search.query.VectorIT
method: testFilteredQueryStrategy
issue: https://github.com/elastic/elasticsearch/issues/129517
- class: org.elasticsearch.test.apmintegration.TracesApmIT
method: testApmIntegration
issue: https://github.com/elastic/elasticsearch/issues/129651
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
method: testSnapshotShutdownProgressTracker
issue: https://github.com/elastic/elasticsearch/issues/129752

View file

@ -15,7 +15,6 @@ import com.sun.net.httpserver.HttpServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.xcontent.spi.XContentProvider;
import org.junit.rules.ExternalResource;
import java.io.BufferedReader;
@ -25,7 +24,6 @@ import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -35,14 +33,12 @@ import java.util.function.Consumer;
public class RecordingApmServer extends ExternalResource {
private static final Logger logger = LogManager.getLogger(RecordingApmServer.class);
private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent();
final ArrayBlockingQueue<String> received = new ArrayBlockingQueue<>(1000);
private static HttpServer server;
private final Thread messageConsumerThread = consumerThread();
private volatile Consumer<String> consumer;
private volatile boolean consumerRunning = true;
private volatile boolean running = true;
@Override
protected void before() throws Throwable {
@ -56,7 +52,7 @@ public class RecordingApmServer extends ExternalResource {
private Thread consumerThread() {
return new Thread(() -> {
while (consumerRunning) {
while (running) {
if (consumer != null) {
try {
String msg = received.poll(1L, TimeUnit.SECONDS);
@ -74,28 +70,38 @@ public class RecordingApmServer extends ExternalResource {
@Override
protected void after() {
running = false;
server.stop(1);
consumerRunning = false;
consumer = null;
}
private void handle(HttpExchange exchange) throws IOException {
try (exchange) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
if (running) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
}
}
}
} catch (RuntimeException e) {
logger.warn("failed to parse request", e);
} catch (Throwable t) {
// The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
// messages before the test starts running. We should also stop handling them before the test ends (and the test
// cluster is torn down), or we may run into IOException as the communication channel is interrupted.
// Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
// we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
// the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
// if we have an error outside the test scope (before or after) that is OK.
logger.warn("failed to parse request", t);
}
}
exchange.sendResponseHeaders(201, 0);
}
}
private List<String> readJsonMessages(InputStream input) throws IOException {
private List<String> readJsonMessages(InputStream input) {
// parse NDJSON
return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList();
}
@ -104,14 +110,7 @@ public class RecordingApmServer extends ExternalResource {
return server.getAddress().getPort();
}
public List<String> getMessages() {
List<String> list = new ArrayList<>(received.size());
received.drainTo(list);
return list;
}
public void addMessageConsumer(Consumer<String> messageConsumer) {
this.consumer = messageConsumer;
}
}

View file

@ -91,7 +91,8 @@ public class TracesApmIT extends ESRestTestCase {
client().performRequest(nodeStatsRequest);
finished.await(30, TimeUnit.SECONDS);
var completed = finished.await(30, TimeUnit.SECONDS);
assertTrue("Timeout when waiting for assertions to complete", completed);
assertThat(assertions, equalTo(Collections.emptySet()));
}
@ -143,5 +144,4 @@ public class TracesApmIT extends ESRestTestCase {
return Collections.emptyMap();
}
}
}