mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-21 13:47:25 -04:00
Reduce autoread changes in header validator (#112608)
The header validator is very aggressive about adjusting autoread on the belief it is the only place where autoread is tweaked. However, with stream backpressure, we should only change it when we are starting or finishing header validation.
This commit is contained in:
parent
95b42a7129
commit
ce2d648d8e
2 changed files with 38 additions and 9 deletions
|
@ -61,6 +61,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
pending.add(ReferenceCountUtil.retain(httpObject));
|
||||
requestStart(ctx);
|
||||
assert state == QUEUEING_DATA;
|
||||
assert ctx.channel().config().isAutoRead() == false;
|
||||
break;
|
||||
case QUEUEING_DATA:
|
||||
pending.add(ReferenceCountUtil.retain(httpObject));
|
||||
|
@ -77,14 +78,14 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
if (httpObject instanceof LastHttpContent) {
|
||||
state = WAITING_TO_START;
|
||||
}
|
||||
// fall-through
|
||||
ReferenceCountUtil.release(httpObject);
|
||||
break;
|
||||
case DROPPING_DATA_PERMANENTLY:
|
||||
assert pending.isEmpty();
|
||||
ReferenceCountUtil.release(httpObject); // consume without enqueuing
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
break;
|
||||
}
|
||||
|
||||
setAutoReadForState(ctx, state);
|
||||
}
|
||||
|
||||
private void requestStart(ChannelHandlerContext ctx) {
|
||||
|
@ -105,6 +106,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
state = QUEUEING_DATA;
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
|
||||
if (httpRequest == null) {
|
||||
// this looks like a malformed request and will forward without validation
|
||||
|
@ -150,6 +152,7 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
assert ctx.channel().config().isAutoRead() == false;
|
||||
assert state == QUEUEING_DATA;
|
||||
|
||||
ctx.channel().config().setAutoRead(true);
|
||||
boolean fullRequestForwarded = forwardData(ctx, pending);
|
||||
|
||||
assert fullRequestForwarded || pending.isEmpty();
|
||||
|
@ -161,7 +164,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
|
||||
setAutoReadForState(ctx, state);
|
||||
}
|
||||
|
||||
private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
|
||||
|
@ -177,6 +179,8 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
|
||||
}
|
||||
messageToForward.setDecoderResult(DecoderResult.failure(e));
|
||||
|
||||
ctx.channel().config().setAutoRead(true);
|
||||
ctx.fireChannelRead(messageToForward);
|
||||
|
||||
assert fullRequestDropped || pending.isEmpty();
|
||||
|
@ -188,7 +192,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
|
||||
setAutoReadForState(ctx, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -244,10 +247,6 @@ public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
|
||||
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
|
||||
}
|
||||
|
||||
enum State {
|
||||
WAITING_TO_START,
|
||||
QUEUEING_DATA,
|
||||
|
|
|
@ -117,6 +117,36 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase {
|
|||
assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
|
||||
}
|
||||
|
||||
public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
|
||||
assertTrue(channel.config().isAutoRead());
|
||||
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||
|
||||
final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
|
||||
DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
|
||||
channel.writeInbound(request);
|
||||
channel.writeInbound(content);
|
||||
|
||||
assertThat(header.get(), sameInstance(request));
|
||||
// channel is paused
|
||||
assertThat(channel.readInbound(), nullValue());
|
||||
assertFalse(channel.config().isAutoRead());
|
||||
|
||||
// channel is resumed
|
||||
listener.get().onResponse(null);
|
||||
channel.runPendingTasks();
|
||||
|
||||
assertTrue(channel.config().isAutoRead());
|
||||
assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
|
||||
assertThat(channel.readInbound(), sameInstance(request));
|
||||
assertThat(channel.readInbound(), sameInstance(content));
|
||||
assertThat(channel.readInbound(), nullValue());
|
||||
assertThat(content.refCnt(), equalTo(1));
|
||||
channel.config().setAutoRead(false);
|
||||
|
||||
channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
|
||||
assertFalse(channel.config().isAutoRead());
|
||||
}
|
||||
|
||||
public void testContentForwardedAfterValidation() {
|
||||
assertTrue(channel.config().isAutoRead());
|
||||
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue