mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-07-21 05:03:25 -04:00
fix leaking listener (#112629)
This commit is contained in:
parent
ce2d648d8e
commit
0d55dc6de4
3 changed files with 69 additions and 63 deletions
|
@ -153,7 +153,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures that all queued chunks are released when connection closed
|
// ensures that all received chunks are released when connection closed
|
||||||
public void testClientConnectionCloseMidStream() throws Exception {
|
public void testClientConnectionCloseMidStream() throws Exception {
|
||||||
try (var ctx = setupClientCtx()) {
|
try (var ctx = setupClientCtx()) {
|
||||||
var opaqueId = opaqueId(0);
|
var opaqueId = opaqueId(0);
|
||||||
|
@ -164,18 +164,18 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
// await stream handler is ready and request full content
|
// await stream handler is ready and request full content
|
||||||
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
||||||
assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size()));
|
assertBusy(() -> assertNotNull(handler.stream.buf()));
|
||||||
|
|
||||||
// enable auto-read to receive channel close event
|
// enable auto-read to receive channel close event
|
||||||
handler.stream.channel().config().setAutoRead(true);
|
handler.stream.channel().config().setAutoRead(true);
|
||||||
|
|
||||||
// terminate connection and wait resources are released
|
// terminate connection and wait resources are released
|
||||||
ctx.clientChannel.close();
|
ctx.clientChannel.close();
|
||||||
assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size()));
|
assertBusy(() -> assertNull(handler.stream.buf()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures that all queued chunks are released when server decides to close connection
|
// ensures that all recieved chunks are released when server decides to close connection
|
||||||
public void testServerCloseConnectionMidStream() throws Exception {
|
public void testServerCloseConnectionMidStream() throws Exception {
|
||||||
try (var ctx = setupClientCtx()) {
|
try (var ctx = setupClientCtx()) {
|
||||||
var opaqueId = opaqueId(0);
|
var opaqueId = opaqueId(0);
|
||||||
|
@ -186,11 +186,11 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
// await stream handler is ready and request full content
|
// await stream handler is ready and request full content
|
||||||
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
||||||
assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size()));
|
assertBusy(() -> assertNotNull(handler.stream.buf()));
|
||||||
|
|
||||||
// terminate connection on server and wait resources are released
|
// terminate connection on server and wait resources are released
|
||||||
handler.channel.request().getHttpChannel().close();
|
handler.channel.request().getHttpChannel().close();
|
||||||
assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size()));
|
assertBusy(() -> assertNull(handler.stream.buf()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,7 +470,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();
|
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();
|
||||||
final Netty4HttpRequestBodyStream stream;
|
final Netty4HttpRequestBodyStream stream;
|
||||||
RestChannel channel;
|
RestChannel channel;
|
||||||
|
|
||||||
boolean recvLast = false;
|
boolean recvLast = false;
|
||||||
|
|
||||||
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
|
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
|
||||||
|
|
|
@ -9,34 +9,36 @@
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
package org.elasticsearch.http.netty4;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
|
||||||
import org.elasticsearch.http.HttpBody;
|
import org.elasticsearch.http.HttpBody;
|
||||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.Queue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Netty based implementation of {@link HttpBody.Stream}.
|
* Netty based implementation of {@link HttpBody.Stream}.
|
||||||
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
||||||
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
|
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
|
||||||
* autoRead=off. In this case chunks will be queued until downstream calls {@link Stream#next()}
|
* autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()}
|
||||||
*/
|
*/
|
||||||
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
|
private final ChannelFutureListener closeListener = future -> doClose();
|
||||||
private boolean requested = false;
|
private ByteBuf buf;
|
||||||
private boolean hasLast = false;
|
private boolean hasLast = false;
|
||||||
|
private boolean requested = false;
|
||||||
private boolean closing = false;
|
private boolean closing = false;
|
||||||
private HttpBody.ChunkHandler handler;
|
private HttpBody.ChunkHandler handler;
|
||||||
|
|
||||||
|
|
||||||
public Netty4HttpRequestBodyStream(Channel channel) {
|
public Netty4HttpRequestBodyStream(Channel channel) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
channel.closeFuture().addListener((f) -> doClose());
|
Netty4Utils.addListener(channel.closeFuture(), closeListener);
|
||||||
channel.config().setAutoRead(false);
|
channel.config().setAutoRead(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,41 +52,49 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
this.handler = chunkHandler;
|
this.handler = chunkHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendQueuedOrRead() {
|
|
||||||
assert channel.eventLoop().inEventLoop();
|
|
||||||
requested = true;
|
|
||||||
var chunk = chunkQueue.poll();
|
|
||||||
if (chunk == null) {
|
|
||||||
channel.read();
|
|
||||||
} else {
|
|
||||||
sendChunk(chunk);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void next() {
|
public void next() {
|
||||||
|
assert closing == false : "cannot request next chunk on closing stream";
|
||||||
assert handler != null : "handler must be set before requesting next chunk";
|
assert handler != null : "handler must be set before requesting next chunk";
|
||||||
if (channel.eventLoop().inEventLoop()) {
|
channel.eventLoop().submit(() -> {
|
||||||
sendQueuedOrRead();
|
requested = true;
|
||||||
|
if (buf == null) {
|
||||||
|
channel.read();
|
||||||
} else {
|
} else {
|
||||||
channel.eventLoop().submit(this::sendQueuedOrRead);
|
send();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleNettyContent(HttpContent httpContent) {
|
public void handleNettyContent(HttpContent httpContent) {
|
||||||
|
assert hasLast == false : "receive http content on completed stream";
|
||||||
|
hasLast = httpContent instanceof LastHttpContent;
|
||||||
if (closing) {
|
if (closing) {
|
||||||
httpContent.release();
|
httpContent.release();
|
||||||
return;
|
|
||||||
}
|
|
||||||
assert handler != null : "handler must be set before processing http content";
|
|
||||||
if (requested && chunkQueue.isEmpty()) {
|
|
||||||
sendChunk(httpContent);
|
|
||||||
} else {
|
} else {
|
||||||
chunkQueue.add(httpContent);
|
addChunk(httpContent.content());
|
||||||
|
if (requested) {
|
||||||
|
send();
|
||||||
}
|
}
|
||||||
if (httpContent instanceof LastHttpContent) {
|
}
|
||||||
hasLast = true;
|
if (hasLast) {
|
||||||
channel.config().setAutoRead(true);
|
channel.config().setAutoRead(true);
|
||||||
|
channel.closeFuture().removeListener(closeListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk
|
||||||
|
private void addChunk(ByteBuf chunk) {
|
||||||
|
assert chunk != null;
|
||||||
|
if (buf == null) {
|
||||||
|
buf = chunk;
|
||||||
|
} else if (buf instanceof CompositeByteBuf comp) {
|
||||||
|
comp.addComponent(true, chunk);
|
||||||
|
} else {
|
||||||
|
var comp = channel.alloc().compositeBuffer();
|
||||||
|
comp.addComponent(true, buf);
|
||||||
|
comp.addComponent(true, chunk);
|
||||||
|
buf = comp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,8 +104,8 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
}
|
}
|
||||||
|
|
||||||
// visible for test
|
// visible for test
|
||||||
Queue<HttpContent> chunkQueue() {
|
ByteBuf buf() {
|
||||||
return chunkQueue;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
// visible for test
|
// visible for test
|
||||||
|
@ -103,18 +113,13 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
return hasLast;
|
return hasLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendChunk(HttpContent httpContent) {
|
private void send() {
|
||||||
assert requested;
|
assert requested;
|
||||||
|
assert handler != null : "must set handler before receiving next chunk";
|
||||||
|
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
|
||||||
requested = false;
|
requested = false;
|
||||||
var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content());
|
buf = null;
|
||||||
var isLast = httpContent instanceof LastHttpContent;
|
handler.onNext(bytesRef, hasLast);
|
||||||
handler.onNext(bytesRef, isLast);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void releaseQueuedChunks() {
|
|
||||||
while (chunkQueue.isEmpty() == false) {
|
|
||||||
chunkQueue.poll().release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -128,7 +133,10 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
|
|
||||||
private void doClose() {
|
private void doClose() {
|
||||||
closing = true;
|
closing = true;
|
||||||
releaseQueuedChunks();
|
if (buf != null) {
|
||||||
|
buf.release();
|
||||||
|
buf = null;
|
||||||
|
}
|
||||||
channel.config().setAutoRead(true);
|
channel.config().setAutoRead(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,33 +53,31 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
for (int i = 0; i < totalChunks; i++) {
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
channel.writeInbound(randomContent(1024));
|
channel.writeInbound(randomContent(1024));
|
||||||
}
|
}
|
||||||
assertEquals(totalChunks, stream.chunkQueue().size());
|
assertEquals(totalChunks * 1024, stream.buf().readableBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures all queued chunks can be flushed downstream
|
// ensures all received chunks can be flushed downstream
|
||||||
public void testFlushQueued() {
|
public void testFlushAllReceivedChunks() {
|
||||||
var chunks = new ArrayList<ReleasableBytesReference>();
|
var chunks = new ArrayList<ReleasableBytesReference>();
|
||||||
var totalBytes = new AtomicInteger();
|
var totalBytes = new AtomicInteger();
|
||||||
stream.setHandler((chunk, isLast) -> {
|
stream.setHandler((chunk, isLast) -> {
|
||||||
chunks.add(chunk);
|
chunks.add(chunk);
|
||||||
totalBytes.addAndGet(chunk.length());
|
totalBytes.addAndGet(chunk.length());
|
||||||
});
|
});
|
||||||
// enqueue chunks
|
|
||||||
var chunkSize = 1024;
|
var chunkSize = 1024;
|
||||||
var totalChunks = randomIntBetween(1, 100);
|
var totalChunks = randomIntBetween(1, 100);
|
||||||
for (int i = 0; i < totalChunks; i++) {
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
channel.writeInbound(randomContent(chunkSize));
|
channel.writeInbound(randomContent(chunkSize));
|
||||||
}
|
}
|
||||||
// consume all chunks
|
|
||||||
for (var i = 0; i < totalChunks; i++) {
|
|
||||||
stream.next();
|
stream.next();
|
||||||
}
|
channel.runPendingTasks();
|
||||||
assertEquals(totalChunks, chunks.size());
|
assertEquals("should receive all chunks as single composite", 1, chunks.size());
|
||||||
assertEquals(chunkSize * totalChunks, totalBytes.get());
|
assertEquals(chunkSize * totalChunks, totalBytes.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures that we read from channel when chunks queue is empty
|
// ensures that we read from channel when no current chunks available
|
||||||
// and pass next chunk downstream without queuing
|
// and pass next chunk downstream without holding
|
||||||
public void testReadFromChannel() {
|
public void testReadFromChannel() {
|
||||||
var gotChunks = new ArrayList<ReleasableBytesReference>();
|
var gotChunks = new ArrayList<ReleasableBytesReference>();
|
||||||
var gotLast = new AtomicBoolean(false);
|
var gotLast = new AtomicBoolean(false);
|
||||||
|
@ -96,8 +94,9 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
channel.writeInbound(randomLastContent(chunkSize));
|
channel.writeInbound(randomLastContent(chunkSize));
|
||||||
|
|
||||||
for (int i = 0; i < totalChunks; i++) {
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
assertEquals("should not enqueue chunks", 0, stream.chunkQueue().size());
|
assertNull("should not enqueue chunks", stream.buf());
|
||||||
stream.next();
|
stream.next();
|
||||||
|
channel.runPendingTasks();
|
||||||
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
|
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
|
||||||
}
|
}
|
||||||
assertTrue("should receive last content", gotLast.get());
|
assertTrue("should receive last content", gotLast.get());
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue