From 25e8ddc9abe6249b718d631c853ca375e8f95782 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Mon, 23 Jun 2025 14:26:28 -0300 Subject: [PATCH] Remove unused `BulkProcessor` --- .../action/bulk/BulkProcessorIT.java | 373 ------------ .../action/bulk/BulkProcessorRetryIT.java | 235 -------- .../action/bulk/BulkProcessor.java | 554 ----------------- .../action/bulk/BulkRequestHandler.java | 95 --- .../action/bulk/BulkProcessorTests.java | 557 ------------------ .../xpack/ilm/history/ILMHistoryStore.java | 7 +- 6 files changed, 6 insertions(+), 1815 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java delete mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java delete mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java delete mode 100644 server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java deleted file mode 100644 index 21e20226e657..000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.action.bulk; - -import com.carrotsearch.randomizedtesting.generators.RandomPicks; - -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.client.internal.Requests; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.test.ESIntegTestCase; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.hamcrest.Matchers.both; -import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -public class BulkProcessorIT extends ESIntegTestCase { - - public void testThatBulkProcessorCountIsCorrect() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); - - int numDocs = randomIntBetween(10, 100); - try ( - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - // let's make sure that the bulk action limit trips, one single execution will index all the documents - .setConcurrentRequests(randomIntBetween(0, 1)) - .setBulkActions(numDocs) - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB)) - .build() - ) { - - MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); - - latch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(1)); - assertThat(listener.afterCounts.get(), equalTo(1)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertResponseItems(listener.bulkItems, numDocs); - assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); - } - } - - public void testBulkProcessorFlush() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); - - int numDocs = randomIntBetween(10, 100); - - try ( - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - // let's make sure that this bulk won't be automatically flushed - .setConcurrentRequests(randomIntBetween(0, 10)) - .setBulkActions(numDocs + randomIntBetween(1, 100)) - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB)) - .build() - ) { - - MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); - - assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false)); - // we really need an explicit flush as none of the bulk thresholds was reached - processor.flush(); - latch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(1)); - assertThat(listener.afterCounts.get(), equalTo(1)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertResponseItems(listener.bulkItems, numDocs); - assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); - } - } - - public void testBulkProcessorFlushDisabled() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); - - int numDocs = randomIntBetween(10, 100); - - AtomicBoolean flushEnabled = new AtomicBoolean(false); - try ( - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - // let's make sure that this bulk won't be automatically flushed - .setConcurrentRequests(randomIntBetween(0, 10)) - .setBulkActions(numDocs + randomIntBetween(1, 100)) - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB)) - .setFlushCondition(flushEnabled::get) - .build() - ) { - - MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); - assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false)); - // no documents will be indexed here - processor.flush(); - - flushEnabled.set(true); - processor.flush(); - latch.await(); - - // disabled flush resulted in listener being triggered only once - assertThat(listener.beforeCounts.get(), equalTo(1)); - assertThat(listener.afterCounts.get(), equalTo(1)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertResponseItems(listener.bulkItems, numDocs); - assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); - } - } - - public void testBulkProcessorConcurrentRequests() throws Exception { - int bulkActions = randomIntBetween(10, 100); - int numDocs = randomIntBetween(bulkActions, bulkActions + 100); - int concurrentRequests = randomIntBetween(0, 7); - - int expectedBulkActions = numDocs / bulkActions; - - final CountDownLatch latch = new CountDownLatch(expectedBulkActions); - int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; - final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); - - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - - MultiGetRequestBuilder multiGetRequestBuilder; - - try ( - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - .setConcurrentRequests(concurrentRequests) - .setBulkActions(bulkActions) - // set interval and size to high values - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB)) - .build() - ) { - - multiGetRequestBuilder = indexDocs(client(), processor, numDocs); - - latch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions)); - assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions)); - } - - closeLatch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertThat(listener.bulkItems.size(), equalTo(numDocs)); - - Set ids = new HashSet<>(); - for (BulkItemResponse bulkItemResponse : listener.bulkItems) { - assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false)); - assertThat(bulkItemResponse.getIndex(), equalTo("test")); - // with concurrent requests > 1 we can't rely on the order of the bulk requests - assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs))); - // we do want to check that we don't get duplicate ids back - assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); - } - - assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); - } - - public void testBulkProcessorWaitOnClose() throws Exception { - BulkProcessorTestListener listener = new BulkProcessorTestListener(); - - int numDocs = randomIntBetween(10, 100); - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - // let's make sure that the bulk action limit trips, one single execution will index all the documents - .setConcurrentRequests(randomIntBetween(0, 1)) - .setBulkActions(numDocs) - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(randomIntBetween(1, 10), RandomPicks.randomFrom(random(), ByteSizeUnit.values()))) - .build(); - - MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); - assertThat(processor.isOpen(), is(true)); - assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); - if (randomBoolean()) { // check if we can call it multiple times - if (randomBoolean()) { - assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); - } else { - processor.close(); - } - } - assertThat(processor.isOpen(), is(false)); - - assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1)); - assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertResponseItems(listener.bulkItems, numDocs); - assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); - } - - public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception { - createIndex("test-ro"); - updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true), "test-ro"); - ensureGreen(); - - int bulkActions = randomIntBetween(10, 100); - int numDocs = randomIntBetween(bulkActions, bulkActions + 100); - int concurrentRequests = randomIntBetween(0, 10); - - int expectedBulkActions = numDocs / bulkActions; - - final CountDownLatch latch = new CountDownLatch(expectedBulkActions); - int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; - final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); - - int testDocs = 0; - int testReadOnlyDocs = 0; - MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - - try ( - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") - .setConcurrentRequests(concurrentRequests) - .setBulkActions(bulkActions) - // set interval and size to high values - .setFlushInterval(TimeValue.timeValueHours(24)) - .setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB)) - .build() - ) { - - for (int i = 1; i <= numDocs; i++) { - if (randomBoolean()) { - testDocs++; - processor.add( - new IndexRequest("test").id(Integer.toString(testDocs)).source(Requests.INDEX_CONTENT_TYPE, "field", "value") - ); - multiGetRequestBuilder.add("test", Integer.toString(testDocs)); - } else { - testReadOnlyDocs++; - processor.add( - new IndexRequest("test-ro").id(Integer.toString(testReadOnlyDocs)) - .source(Requests.INDEX_CONTENT_TYPE, "field", "value") - ); - } - } - } - - closeLatch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs)); - - Set ids = new HashSet<>(); - Set readOnlyIds = new HashSet<>(); - for (BulkItemResponse bulkItemResponse : listener.bulkItems) { - assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro"))); - if (bulkItemResponse.getIndex().equals("test")) { - assertThat(bulkItemResponse.isFailed(), equalTo(false)); - // with concurrent requests > 1 we can't rely on the order of the bulk requests - assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs))); - // we do want to check that we don't get duplicate ids back - assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); - } else { - assertThat(bulkItemResponse.isFailed(), equalTo(true)); - // with concurrent requests > 1 we can't rely on the order of the bulk requests - assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs))); - // we do want to check that we don't get duplicate ids back - assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true)); - } - } - - assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs); - } - - private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception { - MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet(); - for (int i = 1; i <= numDocs; i++) { - processor.add( - new IndexRequest("test").id(Integer.toString(i)) - .source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)) - ); - multiGetRequestBuilder.add("test", Integer.toString(i)); - } - return multiGetRequestBuilder; - } - - private static void assertResponseItems(List bulkItemResponses, int numDocs) { - assertThat(bulkItemResponses.size(), is(numDocs)); - int i = 1; - for (BulkItemResponse bulkItemResponse : bulkItemResponses) { - assertThat(bulkItemResponse.getIndex(), equalTo("test")); - assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++))); - assertThat( - "item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(), - bulkItemResponse.isFailed(), - equalTo(false) - ); - } - } - - private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) { - assertThat(multiGetResponse.getResponses().length, equalTo(numDocs)); - int i = 1; - for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) { - assertThat(multiGetItemResponse.getIndex(), equalTo("test")); - assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++))); - } - } - - private static class BulkProcessorTestListener implements BulkProcessor.Listener { - - private final CountDownLatch[] latches; - private final AtomicInteger beforeCounts = new AtomicInteger(); - private final AtomicInteger afterCounts = new AtomicInteger(); - private final List bulkItems = new CopyOnWriteArrayList<>(); - private final List bulkFailures = new CopyOnWriteArrayList<>(); - - private BulkProcessorTestListener(CountDownLatch... latches) { - this.latches = latches; - } - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - beforeCounts.incrementAndGet(); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - bulkItems.addAll(Arrays.asList(response.getItems())); - afterCounts.incrementAndGet(); - for (CountDownLatch latch : latches) { - latch.countDown(); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - bulkFailures.add(failure); - afterCounts.incrementAndGet(); - for (CountDownLatch latch : latches) { - latch.countDown(); - } - } - } -} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java deleted file mode 100644 index 4ed19065f32f..000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.action.bulk; - -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.common.BackoffPolicy; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESIntegTestCase; - -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) -public class BulkProcessorRetryIT extends ESIntegTestCase { - private static final String INDEX_NAME = "test"; - - @Override - protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { - // Have very low pool and queue sizes to overwhelm internal pools easily - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal, otherSettings)) - // don't mess with this one! It's quite sensitive to a low queue size - // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected) - // .put("thread_pool.listener.queue_size", 1) - .put("thread_pool.get.queue_size", 1) - // default is 200 - .put("thread_pool.write.queue_size", 30) - .build(); - } - - public void testBulkRejectionLoadWithoutBackoff() throws Throwable { - boolean rejectedExecutionExpected = true; - executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected); - } - - public void testBulkRejectionLoadWithBackoff() throws Throwable { - boolean rejectedExecutionExpected = false; - executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected); - } - - private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable { - final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy); - int numberOfAsyncOps = randomIntBetween(600, 700); - final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); - final Set responses = ConcurrentCollections.newConcurrentSet(); - - assertAcked(prepareCreate(INDEX_NAME)); - ensureGreen(); - - BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - // no op - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - internalPolicy.logResponse(response); - responses.add(response); - latch.countDown(); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - internalPolicy.logResponse(failure); - responses.add(failure); - latch.countDown(); - } - }, "BulkProcssorRetryIT") - .setBulkActions(1) - // zero means that we're in the sync case, more means that we're in the async case - .setConcurrentRequests(randomIntBetween(0, 100)) - .setBackoffPolicy(internalPolicy) - .build(); - indexDocs(bulkProcessor, numberOfAsyncOps); - latch.await(10, TimeUnit.SECONDS); - bulkProcessor.close(); - - assertThat(responses.size(), equalTo(numberOfAsyncOps)); - - // validate all responses - boolean rejectedAfterAllRetries = false; - for (Object response : responses) { - if (response instanceof BulkResponse bulkResponse) { - for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { - if (bulkItemResponse.isFailed()) { - BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { - if (rejectedExecutionExpected == false) { - assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause()); - rejectedAfterAllRetries = true; - } - } else { - throw new AssertionError("Unexpected failure status: " + failure.getStatus()); - } - } - } - } else { - if (ExceptionsHelper.status((Throwable) response) == RestStatus.TOO_MANY_REQUESTS) { - if (rejectedExecutionExpected == false) { - assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause()); - rejectedAfterAllRetries = true; - } - // ignored, we exceeded the write queue size when dispatching the initial bulk request - } else { - Throwable t = (Throwable) response; - // we're not expecting any other errors - throw new AssertionError("Unexpected failure", t); - } - } - } - - indicesAdmin().refresh(new RefreshRequest()).get(); - - final boolean finalRejectedAfterAllRetries = rejectedAfterAllRetries; - assertResponse(prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).setSize(0), results -> { - if (rejectedExecutionExpected) { - assertThat((int) results.getHits().getTotalHits().value(), lessThanOrEqualTo(numberOfAsyncOps)); - } else if (finalRejectedAfterAllRetries) { - assertThat((int) results.getHits().getTotalHits().value(), lessThan(numberOfAsyncOps)); - } else { - assertThat((int) results.getHits().getTotalHits().value(), equalTo(numberOfAsyncOps)); - } - }); - } - - private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) { - Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); - assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); - if (backoffState.hasNext()) { - // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", failure); - } else { - logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); - } - } - - private static void indexDocs(BulkProcessor processor, int numDocs) { - for (int i = 1; i <= numDocs; i++) { - processor.add( - prepareIndex(INDEX_NAME).setId(Integer.toString(i)) - .setSource("field", randomRealisticUnicodeOfLengthBetween(1, 30)) - .request() - ); - } - } - - /** - * Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number - * of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load). - * - * This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread - * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code. - */ - private static class CorrelatingBackoffPolicy extends BackoffPolicy { - private final Map> correlations = new ConcurrentHashMap<>(); - // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the - // thread local to be eligible for garbage collection right after the test to avoid leaks. - private final ThreadLocal> iterators = new ThreadLocal<>(); - - private final BackoffPolicy delegate; - - private CorrelatingBackoffPolicy(BackoffPolicy delegate) { - this.delegate = delegate; - } - - public Iterator backoffStateFor(Object response) { - return correlations.get(response); - } - - // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next() - // see also Retry.AbstractRetryHandler#onResponse(). - public void logResponse(Object response) { - Iterator iterator = iterators.get(); - // did we ever retry? - if (iterator != null) { - // we should correlate any iterator only once - iterators.remove(); - correlations.put(response, iterator); - } - } - - @Override - public Iterator iterator() { - return new CorrelatingIterator(iterators, delegate.iterator()); - } - - private static class CorrelatingIterator implements Iterator { - private final Iterator delegate; - private final ThreadLocal> iterators; - - private CorrelatingIterator(ThreadLocal> iterators, Iterator delegate) { - this.iterators = iterators; - this.delegate = delegate; - } - - @Override - public boolean hasNext() { - // update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that - // we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the - // enclosing class CorrelatingBackoffPolicy this should not pose a major problem though. - iterators.set(this); - return delegate.hasNext(); - } - - @Override - public TimeValue next() { - // update on every invocation - iterators.set(this); - return delegate.next(); - } - } - } -} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java deleted file mode 100644 index a8bd6ac35103..000000000000 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.action.bulk; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.common.BackoffPolicy; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.RestApiVersion; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.xcontent.XContentType; - -import java.io.Closeable; -import java.util.Objects; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; -import java.util.function.Supplier; - -/** - * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request - * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk - * requests allowed to be executed in parallel. - *

- * In order to create a new bulk processor, use the {@link Builder}. - */ -public class BulkProcessor implements Closeable { - - static final String FLUSH_SCHEDULER_NAME_SUFFIX = "-flush-scheduler"; - static final String RETRY_SCHEDULER_NAME_SUFFIX = "-retry-scheduler"; - - /** - * A listener for the execution. - */ - public interface Listener { - - /** - * Callback before the bulk is executed. - */ - void beforeBulk(long executionId, BulkRequest request); - - /** - * Callback after a successful execution of bulk request. - */ - void afterBulk(long executionId, BulkRequest request, BulkResponse response); - - /** - * Callback after a failed execution of bulk request. - *

- * Note that in case an instance of InterruptedException is passed, which means that request processing has been - * cancelled externally, the thread's interruption status has been restored prior to calling this method. - */ - void afterBulk(long executionId, BulkRequest request, Throwable failure); - } - - /** - * A builder used to create a build an instance of a bulk processor. - */ - public static class Builder { - - private final BiConsumer> consumer; - private final Listener listener; - private final Scheduler flushScheduler; - private final Scheduler retryScheduler; - private final Runnable onClose; - private int concurrentRequests = 1; - private int bulkActions = 1000; - private ByteSizeValue bulkSize = ByteSizeValue.of(5, ByteSizeUnit.MB); - private TimeValue flushInterval = null; - private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(); - private String globalIndex; - private String globalRouting; - private String globalPipeline; - private Supplier flushCondition = () -> true; - - private Builder( - BiConsumer> consumer, - Listener listener, - Scheduler flushScheduler, - Scheduler retryScheduler, - Runnable onClose - ) { - this.consumer = consumer; - this.listener = listener; - this.flushScheduler = flushScheduler; - this.retryScheduler = retryScheduler; - this.onClose = onClose; - } - - /** - * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single - * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed - * while accumulating new bulk requests. Defaults to {@code 1}. - */ - public Builder setConcurrentRequests(int concurrentRequests) { - this.concurrentRequests = concurrentRequests; - return this; - } - - /** - * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to - * {@code 1000}. Can be set to {@code -1} to disable it. - */ - public Builder setBulkActions(int bulkActions) { - this.bulkActions = bulkActions; - return this; - } - - /** - * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to - * {@code 5mb}. Can be set to {@code -1} to disable it. - */ - public Builder setBulkSize(ByteSizeValue bulkSize) { - this.bulkSize = bulkSize; - return this; - } - - /** - * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set. - *

- * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)} - * can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions. - */ - public Builder setFlushInterval(TimeValue flushInterval) { - this.flushInterval = flushInterval; - return this; - } - - public Builder setGlobalIndex(String globalIndex) { - this.globalIndex = globalIndex; - return this; - } - - public Builder setGlobalRouting(String globalRouting) { - this.globalRouting = globalRouting; - return this; - } - - public Builder setGlobalPipeline(String globalPipeline) { - this.globalPipeline = globalPipeline; - return this; - } - - /** - * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally - * in case they have failed due to resource constraints (i.e. a thread pool was full). - * - * The default is to back off exponentially. - * - * @see BackoffPolicy#exponentialBackoff() - */ - public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) { - if (backoffPolicy == null) { - throw new NullPointerException("'backoffPolicy' must not be null. To disable backoff, pass BackoffPolicy.noBackoff()"); - } - this.backoffPolicy = backoffPolicy; - return this; - } - - /** - * Builds a new bulk processor. - */ - public BulkProcessor build() { - return new BulkProcessor( - consumer, - backoffPolicy, - listener, - concurrentRequests, - bulkActions, - bulkSize, - flushInterval, - flushScheduler, - retryScheduler, - onClose, - createBulkRequestWithGlobalDefaults(), - flushCondition - ); - } - - private Supplier createBulkRequestWithGlobalDefaults() { - return () -> new BulkRequest(globalIndex).pipeline(globalPipeline).routing(globalRouting); - } - - public Builder setFlushCondition(Supplier flushCondition) { - this.flushCondition = flushCondition; - return this; - } - } - - /** - * @param consumer The consumer that is called to fulfil bulk operations - * @param listener The BulkProcessor listener that gets called on bulk events - * @param name The name of this processor, e.g. to identify the scheduler threads - * @return the builder for BulkProcessor - */ - public static Builder builder(BiConsumer> consumer, Listener listener, String name) { - Objects.requireNonNull(consumer, "consumer"); - Objects.requireNonNull(listener, "listener"); - final ScheduledThreadPoolExecutor flushScheduler = Scheduler.initScheduler(Settings.EMPTY, name + FLUSH_SCHEDULER_NAME_SUFFIX); - final ScheduledThreadPoolExecutor retryScheduler = Scheduler.initScheduler(Settings.EMPTY, name + RETRY_SCHEDULER_NAME_SUFFIX); - return new Builder(consumer, listener, buildScheduler(flushScheduler), buildScheduler(retryScheduler), () -> { - Scheduler.terminate(flushScheduler, 10, TimeUnit.SECONDS); - Scheduler.terminate(retryScheduler, 10, TimeUnit.SECONDS); - }); - } - - private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { - return new ScheduledExecutorServiceScheduler(scheduledThreadPoolExecutor); - } - - private final int bulkActions; - private final long bulkSize; - - private final Scheduler.Cancellable cancellableFlushTask; - - private final AtomicLong executionIdGen = new AtomicLong(); - - private BulkRequest bulkRequest; - private final Supplier bulkRequestSupplier; - private final Supplier flushSupplier; - private final BulkRequestHandler bulkRequestHandler; - private final Runnable onClose; - - private volatile boolean closed = false; - private final ReentrantLock lock = new ReentrantLock(); - - BulkProcessor( - BiConsumer> consumer, - BackoffPolicy backoffPolicy, - Listener listener, - int concurrentRequests, - int bulkActions, - ByteSizeValue bulkSize, - @Nullable TimeValue flushInterval, - Scheduler flushScheduler, - Scheduler retryScheduler, - Runnable onClose, - Supplier bulkRequestSupplier, - Supplier flushSupplier - ) { - this.bulkActions = bulkActions; - this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = bulkRequestSupplier.get(); - this.bulkRequestSupplier = bulkRequestSupplier; - this.flushSupplier = flushSupplier; - this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests); - // Start period flushing task after everything is setup - this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler); - this.onClose = onClose; - } - - BulkProcessor( - BiConsumer> consumer, - BackoffPolicy backoffPolicy, - Listener listener, - int concurrentRequests, - int bulkActions, - ByteSizeValue bulkSize, - @Nullable TimeValue flushInterval, - Scheduler flushScheduler, - Scheduler retryScheduler, - Runnable onClose, - Supplier bulkRequestSupplier - ) { - this( - consumer, - backoffPolicy, - listener, - concurrentRequests, - bulkActions, - bulkSize, - flushInterval, - flushScheduler, - retryScheduler, - onClose, - bulkRequestSupplier, - () -> true - ); - } - - /** - * @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry - */ - @Deprecated - BulkProcessor( - BiConsumer> consumer, - BackoffPolicy backoffPolicy, - Listener listener, - int concurrentRequests, - int bulkActions, - ByteSizeValue bulkSize, - @Nullable TimeValue flushInterval, - Scheduler scheduler, - Runnable onClose, - Supplier bulkRequestSupplier - ) { - this( - consumer, - backoffPolicy, - listener, - concurrentRequests, - bulkActions, - bulkSize, - flushInterval, - scheduler, - scheduler, - onClose, - bulkRequestSupplier - ); - } - - /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - */ - @Override - public void close() { - try { - awaitClose(0, TimeUnit.NANOSECONDS); - } catch (InterruptedException exc) { - Thread.currentThread().interrupt(); - } - } - - /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - *

- * If concurrent requests are not enabled, returns {@code true} immediately. - * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true} - * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned. - * - * @param timeout The maximum time to wait for the bulk requests to complete - * @param unit The time unit of the {@code timeout} argument - * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests - * completed - * @throws InterruptedException If the current thread is interrupted - */ - public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { - lock.lock(); - try { - if (closed) { - return true; - } - closed = true; - - this.cancellableFlushTask.cancel(); - - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - try { - return this.bulkRequestHandler.awaitClose(timeout, unit); - } finally { - onClose.run(); - } - } finally { - lock.unlock(); - } - } - - /** - * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest} - * (for example, if no id is provided, one will be generated, or usage of the create flag). - */ - public BulkProcessor add(IndexRequest request) { - return add((DocWriteRequest) request); - } - - /** - * Adds an {@link DeleteRequest} to the list of actions to execute. - */ - public BulkProcessor add(DeleteRequest request) { - return add((DocWriteRequest) request); - } - - /** - * Adds either a delete or an index request. - */ - public BulkProcessor add(DocWriteRequest request) { - internalAdd(request); - return this; - } - - boolean isOpen() { - return closed == false; - } - - protected void ensureOpen() { - if (closed) { - throw new IllegalStateException("bulk process already closed"); - } - } - - private void internalAdd(DocWriteRequest request) { - // bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock. - // once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler. - Tuple bulkRequestToExecute = null; - lock.lock(); - try { - ensureOpen(); - bulkRequest.add(request); - bulkRequestToExecute = newBulkRequestIfNeeded(); - } finally { - lock.unlock(); - } - // execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration. - if (bulkRequestToExecute != null) { - execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2()); - } - } - - /** - * Adds the data from the bytes to be processed by the bulk processor - */ - public BulkProcessor add( - BytesReference data, - @Nullable String defaultIndex, - @Nullable String defaultPipeline, - XContentType xContentType - ) throws Exception { - Tuple bulkRequestToExecute = null; - lock.lock(); - try { - ensureOpen(); - bulkRequest.add( - data, - defaultIndex, - null, - null, - defaultPipeline, - null, - null, - null, - true, - xContentType, - RestApiVersion.current() - ); - bulkRequestToExecute = newBulkRequestIfNeeded(); - } finally { - lock.unlock(); - } - - if (bulkRequestToExecute != null) { - execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2()); - } - return this; - } - - private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) { - if (flushInterval == null) { - return new Scheduler.Cancellable() { - @Override - public boolean cancel() { - return false; - } - - @Override - public boolean isCancelled() { - return true; - } - }; - } - return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE); - } - - // needs to be executed under a lock - private Tuple newBulkRequestIfNeeded() { - ensureOpen(); - if (isOverTheLimit() == false) { - return null; - } - final BulkRequest bulkRequest = this.bulkRequest; - this.bulkRequest = bulkRequestSupplier.get(); - return new Tuple<>(bulkRequest, executionIdGen.incrementAndGet()); - } - - // may be executed without a lock - private void execute(BulkRequest bulkRequest, long executionId) { - this.bulkRequestHandler.execute(bulkRequest, executionId); - } - - // needs to be executed under a lock - private void execute() { - if (flushSupplier.get()) { - final BulkRequest bulkRequest = this.bulkRequest; - final long executionId = executionIdGen.incrementAndGet(); - - this.bulkRequest = bulkRequestSupplier.get(); - execute(bulkRequest, executionId); - } - } - - // needs to be executed under a lock - private boolean isOverTheLimit() { - if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) { - return true; - } - if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) { - return true; - } - return false; - } - - /** - * Flush pending delete or index requests. - */ - public void flush() { - lock.lock(); - try { - ensureOpen(); - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - } finally { - lock.unlock(); - } - } - - class Flush implements Runnable { - @Override - public void run() { - lock.lock(); - try { - if (closed) { - return; - } - if (bulkRequest.numberOfActions() == 0) { - return; - } - execute(); - } finally { - lock.unlock(); - } - } - } -} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java deleted file mode 100644 index c005799ac99c..000000000000 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.action.bulk; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.BackoffPolicy; -import org.elasticsearch.threadpool.Scheduler; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; - -/** - * Implements the low-level details of bulk request handling - */ -public final class BulkRequestHandler { - private static final Logger logger = LogManager.getLogger(BulkRequestHandler.class); - private final BiConsumer> consumer; - private final BulkProcessor.Listener listener; - private final Semaphore semaphore; - private final Retry retry; - private final int concurrentRequests; - - BulkRequestHandler( - BiConsumer> consumer, - BackoffPolicy backoffPolicy, - BulkProcessor.Listener listener, - Scheduler scheduler, - int concurrentRequests - ) { - assert concurrentRequests >= 0; - this.consumer = consumer; - this.listener = listener; - this.concurrentRequests = concurrentRequests; - this.retry = new Retry(backoffPolicy, scheduler); - this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1); - } - - public void execute(BulkRequest bulkRequest, long executionId) { - Runnable toRelease = () -> {}; - boolean bulkRequestSetupSuccessful = false; - try { - listener.beforeBulk(executionId, bulkRequest); - semaphore.acquire(); - toRelease = semaphore::release; - CountDownLatch latch = new CountDownLatch(1); - retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener() { - @Override - public void onResponse(BulkResponse response) { - listener.afterBulk(executionId, bulkRequest, response); - } - - @Override - public void onFailure(Exception e) { - listener.afterBulk(executionId, bulkRequest, e); - } - }, () -> { - semaphore.release(); - latch.countDown(); - })); - bulkRequestSetupSuccessful = true; - if (concurrentRequests == 0) { - latch.await(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.info(() -> "Bulk request " + executionId + " has been cancelled.", e); - listener.afterBulk(executionId, bulkRequest, e); - } catch (Exception e) { - logger.warn(() -> "Failed to execute bulk request " + executionId + ".", e); - listener.afterBulk(executionId, bulkRequest, e); - } finally { - if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore - toRelease.run(); - } - } - } - - boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { - if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) { - semaphore.release(this.concurrentRequests); - return true; - } - return false; - } -} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java deleted file mode 100644 index 918295a756eb..000000000000 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ /dev/null @@ -1,557 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.action.bulk; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.common.BackoffPolicy; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Strings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.xcontent.XContentType; -import org.junit.After; -import org.junit.Before; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -public class BulkProcessorTests extends ESTestCase { - - private ThreadPool threadPool; - - @Before - public void startThreadPool() { - threadPool = new TestThreadPool("BulkProcessorTests"); - } - - @After - public void stopThreadPool() throws InterruptedException { - terminate(threadPool); - } - - public void testBulkProcessorFlushPreservesContext() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final String headerKey = randomAlphaOfLengthBetween(1, 8); - final String transientKey = randomAlphaOfLengthBetween(1, 8); - final String headerValue = randomAlphaOfLengthBetween(1, 32); - final Object transientValue = new Object(); - - BiConsumer> consumer = (request, listener) -> { - ThreadContext threadContext = threadPool.getThreadContext(); - assertEquals(headerValue, threadContext.getHeader(headerKey)); - assertSame(transientValue, threadContext.getTransient(transientKey)); - latch.countDown(); - }; - - final int bulkSize = randomIntBetween(2, 32); - final TimeValue flushInterval = TimeValue.timeValueSeconds(1L); - final BulkProcessor bulkProcessor; - assertNull(threadPool.getThreadContext().getHeader(headerKey)); - assertNull(threadPool.getThreadContext().getTransient(transientKey)); - try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { - threadPool.getThreadContext().putHeader(headerKey, headerValue); - threadPool.getThreadContext().putTransient(transientKey, transientValue); - bulkProcessor = new BulkProcessor( - consumer, - BackoffPolicy.noBackoff(), - emptyListener(), - 1, - bulkSize, - ByteSizeValue.of(5, ByteSizeUnit.MB), - flushInterval, - threadPool, - () -> {}, - BulkRequest::new - ); - } - assertNull(threadPool.getThreadContext().getHeader(headerKey)); - assertNull(threadPool.getThreadContext().getTransient(transientKey)); - - // add a single item which won't be over the size or number of items - bulkProcessor.add(new IndexRequest()); - - // wait for flush to execute - latch.await(); - - assertNull(threadPool.getThreadContext().getHeader(headerKey)); - assertNull(threadPool.getThreadContext().getTransient(transientKey)); - bulkProcessor.close(); - } - - public void testRetry() throws Exception { - final int maxAttempts = between(1, 3); - final AtomicInteger attemptRef = new AtomicInteger(); - - final BiConsumer> consumer = (request, listener) -> { - final int attempt = attemptRef.incrementAndGet(); - assertThat(attempt, lessThanOrEqualTo(maxAttempts)); - if (attempt != 1) { - assertThat(Thread.currentThread().getName(), containsString("[BulkProcessorTests-retry-scheduler]")); - } - - if (attempt == maxAttempts) { - listener.onFailure(new ElasticsearchException("final failure")); - } else { - listener.onFailure(new RemoteTransportException("remote", new EsRejectedExecutionException("retryable failure"))); - } - }; - - final CountDownLatch countDownLatch = new CountDownLatch(1); - final BulkProcessor.Listener listener = new BulkProcessor.Listener() { - - @Override - public void beforeBulk(long executionId, BulkRequest request) {} - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - fail("afterBulk should not return success"); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - assertThat(failure, instanceOf(ElasticsearchException.class)); - assertThat(failure.getMessage(), equalTo("final failure")); - countDownLatch.countDown(); - } - }; - - try ( - BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests") - .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.ZERO, Integer.MAX_VALUE)) - .build() - ) { - bulkProcessor.add(new IndexRequest()); - bulkProcessor.flush(); - assertTrue(countDownLatch.await(5, TimeUnit.SECONDS)); - } - - assertThat(attemptRef.get(), equalTo(maxAttempts)); - } - - public void testConcurrentExecutions() throws Exception { - final AtomicBoolean called = new AtomicBoolean(false); - final AtomicReference exceptionRef = new AtomicReference<>(); - int estimatedTimeForTest = Integer.MAX_VALUE; - final int simulateWorkTimeInMillis = 5; - int concurrentClients = 0; - int concurrentBulkRequests = 0; - int expectedExecutions = 0; - int maxBatchSize = 0; - int maxDocuments = 0; - int iterations = 0; - boolean runTest = true; - // find some randoms that allow this test to take under ~ 10 seconds - while (estimatedTimeForTest > 10_000) { - if (iterations++ > 1_000) { // extremely unlikely - runTest = false; - break; - } - maxBatchSize = randomIntBetween(1, 100); - maxDocuments = randomIntBetween(maxBatchSize, 1_000_000); - concurrentClients = randomIntBetween(1, 20); - concurrentBulkRequests = randomIntBetween(0, 20); - expectedExecutions = maxDocuments / maxBatchSize; - estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) / Math.min( - concurrentBulkRequests + 1, - concurrentClients - ); - } - assumeTrue("failed to find random values that allows test to run quickly", runTest); - BulkResponse bulkResponse = new BulkResponse( - new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) }, - 0 - ); - AtomicInteger failureCount = new AtomicInteger(0); - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger requestCount = new AtomicInteger(0); - AtomicInteger docCount = new AtomicInteger(0); - BiConsumer> consumer = (request, listener) -> { - try { - Thread.sleep(simulateWorkTimeInMillis); // simulate work - listener.onResponse(bulkResponse); - } catch (InterruptedException e) { - // should never happen - Thread.currentThread().interrupt(); - failureCount.getAndIncrement(); - exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); - } - }; - try ( - BulkProcessor bulkProcessor = new BulkProcessor( - consumer, - BackoffPolicy.noBackoff(), - countingListener(requestCount, successCount, failureCount, docCount, exceptionRef), - concurrentBulkRequests, - maxBatchSize, - ByteSizeValue.ofBytes(Integer.MAX_VALUE), - null, - UnusedScheduler.INSTANCE, - () -> called.set(true), - BulkRequest::new - ) - ) { - - ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients); - CountDownLatch startGate = new CountDownLatch(1 + concurrentClients); - - IndexRequest indexRequest = new IndexRequest(); - String bulkRequest = """ - { "index" : { "_index" : "test", "_id" : "1" } } - { "field1" : "value1" } - """; - BytesReference bytesReference = BytesReference.fromByteBuffers( - new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) } - ); - List> futures = new ArrayList<>(); - for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { - futures.add(executorService.submit(() -> { - try { - // don't start any work until all tasks are submitted - startGate.countDown(); - startGate.await(); - // alternate between ways to add to the bulk processor - if (randomBoolean()) { - bulkProcessor.add(indexRequest); - } else { - bulkProcessor.add(bytesReference, null, null, XContentType.JSON); - } - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - })); - } - startGate.countDown(); - startGate.await(); - - for (Future f : futures) { - try { - f.get(); - } catch (Exception e) { - failureCount.incrementAndGet(); - exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); - } - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.SECONDS); - - if (failureCount.get() > 0 || successCount.get() != expectedExecutions || requestCount.get() != successCount.get()) { - if (exceptionRef.get() != null) { - logger.error("exception(s) caught during test", exceptionRef.get()); - } - String message = """ - - Expected Bulks: %s - Requested Bulks: %s - Successful Bulks: %s - Failed Bulks: %ds - Max Documents: %s - Max Batch Size: %s - Concurrent Clients: %s - Concurrent Bulk Requests: %s - """; - fail( - Strings.format( - message, - expectedExecutions, - requestCount.get(), - successCount.get(), - failureCount.get(), - maxDocuments, - maxBatchSize, - concurrentClients, - concurrentBulkRequests - ) - ); - } - } - // count total docs after processor is closed since there may have been partial batches that are flushed on close. - assertEquals(docCount.get(), maxDocuments); - } - - public void testConcurrentExecutionsWithFlush() throws Exception { - final AtomicReference exceptionRef = new AtomicReference<>(); - final int maxDocuments = 100_000; - final int concurrentClients = 2; - final int maxBatchSize = Integer.MAX_VALUE; // don't flush based on size - final int concurrentBulkRequests = randomIntBetween(0, 20); - final int simulateWorkTimeInMillis = 5; - BulkResponse bulkResponse = new BulkResponse( - new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) }, - 0 - ); - AtomicInteger failureCount = new AtomicInteger(0); - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger requestCount = new AtomicInteger(0); - AtomicInteger docCount = new AtomicInteger(0); - BiConsumer> consumer = (request, listener) -> { - try { - Thread.sleep(simulateWorkTimeInMillis); // simulate work - listener.onResponse(bulkResponse); - } catch (InterruptedException e) { - // should never happen - Thread.currentThread().interrupt(); - failureCount.getAndIncrement(); - exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); - } - }; - ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1); - try ( - BulkProcessor bulkProcessor = new BulkProcessor( - consumer, - BackoffPolicy.noBackoff(), - countingListener(requestCount, successCount, failureCount, docCount, exceptionRef), - concurrentBulkRequests, - maxBatchSize, - ByteSizeValue.ofBytes(Integer.MAX_VALUE), - TimeValue.timeValueMillis(simulateWorkTimeInMillis * 2), - new ScheduledExecutorServiceScheduler(flushExecutor), - () -> { - flushExecutor.shutdown(); - try { - flushExecutor.awaitTermination(10L, TimeUnit.SECONDS); - if (flushExecutor.isTerminated() == false) { - flushExecutor.shutdownNow(); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - }, - BulkRequest::new - ) - ) { - - ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients); - IndexRequest indexRequest = new IndexRequest(); - String bulkRequest = """ - { "index" : { "_index" : "test", "_id" : "1" } } - { "field1" : "value1" } - """; - BytesReference bytesReference = BytesReference.fromByteBuffers( - new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) } - ); - List> futures = new ArrayList<>(); - CountDownLatch startGate = new CountDownLatch(1 + concurrentClients); - for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { - futures.add(executorService.submit(() -> { - try { - // don't start any work until all tasks are submitted - startGate.countDown(); - startGate.await(); - // alternate between ways to add to the bulk processor - if (randomBoolean()) { - bulkProcessor.add(indexRequest); - } else { - bulkProcessor.add(bytesReference, null, null, XContentType.JSON); - } - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - })); - } - startGate.countDown(); - startGate.await(); - - for (Future f : futures) { - try { - f.get(); - } catch (Exception e) { - failureCount.incrementAndGet(); - exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); - } - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.SECONDS); - } - - if (failureCount.get() > 0 || requestCount.get() != successCount.get() || maxDocuments != docCount.get()) { - if (exceptionRef.get() != null) { - logger.error("exception(s) caught during test", exceptionRef.get()); - } - String message = """ - - Requested Bulks: %d - Successful Bulks: %d - Failed Bulks: %d - Total Documents: %d - Max Documents: %d - Max Batch Size: %d - Concurrent Clients: %d - Concurrent Bulk Requests: %d - """; - fail( - Strings.format( - message, - requestCount.get(), - successCount.get(), - failureCount.get(), - docCount.get(), - maxDocuments, - maxBatchSize, - concurrentClients, - concurrentBulkRequests - ) - ); - } - } - - public void testAwaitOnCloseCallsOnClose() throws Exception { - final AtomicBoolean called = new AtomicBoolean(false); - BiConsumer> consumer = (request, listener) -> {}; - BulkProcessor bulkProcessor = new BulkProcessor( - consumer, - BackoffPolicy.noBackoff(), - emptyListener(), - 0, - 10, - ByteSizeValue.ofBytes(1000), - null, - UnusedScheduler.INSTANCE, - () -> called.set(true), - BulkRequest::new - ); - - assertFalse(called.get()); - bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); - assertTrue(called.get()); - } - - public void testDisableFlush() { - final AtomicInteger attemptRef = new AtomicInteger(); - - BulkResponse bulkResponse = new BulkResponse( - new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) }, - 0 - ); - - final BiConsumer> consumer = (request, listener) -> { - listener.onResponse(bulkResponse); - }; - - final BulkProcessor.Listener listener = new BulkProcessor.Listener() { - - @Override - public void beforeBulk(long executionId, BulkRequest request) {} - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - attemptRef.incrementAndGet(); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} - }; - - AtomicBoolean flushEnabled = new AtomicBoolean(false); - try ( - BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests") - .setFlushCondition(flushEnabled::get) - .build() - ) { - bulkProcessor.add(new IndexRequest()); - bulkProcessor.flush(); - assertThat(attemptRef.get(), equalTo(0)); - - flushEnabled.set(true); - bulkProcessor.flush(); - assertThat(attemptRef.get(), equalTo(1)); - } - } - - private BulkProcessor.Listener emptyListener() { - return new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) {} - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} - }; - } - - private BulkProcessor.Listener countingListener( - AtomicInteger requestCount, - AtomicInteger successCount, - AtomicInteger failureCount, - AtomicInteger docCount, - AtomicReference exceptionRef - ) { - - return new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - requestCount.incrementAndGet(); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - successCount.incrementAndGet(); - docCount.addAndGet(request.requests().size()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (failure != null) { - failureCount.incrementAndGet(); - exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), failure)); - - } - } - }; - } - - private DocWriteResponse mockResponse() { - return new IndexResponse(new ShardId("index", "uid", 0), "id", 1, 1, 1, true); - } - - private static class UnusedScheduler implements Scheduler { - static UnusedScheduler INSTANCE = new UnusedScheduler(); - - @Override - public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) { - throw new AssertionError("should not be called"); - } - } -} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java index b25a722abcab..793bf865b646 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; @@ -95,7 +96,11 @@ public class ILMHistoryStore implements Closeable { new BulkProcessor2.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) { + if (clusterService.state() + .getMetadata() + .getProject(ProjectId.DEFAULT) + .templatesV2() + .containsKey(ILM_TEMPLATE_NAME) == false) { ElasticsearchException e = new ElasticsearchException("no ILM history template"); logger.warn( () -> format(