Revert #126441 Add flow-control and remove auto-read in netty4 HTTP pipeline (#127030)

* Revert "Release buffers in netty test (#126744)"

This reverts commit f9f3defe92.

* Revert "Add flow-control and remove auto-read in netty4 HTTP pipeline (#126441)"

This reverts commit c8805b85d2.
This commit is contained in:
Brian Seeders 2025-04-17 15:37:26 -04:00 committed by GitHub
parent d19b525eb1
commit 2a243d8492
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1165 additions and 496 deletions

View file

@ -1,5 +0,0 @@
pr: 126441
summary: Add flow-control and remove auto-read in netty4 http pipeline
area: Network
type: enhancement
issues: []

View file

@ -94,64 +94,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}
static int MBytes(int m) {
return m * 1024 * 1024;
}
static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}
private static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}
private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}
private static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}
private static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
@ -236,6 +178,8 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);
// terminate client connection
@ -246,7 +190,10 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
handler.stream.next();
// wait for resources to be released
assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}
@ -261,11 +208,15 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);
// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}
@ -279,12 +230,16 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);
handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();
assertBusy(() -> assertTrue(handler.streamClosed));
assertBusy(() -> {
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
}
@ -325,7 +280,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
});
handler.readBytes(partSize);
}
assertTrue(handler.recvLast);
assertTrue(handler.stream.hasLast());
}
}
@ -430,6 +385,16 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
}
}
private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}
/**
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
*/
@ -524,7 +489,55 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
return getTestName() + "-" + reqNo;
}
private Ctx setupClientCtx() throws Exception {
static int MBytes(int m) {
return m * 1024 * 1024;
}
static <T> T safePoll(BlockingDeque<T> queue) {
try {
var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
req.headers().add(CONTENT_LENGTH, content.readableBytes());
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}
static HttpRequest httpRequest(String opaqueId, int contentLength) {
return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
}
static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
return req;
}
static HttpContent randomContent(int size, boolean isLast) {
var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
if (isLast) {
return new DefaultLastHttpContent(buf);
} else {
return new DefaultHttpContent(buf);
}
}
static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}
Ctx setupClientCtx() throws Exception {
var nodeName = internalCluster().getRandomNodeName();
var clientRespQueue = new LinkedBlockingDeque<>(16);
var bootstrap = bootstrapClient(nodeName, clientRespQueue);
@ -532,7 +545,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue);
}
private Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
var httpServer = internalCluster().getInstance(HttpServerTransport.class, node);
var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses());
return new Bootstrap().group(new NioEventLoopGroup(1))
@ -570,13 +583,9 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
return false; // enable http
}
private record Ctx(
String testName,
String nodeName,
Bootstrap clientBootstrap,
Channel clientChannel,
BlockingDeque<Object> clientRespQueue
) implements AutoCloseable {
record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque<Object> clientRespQueue)
implements
AutoCloseable {
@Override
public void close() throws Exception {
@ -601,7 +610,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
}
}
private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
final String opaqueId;
final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();

View file

@ -1,78 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.TimeUnit;
/**
* When channel auto-read is disabled handlers are responsible to read from channel.
* But it's hard to detect when read is missing. This helper class print warnings
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
* to avoid test hang for too long, but can be increased if needed.
*/
class MissingReadDetector extends ChannelDuplexHandler {
private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);
private final long interval;
private final TimeProvider timer;
private boolean pendingRead;
private long lastRead;
private ScheduledFuture<?> checker;
MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
this.interval = missingReadIntervalMillis;
this.timer = timer;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
if (pendingRead == false) {
long now = timer.absoluteTimeInMillis();
if (now >= lastRead + interval) {
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
}
}
}, interval, interval, TimeUnit.MILLISECONDS);
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (checker != null) {
FutureUtils.cancel(checker);
}
super.handlerRemoved(ctx);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
pendingRead = true;
ctx.read();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
pendingRead = false;
lastRead = timer.absoluteTimeInMillis();
ctx.fireChannelRead(msg);
}
}

View file

@ -15,7 +15,6 @@ import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.LastHttpContent;
import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@ -49,9 +48,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
}

View file

@ -123,7 +123,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
return;
}
}
@ -137,7 +136,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
decoder.reset();
}
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.read();
} else {
ignoreContent = false;
currentContentLength = 0;
@ -152,13 +150,11 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
if (ignoreContent) {
msg.release();
ctx.read();
} else {
currentContentLength += msg.content().readableBytes();
if (currentContentLength > maxContentLength) {
msg.release();
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
} else {
ctx.fireChannelRead(msg);
}

View file

@ -9,113 +9,249 @@
package org.elasticsearch.http.netty4;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.http.netty4.internal.HttpValidator;
import org.elasticsearch.transport.Transports;
public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
import java.util.ArrayDeque;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
private final HttpValidator validator;
private final ThreadContext threadContext;
private State state;
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
private State state = WAITING_TO_START;
public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
this.validator = validator;
this.threadContext = threadContext;
}
State getState() {
return state;
}
@SuppressWarnings("fallthrough")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof HttpObject;
var httpObject = (HttpObject) msg;
if (httpObject.decoderResult().isFailure()) {
ctx.fireChannelRead(httpObject); // pass-through for decoding failures
} else {
if (msg instanceof HttpRequest request) {
validate(ctx, request);
} else {
assert msg instanceof HttpContent;
var content = (HttpContent) msg;
if (state == State.DROPPING) {
content.release();
ctx.read();
} else {
assert state == State.PASSING : "unexpected content before validation completed";
ctx.fireChannelRead(content);
final HttpObject httpObject = (HttpObject) msg;
switch (state) {
case WAITING_TO_START:
assert pending.isEmpty();
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
break;
case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
}
ctx.fireChannelRead(httpObject);
break;
case DROPPING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break;
}
}
private void requestStart(ChannelHandlerContext ctx) {
assert state == WAITING_TO_START;
if (pending.isEmpty()) {
return;
}
final HttpObject httpObject = pending.getFirst();
final HttpRequest httpRequest;
if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
// a properly decoded HTTP start message is expected to begin validation
// anything else is probably an error that the downstream HTTP message aggregator will have to handle
httpRequest = (HttpRequest) httpObject;
} else {
httpRequest = null;
}
state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);
if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
} else {
assert Transports.assertDefaultThreadContext(threadContext);
ActionListener.run(
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ActionListener.assertOnce(
new ContextPreservingActionListener<Void>(
threadContext.wrapRestorable(threadContext.newStoredContext()),
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
assert Transports.assertDefaultThreadContext(threadContext);
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
}
@Override
public void onFailure(Exception e) {
assert Transports.assertDefaultThreadContext(threadContext);
ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
}
}
)
),
listener -> {
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(httpRequest, ctx.channel(), listener);
}
}
);
}
}
private void forwardFullRequest(ChannelHandlerContext ctx) {
Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;
ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending);
assert fullRequestForwarded || pending.isEmpty();
if (fullRequestForwarded) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
}
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
}
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;
HttpObject messageToForward = pending.getFirst();
boolean fullRequestDropped = dropData(pending);
if (messageToForward instanceof HttpContent toReplace) {
// if the request to forward contained data (which got dropped), replace with empty data
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));
ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward);
assert fullRequestDropped || pending.isEmpty();
if (fullRequestDropped) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
}
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
// until validation is completed we can ignore read calls,
// once validation is finished HttpRequest will be fired and downstream can read from there
if (state != State.VALIDATING) {
ctx.read();
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
state = DROPPING_DATA_PERMANENTLY;
while (true) {
if (dropData(pending) == false) {
break;
}
}
super.channelInactive(ctx);
}
private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toForward;
while ((toForward = pending.poll()) != null) {
ctx.fireChannelRead(toForward);
ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
if (toForward instanceof LastHttpContent) {
return true;
}
}
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}
void validate(ChannelHandlerContext ctx, HttpRequest request) {
assert Transports.assertDefaultThreadContext(threadContext);
state = State.VALIDATING;
ActionListener.run(
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ActionListener.assertOnce(
new ContextPreservingActionListener<Void>(
threadContext.wrapRestorable(threadContext.newStoredContext()),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
handleValidationResult(ctx, request, null);
}
@Override
public void onFailure(Exception e) {
handleValidationResult(ctx, request, e);
}
}
)
),
listener -> {
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(request, ctx.channel(), listener);
private static boolean dropData(ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toDrop;
while ((toDrop = pending.poll()) != null) {
ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
if (toDrop instanceof LastHttpContent) {
return true;
}
}
);
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}
void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) {
assert Transports.assertDefaultThreadContext(threadContext);
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().execute(() -> {
if (validationError != null) {
request.setDecoderResult(DecoderResult.failure(validationError));
state = State.DROPPING;
} else {
state = State.PASSING;
}
ctx.fireChannelRead(request);
});
private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
if (pending.size() <= 4 && largeSize > 32) {
// Prevent the ArrayDeque from becoming forever large due to a single large message.
ArrayDeque<HttpObject> old = pending;
pending = new ArrayDeque<>(4);
pending.addAll(old);
}
}
private enum State {
PASSING,
VALIDATING,
DROPPING
enum State {
WAITING_TO_START,
QUEUEING_DATA,
FORWARDING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_PERMANENTLY
}
}

View file

@ -118,7 +118,6 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
activityTracker.startActivity();
boolean shouldRead = true;
try {
if (msg instanceof HttpRequest request) {
final Netty4HttpRequest netty4HttpRequest;
@ -138,26 +137,25 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
currentRequestStream = null;
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
var contentStream = new Netty4HttpRequestBodyStream(
ctx.channel(),
serverTransport.getThreadPool().getThreadContext(),
activityTracker
);
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
shouldRead = false;
}
}
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
assert currentRequestStream != null : "current stream must exists before handling http content";
shouldRead = false;
currentRequestStream.handleNettyContent((HttpContent) msg);
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
}
}
} finally {
if (shouldRead) {
ctx.channel().eventLoop().execute(ctx::read);
}
activityTracker.stopActivity();
}
}

View file

@ -9,11 +9,14 @@
package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.http.HttpBody;
@ -24,22 +27,34 @@ import java.util.List;
/**
* Netty based implementation of {@link HttpBody.Stream}.
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
* autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()}
*/
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final Channel channel;
private final ChannelFutureListener closeListener = future -> doClose();
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private final ThreadContext threadContext;
private final ChannelHandlerContext ctx;
private final ThreadWatchdog.ActivityTracker activityTracker;
private ByteBuf buf;
private boolean requested = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;
private ThreadContext.StoredContext requestContext;
private final ChannelFutureListener closeListener = future -> doClose();
public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) {
this.ctx = ctx;
// used in tests
private volatile int bufSize = 0;
private volatile boolean hasLast = false;
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) {
this.channel = channel;
this.threadContext = threadContext;
this.requestContext = threadContext.newStoredContext();
Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener);
this.activityTracker = activityTracker;
Netty4Utils.addListener(channel.closeFuture(), closeListener);
channel.config().setAutoRead(false);
}
@Override
@ -58,43 +73,94 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
tracingHandlers.add(chunkHandler);
}
private void read() {
ctx.channel().eventLoop().execute(ctx::read);
}
@Override
public void next() {
assert handler != null : "handler must be set before requesting next chunk";
requestContext = threadContext.newStoredContext();
read();
channel.eventLoop().submit(() -> {
activityTracker.startActivity();
requested = true;
try {
if (closing) {
return;
}
if (buf == null) {
channel.read();
} else {
send();
}
} catch (Throwable e) {
channel.pipeline().fireExceptionCaught(e);
} finally {
activityTracker.stopActivity();
}
});
}
public void handleNettyContent(HttpContent httpContent) {
assert hasLast == false : "receive http content on completed stream";
hasLast = httpContent instanceof LastHttpContent;
if (closing) {
httpContent.release();
read();
} else {
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
var isLast = httpContent instanceof LastHttpContent;
var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
for (var tracer : tracingHandlers) {
tracer.onNext(buf, isLast);
}
handler.onNext(buf, isLast);
if (isLast) {
read();
ctx.channel().closeFuture().removeListener(closeListener);
}
addChunk(httpContent.content());
if (requested) {
send();
}
}
}
// adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk
private void addChunk(ByteBuf chunk) {
assert chunk != null;
if (buf == null) {
buf = chunk;
} else if (buf instanceof CompositeByteBuf comp) {
comp.addComponent(true, chunk);
} else {
var comp = channel.alloc().compositeBuffer();
comp.addComponent(true, buf);
comp.addComponent(true, chunk);
buf = comp;
}
bufSize = buf.readableBytes();
}
// visible for test
int bufSize() {
return bufSize;
}
// visible for test
boolean hasLast() {
return hasLast;
}
private void send() {
assert requested;
assert handler != null : "must set handler before receiving next chunk";
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
requested = false;
buf = null;
bufSize = 0;
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
for (var tracer : tracingHandlers) {
tracer.onNext(bytesRef, hasLast);
}
handler.onNext(bytesRef, hasLast);
}
if (hasLast) {
channel.config().setAutoRead(true);
channel.closeFuture().removeListener(closeListener);
}
}
@Override
public void close() {
if (ctx.channel().eventLoop().inEventLoop()) {
if (channel.eventLoop().inEventLoop()) {
doClose();
} else {
ctx.channel().eventLoop().submit(this::doClose);
channel.eventLoop().submit(this::doClose);
}
}
@ -108,6 +174,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
handler.close();
}
}
read();
if (buf != null) {
buf.release();
buf = null;
bufSize = 0;
}
channel.config().setAutoRead(true);
}
}

View file

@ -29,7 +29,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.flow.FlowControlHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
@ -47,7 +46,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.http.AbstractHttpServerTransport;
@ -319,9 +317,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override
protected void initChannel(Channel ch) throws Exception {
// auto-read must be disabled all the time
ch.config().setAutoRead(false);
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
if (acceptChannelPredicate != null) {
@ -369,15 +364,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
// from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
ch.pipeline().addLast(new FlowControlHandler());
if (Assertions.ENABLED) {
// missing reads are hard to catch, but we can detect absence of reads within interval
long missingReadIntervalMs = 10_000;
ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
}
if (httpValidator != null) {
// runs a validation function on the first HTTP message piece which contains all the headers
// if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded
@ -435,9 +421,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker)
);
transport.serverAcceptedChannel(nettyHttpChannel);
// make very first read call, since auto-read is disabled; following reads must come from the handlers
ch.read();
}
@Override

View file

@ -12,7 +12,6 @@ package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
@ -41,7 +40,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
private static final int REPS = 1000;
private EmbeddedChannel channel;
private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes
private ReadSniffer readSniffer;
private static HttpContent httpContent(int size) {
return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size)));
@ -70,20 +68,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
super.setUp();
var decoder = new HttpRequestDecoder();
encoder = new EmbeddedChannel(new HttpRequestEncoder());
readSniffer = new ReadSniffer();
channel = new EmbeddedChannel();
channel.config().setAutoRead(false);
channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
}
public void testDecodingFailurePassThrough() {
for (var i = 0; i < REPS; i++) {
var sendReq = httpRequest();
sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad")));
channel.writeInbound(sendReq);
assertEquals(sendReq, channel.readInbound());
}
assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount);
channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
}
/**
@ -100,7 +85,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
assertFalse(HttpUtil.is100ContinueExpected(recvRequest));
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
assertEquals("must not read from channel", 0, readSniffer.readCount);
}
}
@ -115,7 +99,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
assertNotNull("request should pass", channel.readInbound());
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
assertEquals("must not read from channel", 0, readSniffer.readCount);
}
}
@ -138,7 +121,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
assertNotNull(recvContent);
assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes());
recvContent.release();
assertEquals("must not read from channel", 0, readSniffer.readCount);
}
}
@ -152,7 +134,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
channel.writeInbound(encode(sendRequest));
var resp = (FullHttpResponse) channel.readOutbound();
assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status());
assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount);
assertFalse(channel.isOpen());
resp.release();
}
@ -171,7 +152,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
assertNull("request should not pass", channel.readInbound());
assertTrue("should not close channel", channel.isOpen());
assertEquals("must read from channel", i + 1, readSniffer.readCount);
resp.release();
}
}
@ -180,13 +160,11 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
* Mixed load of oversized and normal requests with Exepct:100-Continue.
*/
public void testMixedContent() {
var expectReadCnt = 0;
for (int i = 0; i < REPS; i++) {
var isOversized = randomBoolean();
var sendRequest = httpRequest();
HttpUtil.set100ContinueExpected(sendRequest, true);
if (isOversized) {
expectReadCnt++;
HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH);
channel.writeInbound(encode(sendRequest));
var resp = (FullHttpResponse) channel.readOutbound();
@ -210,7 +188,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes());
recvContent.release();
}
assertEquals(expectReadCnt, readSniffer.readCount);
}
}
@ -228,7 +205,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
resp.release();
assertNull("request and content should not pass", channel.readInbound());
assertTrue("should not close channel", channel.isOpen());
assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount);
}
}
@ -258,7 +234,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
var resp = (FullHttpResponse) channel.readOutbound();
assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
assertFalse("should close channel", channel.isOpen());
assertEquals("expect read after response", 1, readSniffer.readCount);
resp.release();
}
}

View file

@ -19,7 +19,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.flow.FlowControlHandler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -53,8 +52,7 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel(new FlowControlHandler());
channel.config().setAutoRead(false);
channel = new EmbeddedChannel();
threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX);
}
@ -183,7 +181,6 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
threadPool.generic().submit(() -> {
DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.writeInbound(request1);
channel.read();
DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null;
if (content1 != null) {
channel.writeInbound(content1);
@ -199,11 +196,9 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
}
channel.runPendingTasks();
assertThat(channel.readInbound(), sameInstance(request1));
channel.read();
if (content1 != null && success) {
assertThat(channel.readInbound(), sameInstance(content1));
}
channel.read();
if (success) {
assertThat(channel.readInbound(), sameInstance(lastContent1));
}

View file

@ -9,158 +9,766 @@
package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.flow.FlowControlHandler;
import io.netty.util.AsciiString;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.netty4.internal.HttpValidator;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class Netty4HttpHeaderValidatorTests extends ESTestCase {
private final AtomicReference<Object> header = new AtomicReference<>();
private final AtomicReference<ActionListener<Void>> listener = new AtomicReference<>();
private EmbeddedChannel channel;
private BlockingQueue<ValidationRequest> validatorRequestQueue;
private Netty4HttpHeaderValidator netty4HttpHeaderValidator;
private final AtomicReference<RuntimeException> validationException = new AtomicReference<>();
@Override
public void setUp() throws Exception {
super.setUp();
validatorRequestQueue = new LinkedBlockingQueue<>();
channel = new EmbeddedChannel(
new Netty4HttpHeaderValidator(
(httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)),
new ThreadContext(Settings.EMPTY)
)
);
reset();
}
private void reset() {
channel = new EmbeddedChannel();
header.set(null);
listener.set(null);
validationException.set(null);
HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> {
header.set(httpRequest);
final var exception = validationException.get();
if (exception != null) {
throw exception;
}
listener.set(validationCompleteListener);
};
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
channel.pipeline().addLast(netty4HttpHeaderValidator);
}
public void testValidationPausesAndResumesData() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
channel.writeInbound(content);
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), sameInstance(request));
assertThat(channel.readInbound(), sameInstance(content));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(1));
// channel continues in resumed state after request finishes
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), sameInstance(lastContent));
assertThat(lastContent.refCnt(), equalTo(1));
// channel is again paused while validating next request
channel.writeInbound(request);
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
}
public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
channel.writeInbound(content);
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), sameInstance(request));
assertThat(channel.readInbound(), sameInstance(content));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(1));
channel.config().setAutoRead(false);
channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
assertFalse(channel.config().isAutoRead());
}
HttpRequest newHttpRequest() {
return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "");
}
public void testContentForwardedAfterValidation() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
HttpContent newHttpContent() {
return new DefaultHttpContent(Unpooled.buffer());
}
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.writeInbound(request);
LastHttpContent newLastHttpContent() {
return new DefaultLastHttpContent();
}
DefaultHttpContent content1 = null;
if (randomBoolean()) {
content1 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content1);
}
public void testValidatorReceiveHttpRequest() {
channel.writeInbound(newHttpRequest());
assertEquals(1, validatorRequestQueue.size());
assertNull(channel.readInbound());
}
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
public void testDecoderFailurePassThrough() {
for (var i = 0; i < 1000; i++) {
var httpRequest = newHttpRequest();
httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad")));
channel.writeInbound(httpRequest);
assertEquals(httpRequest, channel.readInbound());
// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();
// resumed channel after successful validation forwards data
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
// write more content to the channel after validation passed
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content2);
assertThat(channel.readInbound(), sameInstance(request));
DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content3);
if (content1 != null) {
assertThat(channel.readInbound(), sameInstance(content1));
assertThat(content1.refCnt(), equalTo(1));
}
assertThat(channel.readInbound(), sameInstance(content2));
assertThat(content2.refCnt(), equalTo(1));
DefaultHttpContent content4 = null;
if (randomBoolean()) {
content4 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content4);
}
assertThat(channel.readInbound(), sameInstance(content3));
assertThat(content3.refCnt(), equalTo(1));
if (content4 != null) {
assertThat(channel.readInbound(), sameInstance(content4));
assertThat(content4.refCnt(), equalTo(1));
}
// channel continues in resumed state after request finishes
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), sameInstance(lastContent));
assertThat(lastContent.refCnt(), equalTo(1));
if (randomBoolean()) {
channel.writeInbound(request);
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
}
}
/**
* Sends back-to-back http requests and randomly fail validation.
* Ensures that invalid requests drop content and valid pass through.
*/
public void testMixedValidationResults() {
for (var i = 0; i < 1000; i++) {
var shouldPassValidation = randomBoolean();
var request = newHttpRequest();
var content = newHttpContent();
var last = newLastHttpContent();
public void testContentDroppedAfterValidationFailure() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.writeInbound(request);
DefaultHttpContent content1 = null;
if (randomBoolean()) {
content1 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content1);
}
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// channel is resumed
listener.get().onFailure(new ElasticsearchException("Boom"));
channel.runPendingTasks();
// resumed channel after failed validation drops data
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
// write more content to the channel after validation passed
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content2);
assertThat(channel.readInbound(), sameInstance(request));
DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content3);
if (content1 != null) {
assertThat(channel.readInbound(), nullValue());
assertThat(content1.refCnt(), equalTo(0));
}
assertThat(channel.readInbound(), nullValue()); // content2
assertThat(content2.refCnt(), equalTo(0));
DefaultHttpContent content4 = null;
if (randomBoolean()) {
content4 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content4);
}
assertThat(channel.readInbound(), nullValue()); // content3
assertThat(content3.refCnt(), equalTo(0));
if (content4 != null) {
assertThat(channel.readInbound(), nullValue());
assertThat(content4.refCnt(), equalTo(0));
}
assertThat(channel.readInbound(), nullValue()); // extra read still returns "null"
// channel continues in resumed state after request finishes
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), nullValue()); // lastContent
assertThat(lastContent.refCnt(), equalTo(0));
if (randomBoolean()) {
channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
}
}
public void testValidationErrorForwardsAsDecoderErrorMessage() {
for (Exception exception : List.of(
new Exception("Failure"),
new ElasticsearchException("Failure"),
new ElasticsearchSecurityException("Failure")
)) {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
var validationRequest = validatorRequestQueue.poll();
assertNotNull(validationRequest);
if (shouldPassValidation) {
validationRequest.listener.onResponse(null);
} else {
validationRequest.listener.onFailure(new ValidationException());
}
channel.runPendingTasks();
var gotRequest = channel.readInbound();
assertEquals(
"should set decoder result failure for invalid request",
shouldPassValidation,
((HttpRequest) gotRequest).decoderResult().isSuccess()
);
assertEquals(request, gotRequest);
channel.writeInbound(content);
channel.writeInbound(last);
if (shouldPassValidation) {
assertEquals("should pass content for valid request", content, channel.readInbound());
content.release();
assertEquals(last, channel.readInbound());
last.release();
} else {
assertNull("should drop content for invalid request", channel.readInbound());
}
assertThat(header.get(), sameInstance(request));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
listener.get().onFailure(exception);
channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
DefaultHttpRequest failed = channel.readInbound();
assertThat(failed, sameInstance(request));
assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(failed.decoderResult().isFailure());
Exception cause = (Exception) failed.decoderResult().cause();
assertThat(cause, equalTo(exception));
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(0));
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), nullValue());
assertThat(lastContent.refCnt(), equalTo(0));
reset();
}
}
public void testIgnoreReadWhenValidating() {
channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages
public void testValidationExceptionForwardsAsDecoderErrorMessage() {
final var exception = new ElasticsearchException("Failure");
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
channel.writeInbound(newHttpRequest());
channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler
assertNull("nothing should pass yet", channel.readInbound());
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.read();
var validationRequest = validatorRequestQueue.poll();
assertNotNull(validationRequest);
validationException.set(exception);
channel.writeInbound(request);
channel.read();
assertNull("should ignore read while validating", channel.readInbound());
assertThat(header.get(), sameInstance(request));
assertThat(listener.get(), nullValue());
validationRequest.listener.onResponse(null);
channel.runPendingTasks();
assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest);
assertNull("content should not pass yet, need explicit read", channel.readInbound());
assertTrue(channel.config().isAutoRead());
DefaultHttpRequest failed = channel.readInbound();
assertThat(failed, sameInstance(request));
assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(failed.decoderResult().isFailure());
Exception cause = (Exception) failed.decoderResult().cause();
assertThat(cause, equalTo(exception));
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
channel.read();
asInstanceOf(LastHttpContent.class, channel.readInbound()).release();
final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content);
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(0));
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), nullValue());
assertThat(lastContent.refCnt(), equalTo(0));
}
public void testWithFlowControlAndAggregator() {
channel.pipeline().addFirst(new FlowControlHandler());
channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder()));
public void testValidationHandlesMultipleQueuedUpMessages() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
channel.writeInbound(newHttpRequest());
channel.writeInbound(newHttpContent());
channel.writeInbound(newLastHttpContent());
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(request1);
channel.writeInbound(content1);
channel.writeInbound(lastContent1);
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(request2);
channel.writeInbound(content2);
channel.writeInbound(lastContent2);
channel.read();
assertNull("should ignore read while validating", channel.readInbound());
assertThat(header.get(), sameInstance(request1));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
var validationRequest = validatorRequestQueue.poll();
assertNotNull(validationRequest);
validationRequest.listener.onResponse(null);
listener.get().onResponse(null);
channel.runPendingTasks();
assertThat(channel.readInbound(), sameInstance(request1));
assertThat(channel.readInbound(), sameInstance(content1));
assertThat(channel.readInbound(), sameInstance(lastContent1));
assertThat(content1.refCnt(), equalTo(1));
assertThat(lastContent1.refCnt(), equalTo(1));
asInstanceOf(FullHttpRequest.class, channel.readInbound()).release();
assertThat(header.get(), sameInstance(request2));
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
assertThat(channel.readInbound(), nullValue());
listener.get().onResponse(null);
channel.runPendingTasks();
assertThat(channel.readInbound(), sameInstance(request2));
assertThat(channel.readInbound(), sameInstance(content2));
assertThat(channel.readInbound(), sameInstance(lastContent2));
assertThat(content2.refCnt(), equalTo(1));
assertThat(lastContent2.refCnt(), equalTo(1));
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), nullValue());
}
record ValidationRequest(HttpRequest request, Channel channel, ActionListener<Void> listener) {}
public void testValidationFailureRecoversForEnqueued() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
// write 2 requests before validation for the first one fails
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(request1);
channel.writeInbound(content1);
channel.writeInbound(lastContent1);
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(request2);
channel.writeInbound(content2);
boolean finishSecondRequest = randomBoolean();
if (finishSecondRequest) {
channel.writeInbound(lastContent2);
}
// channel is paused and both requests are queued
assertThat(header.get(), sameInstance(request1));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
assertThat(content1.refCnt(), equalTo(2));
assertThat(lastContent1.refCnt(), equalTo(2));
assertThat(content2.refCnt(), equalTo(2));
if (finishSecondRequest) {
assertThat(lastContent2.refCnt(), equalTo(2));
}
// validation for the 1st request FAILS
Exception exception = new ElasticsearchException("Boom");
listener.get().onFailure(exception);
channel.runPendingTasks();
// request1 becomes a decoder exception and its content is dropped
assertThat(channel.readInbound(), sameInstance(request1));
assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(request1.decoderResult().isFailure());
Exception cause = (Exception) request1.decoderResult().cause();
assertThat(cause, equalTo(exception));
assertThat(content1.refCnt(), equalTo(0)); // content is dropped
assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
assertThat(channel.readInbound(), nullValue());
// channel pauses for the validation of the 2nd request
assertThat(header.get(), sameInstance(request2));
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
assertThat(channel.readInbound(), nullValue());
// validation for the 2nd request SUCCEEDS
listener.get().onResponse(null);
channel.runPendingTasks();
// 2nd request is forwarded correctly
assertThat(channel.readInbound(), sameInstance(request2));
assertThat(channel.readInbound(), sameInstance(content2));
assertThat(content2.refCnt(), equalTo(1));
if (finishSecondRequest == false) {
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertTrue(channel.config().isAutoRead());
assertThat(channel.readInbound(), nullValue());
// while in forwarding state the request can continue
if (randomBoolean()) {
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content);
assertThat(channel.readInbound(), sameInstance(content));
assertThat(content.refCnt(), equalTo(1));
}
channel.writeInbound(lastContent2);
}
assertThat(channel.readInbound(), sameInstance(lastContent2));
assertThat(lastContent2.refCnt(), equalTo(1));
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertTrue(channel.config().isAutoRead());
}
public void testValidationFailureRecoversForInbound() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
// write a single request, but don't finish it yet, for which the validation fails
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request1);
channel.writeInbound(content1);
// channel is paused and the request is queued
assertThat(header.get(), sameInstance(request1));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
assertThat(content1.refCnt(), equalTo(2));
// validation for the 1st request FAILS
Exception exception = new ElasticsearchException("Boom");
listener.get().onFailure(exception);
channel.runPendingTasks();
// request1 becomes a decoder exception and its content is dropped
assertThat(channel.readInbound(), sameInstance(request1));
assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(request1.decoderResult().isFailure());
Exception cause = (Exception) request1.decoderResult().cause();
assertThat(cause, equalTo(exception));
assertThat(content1.refCnt(), equalTo(0)); // content is dropped
assertThat(channel.readInbound(), nullValue());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
if (randomBoolean()) {
channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
}
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent1);
if (randomBoolean()) {
assertThat(channel.readInbound(), nullValue());
}
assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
// write 2nd request after the 1st one failed validation
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(request2);
channel.writeInbound(content2);
boolean finishSecondRequest = randomBoolean();
if (finishSecondRequest) {
channel.writeInbound(lastContent2);
}
// channel pauses for the validation of the 2nd request
assertThat(header.get(), sameInstance(request2));
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
assertThat(channel.readInbound(), nullValue());
// validation for the 2nd request SUCCEEDS
listener.get().onResponse(null);
channel.runPendingTasks();
// 2nd request is forwarded correctly
assertThat(channel.readInbound(), sameInstance(request2));
assertThat(channel.readInbound(), sameInstance(content2));
assertThat(content2.refCnt(), equalTo(1));
if (finishSecondRequest == false) {
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertTrue(channel.config().isAutoRead());
assertThat(channel.readInbound(), nullValue());
// while in forwarding state the request can continue
if (randomBoolean()) {
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content);
assertThat(channel.readInbound(), sameInstance(content));
assertThat(content.refCnt(), equalTo(1));
}
channel.writeInbound(lastContent2);
}
assertThat(channel.readInbound(), sameInstance(lastContent2));
assertThat(lastContent2.refCnt(), equalTo(1));
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertTrue(channel.config().isAutoRead());
}
public void testValidationSuccessForLargeMessage() {
assertTrue(channel.config().isAutoRead());
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.writeInbound(request);
int messageLength = randomIntBetween(32, 128);
for (int i = 0; i < messageLength; ++i) {
channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
}
channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4)));
boolean followupRequest = randomBoolean();
if (followupRequest) {
channel.writeInbound(request);
}
assertThat(header.get(), sameInstance(request));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
listener.get().onResponse(null);
channel.runPendingTasks();
if (followupRequest) {
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
} else {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
}
assertThat(channel.readInbound(), sameInstance(request));
for (int i = 0; i < messageLength; ++i) {
Object content = channel.readInbound();
assertThat(content, instanceOf(DefaultHttpContent.class));
assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1));
}
assertThat(channel.readInbound(), instanceOf(LastHttpContent.class));
assertThat(channel.readInbound(), nullValue());
}
public void testValidationFailureForLargeMessage() {
assertTrue(channel.config().isAutoRead());
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
channel.writeInbound(request);
int messageLength = randomIntBetween(32, 128);
DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength];
for (int i = 0; i < messageLength; ++i) {
messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(messageContents[i]);
}
DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastHttpContent);
boolean followupRequest = randomBoolean();
if (followupRequest) {
channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
}
assertThat(header.get(), sameInstance(request));
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
Exception exception = new ElasticsearchException("Boom");
listener.get().onFailure(exception);
channel.runPendingTasks();
if (followupRequest) {
assertFalse(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
} else {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
}
assertThat(channel.readInbound(), sameInstance(request));
assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(request.decoderResult().isFailure());
Exception cause = (Exception) request.decoderResult().cause();
assertThat(cause, equalTo(exception));
for (int i = 0; i < messageLength; ++i) {
assertThat(channel.readInbound(), nullValue());
assertThat(messageContents[i].refCnt(), equalTo(0));
}
assertThat(channel.readInbound(), nullValue());
assertThat(lastHttpContent.refCnt(), equalTo(0));
assertThat(channel.readInbound(), nullValue());
}
public void testFullRequestValidationFailure() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
ByteBuf buf = channel.alloc().buffer();
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
channel.writeInbound(request);
// request got through to validation
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// validation fails
Exception exception = new ElasticsearchException("Boom");
listener.get().onFailure(exception);
channel.runPendingTasks();
// channel is resumed and waiting for next request
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
DefaultFullHttpRequest throughRequest = channel.readInbound();
// "through request" contains a decoder exception
assertThat(throughRequest, not(sameInstance(request)));
assertTrue(throughRequest.decoderResult().isFailure());
// the content is cleared when validation fails
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is(""));
assertThat(buf.refCnt(), is(0));
Exception cause = (Exception) throughRequest.decoderResult().cause();
assertThat(cause, equalTo(exception));
}
public void testFullRequestValidationSuccess() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
ByteBuf buf = channel.alloc().buffer();
try {
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
channel.writeInbound(request);
// request got through to validation
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// validation succeeds
listener.get().onResponse(null);
channel.runPendingTasks();
// channel is resumed and waiting for next request
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
DefaultFullHttpRequest throughRequest = channel.readInbound();
// request goes through unaltered
assertThat(throughRequest, sameInstance(request));
assertFalse(throughRequest.decoderResult().isFailure());
// the content is unaltered
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
assertThat(buf.refCnt(), is(1));
assertThat(throughRequest.decoderResult().cause(), nullValue());
} finally {
buf.release();
}
}
public void testFullRequestWithDecoderException() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
ByteBuf buf = channel.alloc().buffer();
try {
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
// a request with a decoder error prior to validation
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
Exception cause = new ElasticsearchException("Boom");
request.setDecoderResult(DecoderResult.failure(cause));
channel.writeInbound(request);
// request goes through without invoking the validator
assertThat(header.get(), nullValue());
assertThat(listener.get(), nullValue());
// channel is NOT paused
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
DefaultFullHttpRequest throughRequest = channel.readInbound();
// request goes through unaltered
assertThat(throughRequest, sameInstance(request));
assertTrue(throughRequest.decoderResult().isFailure());
assertThat(throughRequest.decoderResult().cause(), equalTo(cause));
// the content is unaltered
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
assertThat(buf.refCnt(), is(1));
} finally {
buf.release();
}
}
}

View file

@ -21,6 +21,7 @@ import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.flow.FlowControlHandler;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpBody;
@ -42,24 +43,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
private EmbeddedChannel channel;
private ReadSniffer readSniffer;
private Netty4HttpRequestBodyStream stream;
private ThreadWatchdog.ActivityTracker activityTracker;
@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel();
readSniffer = new ReadSniffer();
channel.pipeline().addLast(new FlowControlHandler(), readSniffer);
channel.config().setAutoRead(false);
activityTracker = new ThreadWatchdog.ActivityTracker();
stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker);
stream.setHandler(discardHandler); // set default handler, each test might override one
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
stream.setHandler(discardHandler); // set default handler, each test might override one
super.handlerAdded(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
stream.handleNettyContent(msg);
@ -73,8 +67,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
stream.close();
}
// ensures all chunks are passed to downstream
public void testPassAllChunks() {
// ensures that no chunks are sent downstream without request
public void testEnqueueChunksBeforeRequest() {
var totalChunks = randomIntBetween(1, 100);
for (int i = 0; i < totalChunks; i++) {
channel.writeInbound(randomContent(1024));
}
assertEquals(totalChunks * 1024, stream.bufSize());
}
// ensures all received chunks can be flushed downstream
public void testFlushAllReceivedChunks() {
var chunks = new ArrayList<ReleasableBytesReference>();
var totalBytes = new AtomicInteger();
stream.setHandler((chunk, isLast) -> {
@ -82,35 +85,52 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
totalBytes.addAndGet(chunk.length());
chunk.close();
});
var chunkSize = 1024;
var totalChunks = randomIntBetween(1, 100);
for (int i = 0; i < totalChunks; i++) {
channel.writeInbound(randomContent(chunkSize));
stream.next();
channel.runPendingTasks();
}
assertEquals(totalChunks, chunks.size());
stream.next();
channel.runPendingTasks();
assertEquals("should receive all chunks as single composite", 1, chunks.size());
assertEquals(chunkSize * totalChunks, totalBytes.get());
}
// ensures that we read from channel after last chunk
public void testChannelReadAfterLastContent() {
// ensures that channel.setAutoRead(true) only when we flush last chunk
public void testSetAutoReadOnLastFlush() {
channel.writeInbound(randomLastContent(10));
assertFalse("should not auto-read on last content reception", channel.config().isAutoRead());
stream.next();
channel.runPendingTasks();
assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount);
assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead());
}
// ensures when stream is closing we read and discard chunks
public void testReadAndReleaseOnClosing() {
var unexpectedChunk = new AtomicBoolean();
stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true));
stream.close();
channel.writeInbound(randomContent(1024));
channel.writeInbound(randomLastContent(0));
assertFalse("chunk should be discarded", unexpectedChunk.get());
assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount);
// ensures that we read from channel when no current chunks available
// and pass next chunk downstream without holding
public void testReadFromChannel() {
var gotChunks = new ArrayList<ReleasableBytesReference>();
var gotLast = new AtomicBoolean(false);
stream.setHandler((chunk, isLast) -> {
gotChunks.add(chunk);
gotLast.set(isLast);
chunk.close();
});
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
var chunkSize = 1024;
var totalChunks = randomIntBetween(1, 32);
for (int i = 0; i < totalChunks - 1; i++) {
channel.writeInbound(randomContent(chunkSize));
}
channel.writeInbound(randomLastContent(chunkSize));
for (int i = 0; i < totalChunks; i++) {
assertEquals("should not enqueue chunks", 0, stream.bufSize());
stream.next();
channel.runPendingTasks();
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
}
assertTrue("should receive last content", gotLast.get());
}
public void testReadFromHasCorrectThreadContext() throws InterruptedException {
@ -122,15 +142,9 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
try {
// activity tracker requires stream execution in the same thread, setting up stream inside event-loop
eventLoop.submit(() -> {
channel = new EmbeddedChannel(new FlowControlHandler());
channel.config().setAutoRead(false);
channel = new EmbeddedChannel();
stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker());
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
super.handlerAdded(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
stream.handleNettyContent(msg);
@ -184,6 +198,18 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
}
}
public void testStreamNextActivityTracker() {
var t0 = activityTracker.get();
var N = between(1, 10);
for (int i = 0; i < N; i++) {
channel.writeInbound(randomContent(1024));
stream.next();
channel.runPendingTasks();
}
var t1 = activityTracker.get();
assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L);
}
// ensure that we catch all exceptions and throw them into channel pipeline
public void testCatchExceptions() {
var gotExceptions = new CountDownLatch(3); // number of tests below

View file

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
/**
* Sniffs channel reads, helps detect missing or unexpected ones.
* <pre>
* {@code
* chan = new EmbeddedChannel();
* chan.config().setAutoRead(false);
* readSniffer = new ReadSniffer();
* chan.pipeline().addLast(readSniffer, ...otherHandlers);
* ...
* // run test
* ...
* assertEquals("unexpected read", 0, readSniffer.readCnt)
* // or
* assertEquals("exact number of reads", 2, readSniffer.readCnt)
* }
* </pre>
*
*/
public class ReadSniffer extends ChannelOutboundHandlerAdapter {
int readCount;
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
readCount++;
super.read(ctx);
}
}

View file

@ -251,7 +251,7 @@ public class SecurityNetty4HttpServerTransportCloseNotifyTests extends AbstractH
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
server.netty.stop();
server.threadPool.shutdownNow();
safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
safeAwait(client.netty.config().group().shutdownGracefully());
}
}