diff --git a/docs/changelog/105293.yaml b/docs/changelog/105293.yaml new file mode 100644 index 000000000000..33eb3884a7e5 --- /dev/null +++ b/docs/changelog/105293.yaml @@ -0,0 +1,6 @@ +pr: 105293 +summary: Fix leaked HTTP response sent after close +area: Network +type: bug +issues: + - 104651 diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index f7e3809e295d..0ccd50770a05 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -18,6 +18,7 @@ import org.elasticsearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; public class Netty4HttpChannel implements HttpChannel { @@ -31,7 +32,13 @@ public class Netty4HttpChannel implements HttpChannel { @Override public void sendResponse(HttpResponse response, ActionListener listener) { - channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel)); + if (isOpen()) { + channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel)); + } else { + // No need to dispatch to the event loop just to fail this listener; moreover the channel might be closed because the whole + // node is shutting down, in which case the event loop might not exist any more so the channel promise cannot be completed. + listener.onFailure(new ClosedChannelException()); + } } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 1215d54e9ace..4d44c37ac094 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -39,10 +39,16 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; +import org.apache.http.HttpHost; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchWrapperException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.network.NetworkAddress; @@ -956,6 +962,58 @@ public class Netty4HttpServerTransportTests extends AbstractHttpServerTransportT } } + public void testRespondAfterClose() throws Exception { + final String url = "/thing"; + final CountDownLatch responseReleasedLatch = new CountDownLatch(1); + final SubscribableListener transportClosedFuture = new SubscribableListener<>(); + final CountDownLatch handlingRequestLatch = new CountDownLatch(1); + + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + assertEquals(request.uri(), url); + final var response = RestResponse.chunked( + OK, + ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), + responseReleasedLatch::countDown + ); + transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response))); + handlingRequestLatch.countDown(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + fail(cause, "--> Unexpected bad request [%s]", FakeRestRequest.requestToString(channel.request())); + } + }; + + try ( + Netty4HttpServerTransport transport = new Netty4HttpServerTransport( + Settings.EMPTY, + networkService, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + Tracer.NOOP, + TLSConfig.noTLS(), + null, + randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null) + ) + ) { + transport.start(); + final var address = randomFrom(transport.boundAddress().boundAddresses()).address(); + try (var client = RestClient.builder(new HttpHost(address.getAddress(), address.getPort())).build()) { + client.performRequestAsync(new Request("GET", url), ActionTestUtils.wrapAsRestResponseListener(ActionListener.noop())); + safeAwait(handlingRequestLatch); + transport.close(); + transportClosedFuture.onResponse(null); + safeAwait(responseReleasedLatch); + } + } + } + private Netty4HttpServerTransport getTestNetty4HttpServerTransport( HttpServerTransport.Dispatcher dispatcher, HttpValidator httpValidator,