diff --git a/docs/changelog/127259.yaml b/docs/changelog/127259.yaml deleted file mode 100644 index e90ad926d7e9..000000000000 --- a/docs/changelog/127259.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 127259 -summary: Replace auto-read with proper flow-control in HTTP pipeline -area: Network -type: enhancement -issues: [] diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 3a788ba24879..3072178eaed4 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -205,6 +205,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); @@ -214,6 +215,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { assertEquals(requestTransmittedLength, handler.readUntilClose()); assertTrue(handler.isClosed()); + assertEquals(0, handler.stream.bufSize()); } } @@ -230,6 +232,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); // terminate connection on server and wait resources are released @@ -238,6 +241,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { handler.channel.request().getHttpChannel().close(); assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class)); assertTrue(handler.isClosed()); + assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } @@ -253,6 +257,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { // await stream handler is ready and request full content var handler = clientContext.awaitRestChannelAccepted(opaqueId); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.isClosed()); // terminate connection on server and wait resources are released @@ -264,6 +269,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture)); assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage()); safeAwait(handler.closedLatch); + assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } @@ -304,7 +310,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { }); handler.readBytes(partSize); } - assertTrue(handler.receivedLastChunk); + assertTrue(handler.stream.hasLast()); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java deleted file mode 100644 index f456bba8064b..000000000000 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.http.netty4; - -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.concurrent.ScheduledFuture; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.time.TimeProvider; -import org.elasticsearch.common.util.concurrent.FutureUtils; - -import java.util.concurrent.TimeUnit; - -/** - * When channel auto-read is disabled handlers are responsible to read from channel. - * But it's hard to detect when read is missing. This helper class print warnings - * when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough - * to avoid test hang for too long, but can be increased if needed. - */ -class MissingReadDetector extends ChannelDuplexHandler { - - private static final Logger logger = LogManager.getLogger(MissingReadDetector.class); - - private final long interval; - private final TimeProvider timer; - private boolean pendingRead; - private long lastRead; - private ScheduledFuture checker; - - MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) { - this.interval = missingReadIntervalMillis; - this.timer = timer; - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { - if (pendingRead == false) { - long now = timer.absoluteTimeInMillis(); - if (now >= lastRead + interval) { - logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead)); - } - } - }, interval, interval, TimeUnit.MILLISECONDS); - super.handlerAdded(ctx); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (checker != null) { - FutureUtils.cancel(checker); - } - super.handlerRemoved(ctx); - } - - @Override - public void read(ChannelHandlerContext ctx) throws Exception { - pendingRead = true; - ctx.read(); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled"; - pendingRead = false; - lastRead = timer.absoluteTimeInMillis(); - ctx.fireChannelRead(msg); - } -} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 479d053937e1..0294b4626496 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -15,7 +15,6 @@ import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.LastHttpContent; import org.elasticsearch.http.HttpPreRequest; import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; @@ -49,9 +48,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator { } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); - if (msg instanceof LastHttpContent == false) { - ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf - } } else { streamContentSizeHandler.channelRead(ctx, msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java index e121d091b1bd..fee9d227d831 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java @@ -123,7 +123,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter { isContinueExpected = true; } else { ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); - ctx.read(); return; } } @@ -137,7 +136,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter { decoder.reset(); } ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - ctx.read(); } else { ignoreContent = false; currentContentLength = 0; @@ -152,13 +150,11 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter { private void handleContent(ChannelHandlerContext ctx, HttpContent msg) { if (ignoreContent) { msg.release(); - ctx.read(); } else { currentContentLength += msg.content().readableBytes(); if (currentContentLength > maxContentLength) { msg.release(); ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); - ctx.read(); } else { ctx.fireChannelRead(msg); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index 668780fc9066..95a68cb52bbd 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -9,113 +9,249 @@ package org.elasticsearch.http.netty4; -import io.netty.channel.ChannelDuplexHandler; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.ReferenceCountUtil; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Nullable; import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.transport.Transports; -public class Netty4HttpHeaderValidator extends ChannelDuplexHandler { +import java.util.ArrayDeque; + +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; + +public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter { private final HttpValidator validator; private final ThreadContext threadContext; - private State state; + private ArrayDeque pending = new ArrayDeque<>(4); + private State state = WAITING_TO_START; public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) { this.validator = validator; this.threadContext = threadContext; } + State getState() { + return state; + } + + @SuppressWarnings("fallthrough") @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { assert msg instanceof HttpObject; - var httpObject = (HttpObject) msg; - if (httpObject.decoderResult().isFailure()) { - ctx.fireChannelRead(httpObject); // pass-through for decoding failures - } else { - if (msg instanceof HttpRequest request) { - validate(ctx, request); - } else { - assert msg instanceof HttpContent; - var content = (HttpContent) msg; - if (state == State.DROPPING) { - content.release(); - ctx.read(); - } else { - assert state == State.PASSING : "unexpected content before validation completed"; - ctx.fireChannelRead(content); + final HttpObject httpObject = (HttpObject) msg; + + switch (state) { + case WAITING_TO_START: + assert pending.isEmpty(); + pending.add(ReferenceCountUtil.retain(httpObject)); + requestStart(ctx); + assert state == QUEUEING_DATA; + assert ctx.channel().config().isAutoRead() == false; + break; + case QUEUEING_DATA: + pending.add(ReferenceCountUtil.retain(httpObject)); + break; + case FORWARDING_DATA_UNTIL_NEXT_REQUEST: + assert pending.isEmpty(); + if (httpObject instanceof LastHttpContent) { + state = WAITING_TO_START; } - } + ctx.fireChannelRead(httpObject); + break; + case DROPPING_DATA_UNTIL_NEXT_REQUEST: + assert pending.isEmpty(); + if (httpObject instanceof LastHttpContent) { + state = WAITING_TO_START; + } + ReferenceCountUtil.release(httpObject); + break; + case DROPPING_DATA_PERMANENTLY: + assert pending.isEmpty(); + ReferenceCountUtil.release(httpObject); // consume without enqueuing + ctx.channel().config().setAutoRead(false); + break; } } + private void requestStart(ChannelHandlerContext ctx) { + assert state == WAITING_TO_START; + + if (pending.isEmpty()) { + return; + } + + final HttpObject httpObject = pending.getFirst(); + final HttpRequest httpRequest; + if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) { + // a properly decoded HTTP start message is expected to begin validation + // anything else is probably an error that the downstream HTTP message aggregator will have to handle + httpRequest = (HttpRequest) httpObject; + } else { + httpRequest = null; + } + + state = QUEUEING_DATA; + ctx.channel().config().setAutoRead(false); + + if (httpRequest == null) { + // this looks like a malformed request and will forward without validation + ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); + } else { + assert Transports.assertDefaultThreadContext(threadContext); + ActionListener.run( + // this prevents thread-context changes to propagate to the validation listener + // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, + // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor + ActionListener.assertOnce( + new ContextPreservingActionListener( + threadContext.wrapRestorable(threadContext.newStoredContext()), + // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + assert Transports.assertDefaultThreadContext(threadContext); + ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); + } + + @Override + public void onFailure(Exception e) { + assert Transports.assertDefaultThreadContext(threadContext); + ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e)); + } + } + ) + ), + listener -> { + // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused + try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { + validator.validate(httpRequest, ctx.channel(), listener); + } + } + ); + } + } + + private void forwardFullRequest(ChannelHandlerContext ctx) { + Transports.assertDefaultThreadContext(threadContext); + assert ctx.channel().eventLoop().inEventLoop(); + assert ctx.channel().config().isAutoRead() == false; + assert state == QUEUEING_DATA; + + ctx.channel().config().setAutoRead(true); + boolean fullRequestForwarded = forwardData(ctx, pending); + + assert fullRequestForwarded || pending.isEmpty(); + if (fullRequestForwarded) { + state = WAITING_TO_START; + requestStart(ctx); + } else { + state = FORWARDING_DATA_UNTIL_NEXT_REQUEST; + } + + assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST; + } + + private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) { + Transports.assertDefaultThreadContext(threadContext); + assert ctx.channel().eventLoop().inEventLoop(); + assert ctx.channel().config().isAutoRead() == false; + assert state == QUEUEING_DATA; + + HttpObject messageToForward = pending.getFirst(); + boolean fullRequestDropped = dropData(pending); + if (messageToForward instanceof HttpContent toReplace) { + // if the request to forward contained data (which got dropped), replace with empty data + messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER); + } + messageToForward.setDecoderResult(DecoderResult.failure(e)); + + ctx.channel().config().setAutoRead(true); + ctx.fireChannelRead(messageToForward); + + assert fullRequestDropped || pending.isEmpty(); + if (fullRequestDropped) { + state = WAITING_TO_START; + requestStart(ctx); + } else { + state = DROPPING_DATA_UNTIL_NEXT_REQUEST; + } + + assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST; + } + @Override - public void read(ChannelHandlerContext ctx) throws Exception { - // until validation is completed we can ignore read calls, - // once validation is finished HttpRequest will be fired and downstream can read from there - if (state != State.VALIDATING) { - ctx.read(); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + state = DROPPING_DATA_PERMANENTLY; + while (true) { + if (dropData(pending) == false) { + break; + } + } + super.channelInactive(ctx); + } + + private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque pending) { + final int pendingMessages = pending.size(); + try { + HttpObject toForward; + while ((toForward = pending.poll()) != null) { + ctx.fireChannelRead(toForward); + ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued + if (toForward instanceof LastHttpContent) { + return true; + } + } + return false; + } finally { + maybeResizePendingDown(pendingMessages, pending); } } - void validate(ChannelHandlerContext ctx, HttpRequest request) { - assert Transports.assertDefaultThreadContext(threadContext); - state = State.VALIDATING; - ActionListener.run( - // this prevents thread-context changes to propagate to the validation listener - // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, - // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor - ActionListener.assertOnce( - new ContextPreservingActionListener( - threadContext.wrapRestorable(threadContext.newStoredContext()), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - handleValidationResult(ctx, request, null); - } - - @Override - public void onFailure(Exception e) { - handleValidationResult(ctx, request, e); - } - } - ) - ), - listener -> { - // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused - try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { - validator.validate(request, ctx.channel(), listener); + private static boolean dropData(ArrayDeque pending) { + final int pendingMessages = pending.size(); + try { + HttpObject toDrop; + while ((toDrop = pending.poll()) != null) { + ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming + if (toDrop instanceof LastHttpContent) { + return true; } } - ); + return false; + } finally { + maybeResizePendingDown(pendingMessages, pending); + } } - void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) { - assert Transports.assertDefaultThreadContext(threadContext); - // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop - ctx.channel().eventLoop().execute(() -> { - if (validationError != null) { - request.setDecoderResult(DecoderResult.failure(validationError)); - state = State.DROPPING; - } else { - state = State.PASSING; - } - ctx.fireChannelRead(request); - }); + private static void maybeResizePendingDown(int largeSize, ArrayDeque pending) { + if (pending.size() <= 4 && largeSize > 32) { + // Prevent the ArrayDeque from becoming forever large due to a single large message. + ArrayDeque old = pending; + pending = new ArrayDeque<>(4); + pending.addAll(old); + } } - private enum State { - PASSING, - VALIDATING, - DROPPING + enum State { + WAITING_TO_START, + QUEUEING_DATA, + FORWARDING_DATA_UNTIL_NEXT_REQUEST, + DROPPING_DATA_UNTIL_NEXT_REQUEST, + DROPPING_DATA_PERMANENTLY } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 227b925b7927..4809f1a1a275 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -118,7 +118,6 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { activityTracker.startActivity(); - boolean shouldRead = true; try { if (msg instanceof HttpRequest request) { final Netty4HttpRequest netty4HttpRequest; @@ -138,26 +137,25 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); currentRequestStream = null; } else { - var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext()); + var contentStream = new Netty4HttpRequestBodyStream( + ctx.channel(), + serverTransport.getThreadPool().getThreadContext(), + activityTracker + ); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); - shouldRead = false; } } handlePipelinedRequest(ctx, netty4HttpRequest); } else { assert msg instanceof HttpContent : "expect HttpContent got " + msg; assert currentRequestStream != null : "current stream must exists before handling http content"; - shouldRead = false; currentRequestStream.handleNettyContent((HttpContent) msg); if (msg instanceof LastHttpContent) { currentRequestStream = null; } } } finally { - if (shouldRead) { - ctx.channel().eventLoop().execute(ctx::read); - } activityTracker.stopActivity(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index b4396569ae35..88b4518c8de8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -9,11 +9,14 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; @@ -24,22 +27,34 @@ import java.util.List; /** * Netty based implementation of {@link HttpBody.Stream}. + * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} + * to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite + * autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()} */ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { + private final Channel channel; + private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; - private final ChannelHandlerContext ctx; + private final ThreadWatchdog.ActivityTracker activityTracker; + private ByteBuf buf; + private boolean requested = false; private boolean closing = false; private HttpBody.ChunkHandler handler; private ThreadContext.StoredContext requestContext; - private final ChannelFutureListener closeListener = future -> doClose(); - public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) { - this.ctx = ctx; + // used in tests + private volatile int bufSize = 0; + private volatile boolean hasLast = false; + + public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) { + this.channel = channel; this.threadContext = threadContext; this.requestContext = threadContext.newStoredContext(); - Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener); + this.activityTracker = activityTracker; + Netty4Utils.addListener(channel.closeFuture(), closeListener); + channel.config().setAutoRead(false); } @Override @@ -58,43 +73,94 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { tracingHandlers.add(chunkHandler); } - private void read() { - ctx.channel().eventLoop().execute(ctx::read); - } - @Override public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); - read(); + channel.eventLoop().submit(() -> { + activityTracker.startActivity(); + requested = true; + try { + if (closing) { + return; + } + if (buf == null) { + channel.read(); + } else { + send(); + } + } catch (Throwable e) { + channel.pipeline().fireExceptionCaught(e); + } finally { + activityTracker.stopActivity(); + } + }); } public void handleNettyContent(HttpContent httpContent) { + assert hasLast == false : "receive http content on completed stream"; + hasLast = httpContent instanceof LastHttpContent; if (closing) { httpContent.release(); - read(); } else { - try (var ignored = threadContext.restoreExistingContext(requestContext)) { - var isLast = httpContent instanceof LastHttpContent; - var buf = Netty4Utils.toReleasableBytesReference(httpContent.content()); - for (var tracer : tracingHandlers) { - tracer.onNext(buf, isLast); - } - handler.onNext(buf, isLast); - if (isLast) { - read(); - ctx.channel().closeFuture().removeListener(closeListener); - } + addChunk(httpContent.content()); + if (requested) { + send(); } } } + // adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk + private void addChunk(ByteBuf chunk) { + assert chunk != null; + if (buf == null) { + buf = chunk; + } else if (buf instanceof CompositeByteBuf comp) { + comp.addComponent(true, chunk); + } else { + var comp = channel.alloc().compositeBuffer(); + comp.addComponent(true, buf); + comp.addComponent(true, chunk); + buf = comp; + } + bufSize = buf.readableBytes(); + } + + // visible for test + int bufSize() { + return bufSize; + } + + // visible for test + boolean hasLast() { + return hasLast; + } + + private void send() { + assert requested; + assert handler != null : "must set handler before receiving next chunk"; + var bytesRef = Netty4Utils.toReleasableBytesReference(buf); + requested = false; + buf = null; + bufSize = 0; + try (var ignored = threadContext.restoreExistingContext(requestContext)) { + for (var tracer : tracingHandlers) { + tracer.onNext(bytesRef, hasLast); + } + handler.onNext(bytesRef, hasLast); + } + if (hasLast) { + channel.config().setAutoRead(true); + channel.closeFuture().removeListener(closeListener); + } + } + @Override public void close() { - if (ctx.channel().eventLoop().inEventLoop()) { + if (channel.eventLoop().inEventLoop()) { doClose(); } else { - ctx.channel().eventLoop().submit(this::doClose); + channel.eventLoop().submit(this::doClose); } } @@ -108,6 +174,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { handler.close(); } } - read(); + if (buf != null) { + buf.release(); + buf = null; + bufSize = 0; + } + channel.config().setAutoRead(true); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c8f2d75d18a6..9ffa4b479be1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -29,7 +29,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.flow.FlowControlHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; @@ -47,7 +46,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.http.AbstractHttpServerTransport; @@ -319,9 +317,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { @Override protected void initChannel(Channel ch) throws Exception { - // auto-read must be disabled all the time - ch.config().setAutoRead(false); - Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); if (acceptChannelPredicate != null) { @@ -369,15 +364,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces - - // from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part - ch.pipeline().addLast(new FlowControlHandler()); - if (Assertions.ENABLED) { - // missing reads are hard to catch, but we can detect absence of reads within interval - long missingReadIntervalMs = 10_000; - ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs)); - } - if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded @@ -429,19 +415,12 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { if (ResourceLeakDetector.isEnabled()) { ch.pipeline().addLast(new Netty4LeakDetectionHandler()); } - // See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above - // can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is - // resolved we must add another flow controller here: - ch.pipeline().addLast(new FlowControlHandler()); ch.pipeline() .addLast( "pipelining", new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker) ); transport.serverAcceptedChannel(nettyHttpChannel); - - // make very first read call, since auto-read is disabled; following reads must come from the handlers - ch.read(); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java index edbaa3b62d18..36399c8d6d7a 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java @@ -12,7 +12,6 @@ package org.elasticsearch.http.netty4; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; @@ -41,7 +40,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { private static final int REPS = 1000; private EmbeddedChannel channel; private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes - private ReadSniffer readSniffer; private static HttpContent httpContent(int size) { return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size))); @@ -70,20 +68,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { super.setUp(); var decoder = new HttpRequestDecoder(); encoder = new EmbeddedChannel(new HttpRequestEncoder()); - readSniffer = new ReadSniffer(); - channel = new EmbeddedChannel(); - channel.config().setAutoRead(false); - channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); - } - - public void testDecodingFailurePassThrough() { - for (var i = 0; i < REPS; i++) { - var sendReq = httpRequest(); - sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad"))); - channel.writeInbound(sendReq); - assertEquals(sendReq, channel.readInbound()); - } - assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount); + channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH)); } /** @@ -100,7 +85,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { assertFalse(HttpUtil.is100ContinueExpected(recvRequest)); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); - assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -115,7 +99,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { assertNotNull("request should pass", channel.readInbound()); channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT)); assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound()); - assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -138,7 +121,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { assertNotNull(recvContent); assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes()); recvContent.release(); - assertEquals("must not read from channel", 0, readSniffer.readCount); } } @@ -152,7 +134,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status()); - assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount); assertFalse(channel.isOpen()); resp.release(); } @@ -171,7 +152,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertNull("request should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); - assertEquals("must read from channel", i + 1, readSniffer.readCount); resp.release(); } } @@ -180,13 +160,11 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { * Mixed load of oversized and normal requests with Exepct:100-Continue. */ public void testMixedContent() { - var expectReadCnt = 0; for (int i = 0; i < REPS; i++) { var isOversized = randomBoolean(); var sendRequest = httpRequest(); HttpUtil.set100ContinueExpected(sendRequest, true); if (isOversized) { - expectReadCnt++; HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH); channel.writeInbound(encode(sendRequest)); var resp = (FullHttpResponse) channel.readOutbound(); @@ -210,7 +188,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes()); recvContent.release(); } - assertEquals(expectReadCnt, readSniffer.readCount); } } @@ -228,7 +205,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { resp.release(); assertNull("request and content should not pass", channel.readInbound()); assertTrue("should not close channel", channel.isOpen()); - assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount); } } @@ -258,7 +234,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase { var resp = (FullHttpResponse) channel.readOutbound(); assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); assertFalse("should close channel", channel.isOpen()); - assertEquals("expect read after response", 1, readSniffer.readCount); resp.release(); } + } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java index 957b5fd066ff..9a12ba75d774 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java @@ -19,7 +19,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -53,8 +52,7 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - channel = new EmbeddedChannel(new FlowControlHandler()); - channel.config().setAutoRead(false); + channel = new EmbeddedChannel(); threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX); } @@ -183,7 +181,6 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase { threadPool.generic().submit(() -> { DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); channel.writeInbound(request1); - channel.read(); DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null; if (content1 != null) { channel.writeInbound(content1); @@ -199,11 +196,9 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase { } channel.runPendingTasks(); assertThat(channel.readInbound(), sameInstance(request1)); - channel.read(); if (content1 != null && success) { assertThat(channel.readInbound(), sameInstance(content1)); } - channel.read(); if (success) { assertThat(channel.readInbound(), sameInstance(lastContent1)); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index d29894a149a4..1c0b434105f2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -9,158 +9,766 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.flow.FlowControlHandler; +import io.netty.util.AsciiString; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.http.netty4.internal.HttpValidator; import org.elasticsearch.test.ESTestCase; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA; +import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class Netty4HttpHeaderValidatorTests extends ESTestCase { + + private final AtomicReference header = new AtomicReference<>(); + private final AtomicReference> listener = new AtomicReference<>(); private EmbeddedChannel channel; - private BlockingQueue validatorRequestQueue; + private Netty4HttpHeaderValidator netty4HttpHeaderValidator; + private final AtomicReference validationException = new AtomicReference<>(); @Override public void setUp() throws Exception { super.setUp(); - validatorRequestQueue = new LinkedBlockingQueue<>(); - channel = new EmbeddedChannel( - new Netty4HttpHeaderValidator( - (httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)), - new ThreadContext(Settings.EMPTY) - ) - ); + reset(); + } + + private void reset() { + channel = new EmbeddedChannel(); + header.set(null); + listener.set(null); + validationException.set(null); + HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> { + header.set(httpRequest); + final var exception = validationException.get(); + if (exception != null) { + throw exception; + } + listener.set(validationCompleteListener); + }; + netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY)); + channel.pipeline().addLast(netty4HttpHeaderValidator); + } + + public void testValidationPausesAndResumesData() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request); + channel.writeInbound(content); + + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // channel is resumed + listener.get().onResponse(null); + channel.runPendingTasks(); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + assertThat(channel.readInbound(), sameInstance(request)); + assertThat(channel.readInbound(), sameInstance(content)); + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(1)); + + // channel continues in resumed state after request finishes + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), sameInstance(lastContent)); + assertThat(lastContent.refCnt(), equalTo(1)); + + // channel is again paused while validating next request + channel.writeInbound(request); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + } + + public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request); + channel.writeInbound(content); + + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // channel is resumed + listener.get().onResponse(null); + channel.runPendingTasks(); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + assertThat(channel.readInbound(), sameInstance(request)); + assertThat(channel.readInbound(), sameInstance(content)); + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(1)); channel.config().setAutoRead(false); + + channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4))); + assertFalse(channel.config().isAutoRead()); } - HttpRequest newHttpRequest() { - return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, ""); - } + public void testContentForwardedAfterValidation() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - HttpContent newHttpContent() { - return new DefaultHttpContent(Unpooled.buffer()); - } + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + channel.writeInbound(request); - LastHttpContent newLastHttpContent() { - return new DefaultLastHttpContent(); - } + DefaultHttpContent content1 = null; + if (randomBoolean()) { + content1 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content1); + } - public void testValidatorReceiveHttpRequest() { - channel.writeInbound(newHttpRequest()); - assertEquals(1, validatorRequestQueue.size()); - assertNull(channel.readInbound()); - } + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); - public void testDecoderFailurePassThrough() { - for (var i = 0; i < 1000; i++) { - var httpRequest = newHttpRequest(); - httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad"))); - channel.writeInbound(httpRequest); - assertEquals(httpRequest, channel.readInbound()); + // channel is resumed + listener.get().onResponse(null); + channel.runPendingTasks(); + + // resumed channel after successful validation forwards data + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + // write more content to the channel after validation passed + DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content2); + assertThat(channel.readInbound(), sameInstance(request)); + DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content3); + if (content1 != null) { + assertThat(channel.readInbound(), sameInstance(content1)); + assertThat(content1.refCnt(), equalTo(1)); + } + assertThat(channel.readInbound(), sameInstance(content2)); + assertThat(content2.refCnt(), equalTo(1)); + DefaultHttpContent content4 = null; + if (randomBoolean()) { + content4 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content4); + } + assertThat(channel.readInbound(), sameInstance(content3)); + assertThat(content3.refCnt(), equalTo(1)); + if (content4 != null) { + assertThat(channel.readInbound(), sameInstance(content4)); + assertThat(content4.refCnt(), equalTo(1)); + } + + // channel continues in resumed state after request finishes + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), sameInstance(lastContent)); + assertThat(lastContent.refCnt(), equalTo(1)); + + if (randomBoolean()) { + channel.writeInbound(request); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); } } - /** - * Sends back-to-back http requests and randomly fail validation. - * Ensures that invalid requests drop content and valid pass through. - */ - public void testMixedValidationResults() { - for (var i = 0; i < 1000; i++) { - var shouldPassValidation = randomBoolean(); - var request = newHttpRequest(); - var content = newHttpContent(); - var last = newLastHttpContent(); + public void testContentDroppedAfterValidationFailure() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + channel.writeInbound(request); + + DefaultHttpContent content1 = null; + if (randomBoolean()) { + content1 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content1); + } + + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // channel is resumed + listener.get().onFailure(new ElasticsearchException("Boom")); + channel.runPendingTasks(); + + // resumed channel after failed validation drops data + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); + // write more content to the channel after validation passed + DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content2); + assertThat(channel.readInbound(), sameInstance(request)); + DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content3); + if (content1 != null) { + assertThat(channel.readInbound(), nullValue()); + assertThat(content1.refCnt(), equalTo(0)); + } + assertThat(channel.readInbound(), nullValue()); // content2 + assertThat(content2.refCnt(), equalTo(0)); + DefaultHttpContent content4 = null; + if (randomBoolean()) { + content4 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content4); + } + assertThat(channel.readInbound(), nullValue()); // content3 + assertThat(content3.refCnt(), equalTo(0)); + if (content4 != null) { + assertThat(channel.readInbound(), nullValue()); + assertThat(content4.refCnt(), equalTo(0)); + } + + assertThat(channel.readInbound(), nullValue()); // extra read still returns "null" + + // channel continues in resumed state after request finishes + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), nullValue()); // lastContent + assertThat(lastContent.refCnt(), equalTo(0)); + + if (randomBoolean()) { + channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + } + } + + public void testValidationErrorForwardsAsDecoderErrorMessage() { + for (Exception exception : List.of( + new Exception("Failure"), + new ElasticsearchException("Failure"), + new ElasticsearchSecurityException("Failure") + )) { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); channel.writeInbound(request); - var validationRequest = validatorRequestQueue.poll(); - assertNotNull(validationRequest); - if (shouldPassValidation) { - validationRequest.listener.onResponse(null); - } else { - validationRequest.listener.onFailure(new ValidationException()); - } - channel.runPendingTasks(); - - var gotRequest = channel.readInbound(); - assertEquals( - "should set decoder result failure for invalid request", - shouldPassValidation, - ((HttpRequest) gotRequest).decoderResult().isSuccess() - ); - assertEquals(request, gotRequest); - channel.writeInbound(content); - channel.writeInbound(last); - if (shouldPassValidation) { - assertEquals("should pass content for valid request", content, channel.readInbound()); - content.release(); - assertEquals(last, channel.readInbound()); - last.release(); - } else { - assertNull("should drop content for invalid request", channel.readInbound()); - } + + assertThat(header.get(), sameInstance(request)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + listener.get().onFailure(exception); + channel.runPendingTasks(); + assertTrue(channel.config().isAutoRead()); + DefaultHttpRequest failed = channel.readInbound(); + assertThat(failed, sameInstance(request)); + assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(failed.decoderResult().isFailure()); + Exception cause = (Exception) failed.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); + + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(0)); + + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), nullValue()); + assertThat(lastContent.refCnt(), equalTo(0)); + + reset(); } } - public void testIgnoreReadWhenValidating() { - channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages + public void testValidationExceptionForwardsAsDecoderErrorMessage() { + final var exception = new ElasticsearchException("Failure"); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - channel.writeInbound(newHttpRequest()); - channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler - assertNull("nothing should pass yet", channel.readInbound()); + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); - channel.read(); - var validationRequest = validatorRequestQueue.poll(); - assertNotNull(validationRequest); + validationException.set(exception); + channel.writeInbound(request); - channel.read(); - assertNull("should ignore read while validating", channel.readInbound()); + assertThat(header.get(), sameInstance(request)); + assertThat(listener.get(), nullValue()); - validationRequest.listener.onResponse(null); channel.runPendingTasks(); - assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest); - assertNull("content should not pass yet, need explicit read", channel.readInbound()); + assertTrue(channel.config().isAutoRead()); + DefaultHttpRequest failed = channel.readInbound(); + assertThat(failed, sameInstance(request)); + assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(failed.decoderResult().isFailure()); + Exception cause = (Exception) failed.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); - channel.read(); - asInstanceOf(LastHttpContent.class, channel.readInbound()).release(); + final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content); + + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(0)); + + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), nullValue()); + assertThat(lastContent.refCnt(), equalTo(0)); } - public void testWithFlowControlAndAggregator() { - channel.pipeline().addFirst(new FlowControlHandler()); - channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder())); + public void testValidationHandlesMultipleQueuedUpMessages() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); - channel.writeInbound(newHttpRequest()); - channel.writeInbound(newHttpContent()); - channel.writeInbound(newLastHttpContent()); + final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); + DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request1); + channel.writeInbound(content1); + channel.writeInbound(lastContent1); + final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); + DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request2); + channel.writeInbound(content2); + channel.writeInbound(lastContent2); - channel.read(); - assertNull("should ignore read while validating", channel.readInbound()); + assertThat(header.get(), sameInstance(request1)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); - var validationRequest = validatorRequestQueue.poll(); - assertNotNull(validationRequest); - validationRequest.listener.onResponse(null); + listener.get().onResponse(null); channel.runPendingTasks(); + assertThat(channel.readInbound(), sameInstance(request1)); + assertThat(channel.readInbound(), sameInstance(content1)); + assertThat(channel.readInbound(), sameInstance(lastContent1)); + assertThat(content1.refCnt(), equalTo(1)); + assertThat(lastContent1.refCnt(), equalTo(1)); - asInstanceOf(FullHttpRequest.class, channel.readInbound()).release(); + assertThat(header.get(), sameInstance(request2)); + + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + assertThat(channel.readInbound(), nullValue()); + + listener.get().onResponse(null); + channel.runPendingTasks(); + assertThat(channel.readInbound(), sameInstance(request2)); + assertThat(channel.readInbound(), sameInstance(content2)); + assertThat(channel.readInbound(), sameInstance(lastContent2)); + assertThat(content2.refCnt(), equalTo(1)); + assertThat(lastContent2.refCnt(), equalTo(1)); + + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), nullValue()); } - record ValidationRequest(HttpRequest request, Channel channel, ActionListener listener) {} + public void testValidationFailureRecoversForEnqueued() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + // write 2 requests before validation for the first one fails + final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); + DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request1); + channel.writeInbound(content1); + channel.writeInbound(lastContent1); + final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); + DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request2); + channel.writeInbound(content2); + + boolean finishSecondRequest = randomBoolean(); + if (finishSecondRequest) { + channel.writeInbound(lastContent2); + } + + // channel is paused and both requests are queued + assertThat(header.get(), sameInstance(request1)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + assertThat(content1.refCnt(), equalTo(2)); + assertThat(lastContent1.refCnt(), equalTo(2)); + assertThat(content2.refCnt(), equalTo(2)); + if (finishSecondRequest) { + assertThat(lastContent2.refCnt(), equalTo(2)); + } + + // validation for the 1st request FAILS + Exception exception = new ElasticsearchException("Boom"); + listener.get().onFailure(exception); + channel.runPendingTasks(); + + // request1 becomes a decoder exception and its content is dropped + assertThat(channel.readInbound(), sameInstance(request1)); + assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(request1.decoderResult().isFailure()); + Exception cause = (Exception) request1.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + assertThat(content1.refCnt(), equalTo(0)); // content is dropped + assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped + assertThat(channel.readInbound(), nullValue()); + + // channel pauses for the validation of the 2nd request + assertThat(header.get(), sameInstance(request2)); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + assertThat(channel.readInbound(), nullValue()); + + // validation for the 2nd request SUCCEEDS + listener.get().onResponse(null); + channel.runPendingTasks(); + + // 2nd request is forwarded correctly + assertThat(channel.readInbound(), sameInstance(request2)); + assertThat(channel.readInbound(), sameInstance(content2)); + assertThat(content2.refCnt(), equalTo(1)); + + if (finishSecondRequest == false) { + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + assertTrue(channel.config().isAutoRead()); + assertThat(channel.readInbound(), nullValue()); + // while in forwarding state the request can continue + if (randomBoolean()) { + DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content); + assertThat(channel.readInbound(), sameInstance(content)); + assertThat(content.refCnt(), equalTo(1)); + } + channel.writeInbound(lastContent2); + } + + assertThat(channel.readInbound(), sameInstance(lastContent2)); + assertThat(lastContent2.refCnt(), equalTo(1)); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertTrue(channel.config().isAutoRead()); + } + + public void testValidationFailureRecoversForInbound() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + // write a single request, but don't finish it yet, for which the validation fails + final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request1); + channel.writeInbound(content1); + + // channel is paused and the request is queued + assertThat(header.get(), sameInstance(request1)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + assertThat(content1.refCnt(), equalTo(2)); + + // validation for the 1st request FAILS + Exception exception = new ElasticsearchException("Boom"); + listener.get().onFailure(exception); + channel.runPendingTasks(); + + // request1 becomes a decoder exception and its content is dropped + assertThat(channel.readInbound(), sameInstance(request1)); + assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(request1.decoderResult().isFailure()); + Exception cause = (Exception) request1.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + assertThat(content1.refCnt(), equalTo(0)); // content is dropped + assertThat(channel.readInbound(), nullValue()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); + + if (randomBoolean()) { + channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); + } + DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent1); + if (randomBoolean()) { + assertThat(channel.readInbound(), nullValue()); + } + assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped + + // write 2nd request after the 1st one failed validation + final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4)); + DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request2); + channel.writeInbound(content2); + boolean finishSecondRequest = randomBoolean(); + if (finishSecondRequest) { + channel.writeInbound(lastContent2); + } + + // channel pauses for the validation of the 2nd request + assertThat(header.get(), sameInstance(request2)); + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + assertThat(channel.readInbound(), nullValue()); + + // validation for the 2nd request SUCCEEDS + listener.get().onResponse(null); + channel.runPendingTasks(); + + // 2nd request is forwarded correctly + assertThat(channel.readInbound(), sameInstance(request2)); + assertThat(channel.readInbound(), sameInstance(content2)); + assertThat(content2.refCnt(), equalTo(1)); + + if (finishSecondRequest == false) { + assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST)); + assertTrue(channel.config().isAutoRead()); + assertThat(channel.readInbound(), nullValue()); + // while in forwarding state the request can continue + if (randomBoolean()) { + DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content); + assertThat(channel.readInbound(), sameInstance(content)); + assertThat(content.refCnt(), equalTo(1)); + } + channel.writeInbound(lastContent2); + } + + assertThat(channel.readInbound(), sameInstance(lastContent2)); + assertThat(lastContent2.refCnt(), equalTo(1)); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertTrue(channel.config().isAutoRead()); + } + + public void testValidationSuccessForLargeMessage() { + assertTrue(channel.config().isAutoRead()); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + channel.writeInbound(request); + + int messageLength = randomIntBetween(32, 128); + for (int i = 0; i < messageLength; ++i) { + channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4))); + } + channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4))); + boolean followupRequest = randomBoolean(); + if (followupRequest) { + channel.writeInbound(request); + } + + assertThat(header.get(), sameInstance(request)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + listener.get().onResponse(null); + channel.runPendingTasks(); + if (followupRequest) { + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + } else { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + } + assertThat(channel.readInbound(), sameInstance(request)); + for (int i = 0; i < messageLength; ++i) { + Object content = channel.readInbound(); + assertThat(content, instanceOf(DefaultHttpContent.class)); + assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1)); + } + assertThat(channel.readInbound(), instanceOf(LastHttpContent.class)); + assertThat(channel.readInbound(), nullValue()); + } + + public void testValidationFailureForLargeMessage() { + assertTrue(channel.config().isAutoRead()); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + channel.writeInbound(request); + + int messageLength = randomIntBetween(32, 128); + DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength]; + for (int i = 0; i < messageLength; ++i) { + messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(messageContents[i]); + } + DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastHttpContent); + boolean followupRequest = randomBoolean(); + if (followupRequest) { + channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri")); + } + + assertThat(header.get(), sameInstance(request)); + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + Exception exception = new ElasticsearchException("Boom"); + listener.get().onFailure(exception); + channel.runPendingTasks(); + if (followupRequest) { + assertFalse(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); + } else { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + } + assertThat(channel.readInbound(), sameInstance(request)); + assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(request.decoderResult().isFailure()); + Exception cause = (Exception) request.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + for (int i = 0; i < messageLength; ++i) { + assertThat(channel.readInbound(), nullValue()); + assertThat(messageContents[i].refCnt(), equalTo(0)); + } + assertThat(channel.readInbound(), nullValue()); + assertThat(lastHttpContent.refCnt(), equalTo(0)); + assertThat(channel.readInbound(), nullValue()); + } + + public void testFullRequestValidationFailure() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + ByteBuf buf = channel.alloc().buffer(); + ByteBufUtil.copy(AsciiString.of("test full http request"), buf); + final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); + channel.writeInbound(request); + + // request got through to validation + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // validation fails + Exception exception = new ElasticsearchException("Boom"); + listener.get().onFailure(exception); + channel.runPendingTasks(); + + // channel is resumed and waiting for next request + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + DefaultFullHttpRequest throughRequest = channel.readInbound(); + // "through request" contains a decoder exception + assertThat(throughRequest, not(sameInstance(request))); + assertTrue(throughRequest.decoderResult().isFailure()); + // the content is cleared when validation fails + assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("")); + assertThat(buf.refCnt(), is(0)); + Exception cause = (Exception) throughRequest.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + } + + public void testFullRequestValidationSuccess() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + ByteBuf buf = channel.alloc().buffer(); + try { + ByteBufUtil.copy(AsciiString.of("test full http request"), buf); + final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); + channel.writeInbound(request); + + // request got through to validation + assertThat(header.get(), sameInstance(request)); + // channel is paused + assertThat(channel.readInbound(), nullValue()); + assertFalse(channel.config().isAutoRead()); + + // validation succeeds + listener.get().onResponse(null); + channel.runPendingTasks(); + + // channel is resumed and waiting for next request + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + DefaultFullHttpRequest throughRequest = channel.readInbound(); + // request goes through unaltered + assertThat(throughRequest, sameInstance(request)); + assertFalse(throughRequest.decoderResult().isFailure()); + // the content is unaltered + assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); + assertThat(buf.refCnt(), is(1)); + assertThat(throughRequest.decoderResult().cause(), nullValue()); + } finally { + buf.release(); + } + } + + public void testFullRequestWithDecoderException() { + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + ByteBuf buf = channel.alloc().buffer(); + try { + ByteBufUtil.copy(AsciiString.of("test full http request"), buf); + // a request with a decoder error prior to validation + final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf); + Exception cause = new ElasticsearchException("Boom"); + request.setDecoderResult(DecoderResult.failure(cause)); + channel.writeInbound(request); + + // request goes through without invoking the validator + assertThat(header.get(), nullValue()); + assertThat(listener.get(), nullValue()); + // channel is NOT paused + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + DefaultFullHttpRequest throughRequest = channel.readInbound(); + // request goes through unaltered + assertThat(throughRequest, sameInstance(request)); + assertTrue(throughRequest.decoderResult().isFailure()); + assertThat(throughRequest.decoderResult().cause(), equalTo(cause)); + // the content is unaltered + assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request")); + assertThat(buf.refCnt(), is(1)); + } finally { + buf.release(); + } + } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index b2e91e49a074..7492737d4f87 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -21,6 +21,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpBody; @@ -42,24 +43,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); private EmbeddedChannel channel; - private ReadSniffer readSniffer; private Netty4HttpRequestBodyStream stream; + private ThreadWatchdog.ActivityTracker activityTracker; @Override public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(); - readSniffer = new ReadSniffer(); - channel.pipeline().addLast(new FlowControlHandler(), readSniffer); - channel.config().setAutoRead(false); + activityTracker = new ThreadWatchdog.ActivityTracker(); + stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker); + stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - stream = new Netty4HttpRequestBodyStream(ctx, threadContext); - stream.setHandler(discardHandler); // set default handler, each test might override one - super.handlerAdded(ctx); - } - @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); @@ -73,8 +67,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { stream.close(); } - // ensures all chunks are passed to downstream - public void testPassAllChunks() { + // ensures that no chunks are sent downstream without request + public void testEnqueueChunksBeforeRequest() { + var totalChunks = randomIntBetween(1, 100); + for (int i = 0; i < totalChunks; i++) { + channel.writeInbound(randomContent(1024)); + } + assertEquals(totalChunks * 1024, stream.bufSize()); + } + + // ensures all received chunks can be flushed downstream + public void testFlushAllReceivedChunks() { var chunks = new ArrayList(); var totalBytes = new AtomicInteger(); stream.setHandler((chunk, isLast) -> { @@ -82,35 +85,52 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { totalBytes.addAndGet(chunk.length()); chunk.close(); }); + var chunkSize = 1024; var totalChunks = randomIntBetween(1, 100); for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(chunkSize)); - stream.next(); - channel.runPendingTasks(); - } - assertEquals(totalChunks, chunks.size()); + stream.next(); + channel.runPendingTasks(); + assertEquals("should receive all chunks as single composite", 1, chunks.size()); assertEquals(chunkSize * totalChunks, totalBytes.get()); } - // ensures that we read from channel after last chunk - public void testChannelReadAfterLastContent() { + // ensures that channel.setAutoRead(true) only when we flush last chunk + public void testSetAutoReadOnLastFlush() { channel.writeInbound(randomLastContent(10)); + assertFalse("should not auto-read on last content reception", channel.config().isAutoRead()); stream.next(); channel.runPendingTasks(); - assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount); + assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead()); } - // ensures when stream is closing we read and discard chunks - public void testReadAndReleaseOnClosing() { - var unexpectedChunk = new AtomicBoolean(); - stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true)); - stream.close(); - channel.writeInbound(randomContent(1024)); - channel.writeInbound(randomLastContent(0)); - assertFalse("chunk should be discarded", unexpectedChunk.get()); - assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount); + // ensures that we read from channel when no current chunks available + // and pass next chunk downstream without holding + public void testReadFromChannel() { + var gotChunks = new ArrayList(); + var gotLast = new AtomicBoolean(false); + stream.setHandler((chunk, isLast) -> { + gotChunks.add(chunk); + gotLast.set(isLast); + chunk.close(); + }); + channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + var chunkSize = 1024; + var totalChunks = randomIntBetween(1, 32); + for (int i = 0; i < totalChunks - 1; i++) { + channel.writeInbound(randomContent(chunkSize)); + } + channel.writeInbound(randomLastContent(chunkSize)); + + for (int i = 0; i < totalChunks; i++) { + assertEquals("should not enqueue chunks", 0, stream.bufSize()); + stream.next(); + channel.runPendingTasks(); + assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); + } + assertTrue("should receive last content", gotLast.get()); } public void testReadFromHasCorrectThreadContext() throws InterruptedException { @@ -122,15 +142,9 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { try { // activity tracker requires stream execution in the same thread, setting up stream inside event-loop eventLoop.submit(() -> { - channel = new EmbeddedChannel(new FlowControlHandler()); - channel.config().setAutoRead(false); + channel = new EmbeddedChannel(); + stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker()); channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - stream = new Netty4HttpRequestBodyStream(ctx, threadContext); - super.handlerAdded(ctx); - } - @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { stream.handleNettyContent(msg); @@ -184,6 +198,18 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { } } + public void testStreamNextActivityTracker() { + var t0 = activityTracker.get(); + var N = between(1, 10); + for (int i = 0; i < N; i++) { + channel.writeInbound(randomContent(1024)); + stream.next(); + channel.runPendingTasks(); + } + var t1 = activityTracker.get(); + assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L); + } + // ensure that we catch all exceptions and throw them into channel pipeline public void testCatchExceptions() { var gotExceptions = new CountDownLatch(3); // number of tests below diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java deleted file mode 100644 index af6844883b68..000000000000 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.http.netty4; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; - -/** - * Sniffs channel reads, helps detect missing or unexpected ones. - *
- *     {@code
- *     chan = new EmbeddedChannel();
- *     chan.config().setAutoRead(false);
- *     readSniffer = new ReadSniffer();
- *     chan.pipeline().addLast(readSniffer, ...otherHandlers);
- *     ...
- *     // run test
- *     ...
- *     assertEquals("unexpected read", 0, readSniffer.readCnt)
- *     // or
- *     assertEquals("exact number of reads", 2, readSniffer.readCnt)
- *     }
- * 
- * - */ -public class ReadSniffer extends ChannelOutboundHandlerAdapter { - - int readCount; - - @Override - public void read(ChannelHandlerContext ctx) throws Exception { - readCount++; - super.read(ctx); - } -} diff --git a/muted-tests.yml b/muted-tests.yml index ec710646e29c..864b15fadee7 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -438,12 +438,6 @@ tests: - class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2EnrichUnavailableRemotesIT method: testEsqlEnrichWithSkipUnavailable issue: https://github.com/elastic/elasticsearch/issues/127368 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testHttpClientStats - issue: https://github.com/elastic/elasticsearch/issues/127391 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testBulkIndexingRequestSplitting - issue: https://github.com/elastic/elasticsearch/issues/127392 - class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT method: testLookupExplosionNoFetch issue: https://github.com/elastic/elasticsearch/issues/127365 diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java index 4ab0e53ce2bd..ec2881b989d0 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java @@ -251,7 +251,7 @@ public class SecurityNetty4HttpServerTransportCloseNotifyTests extends AbstractH server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release()); server.netty.stop(); server.threadPool.shutdownNow(); - safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); + safeAwait(client.netty.config().group().shutdownGracefully()); } }