Default incremental bulk functionality to false (#113416)

This commit flips the incremental bulk setting to false. Additionally,
it removes some test code which intermittently causes issues with
security test cases.
This commit is contained in:
Tim Brooks 2024-09-23 14:26:48 -06:00 committed by GitHub
parent f7ba89bcb1
commit d146b27a26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 13 additions and 46 deletions

View file

@ -29,6 +29,14 @@ import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
.build();
}
public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
Response response = getRestClient().performRequest(request);

View file

@ -36,7 +36,7 @@ public class IncrementalBulkService {
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
true,
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

View file

@ -26,7 +26,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
@ -49,8 +48,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
@ -196,7 +193,6 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -1782,48 +1778,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size());
for (List<IndexRequestBuilder> segmented : partition) {
BulkResponse actionGet;
if (randomBoolean()) {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (IndexRequestBuilder indexRequestBuilder : segmented) {
bulkBuilder.add(indexRequestBuilder);
}
actionGet = bulkBuilder.get();
} else {
IncrementalBulkService bulkService = internalCluster().getInstance(IncrementalBulkService.class);
IncrementalBulkService.Handler handler = bulkService.newBulkRequest();
ConcurrentLinkedQueue<IndexRequest> queue = new ConcurrentLinkedQueue<>();
segmented.forEach(b -> queue.add(b.request()));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
AtomicInteger runs = new AtomicInteger(0);
Runnable r = new Runnable() {
@Override
public void run() {
int toRemove = Math.min(randomIntBetween(5, 10), queue.size());
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
for (int i = 0; i < toRemove; i++) {
docs.add(queue.poll());
}
if (queue.isEmpty()) {
handler.lastItems(docs, () -> {}, future);
} else {
handler.addItems(docs, () -> {}, () -> {
// Every 10 runs dispatch to new thread to prevent stackoverflow
if (runs.incrementAndGet() % 10 == 0) {
new Thread(this).start();
} else {
this.run();
}
});
}
}
};
r.run();
actionGet = future.actionGet();
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (IndexRequestBuilder indexRequestBuilder : segmented) {
bulkBuilder.add(indexRequestBuilder);
}
actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
}
}