mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-20 13:17:31 -04:00
merge main
This commit is contained in:
parent
58e3a39392
commit
dce8a0bfd3
7 changed files with 213 additions and 27 deletions
|
@ -37,6 +37,7 @@ import io.netty.handler.codec.http.LastHttpContent;
|
|||
import io.netty.handler.stream.ChunkedStream;
|
||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.elasticsearch.ESNetty4IntegTestCase;
|
||||
import org.elasticsearch.action.support.SubscribableListener;
|
||||
import org.elasticsearch.client.internal.node.NodeClient;
|
||||
|
@ -52,6 +53,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.features.NodeFeature;
|
||||
import org.elasticsearch.http.HttpBodyTracer;
|
||||
import org.elasticsearch.http.HttpHandlingSettings;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.HttpTransportSettings;
|
||||
|
@ -66,6 +68,8 @@ import org.elasticsearch.rest.RestResponse;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.MockLog;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -75,6 +79,7 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -210,10 +215,12 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
public void testClientBackpressure() throws Exception {
|
||||
try (var ctx = setupClientCtx()) {
|
||||
var opaqueId = opaqueId(0);
|
||||
var payloadSize = MBytes(50);
|
||||
var payloadSize = maxContentLength();
|
||||
var totalParts = 10;
|
||||
var partSize = payloadSize / totalParts;
|
||||
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false));
|
||||
for (int i = 0; i < totalParts; i++) {
|
||||
ctx.clientChannel.writeAndFlush(randomContent(partSize, false));
|
||||
}
|
||||
assertFalse(
|
||||
"should not flush last content immediately",
|
||||
|
@ -222,16 +229,15 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
|
||||
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
||||
|
||||
// Read buffers for socket and channel usually within few MBytes range all together.
|
||||
// This test assumes that buffers will not exceed 10 MBytes, in other words there should
|
||||
// be less than 10 MBytes in fly between http client's socket and rest handler. This
|
||||
// loop ensures that reading 10 MBytes of content on server side should free almost
|
||||
// same size in client's channel write buffer.
|
||||
for (int mb = 0; mb <= 50; mb += 10) {
|
||||
var minBufSize = payloadSize - MBytes(10 + mb);
|
||||
var maxBufSize = payloadSize - MBytes(mb);
|
||||
// some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb
|
||||
var osBufferOffset = MBytes(10);
|
||||
|
||||
// incrementally read data on server side and ensure client side buffer drains accordingly
|
||||
for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) {
|
||||
var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0);
|
||||
var maxBufSize = Math.max(payloadSize - readBytes, 0);
|
||||
// it is hard to tell that client's channel is no logger flushing data
|
||||
// it might take a few busy-iterations before channel buffer flush to kernel
|
||||
// it might take a few busy-iterations before channel buffer flush to OS
|
||||
// and bytesBeforeWritable will stop changing
|
||||
assertBusy(() -> {
|
||||
var bufSize = ctx.clientChannel.bytesBeforeWritable();
|
||||
|
@ -240,7 +246,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
bufSize >= minBufSize && bufSize <= maxBufSize
|
||||
);
|
||||
});
|
||||
handler.readBytes(MBytes(10));
|
||||
handler.readBytes(partSize);
|
||||
}
|
||||
assertTrue(handler.stream.hasLast());
|
||||
}
|
||||
|
@ -351,6 +357,107 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static long transportStatsRequestBytesSize(Ctx ctx) {
|
||||
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
|
||||
var stats = httpTransport.stats().clientStats();
|
||||
var bytes = 0L;
|
||||
for (var s : stats) {
|
||||
bytes += s.requestSizeBytes();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
|
||||
*/
|
||||
public void testHttpClientStats() throws Exception {
|
||||
try (var ctx = setupClientCtx()) {
|
||||
// need to offset starting point, since we reuse cluster and other tests already sent some data
|
||||
var totalBytesSent = transportStatsRequestBytesSize(ctx);
|
||||
|
||||
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
|
||||
var id = opaqueId(reqNo);
|
||||
var contentSize = randomIntBetween(0, maxContentLength());
|
||||
totalBytesSent += contentSize;
|
||||
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
|
||||
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
|
||||
var handler = ctx.awaitRestChannelAccepted(id);
|
||||
handler.readAllBytes();
|
||||
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
|
||||
assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that we log parts of http body and final line
|
||||
*/
|
||||
@TestLogging(
|
||||
reason = "testing TRACE logging",
|
||||
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
|
||||
)
|
||||
public void testHttpBodyLogging() throws Exception {
|
||||
assertHttpBodyLogging((ctx) -> () -> {
|
||||
try {
|
||||
var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024));
|
||||
ctx.clientChannel.writeAndFlush(req);
|
||||
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
|
||||
handler.readAllBytes();
|
||||
} catch (Exception e) {
|
||||
fail(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that we log some parts of body and final line when connection is closed in the middle
|
||||
*/
|
||||
@TestLogging(
|
||||
reason = "testing TRACE logging",
|
||||
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
|
||||
)
|
||||
public void testHttpBodyLoggingChannelClose() throws Exception {
|
||||
assertHttpBodyLogging((ctx) -> () -> {
|
||||
try {
|
||||
var req = httpRequest(opaqueId(0), 2 * 8192);
|
||||
var halfContent = randomContent(8192, false);
|
||||
ctx.clientChannel.writeAndFlush(req);
|
||||
ctx.clientChannel.writeAndFlush(halfContent);
|
||||
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
|
||||
handler.readBytes(8192);
|
||||
ctx.clientChannel.close();
|
||||
handler.stream.next();
|
||||
assertBusy(() -> assertTrue(handler.streamClosed));
|
||||
} catch (Exception e) {
|
||||
fail(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// asserts that we emit at least one logging event for a part and last line
|
||||
// http body should be large enough to split across multiple lines, > 4kb
|
||||
private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exception {
|
||||
try (var ctx = setupClientCtx()) {
|
||||
MockLog.assertThatLogger(
|
||||
test.apply(ctx),
|
||||
HttpBodyTracer.class,
|
||||
new MockLog.SeenEventExpectation(
|
||||
"request part",
|
||||
HttpBodyTracer.class.getCanonicalName(),
|
||||
Level.TRACE,
|
||||
"* request body [part *]*"
|
||||
),
|
||||
new MockLog.SeenEventExpectation(
|
||||
"request end",
|
||||
HttpBodyTracer.class.getCanonicalName(),
|
||||
Level.TRACE,
|
||||
"* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see "
|
||||
+ "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private int maxContentLength() {
|
||||
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
|
||||
}
|
||||
|
@ -403,6 +510,10 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
static ByteBuf randomByteBuf(int size) {
|
||||
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
|
||||
}
|
||||
|
||||
Ctx setupClientCtx() throws Exception {
|
||||
var nodeName = internalCluster().getRandomNodeName();
|
||||
var clientRespQueue = new LinkedBlockingDeque<>(16);
|
||||
|
|
|
@ -16,9 +16,13 @@ import io.netty.channel.ChannelFutureListener;
|
|||
import io.netty.handler.codec.http.HttpContent;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.http.HttpBody;
|
||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Netty based implementation of {@link HttpBody.Stream}.
|
||||
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
||||
|
@ -29,6 +33,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
|
||||
private final Channel channel;
|
||||
private final ChannelFutureListener closeListener = future -> doClose();
|
||||
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
|
||||
private ByteBuf buf;
|
||||
private boolean hasLast = false;
|
||||
private boolean requested = false;
|
||||
|
@ -52,6 +57,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
this.handler = chunkHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addTracingHandler(ChunkHandler chunkHandler) {
|
||||
assert tracingHandlers.contains(chunkHandler) == false;
|
||||
tracingHandlers.add(chunkHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void next() {
|
||||
assert closing == false : "cannot request next chunk on closing stream";
|
||||
|
@ -119,6 +130,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
|
||||
requested = false;
|
||||
buf = null;
|
||||
for (var tracer : tracingHandlers) {
|
||||
tracer.onNext(bytesRef, hasLast);
|
||||
}
|
||||
handler.onNext(bytesRef, hasLast);
|
||||
}
|
||||
|
||||
|
@ -133,6 +147,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
|||
|
||||
private void doClose() {
|
||||
closing = true;
|
||||
for (var tracer : tracingHandlers) {
|
||||
Releasables.closeExpectNoException(tracer);
|
||||
}
|
||||
if (handler != null) {
|
||||
handler.close();
|
||||
}
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
@ -88,9 +89,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
|||
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
|
||||
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
clusterAdmin().prepareUpdateSettings()
|
||||
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build())
|
||||
.get();
|
||||
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false));
|
||||
|
||||
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));
|
||||
|
||||
|
@ -98,9 +97,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
|||
sendLargeBulk();
|
||||
} finally {
|
||||
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
|
||||
clusterAdmin().prepareUpdateSettings()
|
||||
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build())
|
||||
.get();
|
||||
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,6 +73,13 @@ public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpB
|
|||
@Nullable
|
||||
ChunkHandler handler();
|
||||
|
||||
/**
|
||||
* Adds tracing chunk handler. Tracing handler will be invoked before main handler, and
|
||||
* should never release or call for next chunk. It should be used for monitoring and
|
||||
* logging purposes.
|
||||
*/
|
||||
void addTracingHandler(ChunkHandler chunkHandler);
|
||||
|
||||
/**
|
||||
* Sets handler that can handle next chunk
|
||||
*/
|
||||
|
|
|
@ -229,6 +229,8 @@ public class HttpClientStatsTracker {
|
|||
requestCount += 1;
|
||||
if (httpRequest.body().isFull()) {
|
||||
requestSizeBytes += httpRequest.body().asFull().bytes().length();
|
||||
} else {
|
||||
httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ package org.elasticsearch.http;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
|
@ -21,6 +22,7 @@ import org.elasticsearch.rest.RestUtils;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -78,10 +80,10 @@ class HttpTracer {
|
|||
e
|
||||
);
|
||||
if (isBodyTracerEnabled()) {
|
||||
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
|
||||
restRequest.content().writeTo(stream);
|
||||
} catch (Exception e2) {
|
||||
assert false : e2; // no real IO here
|
||||
if (restRequest.isFullContent()) {
|
||||
logFullContent(restRequest);
|
||||
} else {
|
||||
logStreamContent(restRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +92,53 @@ class HttpTracer {
|
|||
return null;
|
||||
}
|
||||
|
||||
private void logFullContent(RestRequest restRequest) {
|
||||
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
|
||||
restRequest.content().writeTo(stream);
|
||||
} catch (Exception e2) {
|
||||
assert false : e2; // no real IO here
|
||||
}
|
||||
}
|
||||
|
||||
private void logStreamContent(RestRequest restRequest) {
|
||||
restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest));
|
||||
}
|
||||
|
||||
private static class LoggingChunkHandler implements HttpBody.ChunkHandler {
|
||||
private final OutputStream stream;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
LoggingChunkHandler(RestRequest request) {
|
||||
stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
|
||||
try {
|
||||
chunk.writeTo(stream);
|
||||
} catch (IOException e) {
|
||||
assert false : e; // no real IO
|
||||
} finally {
|
||||
if (isLast) {
|
||||
this.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
closed = true;
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
assert false : e; // no real IO
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean isBodyTracerEnabled() {
|
||||
return HttpBodyTracer.isEnabled();
|
||||
}
|
||||
|
|
|
@ -211,6 +211,9 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addTracingHandler(ChunkHandler chunkHandler) {}
|
||||
|
||||
@Override
|
||||
public void setHandler(ChunkHandler chunkHandler) {}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue