mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-20 13:17:31 -04:00
release stream chunk queue on bad request (#112227)
This commit is contained in:
parent
1b77421cf8
commit
cbcbc34863
4 changed files with 55 additions and 3 deletions
|
@ -316,6 +316,29 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
// ensures that we dont leak buffers in stream on 400-bad-request
|
||||
// some bad requests are dispatched from rest-controller before reaching rest handler
|
||||
// test relies on netty's buffer leak detection
|
||||
public void testBadRequestReleaseQueuedChunks() throws Exception {
|
||||
try (var ctx = setupClientCtx()) {
|
||||
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
|
||||
var id = opaqueId(reqNo);
|
||||
var contentSize = randomIntBetween(0, maxContentLength());
|
||||
var req = httpRequest(id, contentSize);
|
||||
var content = randomContent(contentSize, true);
|
||||
|
||||
// set unacceptable content-type
|
||||
req.headers().set(CONTENT_TYPE, "unknown");
|
||||
ctx.clientChannel.writeAndFlush(req);
|
||||
ctx.clientChannel.writeAndFlush(content);
|
||||
|
||||
var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
|
||||
assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status());
|
||||
resp.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int maxContentLength() {
|
||||
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
|
||||
}
|
||||
|
@ -514,6 +537,11 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
Predicate<NodeFeature> clusterSupportsFeature
|
||||
) {
|
||||
return List.of(new BaseRestHandler() {
|
||||
@Override
|
||||
public boolean allowsUnsafeBuffers() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return ROUTE;
|
||||
|
|
|
@ -61,7 +61,7 @@ public class Netty4HttpRequest implements HttpRequest {
|
|||
EmptyHttpHeaders.INSTANCE
|
||||
),
|
||||
new AtomicBoolean(false),
|
||||
false,
|
||||
true,
|
||||
contentStream,
|
||||
null
|
||||
);
|
||||
|
@ -116,6 +116,7 @@ public class Netty4HttpRequest implements HttpRequest {
|
|||
public void release() {
|
||||
if (pooled && released.compareAndSet(false, true)) {
|
||||
request.release();
|
||||
content.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,11 +31,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
|
||||
private boolean requested = false;
|
||||
private boolean hasLast = false;
|
||||
private boolean closing = false;
|
||||
private HttpBody.ChunkHandler handler;
|
||||
|
||||
public Netty4HttpRequestBodyStream(Channel channel) {
|
||||
this.channel = channel;
|
||||
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
|
||||
channel.closeFuture().addListener((f) -> doClose());
|
||||
channel.config().setAutoRead(false);
|
||||
}
|
||||
|
||||
|
@ -71,6 +72,10 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
}
|
||||
|
||||
public void handleNettyContent(HttpContent httpContent) {
|
||||
if (closing) {
|
||||
httpContent.release();
|
||||
return;
|
||||
}
|
||||
assert handler != null : "handler must be set before processing http content";
|
||||
if (requested && chunkQueue.isEmpty()) {
|
||||
sendChunk(httpContent);
|
||||
|
@ -112,4 +117,18 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (channel.eventLoop().inEventLoop()) {
|
||||
doClose();
|
||||
} else {
|
||||
channel.eventLoop().submit(this::doClose);
|
||||
}
|
||||
}
|
||||
|
||||
private void doClose() {
|
||||
closing = true;
|
||||
releaseQueuedChunks();
|
||||
channel.config().setAutoRead(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,11 +13,12 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
|
||||
/**
|
||||
* A super-interface for different HTTP content implementations
|
||||
*/
|
||||
public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
|
||||
public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream {
|
||||
|
||||
static Full fromBytesReference(BytesReference bytesRef) {
|
||||
return new ByteRefHttpBody(bytesRef);
|
||||
|
@ -56,6 +57,9 @@ public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
|
|||
*/
|
||||
non-sealed interface Full extends HttpBody {
|
||||
BytesReference bytes();
|
||||
|
||||
@Override
|
||||
default void close() {}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue