mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
This reverts commit 3cf70614b8
and unmutes
the associated tests
Closes #127391
Closes #127392
This commit is contained in:
parent
79e9dfd7b4
commit
6f622e813c
16 changed files with 1091 additions and 435 deletions
|
@ -1,5 +0,0 @@
|
||||||
pr: 127259
|
|
||||||
summary: Replace auto-read with proper flow-control in HTTP pipeline
|
|
||||||
area: Network
|
|
||||||
type: enhancement
|
|
||||||
issues: []
|
|
|
@ -205,6 +205,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
// await stream handler is ready and request full content
|
// await stream handler is ready and request full content
|
||||||
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
||||||
|
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
|
||||||
|
|
||||||
assertFalse(handler.isClosed());
|
assertFalse(handler.isClosed());
|
||||||
|
|
||||||
|
@ -214,6 +215,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
assertEquals(requestTransmittedLength, handler.readUntilClose());
|
assertEquals(requestTransmittedLength, handler.readUntilClose());
|
||||||
|
|
||||||
assertTrue(handler.isClosed());
|
assertTrue(handler.isClosed());
|
||||||
|
assertEquals(0, handler.stream.bufSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,6 +232,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
// await stream handler is ready and request full content
|
// await stream handler is ready and request full content
|
||||||
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
||||||
|
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
|
||||||
assertFalse(handler.isClosed());
|
assertFalse(handler.isClosed());
|
||||||
|
|
||||||
// terminate connection on server and wait resources are released
|
// terminate connection on server and wait resources are released
|
||||||
|
@ -238,6 +241,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
handler.channel.request().getHttpChannel().close();
|
handler.channel.request().getHttpChannel().close();
|
||||||
assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class));
|
assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class));
|
||||||
assertTrue(handler.isClosed());
|
assertTrue(handler.isClosed());
|
||||||
|
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,6 +257,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
// await stream handler is ready and request full content
|
// await stream handler is ready and request full content
|
||||||
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
|
||||||
|
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
|
||||||
assertFalse(handler.isClosed());
|
assertFalse(handler.isClosed());
|
||||||
|
|
||||||
// terminate connection on server and wait resources are released
|
// terminate connection on server and wait resources are released
|
||||||
|
@ -264,6 +269,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture));
|
final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture));
|
||||||
assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage());
|
assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage());
|
||||||
safeAwait(handler.closedLatch);
|
safeAwait(handler.closedLatch);
|
||||||
|
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +310,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
});
|
});
|
||||||
handler.readBytes(partSize);
|
handler.readBytes(partSize);
|
||||||
}
|
}
|
||||||
assertTrue(handler.receivedLastChunk);
|
assertTrue(handler.stream.hasLast());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,78 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the "Elastic License
|
|
||||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
|
||||||
* Public License v 1"; you may not use this file except in compliance with, at
|
|
||||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
|
||||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.util.concurrent.ScheduledFuture;
|
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.elasticsearch.common.time.TimeProvider;
|
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When channel auto-read is disabled handlers are responsible to read from channel.
|
|
||||||
* But it's hard to detect when read is missing. This helper class print warnings
|
|
||||||
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
|
|
||||||
* to avoid test hang for too long, but can be increased if needed.
|
|
||||||
*/
|
|
||||||
class MissingReadDetector extends ChannelDuplexHandler {
|
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);
|
|
||||||
|
|
||||||
private final long interval;
|
|
||||||
private final TimeProvider timer;
|
|
||||||
private boolean pendingRead;
|
|
||||||
private long lastRead;
|
|
||||||
private ScheduledFuture<?> checker;
|
|
||||||
|
|
||||||
MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
|
|
||||||
this.interval = missingReadIntervalMillis;
|
|
||||||
this.timer = timer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
|
|
||||||
if (pendingRead == false) {
|
|
||||||
long now = timer.absoluteTimeInMillis();
|
|
||||||
if (now >= lastRead + interval) {
|
|
||||||
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, interval, interval, TimeUnit.MILLISECONDS);
|
|
||||||
super.handlerAdded(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
if (checker != null) {
|
|
||||||
FutureUtils.cancel(checker);
|
|
||||||
}
|
|
||||||
super.handlerRemoved(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
pendingRead = true;
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
|
|
||||||
pendingRead = false;
|
|
||||||
lastRead = timer.absoluteTimeInMillis();
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,7 +15,6 @@ import io.netty.handler.codec.http.HttpObject;
|
||||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
|
||||||
|
|
||||||
import org.elasticsearch.http.HttpPreRequest;
|
import org.elasticsearch.http.HttpPreRequest;
|
||||||
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
|
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
|
||||||
|
@ -49,9 +48,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
|
||||||
}
|
}
|
||||||
if (aggregating || msg instanceof FullHttpRequest) {
|
if (aggregating || msg instanceof FullHttpRequest) {
|
||||||
super.channelRead(ctx, msg);
|
super.channelRead(ctx, msg);
|
||||||
if (msg instanceof LastHttpContent == false) {
|
|
||||||
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
streamContentSizeHandler.channelRead(ctx, msg);
|
streamContentSizeHandler.channelRead(ctx, msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,7 +123,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
|
||||||
isContinueExpected = true;
|
isContinueExpected = true;
|
||||||
} else {
|
} else {
|
||||||
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
|
||||||
ctx.read();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,7 +136,6 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
|
||||||
decoder.reset();
|
decoder.reset();
|
||||||
}
|
}
|
||||||
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||||
ctx.read();
|
|
||||||
} else {
|
} else {
|
||||||
ignoreContent = false;
|
ignoreContent = false;
|
||||||
currentContentLength = 0;
|
currentContentLength = 0;
|
||||||
|
@ -152,13 +150,11 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
|
||||||
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
|
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
|
||||||
if (ignoreContent) {
|
if (ignoreContent) {
|
||||||
msg.release();
|
msg.release();
|
||||||
ctx.read();
|
|
||||||
} else {
|
} else {
|
||||||
currentContentLength += msg.content().readableBytes();
|
currentContentLength += msg.content().readableBytes();
|
||||||
if (currentContentLength > maxContentLength) {
|
if (currentContentLength > maxContentLength) {
|
||||||
msg.release();
|
msg.release();
|
||||||
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
|
||||||
ctx.read();
|
|
||||||
} else {
|
} else {
|
||||||
ctx.fireChannelRead(msg);
|
ctx.fireChannelRead(msg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,113 +9,249 @@
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
package org.elasticsearch.http.netty4;
|
||||||
|
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.handler.codec.DecoderResult;
|
import io.netty.handler.codec.DecoderResult;
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.codec.http.HttpObject;
|
import io.netty.handler.codec.http.HttpObject;
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.core.Nullable;
|
|
||||||
import org.elasticsearch.http.netty4.internal.HttpValidator;
|
import org.elasticsearch.http.netty4.internal.HttpValidator;
|
||||||
import org.elasticsearch.transport.Transports;
|
import org.elasticsearch.transport.Transports;
|
||||||
|
|
||||||
public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
|
import java.util.ArrayDeque;
|
||||||
|
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
|
||||||
|
|
||||||
|
public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
private final HttpValidator validator;
|
private final HttpValidator validator;
|
||||||
private final ThreadContext threadContext;
|
private final ThreadContext threadContext;
|
||||||
private State state;
|
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
|
||||||
|
private State state = WAITING_TO_START;
|
||||||
|
|
||||||
public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
|
public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
|
||||||
this.validator = validator;
|
this.validator = validator;
|
||||||
this.threadContext = threadContext;
|
this.threadContext = threadContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
State getState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("fallthrough")
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
assert msg instanceof HttpObject;
|
assert msg instanceof HttpObject;
|
||||||
var httpObject = (HttpObject) msg;
|
final HttpObject httpObject = (HttpObject) msg;
|
||||||
if (httpObject.decoderResult().isFailure()) {
|
|
||||||
ctx.fireChannelRead(httpObject); // pass-through for decoding failures
|
switch (state) {
|
||||||
} else {
|
case WAITING_TO_START:
|
||||||
if (msg instanceof HttpRequest request) {
|
assert pending.isEmpty();
|
||||||
validate(ctx, request);
|
pending.add(ReferenceCountUtil.retain(httpObject));
|
||||||
} else {
|
requestStart(ctx);
|
||||||
assert msg instanceof HttpContent;
|
assert state == QUEUEING_DATA;
|
||||||
var content = (HttpContent) msg;
|
assert ctx.channel().config().isAutoRead() == false;
|
||||||
if (state == State.DROPPING) {
|
break;
|
||||||
content.release();
|
case QUEUEING_DATA:
|
||||||
ctx.read();
|
pending.add(ReferenceCountUtil.retain(httpObject));
|
||||||
} else {
|
break;
|
||||||
assert state == State.PASSING : "unexpected content before validation completed";
|
case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
|
||||||
ctx.fireChannelRead(content);
|
assert pending.isEmpty();
|
||||||
|
if (httpObject instanceof LastHttpContent) {
|
||||||
|
state = WAITING_TO_START;
|
||||||
}
|
}
|
||||||
}
|
ctx.fireChannelRead(httpObject);
|
||||||
|
break;
|
||||||
|
case DROPPING_DATA_UNTIL_NEXT_REQUEST:
|
||||||
|
assert pending.isEmpty();
|
||||||
|
if (httpObject instanceof LastHttpContent) {
|
||||||
|
state = WAITING_TO_START;
|
||||||
|
}
|
||||||
|
ReferenceCountUtil.release(httpObject);
|
||||||
|
break;
|
||||||
|
case DROPPING_DATA_PERMANENTLY:
|
||||||
|
assert pending.isEmpty();
|
||||||
|
ReferenceCountUtil.release(httpObject); // consume without enqueuing
|
||||||
|
ctx.channel().config().setAutoRead(false);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void requestStart(ChannelHandlerContext ctx) {
|
||||||
|
assert state == WAITING_TO_START;
|
||||||
|
|
||||||
|
if (pending.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final HttpObject httpObject = pending.getFirst();
|
||||||
|
final HttpRequest httpRequest;
|
||||||
|
if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
|
||||||
|
// a properly decoded HTTP start message is expected to begin validation
|
||||||
|
// anything else is probably an error that the downstream HTTP message aggregator will have to handle
|
||||||
|
httpRequest = (HttpRequest) httpObject;
|
||||||
|
} else {
|
||||||
|
httpRequest = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
state = QUEUEING_DATA;
|
||||||
|
ctx.channel().config().setAutoRead(false);
|
||||||
|
|
||||||
|
if (httpRequest == null) {
|
||||||
|
// this looks like a malformed request and will forward without validation
|
||||||
|
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
|
||||||
|
} else {
|
||||||
|
assert Transports.assertDefaultThreadContext(threadContext);
|
||||||
|
ActionListener.run(
|
||||||
|
// this prevents thread-context changes to propagate to the validation listener
|
||||||
|
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
|
||||||
|
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
|
||||||
|
ActionListener.assertOnce(
|
||||||
|
new ContextPreservingActionListener<Void>(
|
||||||
|
threadContext.wrapRestorable(threadContext.newStoredContext()),
|
||||||
|
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
|
||||||
|
new ActionListener<>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(Void unused) {
|
||||||
|
assert Transports.assertDefaultThreadContext(threadContext);
|
||||||
|
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
assert Transports.assertDefaultThreadContext(threadContext);
|
||||||
|
ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
listener -> {
|
||||||
|
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
|
||||||
|
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
|
||||||
|
validator.validate(httpRequest, ctx.channel(), listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void forwardFullRequest(ChannelHandlerContext ctx) {
|
||||||
|
Transports.assertDefaultThreadContext(threadContext);
|
||||||
|
assert ctx.channel().eventLoop().inEventLoop();
|
||||||
|
assert ctx.channel().config().isAutoRead() == false;
|
||||||
|
assert state == QUEUEING_DATA;
|
||||||
|
|
||||||
|
ctx.channel().config().setAutoRead(true);
|
||||||
|
boolean fullRequestForwarded = forwardData(ctx, pending);
|
||||||
|
|
||||||
|
assert fullRequestForwarded || pending.isEmpty();
|
||||||
|
if (fullRequestForwarded) {
|
||||||
|
state = WAITING_TO_START;
|
||||||
|
requestStart(ctx);
|
||||||
|
} else {
|
||||||
|
state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
|
||||||
|
Transports.assertDefaultThreadContext(threadContext);
|
||||||
|
assert ctx.channel().eventLoop().inEventLoop();
|
||||||
|
assert ctx.channel().config().isAutoRead() == false;
|
||||||
|
assert state == QUEUEING_DATA;
|
||||||
|
|
||||||
|
HttpObject messageToForward = pending.getFirst();
|
||||||
|
boolean fullRequestDropped = dropData(pending);
|
||||||
|
if (messageToForward instanceof HttpContent toReplace) {
|
||||||
|
// if the request to forward contained data (which got dropped), replace with empty data
|
||||||
|
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
|
||||||
|
}
|
||||||
|
messageToForward.setDecoderResult(DecoderResult.failure(e));
|
||||||
|
|
||||||
|
ctx.channel().config().setAutoRead(true);
|
||||||
|
ctx.fireChannelRead(messageToForward);
|
||||||
|
|
||||||
|
assert fullRequestDropped || pending.isEmpty();
|
||||||
|
if (fullRequestDropped) {
|
||||||
|
state = WAITING_TO_START;
|
||||||
|
requestStart(ctx);
|
||||||
|
} else {
|
||||||
|
state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
// until validation is completed we can ignore read calls,
|
state = DROPPING_DATA_PERMANENTLY;
|
||||||
// once validation is finished HttpRequest will be fired and downstream can read from there
|
while (true) {
|
||||||
if (state != State.VALIDATING) {
|
if (dropData(pending) == false) {
|
||||||
ctx.read();
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
|
||||||
|
final int pendingMessages = pending.size();
|
||||||
|
try {
|
||||||
|
HttpObject toForward;
|
||||||
|
while ((toForward = pending.poll()) != null) {
|
||||||
|
ctx.fireChannelRead(toForward);
|
||||||
|
ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
|
||||||
|
if (toForward instanceof LastHttpContent) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
maybeResizePendingDown(pendingMessages, pending);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void validate(ChannelHandlerContext ctx, HttpRequest request) {
|
private static boolean dropData(ArrayDeque<HttpObject> pending) {
|
||||||
assert Transports.assertDefaultThreadContext(threadContext);
|
final int pendingMessages = pending.size();
|
||||||
state = State.VALIDATING;
|
try {
|
||||||
ActionListener.run(
|
HttpObject toDrop;
|
||||||
// this prevents thread-context changes to propagate to the validation listener
|
while ((toDrop = pending.poll()) != null) {
|
||||||
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
|
ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
|
||||||
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
|
if (toDrop instanceof LastHttpContent) {
|
||||||
ActionListener.assertOnce(
|
return true;
|
||||||
new ContextPreservingActionListener<Void>(
|
|
||||||
threadContext.wrapRestorable(threadContext.newStoredContext()),
|
|
||||||
new ActionListener<>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(Void unused) {
|
|
||||||
handleValidationResult(ctx, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
handleValidationResult(ctx, request, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
),
|
|
||||||
listener -> {
|
|
||||||
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
|
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
|
|
||||||
validator.validate(request, ctx.channel(), listener);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
return false;
|
||||||
|
} finally {
|
||||||
|
maybeResizePendingDown(pendingMessages, pending);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) {
|
private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
|
||||||
assert Transports.assertDefaultThreadContext(threadContext);
|
if (pending.size() <= 4 && largeSize > 32) {
|
||||||
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
|
// Prevent the ArrayDeque from becoming forever large due to a single large message.
|
||||||
ctx.channel().eventLoop().execute(() -> {
|
ArrayDeque<HttpObject> old = pending;
|
||||||
if (validationError != null) {
|
pending = new ArrayDeque<>(4);
|
||||||
request.setDecoderResult(DecoderResult.failure(validationError));
|
pending.addAll(old);
|
||||||
state = State.DROPPING;
|
}
|
||||||
} else {
|
|
||||||
state = State.PASSING;
|
|
||||||
}
|
|
||||||
ctx.fireChannelRead(request);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum State {
|
enum State {
|
||||||
PASSING,
|
WAITING_TO_START,
|
||||||
VALIDATING,
|
QUEUEING_DATA,
|
||||||
DROPPING
|
FORWARDING_DATA_UNTIL_NEXT_REQUEST,
|
||||||
|
DROPPING_DATA_UNTIL_NEXT_REQUEST,
|
||||||
|
DROPPING_DATA_PERMANENTLY
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,6 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
|
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
|
||||||
activityTracker.startActivity();
|
activityTracker.startActivity();
|
||||||
boolean shouldRead = true;
|
|
||||||
try {
|
try {
|
||||||
if (msg instanceof HttpRequest request) {
|
if (msg instanceof HttpRequest request) {
|
||||||
final Netty4HttpRequest netty4HttpRequest;
|
final Netty4HttpRequest netty4HttpRequest;
|
||||||
|
@ -138,26 +137,25 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
|
||||||
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
|
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
|
||||||
currentRequestStream = null;
|
currentRequestStream = null;
|
||||||
} else {
|
} else {
|
||||||
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
|
var contentStream = new Netty4HttpRequestBodyStream(
|
||||||
|
ctx.channel(),
|
||||||
|
serverTransport.getThreadPool().getThreadContext(),
|
||||||
|
activityTracker
|
||||||
|
);
|
||||||
currentRequestStream = contentStream;
|
currentRequestStream = contentStream;
|
||||||
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
|
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
|
||||||
shouldRead = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handlePipelinedRequest(ctx, netty4HttpRequest);
|
handlePipelinedRequest(ctx, netty4HttpRequest);
|
||||||
} else {
|
} else {
|
||||||
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
|
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
|
||||||
assert currentRequestStream != null : "current stream must exists before handling http content";
|
assert currentRequestStream != null : "current stream must exists before handling http content";
|
||||||
shouldRead = false;
|
|
||||||
currentRequestStream.handleNettyContent((HttpContent) msg);
|
currentRequestStream.handleNettyContent((HttpContent) msg);
|
||||||
if (msg instanceof LastHttpContent) {
|
if (msg instanceof LastHttpContent) {
|
||||||
currentRequestStream = null;
|
currentRequestStream = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (shouldRead) {
|
|
||||||
ctx.channel().eventLoop().execute(ctx::read);
|
|
||||||
}
|
|
||||||
activityTracker.stopActivity();
|
activityTracker.stopActivity();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,11 +9,14 @@
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
package org.elasticsearch.http.netty4;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.network.ThreadWatchdog;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.core.Releasables;
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.http.HttpBody;
|
import org.elasticsearch.http.HttpBody;
|
||||||
|
@ -24,22 +27,34 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Netty based implementation of {@link HttpBody.Stream}.
|
* Netty based implementation of {@link HttpBody.Stream}.
|
||||||
|
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
||||||
|
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
|
||||||
|
* autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()}
|
||||||
*/
|
*/
|
||||||
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
|
|
||||||
|
private final Channel channel;
|
||||||
|
private final ChannelFutureListener closeListener = future -> doClose();
|
||||||
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
|
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
|
||||||
private final ThreadContext threadContext;
|
private final ThreadContext threadContext;
|
||||||
private final ChannelHandlerContext ctx;
|
private final ThreadWatchdog.ActivityTracker activityTracker;
|
||||||
|
private ByteBuf buf;
|
||||||
|
private boolean requested = false;
|
||||||
private boolean closing = false;
|
private boolean closing = false;
|
||||||
private HttpBody.ChunkHandler handler;
|
private HttpBody.ChunkHandler handler;
|
||||||
private ThreadContext.StoredContext requestContext;
|
private ThreadContext.StoredContext requestContext;
|
||||||
private final ChannelFutureListener closeListener = future -> doClose();
|
|
||||||
|
|
||||||
public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) {
|
// used in tests
|
||||||
this.ctx = ctx;
|
private volatile int bufSize = 0;
|
||||||
|
private volatile boolean hasLast = false;
|
||||||
|
|
||||||
|
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) {
|
||||||
|
this.channel = channel;
|
||||||
this.threadContext = threadContext;
|
this.threadContext = threadContext;
|
||||||
this.requestContext = threadContext.newStoredContext();
|
this.requestContext = threadContext.newStoredContext();
|
||||||
Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener);
|
this.activityTracker = activityTracker;
|
||||||
|
Netty4Utils.addListener(channel.closeFuture(), closeListener);
|
||||||
|
channel.config().setAutoRead(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -58,43 +73,94 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
tracingHandlers.add(chunkHandler);
|
tracingHandlers.add(chunkHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void read() {
|
|
||||||
ctx.channel().eventLoop().execute(ctx::read);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void next() {
|
public void next() {
|
||||||
assert handler != null : "handler must be set before requesting next chunk";
|
assert handler != null : "handler must be set before requesting next chunk";
|
||||||
requestContext = threadContext.newStoredContext();
|
requestContext = threadContext.newStoredContext();
|
||||||
read();
|
channel.eventLoop().submit(() -> {
|
||||||
|
activityTracker.startActivity();
|
||||||
|
requested = true;
|
||||||
|
try {
|
||||||
|
if (closing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (buf == null) {
|
||||||
|
channel.read();
|
||||||
|
} else {
|
||||||
|
send();
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
channel.pipeline().fireExceptionCaught(e);
|
||||||
|
} finally {
|
||||||
|
activityTracker.stopActivity();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleNettyContent(HttpContent httpContent) {
|
public void handleNettyContent(HttpContent httpContent) {
|
||||||
|
assert hasLast == false : "receive http content on completed stream";
|
||||||
|
hasLast = httpContent instanceof LastHttpContent;
|
||||||
if (closing) {
|
if (closing) {
|
||||||
httpContent.release();
|
httpContent.release();
|
||||||
read();
|
|
||||||
} else {
|
} else {
|
||||||
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
|
addChunk(httpContent.content());
|
||||||
var isLast = httpContent instanceof LastHttpContent;
|
if (requested) {
|
||||||
var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
|
send();
|
||||||
for (var tracer : tracingHandlers) {
|
|
||||||
tracer.onNext(buf, isLast);
|
|
||||||
}
|
|
||||||
handler.onNext(buf, isLast);
|
|
||||||
if (isLast) {
|
|
||||||
read();
|
|
||||||
ctx.channel().closeFuture().removeListener(closeListener);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk
|
||||||
|
private void addChunk(ByteBuf chunk) {
|
||||||
|
assert chunk != null;
|
||||||
|
if (buf == null) {
|
||||||
|
buf = chunk;
|
||||||
|
} else if (buf instanceof CompositeByteBuf comp) {
|
||||||
|
comp.addComponent(true, chunk);
|
||||||
|
} else {
|
||||||
|
var comp = channel.alloc().compositeBuffer();
|
||||||
|
comp.addComponent(true, buf);
|
||||||
|
comp.addComponent(true, chunk);
|
||||||
|
buf = comp;
|
||||||
|
}
|
||||||
|
bufSize = buf.readableBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
// visible for test
|
||||||
|
int bufSize() {
|
||||||
|
return bufSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
// visible for test
|
||||||
|
boolean hasLast() {
|
||||||
|
return hasLast;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void send() {
|
||||||
|
assert requested;
|
||||||
|
assert handler != null : "must set handler before receiving next chunk";
|
||||||
|
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
|
||||||
|
requested = false;
|
||||||
|
buf = null;
|
||||||
|
bufSize = 0;
|
||||||
|
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
|
||||||
|
for (var tracer : tracingHandlers) {
|
||||||
|
tracer.onNext(bytesRef, hasLast);
|
||||||
|
}
|
||||||
|
handler.onNext(bytesRef, hasLast);
|
||||||
|
}
|
||||||
|
if (hasLast) {
|
||||||
|
channel.config().setAutoRead(true);
|
||||||
|
channel.closeFuture().removeListener(closeListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (ctx.channel().eventLoop().inEventLoop()) {
|
if (channel.eventLoop().inEventLoop()) {
|
||||||
doClose();
|
doClose();
|
||||||
} else {
|
} else {
|
||||||
ctx.channel().eventLoop().submit(this::doClose);
|
channel.eventLoop().submit(this::doClose);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +174,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
handler.close();
|
handler.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
read();
|
if (buf != null) {
|
||||||
|
buf.release();
|
||||||
|
buf = null;
|
||||||
|
bufSize = 0;
|
||||||
|
}
|
||||||
|
channel.config().setAutoRead(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponse;
|
import io.netty.handler.codec.http.HttpResponse;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
import io.netty.handler.codec.http.HttpUtil;
|
import io.netty.handler.codec.http.HttpUtil;
|
||||||
import io.netty.handler.flow.FlowControlHandler;
|
|
||||||
import io.netty.handler.ssl.SslHandler;
|
import io.netty.handler.ssl.SslHandler;
|
||||||
import io.netty.handler.timeout.ReadTimeoutException;
|
import io.netty.handler.timeout.ReadTimeoutException;
|
||||||
import io.netty.handler.timeout.ReadTimeoutHandler;
|
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||||
|
@ -47,7 +46,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.core.Assertions;
|
|
||||||
import org.elasticsearch.core.IOUtils;
|
import org.elasticsearch.core.IOUtils;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.http.AbstractHttpServerTransport;
|
import org.elasticsearch.http.AbstractHttpServerTransport;
|
||||||
|
@ -319,9 +317,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
// auto-read must be disabled all the time
|
|
||||||
ch.config().setAutoRead(false);
|
|
||||||
|
|
||||||
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
|
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
|
||||||
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
|
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
|
||||||
if (acceptChannelPredicate != null) {
|
if (acceptChannelPredicate != null) {
|
||||||
|
@ -369,15 +364,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
}
|
}
|
||||||
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
|
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
|
||||||
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
|
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
|
||||||
|
|
||||||
// from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
|
|
||||||
ch.pipeline().addLast(new FlowControlHandler());
|
|
||||||
if (Assertions.ENABLED) {
|
|
||||||
// missing reads are hard to catch, but we can detect absence of reads within interval
|
|
||||||
long missingReadIntervalMs = 10_000;
|
|
||||||
ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (httpValidator != null) {
|
if (httpValidator != null) {
|
||||||
// runs a validation function on the first HTTP message piece which contains all the headers
|
// runs a validation function on the first HTTP message piece which contains all the headers
|
||||||
// if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded
|
// if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded
|
||||||
|
@ -429,19 +415,12 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
if (ResourceLeakDetector.isEnabled()) {
|
if (ResourceLeakDetector.isEnabled()) {
|
||||||
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
|
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
|
||||||
}
|
}
|
||||||
// See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
|
|
||||||
// can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
|
|
||||||
// resolved we must add another flow controller here:
|
|
||||||
ch.pipeline().addLast(new FlowControlHandler());
|
|
||||||
ch.pipeline()
|
ch.pipeline()
|
||||||
.addLast(
|
.addLast(
|
||||||
"pipelining",
|
"pipelining",
|
||||||
new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker)
|
new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker)
|
||||||
);
|
);
|
||||||
transport.serverAcceptedChannel(nettyHttpChannel);
|
transport.serverAcceptedChannel(nettyHttpChannel);
|
||||||
|
|
||||||
// make very first read call, since auto-read is disabled; following reads must come from the handlers
|
|
||||||
ch.read();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -12,7 +12,6 @@ package org.elasticsearch.http.netty4;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.codec.DecoderResult;
|
|
||||||
import io.netty.handler.codec.http.DefaultHttpContent;
|
import io.netty.handler.codec.http.DefaultHttpContent;
|
||||||
import io.netty.handler.codec.http.DefaultHttpRequest;
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
||||||
|
@ -41,7 +40,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
private static final int REPS = 1000;
|
private static final int REPS = 1000;
|
||||||
private EmbeddedChannel channel;
|
private EmbeddedChannel channel;
|
||||||
private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes
|
private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes
|
||||||
private ReadSniffer readSniffer;
|
|
||||||
|
|
||||||
private static HttpContent httpContent(int size) {
|
private static HttpContent httpContent(int size) {
|
||||||
return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size)));
|
return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size)));
|
||||||
|
@ -70,20 +68,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
var decoder = new HttpRequestDecoder();
|
var decoder = new HttpRequestDecoder();
|
||||||
encoder = new EmbeddedChannel(new HttpRequestEncoder());
|
encoder = new EmbeddedChannel(new HttpRequestEncoder());
|
||||||
readSniffer = new ReadSniffer();
|
channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
|
||||||
channel = new EmbeddedChannel();
|
|
||||||
channel.config().setAutoRead(false);
|
|
||||||
channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testDecodingFailurePassThrough() {
|
|
||||||
for (var i = 0; i < REPS; i++) {
|
|
||||||
var sendReq = httpRequest();
|
|
||||||
sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad")));
|
|
||||||
channel.writeInbound(sendReq);
|
|
||||||
assertEquals(sendReq, channel.readInbound());
|
|
||||||
}
|
|
||||||
assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -100,7 +85,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
assertFalse(HttpUtil.is100ContinueExpected(recvRequest));
|
assertFalse(HttpUtil.is100ContinueExpected(recvRequest));
|
||||||
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
|
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
|
||||||
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
|
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
|
||||||
assertEquals("must not read from channel", 0, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +99,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
assertNotNull("request should pass", channel.readInbound());
|
assertNotNull("request should pass", channel.readInbound());
|
||||||
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
|
channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
|
||||||
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
|
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
|
||||||
assertEquals("must not read from channel", 0, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,7 +121,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
assertNotNull(recvContent);
|
assertNotNull(recvContent);
|
||||||
assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes());
|
assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes());
|
||||||
recvContent.release();
|
recvContent.release();
|
||||||
assertEquals("must not read from channel", 0, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +134,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
channel.writeInbound(encode(sendRequest));
|
channel.writeInbound(encode(sendRequest));
|
||||||
var resp = (FullHttpResponse) channel.readOutbound();
|
var resp = (FullHttpResponse) channel.readOutbound();
|
||||||
assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status());
|
assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status());
|
||||||
assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount);
|
|
||||||
assertFalse(channel.isOpen());
|
assertFalse(channel.isOpen());
|
||||||
resp.release();
|
resp.release();
|
||||||
}
|
}
|
||||||
|
@ -171,7 +152,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
|
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
|
||||||
assertNull("request should not pass", channel.readInbound());
|
assertNull("request should not pass", channel.readInbound());
|
||||||
assertTrue("should not close channel", channel.isOpen());
|
assertTrue("should not close channel", channel.isOpen());
|
||||||
assertEquals("must read from channel", i + 1, readSniffer.readCount);
|
|
||||||
resp.release();
|
resp.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,13 +160,11 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
* Mixed load of oversized and normal requests with Exepct:100-Continue.
|
* Mixed load of oversized and normal requests with Exepct:100-Continue.
|
||||||
*/
|
*/
|
||||||
public void testMixedContent() {
|
public void testMixedContent() {
|
||||||
var expectReadCnt = 0;
|
|
||||||
for (int i = 0; i < REPS; i++) {
|
for (int i = 0; i < REPS; i++) {
|
||||||
var isOversized = randomBoolean();
|
var isOversized = randomBoolean();
|
||||||
var sendRequest = httpRequest();
|
var sendRequest = httpRequest();
|
||||||
HttpUtil.set100ContinueExpected(sendRequest, true);
|
HttpUtil.set100ContinueExpected(sendRequest, true);
|
||||||
if (isOversized) {
|
if (isOversized) {
|
||||||
expectReadCnt++;
|
|
||||||
HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH);
|
HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH);
|
||||||
channel.writeInbound(encode(sendRequest));
|
channel.writeInbound(encode(sendRequest));
|
||||||
var resp = (FullHttpResponse) channel.readOutbound();
|
var resp = (FullHttpResponse) channel.readOutbound();
|
||||||
|
@ -210,7 +188,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes());
|
assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes());
|
||||||
recvContent.release();
|
recvContent.release();
|
||||||
}
|
}
|
||||||
assertEquals(expectReadCnt, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +205,6 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
resp.release();
|
resp.release();
|
||||||
assertNull("request and content should not pass", channel.readInbound());
|
assertNull("request and content should not pass", channel.readInbound());
|
||||||
assertTrue("should not close channel", channel.isOpen());
|
assertTrue("should not close channel", channel.isOpen());
|
||||||
assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,7 +234,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
|
||||||
var resp = (FullHttpResponse) channel.readOutbound();
|
var resp = (FullHttpResponse) channel.readOutbound();
|
||||||
assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
|
assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
|
||||||
assertFalse("should close channel", channel.isOpen());
|
assertFalse("should close channel", channel.isOpen());
|
||||||
assertEquals("expect read after response", 1, readSniffer.readCount);
|
|
||||||
resp.release();
|
resp.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
||||||
import io.netty.handler.codec.http.HttpMethod;
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
import io.netty.handler.codec.http.HttpVersion;
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
import io.netty.handler.flow.FlowControlHandler;
|
|
||||||
|
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
@ -53,8 +52,7 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
channel = new EmbeddedChannel(new FlowControlHandler());
|
channel = new EmbeddedChannel();
|
||||||
channel.config().setAutoRead(false);
|
|
||||||
threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX);
|
threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +181,6 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
|
||||||
threadPool.generic().submit(() -> {
|
threadPool.generic().submit(() -> {
|
||||||
DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
channel.writeInbound(request1);
|
channel.writeInbound(request1);
|
||||||
channel.read();
|
|
||||||
DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null;
|
DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null;
|
||||||
if (content1 != null) {
|
if (content1 != null) {
|
||||||
channel.writeInbound(content1);
|
channel.writeInbound(content1);
|
||||||
|
@ -199,11 +196,9 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
channel.runPendingTasks();
|
channel.runPendingTasks();
|
||||||
assertThat(channel.readInbound(), sameInstance(request1));
|
assertThat(channel.readInbound(), sameInstance(request1));
|
||||||
channel.read();
|
|
||||||
if (content1 != null && success) {
|
if (content1 != null && success) {
|
||||||
assertThat(channel.readInbound(), sameInstance(content1));
|
assertThat(channel.readInbound(), sameInstance(content1));
|
||||||
}
|
}
|
||||||
channel.read();
|
|
||||||
if (success) {
|
if (success) {
|
||||||
assertThat(channel.readInbound(), sameInstance(lastContent1));
|
assertThat(channel.readInbound(), sameInstance(lastContent1));
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,158 +9,766 @@
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
package org.elasticsearch.http.netty4;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.codec.DecoderResult;
|
import io.netty.handler.codec.DecoderResult;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultHttpContent;
|
import io.netty.handler.codec.http.DefaultHttpContent;
|
||||||
import io.netty.handler.codec.http.DefaultHttpRequest;
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
import io.netty.handler.codec.http.DefaultLastHttpContent;
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
|
||||||
import io.netty.handler.codec.http.HttpMethod;
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
|
||||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
|
||||||
import io.netty.handler.codec.http.HttpVersion;
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
import io.netty.handler.flow.FlowControlHandler;
|
import io.netty.util.AsciiString;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchSecurityException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.common.ValidationException;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.http.netty4.internal.HttpValidator;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
|
||||||
|
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
|
|
||||||
public class Netty4HttpHeaderValidatorTests extends ESTestCase {
|
public class Netty4HttpHeaderValidatorTests extends ESTestCase {
|
||||||
|
|
||||||
|
private final AtomicReference<Object> header = new AtomicReference<>();
|
||||||
|
private final AtomicReference<ActionListener<Void>> listener = new AtomicReference<>();
|
||||||
private EmbeddedChannel channel;
|
private EmbeddedChannel channel;
|
||||||
private BlockingQueue<ValidationRequest> validatorRequestQueue;
|
private Netty4HttpHeaderValidator netty4HttpHeaderValidator;
|
||||||
|
private final AtomicReference<RuntimeException> validationException = new AtomicReference<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
validatorRequestQueue = new LinkedBlockingQueue<>();
|
reset();
|
||||||
channel = new EmbeddedChannel(
|
}
|
||||||
new Netty4HttpHeaderValidator(
|
|
||||||
(httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)),
|
private void reset() {
|
||||||
new ThreadContext(Settings.EMPTY)
|
channel = new EmbeddedChannel();
|
||||||
)
|
header.set(null);
|
||||||
);
|
listener.set(null);
|
||||||
|
validationException.set(null);
|
||||||
|
HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> {
|
||||||
|
header.set(httpRequest);
|
||||||
|
final var exception = validationException.get();
|
||||||
|
if (exception != null) {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
|
listener.set(validationCompleteListener);
|
||||||
|
};
|
||||||
|
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
|
||||||
|
channel.pipeline().addLast(netty4HttpHeaderValidator);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidationPausesAndResumesData() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request);
|
||||||
|
channel.writeInbound(content);
|
||||||
|
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
// channel is paused
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
// channel is resumed
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
// channel continues in resumed state after request finishes
|
||||||
|
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent);
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent));
|
||||||
|
assertThat(lastContent.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
// channel is again paused while validating next request
|
||||||
|
channel.writeInbound(request);
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request);
|
||||||
|
channel.writeInbound(content);
|
||||||
|
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
// channel is paused
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
// channel is resumed
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content.refCnt(), equalTo(1));
|
||||||
channel.config().setAutoRead(false);
|
channel.config().setAutoRead(false);
|
||||||
|
|
||||||
|
channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
}
|
}
|
||||||
|
|
||||||
HttpRequest newHttpRequest() {
|
public void testContentForwardedAfterValidation() {
|
||||||
return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "");
|
assertTrue(channel.config().isAutoRead());
|
||||||
}
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
HttpContent newHttpContent() {
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
return new DefaultHttpContent(Unpooled.buffer());
|
channel.writeInbound(request);
|
||||||
}
|
|
||||||
|
|
||||||
LastHttpContent newLastHttpContent() {
|
DefaultHttpContent content1 = null;
|
||||||
return new DefaultLastHttpContent();
|
if (randomBoolean()) {
|
||||||
}
|
content1 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content1);
|
||||||
|
}
|
||||||
|
|
||||||
public void testValidatorReceiveHttpRequest() {
|
assertThat(header.get(), sameInstance(request));
|
||||||
channel.writeInbound(newHttpRequest());
|
// channel is paused
|
||||||
assertEquals(1, validatorRequestQueue.size());
|
assertThat(channel.readInbound(), nullValue());
|
||||||
assertNull(channel.readInbound());
|
assertFalse(channel.config().isAutoRead());
|
||||||
}
|
|
||||||
|
|
||||||
public void testDecoderFailurePassThrough() {
|
// channel is resumed
|
||||||
for (var i = 0; i < 1000; i++) {
|
listener.get().onResponse(null);
|
||||||
var httpRequest = newHttpRequest();
|
channel.runPendingTasks();
|
||||||
httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad")));
|
|
||||||
channel.writeInbound(httpRequest);
|
// resumed channel after successful validation forwards data
|
||||||
assertEquals(httpRequest, channel.readInbound());
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
// write more content to the channel after validation passed
|
||||||
|
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content2);
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content3);
|
||||||
|
if (content1 != null) {
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content1));
|
||||||
|
assertThat(content1.refCnt(), equalTo(1));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content2));
|
||||||
|
assertThat(content2.refCnt(), equalTo(1));
|
||||||
|
DefaultHttpContent content4 = null;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
content4 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content4);
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content3));
|
||||||
|
assertThat(content3.refCnt(), equalTo(1));
|
||||||
|
if (content4 != null) {
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content4));
|
||||||
|
assertThat(content4.refCnt(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// channel continues in resumed state after request finishes
|
||||||
|
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent);
|
||||||
|
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent));
|
||||||
|
assertThat(lastContent.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
channel.writeInbound(request);
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public void testContentDroppedAfterValidationFailure() {
|
||||||
* Sends back-to-back http requests and randomly fail validation.
|
assertTrue(channel.config().isAutoRead());
|
||||||
* Ensures that invalid requests drop content and valid pass through.
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
*/
|
|
||||||
public void testMixedValidationResults() {
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
for (var i = 0; i < 1000; i++) {
|
channel.writeInbound(request);
|
||||||
var shouldPassValidation = randomBoolean();
|
|
||||||
var request = newHttpRequest();
|
DefaultHttpContent content1 = null;
|
||||||
var content = newHttpContent();
|
if (randomBoolean()) {
|
||||||
var last = newLastHttpContent();
|
content1 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content1);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
// channel is paused
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
// channel is resumed
|
||||||
|
listener.get().onFailure(new ElasticsearchException("Boom"));
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// resumed channel after failed validation drops data
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
// write more content to the channel after validation passed
|
||||||
|
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content2);
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content3);
|
||||||
|
if (content1 != null) {
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content1.refCnt(), equalTo(0));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), nullValue()); // content2
|
||||||
|
assertThat(content2.refCnt(), equalTo(0));
|
||||||
|
DefaultHttpContent content4 = null;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
content4 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content4);
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), nullValue()); // content3
|
||||||
|
assertThat(content3.refCnt(), equalTo(0));
|
||||||
|
if (content4 != null) {
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content4.refCnt(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(channel.readInbound(), nullValue()); // extra read still returns "null"
|
||||||
|
|
||||||
|
// channel continues in resumed state after request finishes
|
||||||
|
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent);
|
||||||
|
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), nullValue()); // lastContent
|
||||||
|
assertThat(lastContent.refCnt(), equalTo(0));
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidationErrorForwardsAsDecoderErrorMessage() {
|
||||||
|
for (Exception exception : List.of(
|
||||||
|
new Exception("Failure"),
|
||||||
|
new ElasticsearchException("Failure"),
|
||||||
|
new ElasticsearchSecurityException("Failure")
|
||||||
|
)) {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
|
||||||
channel.writeInbound(request);
|
channel.writeInbound(request);
|
||||||
var validationRequest = validatorRequestQueue.poll();
|
|
||||||
assertNotNull(validationRequest);
|
|
||||||
if (shouldPassValidation) {
|
|
||||||
validationRequest.listener.onResponse(null);
|
|
||||||
} else {
|
|
||||||
validationRequest.listener.onFailure(new ValidationException());
|
|
||||||
}
|
|
||||||
channel.runPendingTasks();
|
|
||||||
|
|
||||||
var gotRequest = channel.readInbound();
|
|
||||||
assertEquals(
|
|
||||||
"should set decoder result failure for invalid request",
|
|
||||||
shouldPassValidation,
|
|
||||||
((HttpRequest) gotRequest).decoderResult().isSuccess()
|
|
||||||
);
|
|
||||||
assertEquals(request, gotRequest);
|
|
||||||
|
|
||||||
channel.writeInbound(content);
|
channel.writeInbound(content);
|
||||||
channel.writeInbound(last);
|
|
||||||
if (shouldPassValidation) {
|
assertThat(header.get(), sameInstance(request));
|
||||||
assertEquals("should pass content for valid request", content, channel.readInbound());
|
assertThat(channel.readInbound(), nullValue());
|
||||||
content.release();
|
assertFalse(channel.config().isAutoRead());
|
||||||
assertEquals(last, channel.readInbound());
|
|
||||||
last.release();
|
listener.get().onFailure(exception);
|
||||||
} else {
|
channel.runPendingTasks();
|
||||||
assertNull("should drop content for invalid request", channel.readInbound());
|
assertTrue(channel.config().isAutoRead());
|
||||||
}
|
DefaultHttpRequest failed = channel.readInbound();
|
||||||
|
assertThat(failed, sameInstance(request));
|
||||||
|
assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
|
||||||
|
assertTrue(failed.decoderResult().isFailure());
|
||||||
|
Exception cause = (Exception) failed.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content.refCnt(), equalTo(0));
|
||||||
|
|
||||||
|
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent);
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(lastContent.refCnt(), equalTo(0));
|
||||||
|
|
||||||
|
reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIgnoreReadWhenValidating() {
|
public void testValidationExceptionForwardsAsDecoderErrorMessage() {
|
||||||
channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages
|
final var exception = new ElasticsearchException("Failure");
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
channel.writeInbound(newHttpRequest());
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler
|
|
||||||
assertNull("nothing should pass yet", channel.readInbound());
|
|
||||||
|
|
||||||
channel.read();
|
validationException.set(exception);
|
||||||
var validationRequest = validatorRequestQueue.poll();
|
channel.writeInbound(request);
|
||||||
assertNotNull(validationRequest);
|
|
||||||
|
|
||||||
channel.read();
|
assertThat(header.get(), sameInstance(request));
|
||||||
assertNull("should ignore read while validating", channel.readInbound());
|
assertThat(listener.get(), nullValue());
|
||||||
|
|
||||||
validationRequest.listener.onResponse(null);
|
|
||||||
channel.runPendingTasks();
|
channel.runPendingTasks();
|
||||||
assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest);
|
assertTrue(channel.config().isAutoRead());
|
||||||
assertNull("content should not pass yet, need explicit read", channel.readInbound());
|
DefaultHttpRequest failed = channel.readInbound();
|
||||||
|
assertThat(failed, sameInstance(request));
|
||||||
|
assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
|
||||||
|
assertTrue(failed.decoderResult().isFailure());
|
||||||
|
Exception cause = (Exception) failed.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
|
||||||
channel.read();
|
final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
asInstanceOf(LastHttpContent.class, channel.readInbound()).release();
|
channel.writeInbound(content);
|
||||||
|
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(content.refCnt(), equalTo(0));
|
||||||
|
|
||||||
|
DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent);
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(lastContent.refCnt(), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWithFlowControlAndAggregator() {
|
public void testValidationHandlesMultipleQueuedUpMessages() {
|
||||||
channel.pipeline().addFirst(new FlowControlHandler());
|
assertTrue(channel.config().isAutoRead());
|
||||||
channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder()));
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
channel.writeInbound(newHttpRequest());
|
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
channel.writeInbound(newHttpContent());
|
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
channel.writeInbound(newLastHttpContent());
|
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request1);
|
||||||
|
channel.writeInbound(content1);
|
||||||
|
channel.writeInbound(lastContent1);
|
||||||
|
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request2);
|
||||||
|
channel.writeInbound(content2);
|
||||||
|
channel.writeInbound(lastContent2);
|
||||||
|
|
||||||
channel.read();
|
assertThat(header.get(), sameInstance(request1));
|
||||||
assertNull("should ignore read while validating", channel.readInbound());
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
|
||||||
var validationRequest = validatorRequestQueue.poll();
|
listener.get().onResponse(null);
|
||||||
assertNotNull(validationRequest);
|
|
||||||
validationRequest.listener.onResponse(null);
|
|
||||||
channel.runPendingTasks();
|
channel.runPendingTasks();
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request1));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content1));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent1));
|
||||||
|
assertThat(content1.refCnt(), equalTo(1));
|
||||||
|
assertThat(lastContent1.refCnt(), equalTo(1));
|
||||||
|
|
||||||
asInstanceOf(FullHttpRequest.class, channel.readInbound()).release();
|
assertThat(header.get(), sameInstance(request2));
|
||||||
|
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request2));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content2));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent2));
|
||||||
|
assertThat(content2.refCnt(), equalTo(1));
|
||||||
|
assertThat(lastContent2.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
record ValidationRequest(HttpRequest request, Channel channel, ActionListener<Void> listener) {}
|
public void testValidationFailureRecoversForEnqueued() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
// write 2 requests before validation for the first one fails
|
||||||
|
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request1);
|
||||||
|
channel.writeInbound(content1);
|
||||||
|
channel.writeInbound(lastContent1);
|
||||||
|
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request2);
|
||||||
|
channel.writeInbound(content2);
|
||||||
|
|
||||||
|
boolean finishSecondRequest = randomBoolean();
|
||||||
|
if (finishSecondRequest) {
|
||||||
|
channel.writeInbound(lastContent2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// channel is paused and both requests are queued
|
||||||
|
assertThat(header.get(), sameInstance(request1));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
assertThat(content1.refCnt(), equalTo(2));
|
||||||
|
assertThat(lastContent1.refCnt(), equalTo(2));
|
||||||
|
assertThat(content2.refCnt(), equalTo(2));
|
||||||
|
if (finishSecondRequest) {
|
||||||
|
assertThat(lastContent2.refCnt(), equalTo(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
// validation for the 1st request FAILS
|
||||||
|
Exception exception = new ElasticsearchException("Boom");
|
||||||
|
listener.get().onFailure(exception);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// request1 becomes a decoder exception and its content is dropped
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request1));
|
||||||
|
assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
|
||||||
|
assertTrue(request1.decoderResult().isFailure());
|
||||||
|
Exception cause = (Exception) request1.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
assertThat(content1.refCnt(), equalTo(0)); // content is dropped
|
||||||
|
assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
|
||||||
|
// channel pauses for the validation of the 2nd request
|
||||||
|
assertThat(header.get(), sameInstance(request2));
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
|
||||||
|
// validation for the 2nd request SUCCEEDS
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// 2nd request is forwarded correctly
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request2));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content2));
|
||||||
|
assertThat(content2.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
if (finishSecondRequest == false) {
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
// while in forwarding state the request can continue
|
||||||
|
if (randomBoolean()) {
|
||||||
|
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content);
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content));
|
||||||
|
assertThat(content.refCnt(), equalTo(1));
|
||||||
|
}
|
||||||
|
channel.writeInbound(lastContent2);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent2));
|
||||||
|
assertThat(lastContent2.refCnt(), equalTo(1));
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidationFailureRecoversForInbound() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
// write a single request, but don't finish it yet, for which the validation fails
|
||||||
|
final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request1);
|
||||||
|
channel.writeInbound(content1);
|
||||||
|
|
||||||
|
// channel is paused and the request is queued
|
||||||
|
assertThat(header.get(), sameInstance(request1));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
assertThat(content1.refCnt(), equalTo(2));
|
||||||
|
|
||||||
|
// validation for the 1st request FAILS
|
||||||
|
Exception exception = new ElasticsearchException("Boom");
|
||||||
|
listener.get().onFailure(exception);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// request1 becomes a decoder exception and its content is dropped
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request1));
|
||||||
|
assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
|
||||||
|
assertTrue(request1.decoderResult().isFailure());
|
||||||
|
Exception cause = (Exception) request1.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
assertThat(content1.refCnt(), equalTo(0)); // content is dropped
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
|
||||||
|
}
|
||||||
|
DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastContent1);
|
||||||
|
if (randomBoolean()) {
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
}
|
||||||
|
assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
|
||||||
|
|
||||||
|
// write 2nd request after the 1st one failed validation
|
||||||
|
final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(request2);
|
||||||
|
channel.writeInbound(content2);
|
||||||
|
boolean finishSecondRequest = randomBoolean();
|
||||||
|
if (finishSecondRequest) {
|
||||||
|
channel.writeInbound(lastContent2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// channel pauses for the validation of the 2nd request
|
||||||
|
assertThat(header.get(), sameInstance(request2));
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
|
||||||
|
// validation for the 2nd request SUCCEEDS
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// 2nd request is forwarded correctly
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request2));
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content2));
|
||||||
|
assertThat(content2.refCnt(), equalTo(1));
|
||||||
|
|
||||||
|
if (finishSecondRequest == false) {
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
// while in forwarding state the request can continue
|
||||||
|
if (randomBoolean()) {
|
||||||
|
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(content);
|
||||||
|
assertThat(channel.readInbound(), sameInstance(content));
|
||||||
|
assertThat(content.refCnt(), equalTo(1));
|
||||||
|
}
|
||||||
|
channel.writeInbound(lastContent2);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(channel.readInbound(), sameInstance(lastContent2));
|
||||||
|
assertThat(lastContent2.refCnt(), equalTo(1));
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidationSuccessForLargeMessage() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
channel.writeInbound(request);
|
||||||
|
|
||||||
|
int messageLength = randomIntBetween(32, 128);
|
||||||
|
for (int i = 0; i < messageLength; ++i) {
|
||||||
|
channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
|
||||||
|
}
|
||||||
|
channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4)));
|
||||||
|
boolean followupRequest = randomBoolean();
|
||||||
|
if (followupRequest) {
|
||||||
|
channel.writeInbound(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
if (followupRequest) {
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
} else {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
for (int i = 0; i < messageLength; ++i) {
|
||||||
|
Object content = channel.readInbound();
|
||||||
|
assertThat(content, instanceOf(DefaultHttpContent.class));
|
||||||
|
assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), instanceOf(LastHttpContent.class));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidationFailureForLargeMessage() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||||
|
channel.writeInbound(request);
|
||||||
|
|
||||||
|
int messageLength = randomIntBetween(32, 128);
|
||||||
|
DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength];
|
||||||
|
for (int i = 0; i < messageLength; ++i) {
|
||||||
|
messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(messageContents[i]);
|
||||||
|
}
|
||||||
|
DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4));
|
||||||
|
channel.writeInbound(lastHttpContent);
|
||||||
|
boolean followupRequest = randomBoolean();
|
||||||
|
if (followupRequest) {
|
||||||
|
channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
Exception exception = new ElasticsearchException("Boom");
|
||||||
|
listener.get().onFailure(exception);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
if (followupRequest) {
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||||
|
} else {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), sameInstance(request));
|
||||||
|
assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue());
|
||||||
|
assertTrue(request.decoderResult().isFailure());
|
||||||
|
Exception cause = (Exception) request.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
for (int i = 0; i < messageLength; ++i) {
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(messageContents[i].refCnt(), equalTo(0));
|
||||||
|
}
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertThat(lastHttpContent.refCnt(), equalTo(0));
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFullRequestValidationFailure() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
ByteBuf buf = channel.alloc().buffer();
|
||||||
|
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
|
||||||
|
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
|
||||||
|
channel.writeInbound(request);
|
||||||
|
|
||||||
|
// request got through to validation
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
// channel is paused
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
// validation fails
|
||||||
|
Exception exception = new ElasticsearchException("Boom");
|
||||||
|
listener.get().onFailure(exception);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// channel is resumed and waiting for next request
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
DefaultFullHttpRequest throughRequest = channel.readInbound();
|
||||||
|
// "through request" contains a decoder exception
|
||||||
|
assertThat(throughRequest, not(sameInstance(request)));
|
||||||
|
assertTrue(throughRequest.decoderResult().isFailure());
|
||||||
|
// the content is cleared when validation fails
|
||||||
|
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is(""));
|
||||||
|
assertThat(buf.refCnt(), is(0));
|
||||||
|
Exception cause = (Exception) throughRequest.decoderResult().cause();
|
||||||
|
assertThat(cause, equalTo(exception));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFullRequestValidationSuccess() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
ByteBuf buf = channel.alloc().buffer();
|
||||||
|
try {
|
||||||
|
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
|
||||||
|
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
|
||||||
|
channel.writeInbound(request);
|
||||||
|
|
||||||
|
// request got through to validation
|
||||||
|
assertThat(header.get(), sameInstance(request));
|
||||||
|
// channel is paused
|
||||||
|
assertThat(channel.readInbound(), nullValue());
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
// validation succeeds
|
||||||
|
listener.get().onResponse(null);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
|
||||||
|
// channel is resumed and waiting for next request
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
DefaultFullHttpRequest throughRequest = channel.readInbound();
|
||||||
|
// request goes through unaltered
|
||||||
|
assertThat(throughRequest, sameInstance(request));
|
||||||
|
assertFalse(throughRequest.decoderResult().isFailure());
|
||||||
|
// the content is unaltered
|
||||||
|
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
|
||||||
|
assertThat(buf.refCnt(), is(1));
|
||||||
|
assertThat(throughRequest.decoderResult().cause(), nullValue());
|
||||||
|
} finally {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFullRequestWithDecoderException() {
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
ByteBuf buf = channel.alloc().buffer();
|
||||||
|
try {
|
||||||
|
ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
|
||||||
|
// a request with a decoder error prior to validation
|
||||||
|
final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
|
||||||
|
Exception cause = new ElasticsearchException("Boom");
|
||||||
|
request.setDecoderResult(DecoderResult.failure(cause));
|
||||||
|
channel.writeInbound(request);
|
||||||
|
|
||||||
|
// request goes through without invoking the validator
|
||||||
|
assertThat(header.get(), nullValue());
|
||||||
|
assertThat(listener.get(), nullValue());
|
||||||
|
// channel is NOT paused
|
||||||
|
assertTrue(channel.config().isAutoRead());
|
||||||
|
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||||
|
|
||||||
|
DefaultFullHttpRequest throughRequest = channel.readInbound();
|
||||||
|
// request goes through unaltered
|
||||||
|
assertThat(throughRequest, sameInstance(request));
|
||||||
|
assertTrue(throughRequest.decoderResult().isFailure());
|
||||||
|
assertThat(throughRequest.decoderResult().cause(), equalTo(cause));
|
||||||
|
// the content is unaltered
|
||||||
|
assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
|
||||||
|
assertThat(buf.refCnt(), is(1));
|
||||||
|
} finally {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.flow.FlowControlHandler;
|
import io.netty.handler.flow.FlowControlHandler;
|
||||||
|
|
||||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||||
|
import org.elasticsearch.common.network.ThreadWatchdog;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.http.HttpBody;
|
import org.elasticsearch.http.HttpBody;
|
||||||
|
@ -42,24 +43,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
|
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
|
||||||
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
private EmbeddedChannel channel;
|
private EmbeddedChannel channel;
|
||||||
private ReadSniffer readSniffer;
|
|
||||||
private Netty4HttpRequestBodyStream stream;
|
private Netty4HttpRequestBodyStream stream;
|
||||||
|
private ThreadWatchdog.ActivityTracker activityTracker;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
channel = new EmbeddedChannel();
|
channel = new EmbeddedChannel();
|
||||||
readSniffer = new ReadSniffer();
|
activityTracker = new ThreadWatchdog.ActivityTracker();
|
||||||
channel.pipeline().addLast(new FlowControlHandler(), readSniffer);
|
stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker);
|
||||||
channel.config().setAutoRead(false);
|
stream.setHandler(discardHandler); // set default handler, each test might override one
|
||||||
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
|
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
|
||||||
@Override
|
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
|
|
||||||
stream.setHandler(discardHandler); // set default handler, each test might override one
|
|
||||||
super.handlerAdded(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
|
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
|
||||||
stream.handleNettyContent(msg);
|
stream.handleNettyContent(msg);
|
||||||
|
@ -73,8 +67,17 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
stream.close();
|
stream.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures all chunks are passed to downstream
|
// ensures that no chunks are sent downstream without request
|
||||||
public void testPassAllChunks() {
|
public void testEnqueueChunksBeforeRequest() {
|
||||||
|
var totalChunks = randomIntBetween(1, 100);
|
||||||
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
|
channel.writeInbound(randomContent(1024));
|
||||||
|
}
|
||||||
|
assertEquals(totalChunks * 1024, stream.bufSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensures all received chunks can be flushed downstream
|
||||||
|
public void testFlushAllReceivedChunks() {
|
||||||
var chunks = new ArrayList<ReleasableBytesReference>();
|
var chunks = new ArrayList<ReleasableBytesReference>();
|
||||||
var totalBytes = new AtomicInteger();
|
var totalBytes = new AtomicInteger();
|
||||||
stream.setHandler((chunk, isLast) -> {
|
stream.setHandler((chunk, isLast) -> {
|
||||||
|
@ -82,35 +85,52 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
totalBytes.addAndGet(chunk.length());
|
totalBytes.addAndGet(chunk.length());
|
||||||
chunk.close();
|
chunk.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
var chunkSize = 1024;
|
var chunkSize = 1024;
|
||||||
var totalChunks = randomIntBetween(1, 100);
|
var totalChunks = randomIntBetween(1, 100);
|
||||||
for (int i = 0; i < totalChunks; i++) {
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
channel.writeInbound(randomContent(chunkSize));
|
channel.writeInbound(randomContent(chunkSize));
|
||||||
stream.next();
|
|
||||||
channel.runPendingTasks();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
assertEquals(totalChunks, chunks.size());
|
stream.next();
|
||||||
|
channel.runPendingTasks();
|
||||||
|
assertEquals("should receive all chunks as single composite", 1, chunks.size());
|
||||||
assertEquals(chunkSize * totalChunks, totalBytes.get());
|
assertEquals(chunkSize * totalChunks, totalBytes.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures that we read from channel after last chunk
|
// ensures that channel.setAutoRead(true) only when we flush last chunk
|
||||||
public void testChannelReadAfterLastContent() {
|
public void testSetAutoReadOnLastFlush() {
|
||||||
channel.writeInbound(randomLastContent(10));
|
channel.writeInbound(randomLastContent(10));
|
||||||
|
assertFalse("should not auto-read on last content reception", channel.config().isAutoRead());
|
||||||
stream.next();
|
stream.next();
|
||||||
channel.runPendingTasks();
|
channel.runPendingTasks();
|
||||||
assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount);
|
assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures when stream is closing we read and discard chunks
|
// ensures that we read from channel when no current chunks available
|
||||||
public void testReadAndReleaseOnClosing() {
|
// and pass next chunk downstream without holding
|
||||||
var unexpectedChunk = new AtomicBoolean();
|
public void testReadFromChannel() {
|
||||||
stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true));
|
var gotChunks = new ArrayList<ReleasableBytesReference>();
|
||||||
stream.close();
|
var gotLast = new AtomicBoolean(false);
|
||||||
channel.writeInbound(randomContent(1024));
|
stream.setHandler((chunk, isLast) -> {
|
||||||
channel.writeInbound(randomLastContent(0));
|
gotChunks.add(chunk);
|
||||||
assertFalse("chunk should be discarded", unexpectedChunk.get());
|
gotLast.set(isLast);
|
||||||
assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount);
|
chunk.close();
|
||||||
|
});
|
||||||
|
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
|
||||||
|
var chunkSize = 1024;
|
||||||
|
var totalChunks = randomIntBetween(1, 32);
|
||||||
|
for (int i = 0; i < totalChunks - 1; i++) {
|
||||||
|
channel.writeInbound(randomContent(chunkSize));
|
||||||
|
}
|
||||||
|
channel.writeInbound(randomLastContent(chunkSize));
|
||||||
|
|
||||||
|
for (int i = 0; i < totalChunks; i++) {
|
||||||
|
assertEquals("should not enqueue chunks", 0, stream.bufSize());
|
||||||
|
stream.next();
|
||||||
|
channel.runPendingTasks();
|
||||||
|
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
|
||||||
|
}
|
||||||
|
assertTrue("should receive last content", gotLast.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadFromHasCorrectThreadContext() throws InterruptedException {
|
public void testReadFromHasCorrectThreadContext() throws InterruptedException {
|
||||||
|
@ -122,15 +142,9 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
// activity tracker requires stream execution in the same thread, setting up stream inside event-loop
|
// activity tracker requires stream execution in the same thread, setting up stream inside event-loop
|
||||||
eventLoop.submit(() -> {
|
eventLoop.submit(() -> {
|
||||||
channel = new EmbeddedChannel(new FlowControlHandler());
|
channel = new EmbeddedChannel();
|
||||||
channel.config().setAutoRead(false);
|
stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker());
|
||||||
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
|
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
|
||||||
@Override
|
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
|
|
||||||
super.handlerAdded(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
|
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
|
||||||
stream.handleNettyContent(msg);
|
stream.handleNettyContent(msg);
|
||||||
|
@ -184,6 +198,18 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testStreamNextActivityTracker() {
|
||||||
|
var t0 = activityTracker.get();
|
||||||
|
var N = between(1, 10);
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
channel.writeInbound(randomContent(1024));
|
||||||
|
stream.next();
|
||||||
|
channel.runPendingTasks();
|
||||||
|
}
|
||||||
|
var t1 = activityTracker.get();
|
||||||
|
assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L);
|
||||||
|
}
|
||||||
|
|
||||||
// ensure that we catch all exceptions and throw them into channel pipeline
|
// ensure that we catch all exceptions and throw them into channel pipeline
|
||||||
public void testCatchExceptions() {
|
public void testCatchExceptions() {
|
||||||
var gotExceptions = new CountDownLatch(3); // number of tests below
|
var gotExceptions = new CountDownLatch(3); // number of tests below
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the "Elastic License
|
|
||||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
|
||||||
* Public License v 1"; you may not use this file except in compliance with, at
|
|
||||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
|
||||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sniffs channel reads, helps detect missing or unexpected ones.
|
|
||||||
* <pre>
|
|
||||||
* {@code
|
|
||||||
* chan = new EmbeddedChannel();
|
|
||||||
* chan.config().setAutoRead(false);
|
|
||||||
* readSniffer = new ReadSniffer();
|
|
||||||
* chan.pipeline().addLast(readSniffer, ...otherHandlers);
|
|
||||||
* ...
|
|
||||||
* // run test
|
|
||||||
* ...
|
|
||||||
* assertEquals("unexpected read", 0, readSniffer.readCnt)
|
|
||||||
* // or
|
|
||||||
* assertEquals("exact number of reads", 2, readSniffer.readCnt)
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class ReadSniffer extends ChannelOutboundHandlerAdapter {
|
|
||||||
|
|
||||||
int readCount;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
readCount++;
|
|
||||||
super.read(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -438,12 +438,6 @@ tests:
|
||||||
- class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2EnrichUnavailableRemotesIT
|
- class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2EnrichUnavailableRemotesIT
|
||||||
method: testEsqlEnrichWithSkipUnavailable
|
method: testEsqlEnrichWithSkipUnavailable
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/127368
|
issue: https://github.com/elastic/elasticsearch/issues/127368
|
||||||
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
|
|
||||||
method: testHttpClientStats
|
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/127391
|
|
||||||
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
|
|
||||||
method: testBulkIndexingRequestSplitting
|
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/127392
|
|
||||||
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
|
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
|
||||||
method: testLookupExplosionNoFetch
|
method: testLookupExplosionNoFetch
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/127365
|
issue: https://github.com/elastic/elasticsearch/issues/127365
|
||||||
|
|
|
@ -251,7 +251,7 @@ public class SecurityNetty4HttpServerTransportCloseNotifyTests extends AbstractH
|
||||||
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
|
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
|
||||||
server.netty.stop();
|
server.netty.stop();
|
||||||
server.threadPool.shutdownNow();
|
server.threadPool.shutdownNow();
|
||||||
safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
|
safeAwait(client.netty.config().group().shutdownGracefully());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue