merge main

This commit is contained in:
Mikhail Berezovskiy 2024-09-12 10:13:52 -07:00 committed by Tim Brooks
parent 58e3a39392
commit dce8a0bfd3
7 changed files with 213 additions and 27 deletions

View file

@ -37,6 +37,7 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.logging.log4j.Level;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.node.NodeClient;
@ -52,6 +53,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpBodyTracer;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
@ -66,6 +68,8 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.util.Collection;
@ -75,6 +79,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -210,10 +215,12 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);
var payloadSize = MBytes(50);
var payloadSize = maxContentLength();
var totalParts = 10;
var partSize = payloadSize / totalParts;
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
for (int i = 0; i < 5; i++) {
ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false));
for (int i = 0; i < totalParts; i++) {
ctx.clientChannel.writeAndFlush(randomContent(partSize, false));
}
assertFalse(
"should not flush last content immediately",
@ -222,16 +229,15 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
var handler = ctx.awaitRestChannelAccepted(opaqueId);
// Read buffers for socket and channel usually within few MBytes range all together.
// This test assumes that buffers will not exceed 10 MBytes, in other words there should
// be less than 10 MBytes in fly between http client's socket and rest handler. This
// loop ensures that reading 10 MBytes of content on server side should free almost
// same size in client's channel write buffer.
for (int mb = 0; mb <= 50; mb += 10) {
var minBufSize = payloadSize - MBytes(10 + mb);
var maxBufSize = payloadSize - MBytes(mb);
// some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb
var osBufferOffset = MBytes(10);
// incrementally read data on server side and ensure client side buffer drains accordingly
for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) {
var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0);
var maxBufSize = Math.max(payloadSize - readBytes, 0);
// it is hard to tell that client's channel is no logger flushing data
// it might take a few busy-iterations before channel buffer flush to kernel
// it might take a few busy-iterations before channel buffer flush to OS
// and bytesBeforeWritable will stop changing
assertBusy(() -> {
var bufSize = ctx.clientChannel.bytesBeforeWritable();
@ -240,7 +246,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
bufSize >= minBufSize && bufSize <= maxBufSize
);
});
handler.readBytes(MBytes(10));
handler.readBytes(partSize);
}
assertTrue(handler.stream.hasLast());
}
@ -351,6 +357,107 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
}
}
private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}
/**
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
*/
public void testHttpClientStats() throws Exception {
try (var ctx = setupClientCtx()) {
// need to offset starting point, since we reuse cluster and other tests already sent some data
var totalBytesSent = transportStatsRequestBytesSize(ctx);
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
totalBytesSent += contentSize;
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
var handler = ctx.awaitRestChannelAccepted(id);
handler.readAllBytes();
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx));
}
}
}
/**
* ensures that we log parts of http body and final line
*/
@TestLogging(
reason = "testing TRACE logging",
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
)
public void testHttpBodyLogging() throws Exception {
assertHttpBodyLogging((ctx) -> () -> {
try {
var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024));
ctx.clientChannel.writeAndFlush(req);
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
handler.readAllBytes();
} catch (Exception e) {
fail(e);
}
});
}
/**
* ensures that we log some parts of body and final line when connection is closed in the middle
*/
@TestLogging(
reason = "testing TRACE logging",
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
)
public void testHttpBodyLoggingChannelClose() throws Exception {
assertHttpBodyLogging((ctx) -> () -> {
try {
var req = httpRequest(opaqueId(0), 2 * 8192);
var halfContent = randomContent(8192, false);
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(halfContent);
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
handler.readBytes(8192);
ctx.clientChannel.close();
handler.stream.next();
assertBusy(() -> assertTrue(handler.streamClosed));
} catch (Exception e) {
fail(e);
}
});
}
// asserts that we emit at least one logging event for a part and last line
// http body should be large enough to split across multiple lines, > 4kb
private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exception {
try (var ctx = setupClientCtx()) {
MockLog.assertThatLogger(
test.apply(ctx),
HttpBodyTracer.class,
new MockLog.SeenEventExpectation(
"request part",
HttpBodyTracer.class.getCanonicalName(),
Level.TRACE,
"* request body [part *]*"
),
new MockLog.SeenEventExpectation(
"request end",
HttpBodyTracer.class.getCanonicalName(),
Level.TRACE,
"* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see "
+ "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)"
)
);
}
}
private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}
@ -403,6 +510,10 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
}
}
static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}
Ctx setupClientCtx() throws Exception {
var nodeName = internalCluster().getRandomNodeName();
var clientRespQueue = new LinkedBlockingDeque<>(16);

View file

@ -16,9 +16,13 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.util.ArrayList;
import java.util.List;
/**
* Netty based implementation of {@link HttpBody.Stream}.
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
@ -29,6 +33,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final Channel channel;
private final ChannelFutureListener closeListener = future -> doClose();
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private ByteBuf buf;
private boolean hasLast = false;
private boolean requested = false;
@ -52,6 +57,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
this.handler = chunkHandler;
}
@Override
public void addTracingHandler(ChunkHandler chunkHandler) {
assert tracingHandlers.contains(chunkHandler) == false;
tracingHandlers.add(chunkHandler);
}
@Override
public void next() {
assert closing == false : "cannot request next chunk on closing stream";
@ -119,6 +130,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
requested = false;
buf = null;
for (var tracer : tracingHandlers) {
tracer.onNext(bytesRef, hasLast);
}
handler.onNext(bytesRef, hasLast);
}
@ -133,6 +147,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private void doClose() {
closing = true;
for (var tracer : tracingHandlers) {
Releasables.closeExpectNoException(tracer);
}
if (handler != null) {
handler.close();
}

View file

@ -1,9 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.http;
@ -88,9 +89,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build())
.get();
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false));
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));
@ -98,9 +97,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
sendLargeBulk();
} finally {
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build())
.get();
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null));
}
}

View file

@ -73,6 +73,13 @@ public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpB
@Nullable
ChunkHandler handler();
/**
* Adds tracing chunk handler. Tracing handler will be invoked before main handler, and
* should never release or call for next chunk. It should be used for monitoring and
* logging purposes.
*/
void addTracingHandler(ChunkHandler chunkHandler);
/**
* Sets handler that can handle next chunk
*/

View file

@ -229,6 +229,8 @@ public class HttpClientStatsTracker {
requestCount += 1;
if (httpRequest.body().isFull()) {
requestSizeBytes += httpRequest.body().asFull().bytes().length();
} else {
httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length());
}
}

View file

@ -12,6 +12,7 @@ package org.elasticsearch.http;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
@ -21,6 +22,7 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@ -78,10 +80,10 @@ class HttpTracer {
e
);
if (isBodyTracerEnabled()) {
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
restRequest.content().writeTo(stream);
} catch (Exception e2) {
assert false : e2; // no real IO here
if (restRequest.isFullContent()) {
logFullContent(restRequest);
} else {
logStreamContent(restRequest);
}
}
@ -90,6 +92,53 @@ class HttpTracer {
return null;
}
private void logFullContent(RestRequest restRequest) {
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
restRequest.content().writeTo(stream);
} catch (Exception e2) {
assert false : e2; // no real IO here
}
}
private void logStreamContent(RestRequest restRequest) {
restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest));
}
private static class LoggingChunkHandler implements HttpBody.ChunkHandler {
private final OutputStream stream;
private volatile boolean closed = false;
LoggingChunkHandler(RestRequest request) {
stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST);
}
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
try {
chunk.writeTo(stream);
} catch (IOException e) {
assert false : e; // no real IO
} finally {
if (isLast) {
this.close();
}
}
}
@Override
public void close() {
if (closed) {
return;
}
try {
closed = true;
stream.close();
} catch (IOException e) {
assert false : e; // no real IO
}
}
}
boolean isBodyTracerEnabled() {
return HttpBodyTracer.isEnabled();
}

View file

@ -211,6 +211,9 @@ public class RestBulkActionTests extends ESTestCase {
return null;
}
@Override
public void addTracingHandler(ChunkHandler chunkHandler) {}
@Override
public void setHandler(ChunkHandler chunkHandler) {}