Updating assorted integration tests to use BulkProcessor2 instead of BulkProcessor (#94172)

In https://github.com/elastic/elasticsearch/pull/91238 we rewrote
BulkProcessor to avoid deadlock that had been seen in the
IlmHistoryStore. At some point we will remove BulkProcessor altogether.
This PR ports a couple of integration tests that were using BulkProcesor
over to BulkProcessor2.
This commit is contained in:
Keith Massey 2023-03-03 12:51:12 -06:00 committed by GitHub
parent 86b7dfc8db
commit b18c036761
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 10 deletions

View file

@ -17,7 +17,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor2;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -32,6 +32,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.InnerHitBuilder;
@ -194,9 +195,9 @@ public class CCSDuelIT extends ESRestTestCase {
response = createIndex(INDEX_NAME, settings, mapping); response = createIndex(INDEX_NAME, settings, mapping);
assertTrue(response.isAcknowledged()); assertTrue(response.isAcknowledged());
BulkProcessor bulkProcessor = BulkProcessor.builder( BulkProcessor2 bulkProcessor = BulkProcessor2.builder(
(r, l) -> restHighLevelClient.bulkAsync(r, RequestOptions.DEFAULT, l), (r, l) -> restHighLevelClient.bulkAsync(r, RequestOptions.DEFAULT, l),
new BulkProcessor.Listener() { new BulkProcessor2.Listener() {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) {} public void beforeBulk(long executionId, BulkRequest request) {}
@ -206,11 +207,11 @@ public class CCSDuelIT extends ESRestTestCase {
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { public void afterBulk(long executionId, BulkRequest request, Exception failure) {
throw new AssertionError("Failed to execute bulk", failure); throw new AssertionError("Failed to execute bulk", failure);
} }
}, },
"CCSDuelIT" new DeterministicTaskQueue(random()).getThreadPool()
).build(); ).build();
int numQuestions = randomIntBetween(50, 100); int numQuestions = randomIntBetween(50, 100);

View file

@ -35,7 +35,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor2;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
@ -495,7 +495,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
int numberOfShards = between(1, 5); int numberOfShards = between(1, 5);
String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1)); String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
BulkProcessor.Listener listener = new BulkProcessor.Listener() { BulkProcessor2.Listener listener = new BulkProcessor2.Listener() {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) {} public void beforeBulk(long executionId, BulkRequest request) {}
@ -503,12 +503,11 @@ public class IndexFollowingIT extends CcrIntegTestCase {
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override @Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} public void afterBulk(long executionId, BulkRequest request, Exception failure) {}
}; };
int bulkSize = between(1, 20); int bulkSize = between(1, 20);
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener, "IndexFollowingIT") BulkProcessor2 bulkProcessor = BulkProcessor2.builder(leaderClient()::bulk, listener, leaderClient().threadPool())
.setBulkActions(bulkSize) .setBulkActions(bulkSize)
.setConcurrentRequests(4)
.build(); .build();
AtomicBoolean run = new AtomicBoolean(true); AtomicBoolean run = new AtomicBoolean(true);
Semaphore availableDocs = new Semaphore(0); Semaphore availableDocs = new Semaphore(0);