Reduce autoread changes in header validator (#112608)

The header validator is very aggressive about adjusting autoread on the
belief it is the only place where autoread is tweaked. However, with
stream backpressure, we should only change it when we are starting or
finishing header validation.
This commit is contained in:
Tim Brooks 2024-09-06 12:34:50 -06:00
parent 95b42a7129
commit ce2d648d8e
2 changed files with 38 additions and 9 deletions

View file

@ -61,6 +61,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
@ -77,14 +78,14 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break;
}
setAutoReadForState(ctx, state);
}
private void requestStart(ChannelHandlerContext ctx) {
@ -105,6 +106,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
}
state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);
if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
@ -150,6 +152,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;
ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending);
assert fullRequestForwarded || pending.isEmpty();
@ -161,7 +164,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
}
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
@ -177,6 +179,8 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));
ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward);
assert fullRequestDropped || pending.isEmpty();
@ -188,7 +192,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
}
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}
@Override
@ -244,10 +247,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
}
}
private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}
enum State {
WAITING_TO_START,
QUEUEING_DATA,

View file

@ -117,6 +117,36 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase {
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
}
public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(request);
channel.writeInbound(content);
assertThat(header.get(), sameInstance(request));
// channel is paused
assertThat(channel.readInbound(), nullValue());
assertFalse(channel.config().isAutoRead());
// channel is resumed
listener.get().onResponse(null);
channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
assertThat(channel.readInbound(), sameInstance(request));
assertThat(channel.readInbound(), sameInstance(content));
assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(1));
channel.config().setAutoRead(false);
channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
assertFalse(channel.config().isAutoRead());
}
public void testContentForwardedAfterValidation() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));