Incremental bulk integration with rest layer (#112154)

Integrate the incremental bulks into RestBulkAction
This commit is contained in:
Tim Brooks 2024-08-30 12:06:04 -06:00
parent c00768a116
commit a03fb12b09
17 changed files with 675 additions and 100 deletions

View file

@ -26,6 +26,7 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
assertResponseException(responseException, "request body is required");
}
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,

View file

@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -52,6 +53,8 @@ public class Netty4HttpRequestSizeLimitIT extends ESNetty4IntegTestCase {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}

View file

@ -36,10 +36,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;
public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
}
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
@ -50,7 +46,7 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
aggregating = decider.test(preReq);
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);

View file

@ -37,6 +37,7 @@ import io.netty.util.AttributeKey;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
@ -97,6 +98,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final TLSConfig tlsConfig;
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;
private final ThreadWatchdog threadWatchdog;
private final int readTimeoutMillis;
@ -135,6 +137,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.threadWatchdog = networkService.getThreadWatchdog();
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
@ -280,7 +283,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
}
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
@ -293,19 +296,22 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final TLSConfig tlsConfig;
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;
protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final HttpHandlingSettings handlingSettings,
final TLSConfig tlsConfig,
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
@Nullable final HttpValidator httpValidator
@Nullable final HttpValidator httpValidator,
IncrementalBulkService.Enabled enabled
) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.enabled = enabled;
}
@Override
@ -366,7 +372,13 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
);
}
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
handlingSettings.maxContentLength(),
httpPreRequest -> enabled.get() == false
|| (httpPreRequest.uri().contains("_bulk") == false
|| httpPreRequest.uri().contains("_bulk_update")
|| httpPreRequest.uri().contains("/_xpack/monitoring/_bulk"))
);
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression

View file

@ -46,6 +46,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
@ -419,7 +420,8 @@ public class Netty4HttpServerTransportTests extends AbstractHttpServerTransportT
handlingSettings,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
new IncrementalBulkService.Enabled(clusterSettings)
) {
@Override
protected void initChannel(Channel ch) throws Exception {

View file

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.http;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
@SuppressWarnings("unchecked")
public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
+ "{\"field\":1}\n"
+ "\r\n";
firstBulkRequest.setJsonEntity(bulkBody);
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Request bulkRequest = new Request("POST", "/index_name/_bulk");
// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
++updates;
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
}
}
bulk.append("\r\n");
bulkRequest.setJsonEntity(bulk.toString());
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
bulkResponse.getEntity().getContent(),
true
);
assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
}
public void testIncrementalMalformed() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Request bulkRequest = new Request("POST", "/index_name/_bulk");
// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":1}\n");
bulk.append("{}\n");
bulk.append("\r\n");
bulkRequest.setJsonEntity(bulk.toString());
expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
}
}

View file

@ -160,6 +160,7 @@ import org.elasticsearch.action.admin.indices.template.put.TransportPutComposabl
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.bulk.SimulateBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
@ -448,6 +449,7 @@ public class ActionModule extends AbstractModule {
private final List<ActionPlugin> actionPlugins;
private final Map<String, ActionHandler<?, ?>> actions;
private final ActionFilters actionFilters;
private final IncrementalBulkService bulkService;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
private final RestController restController;
@ -476,7 +478,8 @@ public class ActionModule extends AbstractModule {
ClusterService clusterService,
RerouteService rerouteService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
RestExtension restExtension
RestExtension restExtension,
IncrementalBulkService bulkService
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
@ -488,6 +491,7 @@ public class ActionModule extends AbstractModule {
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
this.bulkService = bulkService;
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
@ -928,7 +932,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestCountAction());
registerHandler.accept(new RestTermVectorsAction());
registerHandler.accept(new RestMultiTermVectorsAction());
registerHandler.accept(new RestBulkAction(settings));
registerHandler.accept(new RestBulkAction(settings, bulkService));
registerHandler.accept(new RestUpdateAction());
registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));

View file

@ -86,13 +86,13 @@ public final class BulkRequestParser {
.withRestApiVersion(restApiVersion);
}
private static int findNextMarker(byte marker, int from, BytesReference data) {
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length()) {
if (from != data.length() && isIncremental == false) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
}
return res;
@ -137,18 +137,57 @@ public final class BulkRequestParser {
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.bulkSeparator();
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();
incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
indexRequestConsumer,
updateRequestConsumer,
deleteRequestConsumer,
false,
stringDeduplicator
);
}
public int incrementalParse(
BytesReference data,
String defaultIndex,
String defaultRouting,
FetchSourceContext defaultFetchSourceContext,
String defaultPipeline,
Boolean defaultRequireAlias,
Boolean defaultRequireDataStream,
Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer,
boolean isIncremental,
Map<String, String> stringDeduplicator
) throws IOException {
XContent xContent = xContentType.xContent();
byte marker = xContent.bulkSeparator();
boolean typesDeprecationLogged = false;
int line = 0;
int from = 0;
int consumed = 0;
while (true) {
int nextMarker = findNextMarker(marker, from, data);
int nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
@ -333,8 +372,9 @@ public final class BulkRequestParser {
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
);
consumed = from;
} else {
nextMarker = findNextMarker(marker, from, data);
nextMarker = findNextMarker(marker, from, data, isIncremental);
if (nextMarker == -1) {
break;
}
@ -407,9 +447,11 @@ public final class BulkRequestParser {
}
// move pointers
from = nextMarker + 1;
consumed = from;
}
}
}
return isIncremental ? consumed : from;
}
@UpdateForV9

View file

@ -14,25 +14,56 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.action.document.RestBulkAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IncrementalBulkService {
private final Client client;
private final IndexingPressure indexingPressure;
private final ThreadContext threadContext;
private final Supplier<Boolean> enabled;
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
this(client, indexingPressure, threadContext, new Enabled());
}
public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
ClusterSettings clusterSettings
) {
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
}
public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
Supplier<Boolean> enabled
) {
this.client = client;
this.indexingPressure = indexingPressure;
this.threadContext = threadContext;
this.enabled = enabled;
}
public boolean incrementalBulkEnabled() {
return enabled.get();
}
public Handler newBulkRequest() {
@ -40,14 +71,32 @@ public class IncrementalBulkService {
}
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
}
public static class Handler {
public static class Enabled implements Supplier<Boolean> {
private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);
public Enabled() {}
public Enabled(ClusterSettings clusterSettings) {
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
}
@Override
public Boolean get() {
return incrementalBulksEnabled.get();
}
}
public static class Handler implements Releasable {
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
private final Client client;
private final ThreadContext threadContext;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
@ -57,17 +106,21 @@ public class IncrementalBulkService {
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
private boolean globalFailure = false;
private boolean incrementalRequestSubmitted = false;
private ThreadContext.StoredContext requestContext;
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;
private Handler(
protected Handler(
Client client,
ThreadContext threadContext,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.threadContext = threadContext;
this.requestContext = threadContext.newStoredContext();
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
@ -85,30 +138,34 @@ public class IncrementalBulkService {
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, () -> {
requestContext = threadContext.newStoredContext();
nextItems.run();
}));
}
} else {
nextItems.run();
}
} else {
nextItems.run();
}
}
}
@ -123,23 +180,26 @@ public class IncrementalBulkService {
} else {
assert bulkRequest != null;
if (internalAddItems(items, releasable)) {
client.bulk(bulkRequest, new ActionListener<>() {
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, new ActionListener<>() {
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
}
} else {
errorResponse(listener);
}
@ -240,5 +300,10 @@ public class IncrementalBulkService {
return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
}
@Override
public void close() {
// TODO: Implement
}
}
}

View file

@ -113,6 +113,7 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
@ -242,6 +243,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
Metadata.SETTING_READ_ONLY_SETTING,
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
RestBulkAction.INCREMENTAL_BULK,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,

View file

@ -891,6 +891,14 @@ class NodeConstruction {
.map(TerminationHandlerProvider::handler);
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
indexingLimits,
threadPool.getThreadContext(),
clusterService.getClusterSettings()
);
ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
@ -916,7 +924,8 @@ class NodeConstruction {
metadataCreateIndexService,
dataStreamGlobalRetentionSettings
),
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll),
incrementalBulkService
);
modules.add(actionModule);
@ -979,8 +988,6 @@ class NodeConstruction {
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
SnapshotsService snapshotsService = new SnapshotsService(
settings,

View file

@ -11,21 +11,38 @@ package org.elasticsearch.rest.action.document;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
@ -40,12 +57,21 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
*/
@ServerlessScope(Scope.PUBLIC)
public class RestBulkAction extends BaseRestHandler {
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
private final boolean allowExplicitIndex;
private final IncrementalBulkService bulkHandler;
public RestBulkAction(Settings settings) {
public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
this.bulkHandler = bulkHandler;
}
@Override
@ -67,38 +93,181 @@ public class RestBulkAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
BulkRequest bulkRequest = new BulkRequest();
String defaultIndex = request.param("index");
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
if (bulkHandler.incrementalBulkEnabled() == false) {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
BulkRequest bulkRequest = new BulkRequest();
String defaultIndex = request.param("index");
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
} else {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
String waitForActiveShards = request.param("wait_for_active_shards");
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
String refresh = request.param("refresh");
return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh));
}
}
static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private final boolean allowExplicitIndex;
private final RestRequest request;
private final Map<String, String> stringDeduplicator = new HashMap<>();
private final String defaultIndex;
private final String defaultRouting;
private final FetchSourceContext defaultFetchSourceContext;
private final String defaultPipeline;
private final boolean defaultListExecutedPipelines;
private final Boolean defaultRequireAlias;
private final boolean defaultRequireDataStream;
private final BulkRequestParser parser;
private final Supplier<IncrementalBulkService.Handler> handlerSupplier;
private IncrementalBulkService.Handler handler;
private volatile RestChannel restChannel;
private boolean isException;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
this.allowExplicitIndex = allowExplicitIndex;
this.request = request;
this.defaultIndex = request.param("index");
this.defaultRouting = request.param("routing");
this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
this.defaultPipeline = request.param("pipeline");
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
// TODO: Fix type deprecation logging
this.parser = new BulkRequestParser(false, request.getRestApiVersion());
this.handlerSupplier = handlerSupplier;
}
@Override
public void accept(RestChannel restChannel) {
this.restChannel = restChannel;
this.handler = handlerSupplier.get();
request.contentStream().next();
}
@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert handler != null;
assert channel == restChannel;
if (isException) {
chunk.close();
return;
}
final BytesReference data;
int bytesConsumed;
try {
unParsedChunks.add(chunk);
if (unParsedChunks.size() > 1) {
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
}
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
isLast == false,
stringDeduplicator
);
} catch (Exception e) {
// TODO: This needs to be better
Releasables.close(handler);
Releasables.close(unParsedChunks);
unParsedChunks.clear();
new RestToXContentListener<>(channel).onFailure(e);
isException = true;
return;
}
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
} else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
} else {
assert releasables.isEmpty();
request.contentStream().next();
}
}
@Override
public void close() {
RequestBodyChunkConsumer.super.close();
}
private ArrayList<Releasable> accountParsing(int bytesConsumed) {
ArrayList<Releasable> releasables = new ArrayList<>(unParsedChunks.size());
while (bytesConsumed > 0) {
ReleasableBytesReference reference = unParsedChunks.removeFirst();
releasables.add(reference);
if (bytesConsumed >= reference.length()) {
bytesConsumed -= reference.length();
} else {
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
bytesConsumed = 0;
}
}
return releasables;
}
}
@Override

View file

@ -10,6 +10,7 @@
package org.elasticsearch.action;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
@ -129,7 +130,8 @@ public class ActionModuleTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
);
actionModule.initRestHandlers(null, null);
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
@ -193,7 +195,8 @@ public class ActionModuleTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
);
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET"));
@ -250,7 +253,8 @@ public class ActionModuleTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
);
actionModule.initRestHandlers(null, null);
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
@ -300,7 +304,8 @@ public class ActionModuleTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
)
);
assertThat(
@ -341,7 +346,8 @@ public class ActionModuleTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
)
);
assertThat(

View file

@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
@ -1177,7 +1178,8 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
);
}

View file

@ -11,23 +11,37 @@ package org.elasticsearch.rest.action.document;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xcontent.XContentType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
@ -51,7 +65,10 @@ public class RestBulkActionTests extends ESTestCase {
};
final Map<String, String> params = new HashMap<>();
params.put("pipeline", "timestamps");
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
@ -83,7 +100,15 @@ public class RestBulkActionTests extends ESTestCase {
};
Map<String, String> params = new HashMap<>();
{
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
@ -104,7 +129,15 @@ public class RestBulkActionTests extends ESTestCase {
{
params.put("list_executed_pipelines", "true");
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
@ -124,7 +157,15 @@ public class RestBulkActionTests extends ESTestCase {
}
{
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
@ -145,7 +186,15 @@ public class RestBulkActionTests extends ESTestCase {
{
params.remove("list_executed_pipelines");
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
@ -165,4 +214,95 @@ public class RestBulkActionTests extends ESTestCase {
}
}
}
public void testIncrementalParsing() {
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
AtomicBoolean isLast = new AtomicBoolean(false);
AtomicBoolean next = new AtomicBoolean(false);
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withMethod(RestRequest.Method.POST)
.withBody(new HttpBody.Stream() {
@Override
public void close() {}
@Override
public ChunkHandler handler() {
return null;
}
@Override
public void setHandler(ChunkHandler chunkHandler) {}
@Override
public void next() {
next.set(true);
}
})
.withHeaders(Map.of("Content-Type", Collections.singletonList("application/json")))
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(
true,
request,
() -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) {
@Override
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
releasable.close();
docs.addAll(items);
}
@Override
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
releasable.close();
docs.addAll(items);
isLast.set(true);
}
}
);
chunkHandler.accept(channel);
ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {});
chunkHandler.handleChunk(channel, r1, false);
assertThat(docs, empty());
assertTrue(next.get());
next.set(false);
assertFalse(isLast.get());
ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {});
chunkHandler.handleChunk(channel, r2, false);
assertThat(docs, empty());
assertTrue(next.get());
next.set(false);
assertFalse(isLast.get());
assertTrue(r1.hasReferences());
assertTrue(r2.hasReferences());
ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {});
chunkHandler.handleChunk(channel, r3, false);
assertThat(docs, hasSize(1));
assertFalse(next.get());
assertFalse(isLast.get());
assertFalse(r1.hasReferences());
assertFalse(r2.hasReferences());
assertTrue(r3.hasReferences());
ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {});
chunkHandler.handleChunk(channel, r4, false);
assertThat(docs, hasSize(1));
assertTrue(next.get());
next.set(false);
assertFalse(isLast.get());
ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {});
chunkHandler.handleChunk(channel, r5, true);
assertThat(docs, hasSize(2));
assertFalse(next.get());
assertTrue(isLast.get());
assertFalse(r3.hasReferences());
assertFalse(r4.hasReferences());
assertFalse(r5.hasReferences());
}
}

View file

@ -229,6 +229,11 @@ public class FakeRestRequest extends RestRequest {
return this;
}
public Builder withBody(HttpBody body) {
this.content = body;
return this;
}
public Builder withPath(String path) {
this.path = path;
return this;

View file

@ -14,6 +14,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -822,7 +823,8 @@ public class SecurityTests extends ESTestCase {
mock(ClusterService.class),
null,
List.of(),
RestExtension.allowAll()
RestExtension.allowAll(),
new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY))
);
actionModule.initRestHandlers(null, null);