Fix leaked HTTP response sent after close (#105293)

Today a `HttpResponse` is always released via a `ChannelPromise` which
means the release happens on a network thread. However, it's possible we
try and send a `HttpResponse` after the node has got far enough through
shutdown that it doesn't have any running network threads left, which
means the response just leaks.

This is no big deal in production, it becomes irrelevant when the
process exits, but in tests we start and stop many nodes within the same
process so mustn't leak anything.

At this point in shutdown, all HTTP channels are now closed, so it's
sufficient to check whether the channel is open first, and to fail the
listener on the calling thread if not. That's what this commit does.

Closes #104651
This commit is contained in:
David Turner 2024-02-08 19:57:02 +00:00 committed by GitHub
parent 0cbc745f57
commit 97dbb2a27e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 72 additions and 1 deletions

View file

@ -0,0 +1,6 @@
pr: 105293
summary: Fix leaked HTTP response sent after close
area: Network
type: bug
issues:
- 104651

View file

@ -18,6 +18,7 @@ import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
public class Netty4HttpChannel implements HttpChannel {
@ -31,7 +32,13 @@ public class Netty4HttpChannel implements HttpChannel {
@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
if (isOpen()) {
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
} else {
// No need to dispatch to the event loop just to fail this listener; moreover the channel might be closed because the whole
// node is shutting down, in which case the event loop might not exist any more so the channel promise cannot be completed.
listener.onFailure(new ClosedChannelException());
}
}
@Override

View file

@ -39,10 +39,16 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import org.apache.http.HttpHost;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.network.NetworkAddress;
@ -956,6 +962,58 @@ public class Netty4HttpServerTransportTests extends AbstractHttpServerTransportT
}
}
public void testRespondAfterClose() throws Exception {
final String url = "/thing";
final CountDownLatch responseReleasedLatch = new CountDownLatch(1);
final SubscribableListener<Void> transportClosedFuture = new SubscribableListener<>();
final CountDownLatch handlingRequestLatch = new CountDownLatch(1);
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
assertEquals(request.uri(), url);
final var response = RestResponse.chunked(
OK,
ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()),
responseReleasedLatch::countDown
);
transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response)));
handlingRequestLatch.countDown();
}
@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
fail(cause, "--> Unexpected bad request [%s]", FakeRestRequest.requestToString(channel.request()));
}
};
try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
Settings.EMPTY,
networkService,
threadPool,
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(Settings.EMPTY),
Tracer.NOOP,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
)
) {
transport.start();
final var address = randomFrom(transport.boundAddress().boundAddresses()).address();
try (var client = RestClient.builder(new HttpHost(address.getAddress(), address.getPort())).build()) {
client.performRequestAsync(new Request("GET", url), ActionTestUtils.wrapAsRestResponseListener(ActionListener.noop()));
safeAwait(handlingRequestLatch);
transport.close();
transportClosedFuture.onResponse(null);
safeAwait(responseReleasedLatch);
}
}
}
private Netty4HttpServerTransport getTestNetty4HttpServerTransport(
HttpServerTransport.Dispatcher dispatcher,
HttpValidator httpValidator,