mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-20 21:27:35 -04:00
Incremental bulk integration with rest layer (#112154)
Integrate the incremental bulks into RestBulkAction
This commit is contained in:
parent
c00768a116
commit
a03fb12b09
17 changed files with 675 additions and 100 deletions
|
@ -26,6 +26,7 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
|
|||
assertResponseException(responseException, "request body is required");
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
|
||||
public void testBulkMissingBody() throws IOException {
|
||||
ResponseException responseException = expectThrows(
|
||||
ResponseException.class,
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.core.Tuple;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
||||
|
@ -52,6 +53,8 @@ public class Netty4HttpRequestSizeLimitIT extends ESNetty4IntegTestCase {
|
|||
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
||||
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
|
||||
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
|
||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -36,10 +36,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
|
|||
private boolean aggregating = true;
|
||||
private boolean ignoreContentAfterContinueResponse = false;
|
||||
|
||||
public Netty4HttpAggregator(int maxContentLength) {
|
||||
this(maxContentLength, IGNORE_TEST);
|
||||
}
|
||||
|
||||
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
|
||||
super(maxContentLength);
|
||||
this.decider = decider;
|
||||
|
@ -50,7 +46,7 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
|
|||
assert msg instanceof HttpObject;
|
||||
if (msg instanceof HttpRequest request) {
|
||||
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
|
||||
aggregating = decider.test(preReq);
|
||||
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
|
||||
}
|
||||
if (aggregating || msg instanceof FullHttpRequest) {
|
||||
super.channelRead(ctx, msg);
|
||||
|
|
|
@ -37,6 +37,7 @@ import io.netty.util.AttributeKey;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.common.network.CloseableChannel;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.ThreadWatchdog;
|
||||
|
@ -97,6 +98,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
private final TLSConfig tlsConfig;
|
||||
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
|
||||
private final HttpValidator httpValidator;
|
||||
private final IncrementalBulkService.Enabled enabled;
|
||||
private final ThreadWatchdog threadWatchdog;
|
||||
private final int readTimeoutMillis;
|
||||
|
||||
|
@ -135,6 +137,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
this.acceptChannelPredicate = acceptChannelPredicate;
|
||||
this.httpValidator = httpValidator;
|
||||
this.threadWatchdog = networkService.getThreadWatchdog();
|
||||
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);
|
||||
|
||||
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
|
||||
|
||||
|
@ -280,7 +283,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
}
|
||||
|
||||
public ChannelHandler configureServerChannelHandler() {
|
||||
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
|
||||
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
|
||||
}
|
||||
|
||||
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
|
||||
|
@ -293,19 +296,22 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
private final TLSConfig tlsConfig;
|
||||
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
|
||||
private final HttpValidator httpValidator;
|
||||
private final IncrementalBulkService.Enabled enabled;
|
||||
|
||||
protected HttpChannelHandler(
|
||||
final Netty4HttpServerTransport transport,
|
||||
final HttpHandlingSettings handlingSettings,
|
||||
final TLSConfig tlsConfig,
|
||||
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
|
||||
@Nullable final HttpValidator httpValidator
|
||||
@Nullable final HttpValidator httpValidator,
|
||||
IncrementalBulkService.Enabled enabled
|
||||
) {
|
||||
this.transport = transport;
|
||||
this.handlingSettings = handlingSettings;
|
||||
this.tlsConfig = tlsConfig;
|
||||
this.acceptChannelPredicate = acceptChannelPredicate;
|
||||
this.httpValidator = httpValidator;
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -366,7 +372,13 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
);
|
||||
}
|
||||
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
|
||||
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
|
||||
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
|
||||
handlingSettings.maxContentLength(),
|
||||
httpPreRequest -> enabled.get() == false
|
||||
|| (httpPreRequest.uri().contains("_bulk") == false
|
||||
|| httpPreRequest.uri().contains("_bulk_update")
|
||||
|| httpPreRequest.uri().contains("/_xpack/monitoring/_bulk"))
|
||||
);
|
||||
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
|
||||
ch.pipeline()
|
||||
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ElasticsearchWrapperException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.SubscribableListener;
|
||||
import org.elasticsearch.client.Request;
|
||||
|
@ -419,7 +420,8 @@ public class Netty4HttpServerTransportTests extends AbstractHttpServerTransportT
|
|||
handlingSettings,
|
||||
TLSConfig.noTLS(),
|
||||
null,
|
||||
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
|
||||
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
|
||||
new IncrementalBulkService.Enabled(clusterSettings)
|
||||
) {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
|
||||
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testIncrementalBulk() throws IOException {
|
||||
Request createRequest = new Request("PUT", "/index_name");
|
||||
createRequest.setJsonEntity("""
|
||||
{
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 1,
|
||||
"write.wait_for_active_shards": 2
|
||||
}
|
||||
}
|
||||
}""");
|
||||
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
|
||||
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
|
||||
|
||||
// index documents for the rollup job
|
||||
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
|
||||
+ "{\"field\":1}\n"
|
||||
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
|
||||
+ "{\"field\":1}\n"
|
||||
+ "\r\n";
|
||||
|
||||
firstBulkRequest.setJsonEntity(bulkBody);
|
||||
|
||||
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
|
||||
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
Request bulkRequest = new Request("POST", "/index_name/_bulk");
|
||||
|
||||
// index documents for the rollup job
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
|
||||
int updates = 0;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
|
||||
bulk.append("{\"field\":").append(i).append("}\n");
|
||||
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
|
||||
++updates;
|
||||
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
|
||||
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
|
||||
}
|
||||
}
|
||||
bulk.append("\r\n");
|
||||
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
|
||||
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
|
||||
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
Map<String, Object> responseMap = XContentHelper.convertToMap(
|
||||
JsonXContent.jsonXContent,
|
||||
bulkResponse.getEntity().getContent(),
|
||||
true
|
||||
);
|
||||
|
||||
assertFalse((Boolean) responseMap.get("errors"));
|
||||
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
|
||||
}
|
||||
|
||||
public void testIncrementalMalformed() throws IOException {
|
||||
Request createRequest = new Request("PUT", "/index_name");
|
||||
createRequest.setJsonEntity("""
|
||||
{
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 1,
|
||||
"write.wait_for_active_shards": 2
|
||||
}
|
||||
}
|
||||
}""");
|
||||
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
|
||||
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
Request bulkRequest = new Request("POST", "/index_name/_bulk");
|
||||
|
||||
// index documents for the rollup job
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
|
||||
bulk.append("{\"field\":1}\n");
|
||||
bulk.append("{}\n");
|
||||
bulk.append("\r\n");
|
||||
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
|
||||
expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
|
||||
}
|
||||
}
|
|
@ -160,6 +160,7 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutComposabl
|
|||
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.action.bulk.SimulateBulkAction;
|
||||
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||
|
@ -448,6 +449,7 @@ public class ActionModule extends AbstractModule {
|
|||
private final List<ActionPlugin> actionPlugins;
|
||||
private final Map<String, ActionHandler<?, ?>> actions;
|
||||
private final ActionFilters actionFilters;
|
||||
private final IncrementalBulkService bulkService;
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final DestructiveOperations destructiveOperations;
|
||||
private final RestController restController;
|
||||
|
@ -476,7 +478,8 @@ public class ActionModule extends AbstractModule {
|
|||
ClusterService clusterService,
|
||||
RerouteService rerouteService,
|
||||
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
|
||||
RestExtension restExtension
|
||||
RestExtension restExtension,
|
||||
IncrementalBulkService bulkService
|
||||
) {
|
||||
this.settings = settings;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
|
@ -488,6 +491,7 @@ public class ActionModule extends AbstractModule {
|
|||
this.threadPool = threadPool;
|
||||
actions = setupActions(actionPlugins);
|
||||
actionFilters = setupActionFilters(actionPlugins);
|
||||
this.bulkService = bulkService;
|
||||
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
|
||||
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
|
||||
Set<RestHeaderDefinition> headers = Stream.concat(
|
||||
|
@ -928,7 +932,7 @@ public class ActionModule extends AbstractModule {
|
|||
registerHandler.accept(new RestCountAction());
|
||||
registerHandler.accept(new RestTermVectorsAction());
|
||||
registerHandler.accept(new RestMultiTermVectorsAction());
|
||||
registerHandler.accept(new RestBulkAction(settings));
|
||||
registerHandler.accept(new RestBulkAction(settings, bulkService));
|
||||
registerHandler.accept(new RestUpdateAction());
|
||||
|
||||
registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));
|
||||
|
|
|
@ -86,13 +86,13 @@ public final class BulkRequestParser {
|
|||
.withRestApiVersion(restApiVersion);
|
||||
}
|
||||
|
||||
private static int findNextMarker(byte marker, int from, BytesReference data) {
|
||||
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
|
||||
final int res = data.indexOf(marker, from);
|
||||
if (res != -1) {
|
||||
assert res >= 0;
|
||||
return res;
|
||||
}
|
||||
if (from != data.length()) {
|
||||
if (from != data.length() && isIncremental == false) {
|
||||
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
|
||||
}
|
||||
return res;
|
||||
|
@ -137,18 +137,57 @@ public final class BulkRequestParser {
|
|||
Consumer<UpdateRequest> updateRequestConsumer,
|
||||
Consumer<DeleteRequest> deleteRequestConsumer
|
||||
) throws IOException {
|
||||
XContent xContent = xContentType.xContent();
|
||||
int line = 0;
|
||||
int from = 0;
|
||||
byte marker = xContent.bulkSeparator();
|
||||
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
|
||||
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
|
||||
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
|
||||
final Map<String, String> stringDeduplicator = new HashMap<>();
|
||||
|
||||
incrementalParse(
|
||||
data,
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
xContentType,
|
||||
indexRequestConsumer,
|
||||
updateRequestConsumer,
|
||||
deleteRequestConsumer,
|
||||
false,
|
||||
stringDeduplicator
|
||||
);
|
||||
}
|
||||
|
||||
public int incrementalParse(
|
||||
BytesReference data,
|
||||
String defaultIndex,
|
||||
String defaultRouting,
|
||||
FetchSourceContext defaultFetchSourceContext,
|
||||
String defaultPipeline,
|
||||
Boolean defaultRequireAlias,
|
||||
Boolean defaultRequireDataStream,
|
||||
Boolean defaultListExecutedPipelines,
|
||||
boolean allowExplicitIndex,
|
||||
XContentType xContentType,
|
||||
BiConsumer<IndexRequest, String> indexRequestConsumer,
|
||||
Consumer<UpdateRequest> updateRequestConsumer,
|
||||
Consumer<DeleteRequest> deleteRequestConsumer,
|
||||
boolean isIncremental,
|
||||
Map<String, String> stringDeduplicator
|
||||
) throws IOException {
|
||||
XContent xContent = xContentType.xContent();
|
||||
byte marker = xContent.bulkSeparator();
|
||||
boolean typesDeprecationLogged = false;
|
||||
|
||||
int line = 0;
|
||||
int from = 0;
|
||||
int consumed = 0;
|
||||
|
||||
while (true) {
|
||||
int nextMarker = findNextMarker(marker, from, data);
|
||||
int nextMarker = findNextMarker(marker, from, data, isIncremental);
|
||||
if (nextMarker == -1) {
|
||||
break;
|
||||
}
|
||||
|
@ -333,8 +372,9 @@ public final class BulkRequestParser {
|
|||
.setIfSeqNo(ifSeqNo)
|
||||
.setIfPrimaryTerm(ifPrimaryTerm)
|
||||
);
|
||||
consumed = from;
|
||||
} else {
|
||||
nextMarker = findNextMarker(marker, from, data);
|
||||
nextMarker = findNextMarker(marker, from, data, isIncremental);
|
||||
if (nextMarker == -1) {
|
||||
break;
|
||||
}
|
||||
|
@ -407,9 +447,11 @@ public final class BulkRequestParser {
|
|||
}
|
||||
// move pointers
|
||||
from = nextMarker + 1;
|
||||
consumed = from;
|
||||
}
|
||||
}
|
||||
}
|
||||
return isIncremental ? consumed : from;
|
||||
}
|
||||
|
||||
@UpdateForV9
|
||||
|
|
|
@ -14,25 +14,56 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class IncrementalBulkService {
|
||||
|
||||
private final Client client;
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final ThreadContext threadContext;
|
||||
private final Supplier<Boolean> enabled;
|
||||
|
||||
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
|
||||
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
|
||||
this(client, indexingPressure, threadContext, new Enabled());
|
||||
}
|
||||
|
||||
public IncrementalBulkService(
|
||||
Client client,
|
||||
IndexingPressure indexingPressure,
|
||||
ThreadContext threadContext,
|
||||
ClusterSettings clusterSettings
|
||||
) {
|
||||
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
|
||||
}
|
||||
|
||||
public IncrementalBulkService(
|
||||
Client client,
|
||||
IndexingPressure indexingPressure,
|
||||
ThreadContext threadContext,
|
||||
Supplier<Boolean> enabled
|
||||
) {
|
||||
this.client = client;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.threadContext = threadContext;
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public boolean incrementalBulkEnabled() {
|
||||
return enabled.get();
|
||||
}
|
||||
|
||||
public Handler newBulkRequest() {
|
||||
|
@ -40,14 +71,32 @@ public class IncrementalBulkService {
|
|||
}
|
||||
|
||||
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
||||
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
|
||||
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
|
||||
}
|
||||
|
||||
public static class Handler {
|
||||
public static class Enabled implements Supplier<Boolean> {
|
||||
|
||||
private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);
|
||||
|
||||
public Enabled() {}
|
||||
|
||||
public Enabled(ClusterSettings clusterSettings) {
|
||||
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
|
||||
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return incrementalBulksEnabled.get();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Handler implements Releasable {
|
||||
|
||||
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
|
||||
|
||||
private final Client client;
|
||||
private final ThreadContext threadContext;
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final ActiveShardCount waitForActiveShards;
|
||||
private final TimeValue timeout;
|
||||
|
@ -57,17 +106,21 @@ public class IncrementalBulkService {
|
|||
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
|
||||
private boolean globalFailure = false;
|
||||
private boolean incrementalRequestSubmitted = false;
|
||||
private ThreadContext.StoredContext requestContext;
|
||||
private Exception bulkActionLevelFailure = null;
|
||||
private BulkRequest bulkRequest = null;
|
||||
|
||||
private Handler(
|
||||
protected Handler(
|
||||
Client client,
|
||||
ThreadContext threadContext,
|
||||
IndexingPressure indexingPressure,
|
||||
@Nullable String waitForActiveShards,
|
||||
@Nullable TimeValue timeout,
|
||||
@Nullable String refresh
|
||||
) {
|
||||
this.client = client;
|
||||
this.threadContext = threadContext;
|
||||
this.requestContext = threadContext.newStoredContext();
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
||||
this.timeout = timeout;
|
||||
|
@ -85,30 +138,34 @@ public class IncrementalBulkService {
|
|||
if (shouldBackOff()) {
|
||||
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
incrementalRequestSubmitted = true;
|
||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||
requestContext.restore();
|
||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||
|
||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
createNewBulkRequest(
|
||||
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
createNewBulkRequest(
|
||||
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
}
|
||||
}, nextItems));
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
}
|
||||
}, () -> {
|
||||
requestContext = threadContext.newStoredContext();
|
||||
nextItems.run();
|
||||
}));
|
||||
}
|
||||
} else {
|
||||
nextItems.run();
|
||||
}
|
||||
} else {
|
||||
nextItems.run();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -123,23 +180,26 @@ public class IncrementalBulkService {
|
|||
} else {
|
||||
assert bulkRequest != null;
|
||||
if (internalAddItems(items, releasable)) {
|
||||
client.bulk(bulkRequest, new ActionListener<>() {
|
||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||
requestContext.restore();
|
||||
client.bulk(bulkRequest, new ActionListener<>() {
|
||||
|
||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
listener.onResponse(combineResponses());
|
||||
}
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
listener.onResponse(combineResponses());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
errorResponse(listener);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
errorResponse(listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
errorResponse(listener);
|
||||
}
|
||||
|
@ -240,5 +300,10 @@ public class IncrementalBulkService {
|
|||
|
||||
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// TODO: Implement
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,6 +113,7 @@ import org.elasticsearch.plugins.PluginsService;
|
|||
import org.elasticsearch.readiness.ReadinessService;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
|
@ -242,6 +243,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
Metadata.SETTING_READ_ONLY_SETTING,
|
||||
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
|
||||
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
||||
RestBulkAction.INCREMENTAL_BULK,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
|
||||
|
|
|
@ -891,6 +891,14 @@ class NodeConstruction {
|
|||
.map(TerminationHandlerProvider::handler);
|
||||
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
|
||||
|
||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
|
||||
client,
|
||||
indexingLimits,
|
||||
threadPool.getThreadContext(),
|
||||
clusterService.getClusterSettings()
|
||||
);
|
||||
|
||||
ActionModule actionModule = new ActionModule(
|
||||
settings,
|
||||
clusterModule.getIndexNameExpressionResolver(),
|
||||
|
@ -916,7 +924,8 @@ class NodeConstruction {
|
|||
metadataCreateIndexService,
|
||||
dataStreamGlobalRetentionSettings
|
||||
),
|
||||
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
|
||||
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll),
|
||||
incrementalBulkService
|
||||
);
|
||||
modules.add(actionModule);
|
||||
|
||||
|
@ -979,8 +988,6 @@ class NodeConstruction {
|
|||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
|
||||
);
|
||||
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
|
||||
|
||||
SnapshotsService snapshotsService = new SnapshotsService(
|
||||
settings,
|
||||
|
|
|
@ -11,21 +11,38 @@ package org.elasticsearch.rest.action.document;
|
|||
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequestParser;
|
||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.internal.node.NodeClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.core.RestApiVersion;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.Scope;
|
||||
import org.elasticsearch.rest.ServerlessScope;
|
||||
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
||||
|
||||
|
@ -40,12 +57,21 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
|||
*/
|
||||
@ServerlessScope(Scope.PUBLIC)
|
||||
public class RestBulkAction extends BaseRestHandler {
|
||||
|
||||
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
|
||||
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
|
||||
"rest.incremental_bulk",
|
||||
true,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
|
||||
private final boolean allowExplicitIndex;
|
||||
private final IncrementalBulkService bulkHandler;
|
||||
|
||||
public RestBulkAction(Settings settings) {
|
||||
public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) {
|
||||
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
|
||||
this.bulkHandler = bulkHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,38 +93,181 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
|
||||
@Override
|
||||
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
|
||||
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
||||
request.param("type");
|
||||
}
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
String defaultIndex = request.param("index");
|
||||
String defaultRouting = request.param("routing");
|
||||
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
||||
String defaultPipeline = request.param("pipeline");
|
||||
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||
if (waitForActiveShards != null) {
|
||||
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
||||
}
|
||||
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
||||
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
||||
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
|
||||
bulkRequest.setRefreshPolicy(request.param("refresh"));
|
||||
bulkRequest.add(
|
||||
request.requiredContent(),
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
request.getXContentType(),
|
||||
request.getRestApiVersion()
|
||||
);
|
||||
if (bulkHandler.incrementalBulkEnabled() == false) {
|
||||
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
||||
request.param("type");
|
||||
}
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
String defaultIndex = request.param("index");
|
||||
String defaultRouting = request.param("routing");
|
||||
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
||||
String defaultPipeline = request.param("pipeline");
|
||||
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||
if (waitForActiveShards != null) {
|
||||
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
|
||||
}
|
||||
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
||||
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
||||
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
|
||||
bulkRequest.setRefreshPolicy(request.param("refresh"));
|
||||
bulkRequest.add(
|
||||
request.requiredContent(),
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
request.getXContentType(),
|
||||
request.getRestApiVersion()
|
||||
);
|
||||
|
||||
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
} else {
|
||||
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
|
||||
request.param("type");
|
||||
}
|
||||
|
||||
String waitForActiveShards = request.param("wait_for_active_shards");
|
||||
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
|
||||
String refresh = request.param("refresh");
|
||||
return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh));
|
||||
}
|
||||
}
|
||||
|
||||
static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
|
||||
|
||||
private final boolean allowExplicitIndex;
|
||||
private final RestRequest request;
|
||||
|
||||
private final Map<String, String> stringDeduplicator = new HashMap<>();
|
||||
private final String defaultIndex;
|
||||
private final String defaultRouting;
|
||||
private final FetchSourceContext defaultFetchSourceContext;
|
||||
private final String defaultPipeline;
|
||||
private final boolean defaultListExecutedPipelines;
|
||||
private final Boolean defaultRequireAlias;
|
||||
private final boolean defaultRequireDataStream;
|
||||
private final BulkRequestParser parser;
|
||||
private final Supplier<IncrementalBulkService.Handler> handlerSupplier;
|
||||
private IncrementalBulkService.Handler handler;
|
||||
|
||||
private volatile RestChannel restChannel;
|
||||
private boolean isException;
|
||||
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
|
||||
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
|
||||
|
||||
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
|
||||
this.allowExplicitIndex = allowExplicitIndex;
|
||||
this.request = request;
|
||||
this.defaultIndex = request.param("index");
|
||||
this.defaultRouting = request.param("routing");
|
||||
this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
|
||||
this.defaultPipeline = request.param("pipeline");
|
||||
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
|
||||
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
|
||||
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
|
||||
// TODO: Fix type deprecation logging
|
||||
this.parser = new BulkRequestParser(false, request.getRestApiVersion());
|
||||
this.handlerSupplier = handlerSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(RestChannel restChannel) {
|
||||
this.restChannel = restChannel;
|
||||
this.handler = handlerSupplier.get();
|
||||
request.contentStream().next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
|
||||
assert handler != null;
|
||||
assert channel == restChannel;
|
||||
if (isException) {
|
||||
chunk.close();
|
||||
return;
|
||||
}
|
||||
|
||||
final BytesReference data;
|
||||
int bytesConsumed;
|
||||
try {
|
||||
unParsedChunks.add(chunk);
|
||||
|
||||
if (unParsedChunks.size() > 1) {
|
||||
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
|
||||
} else {
|
||||
data = chunk;
|
||||
}
|
||||
|
||||
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
|
||||
// BulkRequest#add is fine
|
||||
bytesConsumed = parser.incrementalParse(
|
||||
data,
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
request.getXContentType(),
|
||||
(request, type) -> items.add(request),
|
||||
items::add,
|
||||
items::add,
|
||||
isLast == false,
|
||||
stringDeduplicator
|
||||
);
|
||||
|
||||
} catch (Exception e) {
|
||||
// TODO: This needs to be better
|
||||
Releasables.close(handler);
|
||||
Releasables.close(unParsedChunks);
|
||||
unParsedChunks.clear();
|
||||
new RestToXContentListener<>(channel).onFailure(e);
|
||||
isException = true;
|
||||
return;
|
||||
}
|
||||
|
||||
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
|
||||
if (isLast) {
|
||||
assert unParsedChunks.isEmpty();
|
||||
assert channel != null;
|
||||
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||
items.clear();
|
||||
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
} else if (items.isEmpty() == false) {
|
||||
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||
items.clear();
|
||||
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
|
||||
} else {
|
||||
assert releasables.isEmpty();
|
||||
request.contentStream().next();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
RequestBodyChunkConsumer.super.close();
|
||||
}
|
||||
|
||||
private ArrayList<Releasable> accountParsing(int bytesConsumed) {
|
||||
ArrayList<Releasable> releasables = new ArrayList<>(unParsedChunks.size());
|
||||
while (bytesConsumed > 0) {
|
||||
ReleasableBytesReference reference = unParsedChunks.removeFirst();
|
||||
releasables.add(reference);
|
||||
if (bytesConsumed >= reference.length()) {
|
||||
bytesConsumed -= reference.length();
|
||||
} else {
|
||||
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
|
||||
bytesConsumed = 0;
|
||||
}
|
||||
}
|
||||
return releasables;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.internal.node.NodeClient;
|
||||
|
@ -129,7 +130,8 @@ public class ActionModuleTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
);
|
||||
actionModule.initRestHandlers(null, null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
|
@ -193,7 +195,8 @@ public class ActionModuleTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
);
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
|
||||
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET"));
|
||||
|
@ -250,7 +253,8 @@ public class ActionModuleTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
);
|
||||
actionModule.initRestHandlers(null, null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
|
@ -300,7 +304,8 @@ public class ActionModuleTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
)
|
||||
);
|
||||
assertThat(
|
||||
|
@ -341,7 +346,8 @@ public class ActionModuleTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
)
|
||||
);
|
||||
assertThat(
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -1177,7 +1178,8 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -11,23 +11,37 @@ package org.elasticsearch.rest.action.document;
|
|||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.http.HttpBody;
|
||||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpNodeClient;
|
||||
import org.elasticsearch.test.rest.FakeRestChannel;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -51,7 +65,10 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
};
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
params.put("pipeline", "timestamps");
|
||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
||||
new RestBulkAction(
|
||||
settings(IndexVersion.current()).build(),
|
||||
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false)
|
||||
).handleRequest(
|
||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
|
||||
{"index":{"_id":"1"}}
|
||||
{"field1":"val1"}
|
||||
|
@ -83,7 +100,15 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
};
|
||||
Map<String, String> params = new HashMap<>();
|
||||
{
|
||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
||||
new RestBulkAction(
|
||||
settings(IndexVersion.current()).build(),
|
||||
new IncrementalBulkService(
|
||||
mock(Client.class),
|
||||
mock(IndexingPressure.class),
|
||||
new ThreadContext(Settings.EMPTY),
|
||||
() -> false
|
||||
)
|
||||
).handleRequest(
|
||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||
.withParams(params)
|
||||
.withContent(new BytesArray("""
|
||||
|
@ -104,7 +129,15 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
{
|
||||
params.put("list_executed_pipelines", "true");
|
||||
bulkCalled.set(false);
|
||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
||||
new RestBulkAction(
|
||||
settings(IndexVersion.current()).build(),
|
||||
new IncrementalBulkService(
|
||||
mock(Client.class),
|
||||
mock(IndexingPressure.class),
|
||||
new ThreadContext(Settings.EMPTY),
|
||||
() -> false
|
||||
)
|
||||
).handleRequest(
|
||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||
.withParams(params)
|
||||
.withContent(new BytesArray("""
|
||||
|
@ -124,7 +157,15 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
}
|
||||
{
|
||||
bulkCalled.set(false);
|
||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
||||
new RestBulkAction(
|
||||
settings(IndexVersion.current()).build(),
|
||||
new IncrementalBulkService(
|
||||
mock(Client.class),
|
||||
mock(IndexingPressure.class),
|
||||
new ThreadContext(Settings.EMPTY),
|
||||
() -> false
|
||||
)
|
||||
).handleRequest(
|
||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||
.withParams(params)
|
||||
.withContent(new BytesArray("""
|
||||
|
@ -145,7 +186,15 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
{
|
||||
params.remove("list_executed_pipelines");
|
||||
bulkCalled.set(false);
|
||||
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
|
||||
new RestBulkAction(
|
||||
settings(IndexVersion.current()).build(),
|
||||
new IncrementalBulkService(
|
||||
mock(Client.class),
|
||||
mock(IndexingPressure.class),
|
||||
new ThreadContext(Settings.EMPTY),
|
||||
() -> false
|
||||
)
|
||||
).handleRequest(
|
||||
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||
.withParams(params)
|
||||
.withContent(new BytesArray("""
|
||||
|
@ -165,4 +214,95 @@ public class RestBulkActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementalParsing() {
|
||||
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
|
||||
AtomicBoolean isLast = new AtomicBoolean(false);
|
||||
AtomicBoolean next = new AtomicBoolean(false);
|
||||
|
||||
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
|
||||
.withMethod(RestRequest.Method.POST)
|
||||
.withBody(new HttpBody.Stream() {
|
||||
@Override
|
||||
public void close() {}
|
||||
|
||||
@Override
|
||||
public ChunkHandler handler() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setHandler(ChunkHandler chunkHandler) {}
|
||||
|
||||
@Override
|
||||
public void next() {
|
||||
next.set(true);
|
||||
}
|
||||
})
|
||||
.withHeaders(Map.of("Content-Type", Collections.singletonList("application/json")))
|
||||
.build();
|
||||
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
|
||||
|
||||
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(
|
||||
true,
|
||||
request,
|
||||
() -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) {
|
||||
|
||||
@Override
|
||||
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
|
||||
releasable.close();
|
||||
docs.addAll(items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
|
||||
releasable.close();
|
||||
docs.addAll(items);
|
||||
isLast.set(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
chunkHandler.accept(channel);
|
||||
ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {});
|
||||
chunkHandler.handleChunk(channel, r1, false);
|
||||
assertThat(docs, empty());
|
||||
assertTrue(next.get());
|
||||
next.set(false);
|
||||
assertFalse(isLast.get());
|
||||
|
||||
ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {});
|
||||
chunkHandler.handleChunk(channel, r2, false);
|
||||
assertThat(docs, empty());
|
||||
assertTrue(next.get());
|
||||
next.set(false);
|
||||
assertFalse(isLast.get());
|
||||
assertTrue(r1.hasReferences());
|
||||
assertTrue(r2.hasReferences());
|
||||
|
||||
ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {});
|
||||
chunkHandler.handleChunk(channel, r3, false);
|
||||
assertThat(docs, hasSize(1));
|
||||
assertFalse(next.get());
|
||||
assertFalse(isLast.get());
|
||||
assertFalse(r1.hasReferences());
|
||||
assertFalse(r2.hasReferences());
|
||||
assertTrue(r3.hasReferences());
|
||||
|
||||
ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {});
|
||||
chunkHandler.handleChunk(channel, r4, false);
|
||||
assertThat(docs, hasSize(1));
|
||||
assertTrue(next.get());
|
||||
next.set(false);
|
||||
assertFalse(isLast.get());
|
||||
|
||||
ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {});
|
||||
chunkHandler.handleChunk(channel, r5, true);
|
||||
assertThat(docs, hasSize(2));
|
||||
assertFalse(next.get());
|
||||
assertTrue(isLast.get());
|
||||
assertFalse(r3.hasReferences());
|
||||
assertFalse(r4.hasReferences());
|
||||
assertFalse(r5.hasReferences());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,6 +229,11 @@ public class FakeRestRequest extends RestRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder withBody(HttpBody body) {
|
||||
this.content = body;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withPath(String path) {
|
||||
this.path = path;
|
||||
return this;
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.IncrementalBulkService;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -822,7 +823,8 @@ public class SecurityTests extends ESTestCase {
|
|||
mock(ClusterService.class),
|
||||
null,
|
||||
List.of(),
|
||||
RestExtension.allowAll()
|
||||
RestExtension.allowAll(),
|
||||
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
|
||||
);
|
||||
actionModule.initRestHandlers(null, null);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue