mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-07-06 05:43:52 -04:00
Incremental bulk integration with rest layer (#112154)
Integrate the incremental bulks into RestBulkAction
This commit is contained in:
parent
c00768a116
commit
a03fb12b09
17 changed files with 675 additions and 100 deletions
|
@ -26,6 +26,7 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
|
||||||
assertResponseException(responseException, "request body is required");
|
assertResponseException(responseException, "request body is required");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
|
||||||
public void testBulkMissingBody() throws IOException {
|
public void testBulkMissingBody() throws IOException {
|
||||||
ResponseException responseException = expectThrows(
|
ResponseException responseException = expectThrows(
|
||||||
ResponseException.class,
|
ResponseException.class,
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.core.Tuple;
|
import org.elasticsearch.core.Tuple;
|
||||||
import org.elasticsearch.http.HttpServerTransport;
|
import org.elasticsearch.http.HttpServerTransport;
|
||||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||||
|
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||||
|
|
||||||
|
@ -52,6 +53,8 @@ public class Netty4HttpRequestSizeLimitIT extends ESNetty4IntegTestCase {
|
||||||
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
||||||
return Settings.builder()
|
return Settings.builder()
|
||||||
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
||||||
|
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
|
||||||
|
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
|
||||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
|
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,10 +36,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
|
||||||
private boolean aggregating = true;
|
private boolean aggregating = true;
|
||||||
private boolean ignoreContentAfterContinueResponse = false;
|
private boolean ignoreContentAfterContinueResponse = false;
|
||||||
|
|
||||||
public Netty4HttpAggregator(int maxContentLength) {
|
|
||||||
this(maxContentLength, IGNORE_TEST);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
|
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
|
||||||
super(maxContentLength);
|
super(maxContentLength);
|
||||||
this.decider = decider;
|
this.decider = decider;
|
||||||
|
@ -50,7 +46,7 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
|
||||||
assert msg instanceof HttpObject;
|
assert msg instanceof HttpObject;
|
||||||
if (msg instanceof HttpRequest request) {
|
if (msg instanceof HttpRequest request) {
|
||||||
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
|
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
|
||||||
aggregating = decider.test(preReq);
|
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
|
||||||
}
|
}
|
||||||
if (aggregating || msg instanceof FullHttpRequest) {
|
if (aggregating || msg instanceof FullHttpRequest) {
|
||||||
super.channelRead(ctx, msg);
|
super.channelRead(ctx, msg);
|
||||||
|
|
|
@ -37,6 +37,7 @@ import io.netty.util.AttributeKey;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.common.network.CloseableChannel;
|
import org.elasticsearch.common.network.CloseableChannel;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.network.ThreadWatchdog;
|
import org.elasticsearch.common.network.ThreadWatchdog;
|
||||||
|
@ -97,6 +98,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
private final TLSConfig tlsConfig;
|
private final TLSConfig tlsConfig;
|
||||||
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
|
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
|
||||||
private final HttpValidator httpValidator;
|
private final HttpValidator httpValidator;
|
||||||
|
private final IncrementalBulkService.Enabled enabled;
|
||||||
private final ThreadWatchdog threadWatchdog;
|
private final ThreadWatchdog threadWatchdog;
|
||||||
private final int readTimeoutMillis;
|
private final int readTimeoutMillis;
|
||||||
|
|
||||||
|
@ -135,6 +137,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
this.acceptChannelPredicate = acceptChannelPredicate;
|
this.acceptChannelPredicate = acceptChannelPredicate;
|
||||||
this.httpValidator = httpValidator;
|
this.httpValidator = httpValidator;
|
||||||
this.threadWatchdog = networkService.getThreadWatchdog();
|
this.threadWatchdog = networkService.getThreadWatchdog();
|
||||||
|
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);
|
||||||
|
|
||||||
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
|
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
|
||||||
|
|
||||||
|
@ -280,7 +283,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChannelHandler configureServerChannelHandler() {
|
public ChannelHandler configureServerChannelHandler() {
|
||||||
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
|
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
|
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
|
||||||
|
@ -293,19 +296,22 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
private final TLSConfig tlsConfig;
|
private final TLSConfig tlsConfig;
|
||||||
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
|
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
|
||||||
private final HttpValidator httpValidator;
|
private final HttpValidator httpValidator;
|
||||||
|
private final IncrementalBulkService.Enabled enabled;
|
||||||
|
|
||||||
protected HttpChannelHandler(
|
protected HttpChannelHandler(
|
||||||
final Netty4HttpServerTransport transport,
|
final Netty4HttpServerTransport transport,
|
||||||
final HttpHandlingSettings handlingSettings,
|
final HttpHandlingSettings handlingSettings,
|
||||||
final TLSConfig tlsConfig,
|
final TLSConfig tlsConfig,
|
||||||
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
|
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
|
||||||
@Nullable final HttpValidator httpValidator
|
@Nullable final HttpValidator httpValidator,
|
||||||
|
IncrementalBulkService.Enabled enabled
|
||||||
) {
|
) {
|
||||||
this.transport = transport;
|
this.transport = transport;
|
||||||
this.handlingSettings = handlingSettings;
|
this.handlingSettings = handlingSettings;
|
||||||
this.tlsConfig = tlsConfig;
|
this.tlsConfig = tlsConfig;
|
||||||
this.acceptChannelPredicate = acceptChannelPredicate;
|
this.acceptChannelPredicate = acceptChannelPredicate;
|
||||||
this.httpValidator = httpValidator;
|
this.httpValidator = httpValidator;
|
||||||
|
this.enabled = enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -366,7 +372,13 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
|
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
|
||||||
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
|
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
|
||||||
|
handlingSettings.maxContentLength(),
|
||||||
|
httpPreRequest -> enabled.get() == false
|
||||||
|
|| (httpPreRequest.uri().contains("_bulk") == false
|
||||||
|
|| httpPreRequest.uri().contains("_bulk_update")
|
||||||
|
|| httpPreRequest.uri().contains("/_xpack/monitoring/_bulk"))
|
||||||
|
);
|
||||||
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
|
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
|
||||||
ch.pipeline()
|
ch.pipeline()
|
||||||
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
|
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchSecurityException;
|
import org.elasticsearch.ElasticsearchSecurityException;
|
||||||
import org.elasticsearch.ElasticsearchWrapperException;
|
import org.elasticsearch.ElasticsearchWrapperException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.action.support.ActionTestUtils;
|
import org.elasticsearch.action.support.ActionTestUtils;
|
||||||
import org.elasticsearch.action.support.SubscribableListener;
|
import org.elasticsearch.action.support.SubscribableListener;
|
||||||
import org.elasticsearch.client.Request;
|
import org.elasticsearch.client.Request;
|
||||||
|
@ -419,7 +420,8 @@ public class Netty4HttpServerTransportTests extends AbstractHttpServerTransportT
|
||||||
handlingSettings,
|
handlingSettings,
|
||||||
TLSConfig.noTLS(),
|
TLSConfig.noTLS(),
|
||||||
null,
|
null,
|
||||||
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
|
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
|
||||||
|
new IncrementalBulkService.Enabled(clusterSettings)
|
||||||
) {
|
) {
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.http;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.Request;
|
||||||
|
import org.elasticsearch.client.Response;
|
||||||
|
import org.elasticsearch.client.ResponseException;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.elasticsearch.rest.RestStatus.OK;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
|
||||||
|
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testIncrementalBulk() throws IOException {
|
||||||
|
Request createRequest = new Request("PUT", "/index_name");
|
||||||
|
createRequest.setJsonEntity("""
|
||||||
|
{
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"number_of_shards": 1,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"write.wait_for_active_shards": 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}""");
|
||||||
|
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
|
||||||
|
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||||
|
|
||||||
|
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
|
||||||
|
|
||||||
|
// index documents for the rollup job
|
||||||
|
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
|
||||||
|
+ "{\"field\":1}\n"
|
||||||
|
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
|
||||||
|
+ "{\"field\":1}\n"
|
||||||
|
+ "\r\n";
|
||||||
|
|
||||||
|
firstBulkRequest.setJsonEntity(bulkBody);
|
||||||
|
|
||||||
|
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
|
||||||
|
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||||
|
|
||||||
|
Request bulkRequest = new Request("POST", "/index_name/_bulk");
|
||||||
|
|
||||||
|
// index documents for the rollup job
|
||||||
|
final StringBuilder bulk = new StringBuilder();
|
||||||
|
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
|
||||||
|
int updates = 0;
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
|
||||||
|
bulk.append("{\"field\":").append(i).append("}\n");
|
||||||
|
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
|
||||||
|
++updates;
|
||||||
|
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
|
||||||
|
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bulk.append("\r\n");
|
||||||
|
|
||||||
|
bulkRequest.setJsonEntity(bulk.toString());
|
||||||
|
|
||||||
|
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
|
||||||
|
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||||
|
Map<String, Object> responseMap = XContentHelper.convertToMap(
|
||||||
|
JsonXContent.jsonXContent,
|
||||||
|
bulkResponse.getEntity().getContent(),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
assertFalse((Boolean) responseMap.get("errors"));
|
||||||
|
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testIncrementalMalformed() throws IOException {
|
||||||
|
Request createRequest = new Request("PUT", "/index_name");
|
||||||
|
createRequest.setJsonEntity("""
|
||||||
|
{
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"number_of_shards": 1,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"write.wait_for_active_shards": 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}""");
|
||||||
|
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
|
||||||
|
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||||
|
|
||||||
|
Request bulkRequest = new Request("POST", "/index_name/_bulk");
|
||||||
|
|
||||||
|
// index documents for the rollup job
|
||||||
|
final StringBuilder bulk = new StringBuilder();
|
||||||
|
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
|
||||||
|
bulk.append("{\"field\":1}\n");
|
||||||
|
bulk.append("{}\n");
|
||||||
|
bulk.append("\r\n");
|
||||||
|
|
||||||
|
bulkRequest.setJsonEntity(bulk.toString());
|
||||||
|
|
||||||
|
expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
|
||||||
|
}
|
||||||
|
}
|
|
@ -160,6 +160,7 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutComposabl
|
||||||
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
|
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
|
||||||
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
|
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
|
||||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.action.bulk.SimulateBulkAction;
|
import org.elasticsearch.action.bulk.SimulateBulkAction;
|
||||||
import org.elasticsearch.action.bulk.TransportBulkAction;
|
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||||
|
@ -448,6 +449,7 @@ public class ActionModule extends AbstractModule {
|
||||||
private final List<ActionPlugin> actionPlugins;
|
private final List<ActionPlugin> actionPlugins;
|
||||||
private final Map<String, ActionHandler<?, ?>> actions;
|
private final Map<String, ActionHandler<?, ?>> actions;
|
||||||
private final ActionFilters actionFilters;
|
private final ActionFilters actionFilters;
|
||||||
|
private final IncrementalBulkService bulkService;
|
||||||
private final AutoCreateIndex autoCreateIndex;
|
private final AutoCreateIndex autoCreateIndex;
|
||||||
private final DestructiveOperations destructiveOperations;
|
private final DestructiveOperations destructiveOperations;
|
||||||
private final RestController restController;
|
private final RestController restController;
|
||||||
|
@ -476,7 +478,8 @@ public class ActionModule extends AbstractModule {
|
||||||
ClusterService clusterService,
|
ClusterService clusterService,
|
||||||
RerouteService rerouteService,
|
RerouteService rerouteService,
|
||||||
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
|
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
|
||||||
RestExtension restExtension
|
RestExtension restExtension,
|
||||||
|
IncrementalBulkService bulkService
|
||||||
) {
|
) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||||
|
@ -488,6 +491,7 @@ public class ActionModule extends AbstractModule {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
actions = setupActions(actionPlugins);
|
actions = setupActions(actionPlugins);
|
||||||
actionFilters = setupActionFilters(actionPlugins);
|
actionFilters = setupActionFilters(actionPlugins);
|
||||||
|
this.bulkService = bulkService;
|
||||||
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
|
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
|
||||||
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
|
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
|
||||||
Set<RestHeaderDefinition> headers = Stream.concat(
|
Set<RestHeaderDefinition> headers = Stream.concat(
|
||||||
|
@ -928,7 +932,7 @@ public class ActionModule extends AbstractModule {
|
||||||
registerHandler.accept(new RestCountAction());
|
registerHandler.accept(new RestCountAction());
|
||||||
registerHandler.accept(new RestTermVectorsAction());
|
registerHandler.accept(new RestTermVectorsAction());
|
||||||
registerHandler.accept(new RestMultiTermVectorsAction());
|
registerHandler.accept(new RestMultiTermVectorsAction());
|
||||||
registerHandler.accept(new RestBulkAction(settings));
|
registerHandler.accept(new RestBulkAction(settings, bulkService));
|
||||||
registerHandler.accept(new RestUpdateAction());
|
registerHandler.accept(new RestUpdateAction());
|
||||||
|
|
||||||
registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));
|
registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));
|
||||||
|
|
|
@ -86,13 +86,13 @@ public final class BulkRequestParser {
|
||||||
.withRestApiVersion(restApiVersion);
|
.withRestApiVersion(restApiVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int findNextMarker(byte marker, int from, BytesReference data) {
|
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
|
||||||
final int res = data.indexOf(marker, from);
|
final int res = data.indexOf(marker, from);
|
||||||
if (res != -1) {
|
if (res != -1) {
|
||||||
assert res >= 0;
|
assert res >= 0;
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
if (from != data.length()) {
|
if (from != data.length() && isIncremental == false) {
|
||||||
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
|
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
@ -137,18 +137,57 @@ public final class BulkRequestParser {
|
||||||
Consumer<UpdateRequest> updateRequestConsumer,
|
Consumer<UpdateRequest> updateRequestConsumer,
|
||||||
Consumer<DeleteRequest> deleteRequestConsumer
|
Consumer<DeleteRequest> deleteRequestConsumer
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
XContent xContent = xContentType.xContent();
|
|
||||||
int line = 0;
|
|
||||||
int from = 0;
|
|
||||||
byte marker = xContent.bulkSeparator();
|
|
||||||
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
|
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
|
||||||
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
|
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
|
||||||
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
|
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
|
||||||
final Map<String, String> stringDeduplicator = new HashMap<>();
|
final Map<String, String> stringDeduplicator = new HashMap<>();
|
||||||
|
|
||||||
|
incrementalParse(
|
||||||
|
data,
|
||||||
|
defaultIndex,
|
||||||
|
defaultRouting,
|
||||||
|
defaultFetchSourceContext,
|
||||||
|
defaultPipeline,
|
||||||
|
defaultRequireAlias,
|
||||||
|
defaultRequireDataStream,
|
||||||
|
defaultListExecutedPipelines,
|
||||||
|
allowExplicitIndex,
|
||||||
|
xContentType,
|
||||||
|
indexRequestConsumer,
|
||||||
|
updateRequestConsumer,
|
||||||
|
deleteRequestConsumer,
|
||||||
|
false,
|
||||||
|
stringDeduplicator
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int incrementalParse(
|
||||||
|
BytesReference data,
|
||||||
|
String defaultIndex,
|
||||||
|
String defaultRouting,
|
||||||
|
FetchSourceContext defaultFetchSourceContext,
|
||||||
|
String defaultPipeline,
|
||||||
|
Boolean defaultRequireAlias,
|
||||||
|
Boolean defaultRequireDataStream,
|
||||||
|
Boolean defaultListExecutedPipelines,
|
||||||
|
boolean allowExplicitIndex,
|
||||||
|
XContentType xContentType,
|
||||||
|
BiConsumer<IndexRequest, String> indexRequestConsumer,
|
||||||
|
Consumer<UpdateRequest> updateRequestConsumer,
|
||||||
|
Consumer<DeleteRequest> deleteRequestConsumer,
|
||||||
|
boolean isIncremental,
|
||||||
|
Map<String, String> stringDeduplicator
|
||||||
|
) throws IOException {
|
||||||
|
XContent xContent = xContentType.xContent();
|
||||||
|
byte marker = xContent.bulkSeparator();
|
||||||
boolean typesDeprecationLogged = false;
|
boolean typesDeprecationLogged = false;
|
||||||
|
|
||||||
|
int line = 0;
|
||||||
|
int from = 0;
|
||||||
|
int consumed = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
int nextMarker = findNextMarker(marker, from, data);
|
int nextMarker = findNextMarker(marker, from, data, isIncremental);
|
||||||
if (nextMarker == -1) {
|
if (nextMarker == -1) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -333,8 +372,9 @@ public final class BulkRequestParser {
|
||||||
.setIfSeqNo(ifSeqNo)
|
.setIfSeqNo(ifSeqNo)
|
||||||
.setIfPrimaryTerm(ifPrimaryTerm)
|
.setIfPrimaryTerm(ifPrimaryTerm)
|
||||||
);
|
);
|
||||||
|
consumed = from;
|
||||||
} else {
|
} else {
|
||||||
nextMarker = findNextMarker(marker, from, data);
|
nextMarker = findNextMarker(marker, from, data, isIncremental);
|
||||||
if (nextMarker == -1) {
|
if (nextMarker == -1) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -407,9 +447,11 @@ public final class BulkRequestParser {
|
||||||
}
|
}
|
||||||
// move pointers
|
// move pointers
|
||||||
from = nextMarker + 1;
|
from = nextMarker + 1;
|
||||||
|
consumed = from;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return isIncremental ? consumed : from;
|
||||||
}
|
}
|
||||||
|
|
||||||
@UpdateForV9
|
@UpdateForV9
|
||||||
|
|
|
@ -14,25 +14,56 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.client.internal.Client;
|
import org.elasticsearch.client.internal.Client;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.Releasable;
|
import org.elasticsearch.core.Releasable;
|
||||||
import org.elasticsearch.core.Releasables;
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.index.IndexingPressure;
|
import org.elasticsearch.index.IndexingPressure;
|
||||||
|
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class IncrementalBulkService {
|
public class IncrementalBulkService {
|
||||||
|
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final IndexingPressure indexingPressure;
|
private final IndexingPressure indexingPressure;
|
||||||
|
private final ThreadContext threadContext;
|
||||||
|
private final Supplier<Boolean> enabled;
|
||||||
|
|
||||||
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
|
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
|
||||||
|
this(client, indexingPressure, threadContext, new Enabled());
|
||||||
|
}
|
||||||
|
|
||||||
|
public IncrementalBulkService(
|
||||||
|
Client client,
|
||||||
|
IndexingPressure indexingPressure,
|
||||||
|
ThreadContext threadContext,
|
||||||
|
ClusterSettings clusterSettings
|
||||||
|
) {
|
||||||
|
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
|
||||||
|
}
|
||||||
|
|
||||||
|
public IncrementalBulkService(
|
||||||
|
Client client,
|
||||||
|
IndexingPressure indexingPressure,
|
||||||
|
ThreadContext threadContext,
|
||||||
|
Supplier<Boolean> enabled
|
||||||
|
) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.indexingPressure = indexingPressure;
|
this.indexingPressure = indexingPressure;
|
||||||
|
this.threadContext = threadContext;
|
||||||
|
this.enabled = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean incrementalBulkEnabled() {
|
||||||
|
return enabled.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Handler newBulkRequest() {
|
public Handler newBulkRequest() {
|
||||||
|
@ -40,14 +71,32 @@ public class IncrementalBulkService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
||||||
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
|
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Handler {
|
public static class Enabled implements Supplier<Boolean> {
|
||||||
|
|
||||||
|
private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
public Enabled() {}
|
||||||
|
|
||||||
|
public Enabled(ClusterSettings clusterSettings) {
|
||||||
|
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
|
||||||
|
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return incrementalBulksEnabled.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Handler implements Releasable {
|
||||||
|
|
||||||
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
|
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
|
||||||
|
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
private final ThreadContext threadContext;
|
||||||
private final IndexingPressure indexingPressure;
|
private final IndexingPressure indexingPressure;
|
||||||
private final ActiveShardCount waitForActiveShards;
|
private final ActiveShardCount waitForActiveShards;
|
||||||
private final TimeValue timeout;
|
private final TimeValue timeout;
|
||||||
|
@ -57,17 +106,21 @@ public class IncrementalBulkService {
|
||||||
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
|
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
|
||||||
private boolean globalFailure = false;
|
private boolean globalFailure = false;
|
||||||
private boolean incrementalRequestSubmitted = false;
|
private boolean incrementalRequestSubmitted = false;
|
||||||
|
private ThreadContext.StoredContext requestContext;
|
||||||
private Exception bulkActionLevelFailure = null;
|
private Exception bulkActionLevelFailure = null;
|
||||||
private BulkRequest bulkRequest = null;
|
private BulkRequest bulkRequest = null;
|
||||||
|
|
||||||
private Handler(
|
protected Handler(
|
||||||
Client client,
|
Client client,
|
||||||
|
ThreadContext threadContext,
|
||||||
IndexingPressure indexingPressure,
|
IndexingPressure indexingPressure,
|
||||||
@Nullable String waitForActiveShards,
|
@Nullable String waitForActiveShards,
|
||||||
@Nullable TimeValue timeout,
|
@Nullable TimeValue timeout,
|
||||||
@Nullable String refresh
|
@Nullable String refresh
|
||||||
) {
|
) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
this.threadContext = threadContext;
|
||||||
|
this.requestContext = threadContext.newStoredContext();
|
||||||
this.indexingPressure = indexingPressure;
|
this.indexingPressure = indexingPressure;
|
||||||
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
|
@ -85,30 +138,34 @@ public class IncrementalBulkService {
|
||||||
if (shouldBackOff()) {
|
if (shouldBackOff()) {
|
||||||
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||||
incrementalRequestSubmitted = true;
|
incrementalRequestSubmitted = true;
|
||||||
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
|
requestContext.restore();
|
||||||
|
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||||
|
|
||||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
@Override
|
||||||
|
public void onResponse(BulkResponse bulkResponse) {
|
||||||
|
responses.add(bulkResponse);
|
||||||
|
releaseCurrentReferences();
|
||||||
|
createNewBulkRequest(
|
||||||
|
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse bulkResponse) {
|
public void onFailure(Exception e) {
|
||||||
responses.add(bulkResponse);
|
handleBulkFailure(isFirstRequest, e);
|
||||||
releaseCurrentReferences();
|
}
|
||||||
createNewBulkRequest(
|
}, () -> {
|
||||||
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
requestContext = threadContext.newStoredContext();
|
||||||
);
|
nextItems.run();
|
||||||
}
|
}));
|
||||||
|
}
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
handleBulkFailure(isFirstRequest, e);
|
|
||||||
}
|
|
||||||
}, nextItems));
|
|
||||||
} else {
|
} else {
|
||||||
nextItems.run();
|
nextItems.run();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nextItems.run();
|
nextItems.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,23 +180,26 @@ public class IncrementalBulkService {
|
||||||
} else {
|
} else {
|
||||||
assert bulkRequest != null;
|
assert bulkRequest != null;
|
||||||
if (internalAddItems(items, releasable)) {
|
if (internalAddItems(items, releasable)) {
|
||||||
client.bulk(bulkRequest, new ActionListener<>() {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
|
requestContext.restore();
|
||||||
|
client.bulk(bulkRequest, new ActionListener<>() {
|
||||||
|
|
||||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse bulkResponse) {
|
public void onResponse(BulkResponse bulkResponse) {
|
||||||
responses.add(bulkResponse);
|
responses.add(bulkResponse);
|
||||||
releaseCurrentReferences();
|
releaseCurrentReferences();
|
||||||
listener.onResponse(combineResponses());
|
listener.onResponse(combineResponses());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
handleBulkFailure(isFirstRequest, e);
|
handleBulkFailure(isFirstRequest, e);
|
||||||
errorResponse(listener);
|
errorResponse(listener);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
errorResponse(listener);
|
errorResponse(listener);
|
||||||
}
|
}
|
||||||
|
@ -240,5 +300,10 @@ public class IncrementalBulkService {
|
||||||
|
|
||||||
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
|
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// TODO: Implement
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,6 +113,7 @@ import org.elasticsearch.plugins.PluginsService;
|
||||||
import org.elasticsearch.readiness.ReadinessService;
|
import org.elasticsearch.readiness.ReadinessService;
|
||||||
import org.elasticsearch.repositories.fs.FsRepository;
|
import org.elasticsearch.repositories.fs.FsRepository;
|
||||||
import org.elasticsearch.rest.BaseRestHandler;
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.search.SearchModule;
|
import org.elasticsearch.search.SearchModule;
|
||||||
import org.elasticsearch.search.SearchService;
|
import org.elasticsearch.search.SearchService;
|
||||||
|
@ -242,6 +243,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
Metadata.SETTING_READ_ONLY_SETTING,
|
Metadata.SETTING_READ_ONLY_SETTING,
|
||||||
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
|
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
|
||||||
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
||||||
|
RestBulkAction.INCREMENTAL_BULK,
|
||||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
|
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
|
||||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
|
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
|
||||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
|
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
|
||||||
|
|
|
@ -891,6 +891,14 @@ class NodeConstruction {
|
||||||
.map(TerminationHandlerProvider::handler);
|
.map(TerminationHandlerProvider::handler);
|
||||||
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
|
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
|
||||||
|
|
||||||
|
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||||
|
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
|
||||||
|
client,
|
||||||
|
indexingLimits,
|
||||||
|
threadPool.getThreadContext(),
|
||||||
|
clusterService.getClusterSettings()
|
||||||
|
);
|
||||||
|
|
||||||
ActionModule actionModule = new ActionModule(
|
ActionModule actionModule = new ActionModule(
|
||||||
settings,
|
settings,
|
||||||
clusterModule.getIndexNameExpressionResolver(),
|
clusterModule.getIndexNameExpressionResolver(),
|
||||||
|
@ -916,7 +924,8 @@ class NodeConstruction {
|
||||||
metadataCreateIndexService,
|
metadataCreateIndexService,
|
||||||
dataStreamGlobalRetentionSettings
|
dataStreamGlobalRetentionSettings
|
||||||
),
|
),
|
||||||
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
|
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll),
|
||||||
|
incrementalBulkService
|
||||||
);
|
);
|
||||||
modules.add(actionModule);
|
modules.add(actionModule);
|
||||||
|
|
||||||
|
@ -979,8 +988,6 @@ class NodeConstruction {
|
||||||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
|
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
|
||||||
);
|
);
|
||||||
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
||||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
|
||||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
|
|
||||||
|
|
||||||
SnapshotsService snapshotsService = new SnapshotsService(
|
SnapshotsService snapshotsService = new SnapshotsService(
|
||||||
settings,
|
settings,
|
||||||
|
|
|
@ -11,21 +11,38 @@ package org.elasticsearch.rest.action.document;
|
||||||
|
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequestParser;
|
||||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.client.internal.node.NodeClient;
|
import org.elasticsearch.client.internal.node.NodeClient;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||||
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.core.Releasable;
|
||||||
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.core.RestApiVersion;
|
import org.elasticsearch.core.RestApiVersion;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.rest.BaseRestHandler;
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.RestChannel;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.rest.Scope;
|
import org.elasticsearch.rest.Scope;
|
||||||
import org.elasticsearch.rest.ServerlessScope;
|
import org.elasticsearch.rest.ServerlessScope;
|
||||||
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
|
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
|
||||||
|
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||||
import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
||||||
|
|
||||||
|
@ -40,12 +57,21 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
||||||
*/
|
*/
|
||||||
@ServerlessScope(Scope.PUBLIC)
|
@ServerlessScope(Scope.PUBLIC)
|
||||||
public class RestBulkAction extends BaseRestHandler {
|
public class RestBulkAction extends BaseRestHandler {
|
||||||
|
|
||||||
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
|
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
|
||||||
|
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
|
||||||
|
"rest.incremental_bulk",
|
||||||
|
true,
|
||||||
|
Setting.Property.NodeScope,
|
||||||
|
Setting.Property.Dynamic
|
||||||
|
);
|
||||||
|
|
||||||
private final boolean allowExplicitIndex;
|
private final boolean allowExplicitIndex;
|
||||||
|
private final IncrementalBulkService bulkHandler;
|
||||||
|
|
||||||
public RestBulkAction(Settings settings) {
|
public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) {
|
||||||
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
|
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
|
||||||
|
this.bulkHandler = bulkHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -67,38 +93,181 @@ public class RestBulkAction extends BaseRestHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
|
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
|
||||||
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
if (bulkHandler.incrementalBulkEnabled() == false) {
|
||||||
request.param("type");
|
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
||||||
}
|
request.param("type");
|
||||||
BulkRequest bulkRequest = new BulkRequest();
|
}
|
||||||
String defaultIndex = request.param("index");
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
String defaultRouting = request.param("routing");
|
String defaultIndex = request.param("index");
|
||||||
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
String defaultRouting = request.param("routing");
|
||||||
String defaultPipeline = request.param("pipeline");
|
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
||||||
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
String defaultPipeline = request.param("pipeline");
|
||||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
||||||
if (waitForActiveShards != null) {
|
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||||
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
if (waitForActiveShards != null) {
|
||||||
}
|
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
||||||
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
}
|
||||||
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
||||||
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
|
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
||||||
bulkRequest.setRefreshPolicy(request.param("refresh"));
|
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
|
||||||
bulkRequest.add(
|
bulkRequest.setRefreshPolicy(request.param("refresh"));
|
||||||
request.requiredContent(),
|
bulkRequest.add(
|
||||||
defaultIndex,
|
request.requiredContent(),
|
||||||
defaultRouting,
|
defaultIndex,
|
||||||
defaultFetchSourceContext,
|
defaultRouting,
|
||||||
defaultPipeline,
|
defaultFetchSourceContext,
|
||||||
defaultRequireAlias,
|
defaultPipeline,
|
||||||
defaultRequireDataStream,
|
defaultRequireAlias,
|
||||||
defaultListExecutedPipelines,
|
defaultRequireDataStream,
|
||||||
allowExplicitIndex,
|
defaultListExecutedPipelines,
|
||||||
request.getXContentType(),
|
allowExplicitIndex,
|
||||||
request.getRestApiVersion()
|
request.getXContentType(),
|
||||||
);
|
request.getRestApiVersion()
|
||||||
|
);
|
||||||
|
|
||||||
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
|
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
|
||||||
|
} else {
|
||||||
|
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
||||||
|
request.param("type");
|
||||||
|
}
|
||||||
|
|
||||||
|
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||||
|
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
|
||||||
|
String refresh = request.param("refresh");
|
||||||
|
return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
|
||||||
|
|
||||||
|
private final boolean allowExplicitIndex;
|
||||||
|
private final RestRequest request;
|
||||||
|
|
||||||
|
private final Map<String, String> stringDeduplicator = new HashMap<>();
|
||||||
|
private final String defaultIndex;
|
||||||
|
private final String defaultRouting;
|
||||||
|
private final FetchSourceContext defaultFetchSourceContext;
|
||||||
|
private final String defaultPipeline;
|
||||||
|
private final boolean defaultListExecutedPipelines;
|
||||||
|
private final Boolean defaultRequireAlias;
|
||||||
|
private final boolean defaultRequireDataStream;
|
||||||
|
private final BulkRequestParser parser;
|
||||||
|
private final Supplier<IncrementalBulkService.Handler> handlerSupplier;
|
||||||
|
private IncrementalBulkService.Handler handler;
|
||||||
|
|
||||||
|
private volatile RestChannel restChannel;
|
||||||
|
private boolean isException;
|
||||||
|
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
|
||||||
|
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
|
||||||
|
|
||||||
|
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
|
||||||
|
this.allowExplicitIndex = allowExplicitIndex;
|
||||||
|
this.request = request;
|
||||||
|
this.defaultIndex = request.param("index");
|
||||||
|
this.defaultRouting = request.param("routing");
|
||||||
|
this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
||||||
|
this.defaultPipeline = request.param("pipeline");
|
||||||
|
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
||||||
|
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
||||||
|
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
||||||
|
// TODO: Fix type deprecation logging
|
||||||
|
this.parser = new BulkRequestParser(false, request.getRestApiVersion());
|
||||||
|
this.handlerSupplier = handlerSupplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(RestChannel restChannel) {
|
||||||
|
this.restChannel = restChannel;
|
||||||
|
this.handler = handlerSupplier.get();
|
||||||
|
request.contentStream().next();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
|
||||||
|
assert handler != null;
|
||||||
|
assert channel == restChannel;
|
||||||
|
if (isException) {
|
||||||
|
chunk.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final BytesReference data;
|
||||||
|
int bytesConsumed;
|
||||||
|
try {
|
||||||
|
unParsedChunks.add(chunk);
|
||||||
|
|
||||||
|
if (unParsedChunks.size() > 1) {
|
||||||
|
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
|
||||||
|
} else {
|
||||||
|
data = chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
|
||||||
|
// BulkRequest#add is fine
|
||||||
|
bytesConsumed = parser.incrementalParse(
|
||||||
|
data,
|
||||||
|
defaultIndex,
|
||||||
|
defaultRouting,
|
||||||
|
defaultFetchSourceContext,
|
||||||
|
defaultPipeline,
|
||||||
|
defaultRequireAlias,
|
||||||
|
defaultRequireDataStream,
|
||||||
|
defaultListExecutedPipelines,
|
||||||
|
allowExplicitIndex,
|
||||||
|
request.getXContentType(),
|
||||||
|
(request, type) -> items.add(request),
|
||||||
|
items::add,
|
||||||
|
items::add,
|
||||||
|
isLast == false,
|
||||||
|
stringDeduplicator
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// TODO: This needs to be better
|
||||||
|
Releasables.close(handler);
|
||||||
|
Releasables.close(unParsedChunks);
|
||||||
|
unParsedChunks.clear();
|
||||||
|
new RestToXContentListener<>(channel).onFailure(e);
|
||||||
|
isException = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
|
||||||
|
if (isLast) {
|
||||||
|
assert unParsedChunks.isEmpty();
|
||||||
|
assert channel != null;
|
||||||
|
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||||
|
items.clear();
|
||||||
|
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
|
||||||
|
} else if (items.isEmpty() == false) {
|
||||||
|
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||||
|
items.clear();
|
||||||
|
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
|
||||||
|
} else {
|
||||||
|
assert releasables.isEmpty();
|
||||||
|
request.contentStream().next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
RequestBodyChunkConsumer.super.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ArrayList<Releasable> accountParsing(int bytesConsumed) {
|
||||||
|
ArrayList<Releasable> releasables = new ArrayList<>(unParsedChunks.size());
|
||||||
|
while (bytesConsumed > 0) {
|
||||||
|
ReleasableBytesReference reference = unParsedChunks.removeFirst();
|
||||||
|
releasables.add(reference);
|
||||||
|
if (bytesConsumed >= reference.length()) {
|
||||||
|
bytesConsumed -= reference.length();
|
||||||
|
} else {
|
||||||
|
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
|
||||||
|
bytesConsumed = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return releasables;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
package org.elasticsearch.action;
|
package org.elasticsearch.action;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
|
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.TransportAction;
|
import org.elasticsearch.action.support.TransportAction;
|
||||||
import org.elasticsearch.client.internal.node.NodeClient;
|
import org.elasticsearch.client.internal.node.NodeClient;
|
||||||
|
@ -129,7 +130,8 @@ public class ActionModuleTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
);
|
);
|
||||||
actionModule.initRestHandlers(null, null);
|
actionModule.initRestHandlers(null, null);
|
||||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||||
|
@ -193,7 +195,8 @@ public class ActionModuleTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
);
|
);
|
||||||
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
|
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
|
||||||
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET"));
|
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET"));
|
||||||
|
@ -250,7 +253,8 @@ public class ActionModuleTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
);
|
);
|
||||||
actionModule.initRestHandlers(null, null);
|
actionModule.initRestHandlers(null, null);
|
||||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||||
|
@ -300,7 +304,8 @@ public class ActionModuleTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
assertThat(
|
assertThat(
|
||||||
|
@ -341,7 +346,8 @@ public class ActionModuleTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
assertThat(
|
assertThat(
|
||||||
|
|
|
@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionModule;
|
import org.elasticsearch.action.ActionModule;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -1177,7 +1178,8 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,23 +11,37 @@ package org.elasticsearch.rest.action.document;
|
||||||
|
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
|
import org.elasticsearch.client.internal.Client;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.core.Releasable;
|
||||||
|
import org.elasticsearch.http.HttpBody;
|
||||||
import org.elasticsearch.index.IndexVersion;
|
import org.elasticsearch.index.IndexVersion;
|
||||||
|
import org.elasticsearch.index.IndexingPressure;
|
||||||
import org.elasticsearch.rest.RestChannel;
|
import org.elasticsearch.rest.RestChannel;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.client.NoOpNodeClient;
|
import org.elasticsearch.test.client.NoOpNodeClient;
|
||||||
|
import org.elasticsearch.test.rest.FakeRestChannel;
|
||||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||||
import org.elasticsearch.xcontent.XContentType;
|
import org.elasticsearch.xcontent.XContentType;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -51,7 +65,10 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
};
|
};
|
||||||
final Map<String, String> params = new HashMap<>();
|
final Map<String, String> params = new HashMap<>();
|
||||||
params.put("pipeline", "timestamps");
|
params.put("pipeline", "timestamps");
|
||||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
new RestBulkAction(
|
||||||
|
settings(IndexVersion.current()).build(),
|
||||||
|
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false)
|
||||||
|
).handleRequest(
|
||||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
|
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
|
||||||
{"index":{"_id":"1"}}
|
{"index":{"_id":"1"}}
|
||||||
{"field1":"val1"}
|
{"field1":"val1"}
|
||||||
|
@ -83,7 +100,15 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
};
|
};
|
||||||
Map<String, String> params = new HashMap<>();
|
Map<String, String> params = new HashMap<>();
|
||||||
{
|
{
|
||||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
new RestBulkAction(
|
||||||
|
settings(IndexVersion.current()).build(),
|
||||||
|
new IncrementalBulkService(
|
||||||
|
mock(Client.class),
|
||||||
|
mock(IndexingPressure.class),
|
||||||
|
new ThreadContext(Settings.EMPTY),
|
||||||
|
() -> false
|
||||||
|
)
|
||||||
|
).handleRequest(
|
||||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||||
.withParams(params)
|
.withParams(params)
|
||||||
.withContent(new BytesArray("""
|
.withContent(new BytesArray("""
|
||||||
|
@ -104,7 +129,15 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
{
|
{
|
||||||
params.put("list_executed_pipelines", "true");
|
params.put("list_executed_pipelines", "true");
|
||||||
bulkCalled.set(false);
|
bulkCalled.set(false);
|
||||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
new RestBulkAction(
|
||||||
|
settings(IndexVersion.current()).build(),
|
||||||
|
new IncrementalBulkService(
|
||||||
|
mock(Client.class),
|
||||||
|
mock(IndexingPressure.class),
|
||||||
|
new ThreadContext(Settings.EMPTY),
|
||||||
|
() -> false
|
||||||
|
)
|
||||||
|
).handleRequest(
|
||||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||||
.withParams(params)
|
.withParams(params)
|
||||||
.withContent(new BytesArray("""
|
.withContent(new BytesArray("""
|
||||||
|
@ -124,7 +157,15 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
bulkCalled.set(false);
|
bulkCalled.set(false);
|
||||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
new RestBulkAction(
|
||||||
|
settings(IndexVersion.current()).build(),
|
||||||
|
new IncrementalBulkService(
|
||||||
|
mock(Client.class),
|
||||||
|
mock(IndexingPressure.class),
|
||||||
|
new ThreadContext(Settings.EMPTY),
|
||||||
|
() -> false
|
||||||
|
)
|
||||||
|
).handleRequest(
|
||||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||||
.withParams(params)
|
.withParams(params)
|
||||||
.withContent(new BytesArray("""
|
.withContent(new BytesArray("""
|
||||||
|
@ -145,7 +186,15 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
{
|
{
|
||||||
params.remove("list_executed_pipelines");
|
params.remove("list_executed_pipelines");
|
||||||
bulkCalled.set(false);
|
bulkCalled.set(false);
|
||||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
new RestBulkAction(
|
||||||
|
settings(IndexVersion.current()).build(),
|
||||||
|
new IncrementalBulkService(
|
||||||
|
mock(Client.class),
|
||||||
|
mock(IndexingPressure.class),
|
||||||
|
new ThreadContext(Settings.EMPTY),
|
||||||
|
() -> false
|
||||||
|
)
|
||||||
|
).handleRequest(
|
||||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||||
.withParams(params)
|
.withParams(params)
|
||||||
.withContent(new BytesArray("""
|
.withContent(new BytesArray("""
|
||||||
|
@ -165,4 +214,95 @@ public class RestBulkActionTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIncrementalParsing() {
|
||||||
|
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
|
||||||
|
AtomicBoolean isLast = new AtomicBoolean(false);
|
||||||
|
AtomicBoolean next = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||||
|
.withMethod(RestRequest.Method.POST)
|
||||||
|
.withBody(new HttpBody.Stream() {
|
||||||
|
@Override
|
||||||
|
public void close() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChunkHandler handler() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setHandler(ChunkHandler chunkHandler) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void next() {
|
||||||
|
next.set(true);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.withHeaders(Map.of("Content-Type", Collections.singletonList("application/json")))
|
||||||
|
.build();
|
||||||
|
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
|
||||||
|
|
||||||
|
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(
|
||||||
|
true,
|
||||||
|
request,
|
||||||
|
() -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
|
||||||
|
releasable.close();
|
||||||
|
docs.addAll(items);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
|
||||||
|
releasable.close();
|
||||||
|
docs.addAll(items);
|
||||||
|
isLast.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
chunkHandler.accept(channel);
|
||||||
|
ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {});
|
||||||
|
chunkHandler.handleChunk(channel, r1, false);
|
||||||
|
assertThat(docs, empty());
|
||||||
|
assertTrue(next.get());
|
||||||
|
next.set(false);
|
||||||
|
assertFalse(isLast.get());
|
||||||
|
|
||||||
|
ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {});
|
||||||
|
chunkHandler.handleChunk(channel, r2, false);
|
||||||
|
assertThat(docs, empty());
|
||||||
|
assertTrue(next.get());
|
||||||
|
next.set(false);
|
||||||
|
assertFalse(isLast.get());
|
||||||
|
assertTrue(r1.hasReferences());
|
||||||
|
assertTrue(r2.hasReferences());
|
||||||
|
|
||||||
|
ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {});
|
||||||
|
chunkHandler.handleChunk(channel, r3, false);
|
||||||
|
assertThat(docs, hasSize(1));
|
||||||
|
assertFalse(next.get());
|
||||||
|
assertFalse(isLast.get());
|
||||||
|
assertFalse(r1.hasReferences());
|
||||||
|
assertFalse(r2.hasReferences());
|
||||||
|
assertTrue(r3.hasReferences());
|
||||||
|
|
||||||
|
ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {});
|
||||||
|
chunkHandler.handleChunk(channel, r4, false);
|
||||||
|
assertThat(docs, hasSize(1));
|
||||||
|
assertTrue(next.get());
|
||||||
|
next.set(false);
|
||||||
|
assertFalse(isLast.get());
|
||||||
|
|
||||||
|
ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {});
|
||||||
|
chunkHandler.handleChunk(channel, r5, true);
|
||||||
|
assertThat(docs, hasSize(2));
|
||||||
|
assertFalse(next.get());
|
||||||
|
assertTrue(isLast.get());
|
||||||
|
assertFalse(r3.hasReferences());
|
||||||
|
assertFalse(r4.hasReferences());
|
||||||
|
assertFalse(r5.hasReferences());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -229,6 +229,11 @@ public class FakeRestRequest extends RestRequest {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withBody(HttpBody body) {
|
||||||
|
this.content = body;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withPath(String path) {
|
public Builder withPath(String path) {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionModule;
|
import org.elasticsearch.action.ActionModule;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
|
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||||
import org.elasticsearch.client.internal.Client;
|
import org.elasticsearch.client.internal.Client;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -822,7 +823,8 @@ public class SecurityTests extends ESTestCase {
|
||||||
mock(ClusterService.class),
|
mock(ClusterService.class),
|
||||||
null,
|
null,
|
||||||
List.of(),
|
List.of(),
|
||||||
RestExtension.allowAll()
|
RestExtension.allowAll(),
|
||||||
|
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||||
);
|
);
|
||||||
actionModule.initRestHandlers(null, null);
|
actionModule.initRestHandlers(null, null);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue