From b18c036761e67c06f18fa66452b8fcead1c5b9c4 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 3 Mar 2023 12:51:12 -0600 Subject: [PATCH] Updating assorted integration tests to use BulkProcessor2 instead of BulkProcessor (#94172) In https://github.com/elastic/elasticsearch/pull/91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. At some point we will remove BulkProcessor altogether. This PR ports a couple of integration tests that were using BulkProcesor over to BulkProcessor2. --- .../test/java/org/elasticsearch/search/CCSDuelIT.java | 11 ++++++----- .../org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 9 ++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java index a21917a41e30..af0246dada05 100644 --- a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java +++ b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkProcessor2; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -32,6 +32,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.query.InnerHitBuilder; @@ -194,9 +195,9 @@ public class CCSDuelIT extends ESRestTestCase { response = createIndex(INDEX_NAME, settings, mapping); assertTrue(response.isAcknowledged()); - BulkProcessor bulkProcessor = BulkProcessor.builder( + BulkProcessor2 bulkProcessor = BulkProcessor2.builder( (r, l) -> restHighLevelClient.bulkAsync(r, RequestOptions.DEFAULT, l), - new BulkProcessor.Listener() { + new BulkProcessor2.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @@ -206,11 +207,11 @@ public class CCSDuelIT extends ESRestTestCase { } @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, Exception failure) { throw new AssertionError("Failed to execute bulk", failure); } }, - "CCSDuelIT" + new DeterministicTaskQueue(random()).getThreadPool() ).build(); int numQuestions = randomIntBetween(50, 100); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index c20c3d215a30..309805f9b80c 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -35,7 +35,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; -import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkProcessor2; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; @@ -495,7 +495,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { int numberOfShards = between(1, 5); String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1)); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - BulkProcessor.Listener listener = new BulkProcessor.Listener() { + BulkProcessor2.Listener listener = new BulkProcessor2.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @@ -503,12 +503,11 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + public void afterBulk(long executionId, BulkRequest request, Exception failure) {} }; int bulkSize = between(1, 20); - BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener, "IndexFollowingIT") + BulkProcessor2 bulkProcessor = BulkProcessor2.builder(leaderClient()::bulk, listener, leaderClient().threadPool()) .setBulkActions(bulkSize) - .setConcurrentRequests(4) .build(); AtomicBoolean run = new AtomicBoolean(true); Semaphore availableDocs = new Semaphore(0);