release stream chunk queue on bad request (#112227)

This commit is contained in:
Mikhail Berezovskiy 2024-08-27 10:33:57 -07:00 committed by Tim Brooks
parent 1b77421cf8
commit cbcbc34863
4 changed files with 55 additions and 3 deletions

View file

@ -316,6 +316,29 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
}
}
// ensures that we dont leak buffers in stream on 400-bad-request
// some bad requests are dispatched from rest-controller before reaching rest handler
// test relies on netty's buffer leak detection
public void testBadRequestReleaseQueuedChunks() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var req = httpRequest(id, contentSize);
var content = randomContent(contentSize, true);
// set unacceptable content-type
req.headers().set(CONTENT_TYPE, "unknown");
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(content);
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status());
resp.release();
}
}
}
private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}
@ -514,6 +537,11 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new BaseRestHandler() {
@Override
public boolean allowsUnsafeBuffers() {
return true;
}
@Override
public String getName() {
return ROUTE;

View file

@ -61,7 +61,7 @@ public class Netty4HttpRequest implements HttpRequest {
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
false,
true,
contentStream,
null
);
@ -116,6 +116,7 @@ public class Netty4HttpRequest implements HttpRequest {
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
content.close();
}
}

View file

@ -31,11 +31,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
private boolean requested = false;
private boolean hasLast = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;
public Netty4HttpRequestBodyStream(Channel channel) {
this.channel = channel;
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
channel.closeFuture().addListener((f) -> doClose());
channel.config().setAutoRead(false);
}
@ -71,6 +72,10 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
}
public void handleNettyContent(HttpContent httpContent) {
if (closing) {
httpContent.release();
return;
}
assert handler != null : "handler must be set before processing http content";
if (requested && chunkQueue.isEmpty()) {
sendChunk(httpContent);
@ -112,4 +117,18 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
}
}
@Override
public void close() {
if (channel.eventLoop().inEventLoop()) {
doClose();
} else {
channel.eventLoop().submit(this::doClose);
}
}
private void doClose() {
closing = true;
releaseQueuedChunks();
channel.config().setAutoRead(true);
}
}

View file

@ -13,11 +13,12 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
/**
* A super-interface for different HTTP content implementations
*/
public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream {
static Full fromBytesReference(BytesReference bytesRef) {
return new ByteRefHttpBody(bytesRef);
@ -56,6 +57,9 @@ public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
*/
non-sealed interface Full extends HttpBody {
BytesReference bytes();
@Override
default void close() {}
}
/**