Reduce autoread changes in header validator (#112608)

The header validator is very aggressive about adjusting autoread on the
belief it is the only place where autoread is tweaked. However, with
stream backpressure, we should only change it when we are starting or
finishing header validation.
This commit is contained in:
Tim Brooks 2024-09-06 12:34:50 -06:00
parent 95b42a7129
commit ce2d648d8e
2 changed files with 38 additions and 9 deletions

View file

@ -61,6 +61,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
pending.add(ReferenceCountUtil.retain(httpObject)); pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx); requestStart(ctx);
assert state == QUEUEING_DATA; assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break; break;
case QUEUEING_DATA: case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject)); pending.add(ReferenceCountUtil.retain(httpObject));
@ -77,14 +78,14 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
if (httpObject instanceof LastHttpContent) { if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START; state = WAITING_TO_START;
} }
// fall-through ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY: case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty(); assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break; break;
} }
setAutoReadForState(ctx, state);
} }
private void requestStart(ChannelHandlerContext ctx) { private void requestStart(ChannelHandlerContext ctx) {
@ -105,6 +106,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
} }
state = QUEUEING_DATA; state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);
if (httpRequest == null) { if (httpRequest == null) {
// this looks like a malformed request and will forward without validation // this looks like a malformed request and will forward without validation
@ -150,6 +152,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
assert ctx.channel().config().isAutoRead() == false; assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA; assert state == QUEUEING_DATA;
ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending); boolean fullRequestForwarded = forwardData(ctx, pending);
assert fullRequestForwarded || pending.isEmpty(); assert fullRequestForwarded || pending.isEmpty();
@ -161,7 +164,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
} }
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST; assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
} }
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) { private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
@ -177,6 +179,8 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER); messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
} }
messageToForward.setDecoderResult(DecoderResult.failure(e)); messageToForward.setDecoderResult(DecoderResult.failure(e));
ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward); ctx.fireChannelRead(messageToForward);
assert fullRequestDropped || pending.isEmpty(); assert fullRequestDropped || pending.isEmpty();
@ -188,7 +192,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
} }
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST; assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
} }
@Override @Override
@ -244,10 +247,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
} }
} }
private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}
enum State { enum State {
WAITING_TO_START, WAITING_TO_START,
QUEUEING_DATA, QUEUEING_DATA,

View file

@ -117,6 +117,36 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase {
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA)); assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
} }
public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
channel.writeInbound(content);
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), sameInstance(request));
assertThat(channel.readInbound(), sameInstance(content));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(1));
channel.config().setAutoRead(false);
channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
assertFalse(channel.config().isAutoRead());
}
public void testContentForwardedAfterValidation() { public void testContentForwardedAfterValidation() {
assertTrue(channel.config().isAutoRead()); assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));