mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
merge main
This commit is contained in:
parent
58e3a39392
commit
dce8a0bfd3
7 changed files with 213 additions and 27 deletions
|
@ -37,6 +37,7 @@ import io.netty.handler.codec.http.LastHttpContent;
|
||||||
import io.netty.handler.stream.ChunkedStream;
|
import io.netty.handler.stream.ChunkedStream;
|
||||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Level;
|
||||||
import org.elasticsearch.ESNetty4IntegTestCase;
|
import org.elasticsearch.ESNetty4IntegTestCase;
|
||||||
import org.elasticsearch.action.support.SubscribableListener;
|
import org.elasticsearch.action.support.SubscribableListener;
|
||||||
import org.elasticsearch.client.internal.node.NodeClient;
|
import org.elasticsearch.client.internal.node.NodeClient;
|
||||||
|
@ -52,6 +53,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.CollectionUtils;
|
import org.elasticsearch.common.util.CollectionUtils;
|
||||||
import org.elasticsearch.features.NodeFeature;
|
import org.elasticsearch.features.NodeFeature;
|
||||||
|
import org.elasticsearch.http.HttpBodyTracer;
|
||||||
import org.elasticsearch.http.HttpHandlingSettings;
|
import org.elasticsearch.http.HttpHandlingSettings;
|
||||||
import org.elasticsearch.http.HttpServerTransport;
|
import org.elasticsearch.http.HttpServerTransport;
|
||||||
import org.elasticsearch.http.HttpTransportSettings;
|
import org.elasticsearch.http.HttpTransportSettings;
|
||||||
|
@ -66,6 +68,8 @@ import org.elasticsearch.rest.RestResponse;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.test.MockLog;
|
||||||
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -75,6 +79,7 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
@ -210,10 +215,12 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
public void testClientBackpressure() throws Exception {
|
public void testClientBackpressure() throws Exception {
|
||||||
try (var ctx = setupClientCtx()) {
|
try (var ctx = setupClientCtx()) {
|
||||||
var opaqueId = opaqueId(0);
|
var opaqueId = opaqueId(0);
|
||||||
var payloadSize = MBytes(50);
|
var payloadSize = maxContentLength();
|
||||||
|
var totalParts = 10;
|
||||||
|
var partSize = payloadSize / totalParts;
|
||||||
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
|
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < totalParts; i++) {
|
||||||
ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false));
|
ctx.clientChannel.writeAndFlush(randomContent(partSize, false));
|
||||||
}
|
}
|
||||||
assertFalse(
|
assertFalse(
|
||||||
"should not flush last content immediately",
|
"should not flush last content immediately",
|
||||||
|
@ -222,16 +229,15 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
|
|
||||||
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
var handler = ctx.awaitRestChannelAccepted(opaqueId);
|
||||||
|
|
||||||
// Read buffers for socket and channel usually within few MBytes range all together.
|
// some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb
|
||||||
// This test assumes that buffers will not exceed 10 MBytes, in other words there should
|
var osBufferOffset = MBytes(10);
|
||||||
// be less than 10 MBytes in fly between http client's socket and rest handler. This
|
|
||||||
// loop ensures that reading 10 MBytes of content on server side should free almost
|
// incrementally read data on server side and ensure client side buffer drains accordingly
|
||||||
// same size in client's channel write buffer.
|
for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) {
|
||||||
for (int mb = 0; mb <= 50; mb += 10) {
|
var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0);
|
||||||
var minBufSize = payloadSize - MBytes(10 + mb);
|
var maxBufSize = Math.max(payloadSize - readBytes, 0);
|
||||||
var maxBufSize = payloadSize - MBytes(mb);
|
|
||||||
// it is hard to tell that client's channel is no logger flushing data
|
// it is hard to tell that client's channel is no logger flushing data
|
||||||
// it might take a few busy-iterations before channel buffer flush to kernel
|
// it might take a few busy-iterations before channel buffer flush to OS
|
||||||
// and bytesBeforeWritable will stop changing
|
// and bytesBeforeWritable will stop changing
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
var bufSize = ctx.clientChannel.bytesBeforeWritable();
|
var bufSize = ctx.clientChannel.bytesBeforeWritable();
|
||||||
|
@ -240,7 +246,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
bufSize >= minBufSize && bufSize <= maxBufSize
|
bufSize >= minBufSize && bufSize <= maxBufSize
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
handler.readBytes(MBytes(10));
|
handler.readBytes(partSize);
|
||||||
}
|
}
|
||||||
assertTrue(handler.stream.hasLast());
|
assertTrue(handler.stream.hasLast());
|
||||||
}
|
}
|
||||||
|
@ -351,6 +357,107 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long transportStatsRequestBytesSize(Ctx ctx) {
|
||||||
|
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
|
||||||
|
var stats = httpTransport.stats().clientStats();
|
||||||
|
var bytes = 0L;
|
||||||
|
for (var s : stats) {
|
||||||
|
bytes += s.requestSizeBytes();
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
|
||||||
|
*/
|
||||||
|
public void testHttpClientStats() throws Exception {
|
||||||
|
try (var ctx = setupClientCtx()) {
|
||||||
|
// need to offset starting point, since we reuse cluster and other tests already sent some data
|
||||||
|
var totalBytesSent = transportStatsRequestBytesSize(ctx);
|
||||||
|
|
||||||
|
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
|
||||||
|
var id = opaqueId(reqNo);
|
||||||
|
var contentSize = randomIntBetween(0, maxContentLength());
|
||||||
|
totalBytesSent += contentSize;
|
||||||
|
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
|
||||||
|
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
|
||||||
|
var handler = ctx.awaitRestChannelAccepted(id);
|
||||||
|
handler.readAllBytes();
|
||||||
|
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
|
||||||
|
assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ensures that we log parts of http body and final line
|
||||||
|
*/
|
||||||
|
@TestLogging(
|
||||||
|
reason = "testing TRACE logging",
|
||||||
|
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
|
||||||
|
)
|
||||||
|
public void testHttpBodyLogging() throws Exception {
|
||||||
|
assertHttpBodyLogging((ctx) -> () -> {
|
||||||
|
try {
|
||||||
|
var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024));
|
||||||
|
ctx.clientChannel.writeAndFlush(req);
|
||||||
|
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
|
||||||
|
handler.readAllBytes();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ensures that we log some parts of body and final line when connection is closed in the middle
|
||||||
|
*/
|
||||||
|
@TestLogging(
|
||||||
|
reason = "testing TRACE logging",
|
||||||
|
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
|
||||||
|
)
|
||||||
|
public void testHttpBodyLoggingChannelClose() throws Exception {
|
||||||
|
assertHttpBodyLogging((ctx) -> () -> {
|
||||||
|
try {
|
||||||
|
var req = httpRequest(opaqueId(0), 2 * 8192);
|
||||||
|
var halfContent = randomContent(8192, false);
|
||||||
|
ctx.clientChannel.writeAndFlush(req);
|
||||||
|
ctx.clientChannel.writeAndFlush(halfContent);
|
||||||
|
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
|
||||||
|
handler.readBytes(8192);
|
||||||
|
ctx.clientChannel.close();
|
||||||
|
handler.stream.next();
|
||||||
|
assertBusy(() -> assertTrue(handler.streamClosed));
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// asserts that we emit at least one logging event for a part and last line
|
||||||
|
// http body should be large enough to split across multiple lines, > 4kb
|
||||||
|
private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exception {
|
||||||
|
try (var ctx = setupClientCtx()) {
|
||||||
|
MockLog.assertThatLogger(
|
||||||
|
test.apply(ctx),
|
||||||
|
HttpBodyTracer.class,
|
||||||
|
new MockLog.SeenEventExpectation(
|
||||||
|
"request part",
|
||||||
|
HttpBodyTracer.class.getCanonicalName(),
|
||||||
|
Level.TRACE,
|
||||||
|
"* request body [part *]*"
|
||||||
|
),
|
||||||
|
new MockLog.SeenEventExpectation(
|
||||||
|
"request end",
|
||||||
|
HttpBodyTracer.class.getCanonicalName(),
|
||||||
|
Level.TRACE,
|
||||||
|
"* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see "
|
||||||
|
+ "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private int maxContentLength() {
|
private int maxContentLength() {
|
||||||
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
|
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
|
||||||
}
|
}
|
||||||
|
@ -403,6 +510,10 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ByteBuf randomByteBuf(int size) {
|
||||||
|
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
|
||||||
|
}
|
||||||
|
|
||||||
Ctx setupClientCtx() throws Exception {
|
Ctx setupClientCtx() throws Exception {
|
||||||
var nodeName = internalCluster().getRandomNodeName();
|
var nodeName = internalCluster().getRandomNodeName();
|
||||||
var clientRespQueue = new LinkedBlockingDeque<>(16);
|
var clientRespQueue = new LinkedBlockingDeque<>(16);
|
||||||
|
|
|
@ -16,9 +16,13 @@ import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
|
||||||
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.http.HttpBody;
|
import org.elasticsearch.http.HttpBody;
|
||||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Netty based implementation of {@link HttpBody.Stream}.
|
* Netty based implementation of {@link HttpBody.Stream}.
|
||||||
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
|
||||||
|
@ -29,6 +33,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
private final ChannelFutureListener closeListener = future -> doClose();
|
private final ChannelFutureListener closeListener = future -> doClose();
|
||||||
|
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
|
||||||
private ByteBuf buf;
|
private ByteBuf buf;
|
||||||
private boolean hasLast = false;
|
private boolean hasLast = false;
|
||||||
private boolean requested = false;
|
private boolean requested = false;
|
||||||
|
@ -52,6 +57,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
this.handler = chunkHandler;
|
this.handler = chunkHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addTracingHandler(ChunkHandler chunkHandler) {
|
||||||
|
assert tracingHandlers.contains(chunkHandler) == false;
|
||||||
|
tracingHandlers.add(chunkHandler);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void next() {
|
public void next() {
|
||||||
assert closing == false : "cannot request next chunk on closing stream";
|
assert closing == false : "cannot request next chunk on closing stream";
|
||||||
|
@ -119,6 +130,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
|
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
|
||||||
requested = false;
|
requested = false;
|
||||||
buf = null;
|
buf = null;
|
||||||
|
for (var tracer : tracingHandlers) {
|
||||||
|
tracer.onNext(bytesRef, hasLast);
|
||||||
|
}
|
||||||
handler.onNext(bytesRef, hasLast);
|
handler.onNext(bytesRef, hasLast);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,6 +147,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
|
||||||
|
|
||||||
private void doClose() {
|
private void doClose() {
|
||||||
closing = true;
|
closing = true;
|
||||||
|
for (var tracer : tracingHandlers) {
|
||||||
|
Releasables.closeExpectNoException(tracer);
|
||||||
|
}
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
handler.close();
|
handler.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
/*
|
/*
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
* or more contributor license agreements. Licensed under the Elastic License
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
* Side Public License, v 1.
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.http;
|
package org.elasticsearch.http;
|
||||||
|
@ -88,9 +89,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
||||||
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
|
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
|
||||||
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||||
|
|
||||||
clusterAdmin().prepareUpdateSettings()
|
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false));
|
||||||
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build())
|
|
||||||
.get();
|
|
||||||
|
|
||||||
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));
|
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));
|
||||||
|
|
||||||
|
@ -98,9 +97,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
||||||
sendLargeBulk();
|
sendLargeBulk();
|
||||||
} finally {
|
} finally {
|
||||||
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
|
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
|
||||||
clusterAdmin().prepareUpdateSettings()
|
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null));
|
||||||
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build())
|
|
||||||
.get();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,13 @@ public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpB
|
||||||
@Nullable
|
@Nullable
|
||||||
ChunkHandler handler();
|
ChunkHandler handler();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds tracing chunk handler. Tracing handler will be invoked before main handler, and
|
||||||
|
* should never release or call for next chunk. It should be used for monitoring and
|
||||||
|
* logging purposes.
|
||||||
|
*/
|
||||||
|
void addTracingHandler(ChunkHandler chunkHandler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets handler that can handle next chunk
|
* Sets handler that can handle next chunk
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -229,6 +229,8 @@ public class HttpClientStatsTracker {
|
||||||
requestCount += 1;
|
requestCount += 1;
|
||||||
if (httpRequest.body().isFull()) {
|
if (httpRequest.body().isFull()) {
|
||||||
requestSizeBytes += httpRequest.body().asFull().bytes().length();
|
requestSizeBytes += httpRequest.body().asFull().bytes().length();
|
||||||
|
} else {
|
||||||
|
httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ package org.elasticsearch.http;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
|
@ -21,6 +22,7 @@ import org.elasticsearch.rest.RestUtils;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -78,10 +80,10 @@ class HttpTracer {
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
if (isBodyTracerEnabled()) {
|
if (isBodyTracerEnabled()) {
|
||||||
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
|
if (restRequest.isFullContent()) {
|
||||||
restRequest.content().writeTo(stream);
|
logFullContent(restRequest);
|
||||||
} catch (Exception e2) {
|
} else {
|
||||||
assert false : e2; // no real IO here
|
logStreamContent(restRequest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +92,53 @@ class HttpTracer {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logFullContent(RestRequest restRequest) {
|
||||||
|
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
|
||||||
|
restRequest.content().writeTo(stream);
|
||||||
|
} catch (Exception e2) {
|
||||||
|
assert false : e2; // no real IO here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logStreamContent(RestRequest restRequest) {
|
||||||
|
restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class LoggingChunkHandler implements HttpBody.ChunkHandler {
|
||||||
|
private final OutputStream stream;
|
||||||
|
private volatile boolean closed = false;
|
||||||
|
|
||||||
|
LoggingChunkHandler(RestRequest request) {
|
||||||
|
stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
|
||||||
|
try {
|
||||||
|
chunk.writeTo(stream);
|
||||||
|
} catch (IOException e) {
|
||||||
|
assert false : e; // no real IO
|
||||||
|
} finally {
|
||||||
|
if (isLast) {
|
||||||
|
this.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
closed = true;
|
||||||
|
stream.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
assert false : e; // no real IO
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boolean isBodyTracerEnabled() {
|
boolean isBodyTracerEnabled() {
|
||||||
return HttpBodyTracer.isEnabled();
|
return HttpBodyTracer.isEnabled();
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,6 +211,9 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addTracingHandler(ChunkHandler chunkHandler) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setHandler(ChunkHandler chunkHandler) {}
|
public void setHandler(ChunkHandler chunkHandler) {}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue