Remove unused BulkProcessor

This commit is contained in:
Niels Bauman 2025-06-23 14:26:28 -03:00
parent 321a39738a
commit 25e8ddc9ab
No known key found for this signature in database
GPG key ID: 1E23BD8DDAC3C49C
6 changed files with 6 additions and 1815 deletions

View file

@ -1,373 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.bulk;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class BulkProcessorIT extends ESIntegTestCase {
public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
int numDocs = randomIntBetween(10, 100);
try (
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
.build()
) {
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
}
public void testBulkProcessorFlush() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
int numDocs = randomIntBetween(10, 100);
try (
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
// let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10))
.setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
.build()
) {
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false));
// we really need an explicit flush as none of the bulk thresholds was reached
processor.flush();
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
}
public void testBulkProcessorFlushDisabled() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
int numDocs = randomIntBetween(10, 100);
AtomicBoolean flushEnabled = new AtomicBoolean(false);
try (
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
// let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10))
.setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
.setFlushCondition(flushEnabled::get)
.build()
) {
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false));
// no documents will be indexed here
processor.flush();
flushEnabled.set(true);
processor.flush();
latch.await();
// disabled flush resulted in listener being triggered only once
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
}
public void testBulkProcessorConcurrentRequests() throws Exception {
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
int concurrentRequests = randomIntBetween(0, 7);
int expectedBulkActions = numDocs / bulkActions;
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
MultiGetRequestBuilder multiGetRequestBuilder;
try (
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
.setConcurrentRequests(concurrentRequests)
.setBulkActions(bulkActions)
// set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
.build()
) {
multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions));
}
closeLatch.await();
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(numDocs));
Set<String> ids = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
}
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
public void testBulkProcessorWaitOnClose() throws Exception {
BulkProcessorTestListener listener = new BulkProcessorTestListener();
int numDocs = randomIntBetween(10, 100);
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(randomIntBetween(1, 10), RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
.build();
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
assertThat(processor.isOpen(), is(true));
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
if (randomBoolean()) { // check if we can call it multiple times
if (randomBoolean()) {
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
} else {
processor.close();
}
}
assertThat(processor.isOpen(), is(false));
assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
createIndex("test-ro");
updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true), "test-ro");
ensureGreen();
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
int concurrentRequests = randomIntBetween(0, 10);
int expectedBulkActions = numDocs / bulkActions;
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
int testDocs = 0;
int testReadOnlyDocs = 0;
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
try (
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
.setConcurrentRequests(concurrentRequests)
.setBulkActions(bulkActions)
// set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(ByteSizeValue.of(1, ByteSizeUnit.GB))
.build()
) {
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(
new IndexRequest("test").id(Integer.toString(testDocs)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")
);
multiGetRequestBuilder.add("test", Integer.toString(testDocs));
} else {
testReadOnlyDocs++;
processor.add(
new IndexRequest("test-ro").id(Integer.toString(testReadOnlyDocs))
.source(Requests.INDEX_CONTENT_TYPE, "field", "value")
);
}
}
}
closeLatch.await();
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
Set<String> ids = new HashSet<>();
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
} else {
assertThat(bulkItemResponse.isFailed(), equalTo(true));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
}
}
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
}
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception {
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
for (int i = 1; i <= numDocs; i++) {
processor.add(
new IndexRequest("test").id(Integer.toString(i))
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))
);
multiGetRequestBuilder.add("test", Integer.toString(i));
}
return multiGetRequestBuilder;
}
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(
"item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(),
equalTo(false)
);
}
}
private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) {
assertThat(multiGetResponse.getResponses().length, equalTo(numDocs));
int i = 1;
for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
}
}
private static class BulkProcessorTestListener implements BulkProcessor.Listener {
private final CountDownLatch[] latches;
private final AtomicInteger beforeCounts = new AtomicInteger();
private final AtomicInteger afterCounts = new AtomicInteger();
private final List<BulkItemResponse> bulkItems = new CopyOnWriteArrayList<>();
private final List<Throwable> bulkFailures = new CopyOnWriteArrayList<>();
private BulkProcessorTestListener(CountDownLatch... latches) {
this.latches = latches;
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
beforeCounts.incrementAndGet();
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
bulkItems.addAll(Arrays.asList(response.getItems()));
afterCounts.incrementAndGet();
for (CountDownLatch latch : latches) {
latch.countDown();
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
bulkFailures.add(failure);
afterCounts.incrementAndGet();
for (CountDownLatch latch : latches) {
latch.countDown();
}
}
}
}

View file

@ -1,235 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
public class BulkProcessorRetryIT extends ESIntegTestCase {
private static final String INDEX_NAME = "test";
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
// Have very low pool and queue sizes to overwhelm internal pools easily
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// don't mess with this one! It's quite sensitive to a low queue size
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
// .put("thread_pool.listener.queue_size", 1)
.put("thread_pool.get.queue_size", 1)
// default is 200
.put("thread_pool.write.queue_size", 30)
.build();
}
public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
boolean rejectedExecutionExpected = true;
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
}
private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable {
final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy);
int numberOfAsyncOps = randomIntBetween(600, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final Set<Object> responses = ConcurrentCollections.newConcurrentSet();
assertAcked(prepareCreate(INDEX_NAME));
ensureGreen();
BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// no op
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
internalPolicy.logResponse(response);
responses.add(response);
latch.countDown();
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
}, "BulkProcssorRetryIT")
.setBulkActions(1)
// zero means that we're in the sync case, more means that we're in the async case
.setConcurrentRequests(randomIntBetween(0, 100))
.setBackoffPolicy(internalPolicy)
.build();
indexDocs(bulkProcessor, numberOfAsyncOps);
latch.await(10, TimeUnit.SECONDS);
bulkProcessor.close();
assertThat(responses.size(), equalTo(numberOfAsyncOps));
// validate all responses
boolean rejectedAfterAllRetries = false;
for (Object response : responses) {
if (response instanceof BulkResponse bulkResponse) {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure status: " + failure.getStatus());
}
}
}
} else {
if (ExceptionsHelper.status((Throwable) response) == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
}
}
}
indicesAdmin().refresh(new RefreshRequest()).get();
final boolean finalRejectedAfterAllRetries = rejectedAfterAllRetries;
assertResponse(prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).setSize(0), results -> {
if (rejectedExecutionExpected) {
assertThat((int) results.getHits().getTotalHits().value(), lessThanOrEqualTo(numberOfAsyncOps));
} else if (finalRejectedAfterAllRetries) {
assertThat((int) results.getHits().getTotalHits().value(), lessThan(numberOfAsyncOps));
} else {
assertThat((int) results.getHits().getTotalHits().value(), equalTo(numberOfAsyncOps));
}
});
}
private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}
private static void indexDocs(BulkProcessor processor, int numDocs) {
for (int i = 1; i <= numDocs; i++) {
processor.add(
prepareIndex(INDEX_NAME).setId(Integer.toString(i))
.setSource("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.request()
);
}
}
/**
* Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number
* of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load).
*
* This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
private final BackoffPolicy delegate;
private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}
// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
// we should correlate any iterator only once
iterators.remove();
correlations.put(response, iterator);
}
}
@Override
public Iterator<TimeValue> iterator() {
return new CorrelatingIterator(iterators, delegate.iterator());
}
private static class CorrelatingIterator implements Iterator<TimeValue> {
private final Iterator<TimeValue> delegate;
private final ThreadLocal<Iterator<TimeValue>> iterators;
private CorrelatingIterator(ThreadLocal<Iterator<TimeValue>> iterators, Iterator<TimeValue> delegate) {
this.iterators = iterators;
this.delegate = delegate;
}
@Override
public boolean hasNext() {
// update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that
// we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the
// enclosing class CorrelatingBackoffPolicy this should not pose a major problem though.
iterators.set(this);
return delegate.hasNext();
}
@Override
public TimeValue next() {
// update on every invocation
iterators.set(this);
return delegate.next();
}
}
}
}

View file

@ -1,554 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.xcontent.XContentType;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
* (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
* requests allowed to be executed in parallel.
* <p>
* In order to create a new bulk processor, use the {@link Builder}.
*/
public class BulkProcessor implements Closeable {
static final String FLUSH_SCHEDULER_NAME_SUFFIX = "-flush-scheduler";
static final String RETRY_SCHEDULER_NAME_SUFFIX = "-retry-scheduler";
/**
* A listener for the execution.
*/
public interface Listener {
/**
* Callback before the bulk is executed.
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
* <p>
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
}
/**
* A builder used to create a build an instance of a bulk processor.
*/
public static class Builder {
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final Listener listener;
private final Scheduler flushScheduler;
private final Scheduler retryScheduler;
private final Runnable onClose;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();
private String globalIndex;
private String globalRouting;
private String globalPipeline;
private Supplier<Boolean> flushCondition = () -> true;
private Builder(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
Listener listener,
Scheduler flushScheduler,
Scheduler retryScheduler,
Runnable onClose
) {
this.consumer = consumer;
this.listener = listener;
this.flushScheduler = flushScheduler;
this.retryScheduler = retryScheduler;
this.onClose = onClose;
}
/**
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
* while accumulating new bulk requests. Defaults to {@code 1}.
*/
public Builder setConcurrentRequests(int concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}
/**
* Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
* {@code 1000}. Can be set to {@code -1} to disable it.
*/
public Builder setBulkActions(int bulkActions) {
this.bulkActions = bulkActions;
return this;
}
/**
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
* {@code 5mb}. Can be set to {@code -1} to disable it.
*/
public Builder setBulkSize(ByteSizeValue bulkSize) {
this.bulkSize = bulkSize;
return this;
}
/**
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
* <p>
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
* can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
*/
public Builder setFlushInterval(TimeValue flushInterval) {
this.flushInterval = flushInterval;
return this;
}
public Builder setGlobalIndex(String globalIndex) {
this.globalIndex = globalIndex;
return this;
}
public Builder setGlobalRouting(String globalRouting) {
this.globalRouting = globalRouting;
return this;
}
public Builder setGlobalPipeline(String globalPipeline) {
this.globalPipeline = globalPipeline;
return this;
}
/**
* Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
* in case they have failed due to resource constraints (i.e. a thread pool was full).
*
* The default is to back off exponentially.
*
* @see BackoffPolicy#exponentialBackoff()
*/
public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
if (backoffPolicy == null) {
throw new NullPointerException("'backoffPolicy' must not be null. To disable backoff, pass BackoffPolicy.noBackoff()");
}
this.backoffPolicy = backoffPolicy;
return this;
}
/**
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(
consumer,
backoffPolicy,
listener,
concurrentRequests,
bulkActions,
bulkSize,
flushInterval,
flushScheduler,
retryScheduler,
onClose,
createBulkRequestWithGlobalDefaults(),
flushCondition
);
}
private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
return () -> new BulkRequest(globalIndex).pipeline(globalPipeline).routing(globalRouting);
}
public Builder setFlushCondition(Supplier<Boolean> flushCondition) {
this.flushCondition = flushCondition;
return this;
}
}
/**
* @param consumer The consumer that is called to fulfil bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events
* @param name The name of this processor, e.g. to identify the scheduler threads
* @return the builder for BulkProcessor
*/
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, String name) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor flushScheduler = Scheduler.initScheduler(Settings.EMPTY, name + FLUSH_SCHEDULER_NAME_SUFFIX);
final ScheduledThreadPoolExecutor retryScheduler = Scheduler.initScheduler(Settings.EMPTY, name + RETRY_SCHEDULER_NAME_SUFFIX);
return new Builder(consumer, listener, buildScheduler(flushScheduler), buildScheduler(retryScheduler), () -> {
Scheduler.terminate(flushScheduler, 10, TimeUnit.SECONDS);
Scheduler.terminate(retryScheduler, 10, TimeUnit.SECONDS);
});
}
private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
return new ScheduledExecutorServiceScheduler(scheduledThreadPoolExecutor);
}
private final int bulkActions;
private final long bulkSize;
private final Scheduler.Cancellable cancellableFlushTask;
private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
private final Supplier<BulkRequest> bulkRequestSupplier;
private final Supplier<Boolean> flushSupplier;
private final BulkRequestHandler bulkRequestHandler;
private final Runnable onClose;
private volatile boolean closed = false;
private final ReentrantLock lock = new ReentrantLock();
BulkProcessor(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy,
Listener listener,
int concurrentRequests,
int bulkActions,
ByteSizeValue bulkSize,
@Nullable TimeValue flushInterval,
Scheduler flushScheduler,
Scheduler retryScheduler,
Runnable onClose,
Supplier<BulkRequest> bulkRequestSupplier,
Supplier<Boolean> flushSupplier
) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestSupplier = bulkRequestSupplier;
this.flushSupplier = flushSupplier;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler);
this.onClose = onClose;
}
BulkProcessor(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy,
Listener listener,
int concurrentRequests,
int bulkActions,
ByteSizeValue bulkSize,
@Nullable TimeValue flushInterval,
Scheduler flushScheduler,
Scheduler retryScheduler,
Runnable onClose,
Supplier<BulkRequest> bulkRequestSupplier
) {
this(
consumer,
backoffPolicy,
listener,
concurrentRequests,
bulkActions,
bulkSize,
flushInterval,
flushScheduler,
retryScheduler,
onClose,
bulkRequestSupplier,
() -> true
);
}
/**
* @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry
*/
@Deprecated
BulkProcessor(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy,
Listener listener,
int concurrentRequests,
int bulkActions,
ByteSizeValue bulkSize,
@Nullable TimeValue flushInterval,
Scheduler scheduler,
Runnable onClose,
Supplier<BulkRequest> bulkRequestSupplier
) {
this(
consumer,
backoffPolicy,
listener,
concurrentRequests,
bulkActions,
bulkSize,
flushInterval,
scheduler,
scheduler,
onClose,
bulkRequestSupplier
);
}
/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
*/
@Override
public void close() {
try {
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
}
}
/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
* <p>
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
*
* @param timeout The maximum time to wait for the bulk requests to complete
* @param unit The time unit of the {@code timeout} argument
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests
* completed
* @throws InterruptedException If the current thread is interrupted
*/
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
if (closed) {
return true;
}
closed = true;
this.cancellableFlushTask.cancel();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
} finally {
onClose.run();
}
} finally {
lock.unlock();
}
}
/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
*/
public BulkProcessor add(IndexRequest request) {
return add((DocWriteRequest<?>) request);
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkProcessor add(DeleteRequest request) {
return add((DocWriteRequest<?>) request);
}
/**
* Adds either a delete or an index request.
*/
public BulkProcessor add(DocWriteRequest<?> request) {
internalAdd(request);
return this;
}
boolean isOpen() {
return closed == false;
}
protected void ensureOpen() {
if (closed) {
throw new IllegalStateException("bulk process already closed");
}
}
private void internalAdd(DocWriteRequest<?> request) {
// bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
// once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(request);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}
// execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
}
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public BulkProcessor add(
BytesReference data,
@Nullable String defaultIndex,
@Nullable String defaultPipeline,
XContentType xContentType
) throws Exception {
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(
data,
defaultIndex,
null,
null,
defaultPipeline,
null,
null,
null,
true,
xContentType,
RestApiVersion.current()
);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}
if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
return this;
}
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public boolean cancel() {
return false;
}
@Override
public boolean isCancelled() {
return true;
}
};
}
return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
// needs to be executed under a lock
private Tuple<BulkRequest, Long> newBulkRequestIfNeeded() {
ensureOpen();
if (isOverTheLimit() == false) {
return null;
}
final BulkRequest bulkRequest = this.bulkRequest;
this.bulkRequest = bulkRequestSupplier.get();
return new Tuple<>(bulkRequest, executionIdGen.incrementAndGet());
}
// may be executed without a lock
private void execute(BulkRequest bulkRequest, long executionId) {
this.bulkRequestHandler.execute(bulkRequest, executionId);
}
// needs to be executed under a lock
private void execute() {
if (flushSupplier.get()) {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = bulkRequestSupplier.get();
execute(bulkRequest, executionId);
}
}
// needs to be executed under a lock
private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true;
}
if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
return true;
}
return false;
}
/**
* Flush pending delete or index requests.
*/
public void flush() {
lock.lock();
try {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
} finally {
lock.unlock();
}
}
class Flush implements Runnable {
@Override
public void run() {
lock.lock();
try {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
} finally {
lock.unlock();
}
}
}
}

View file

@ -1,95 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.threadpool.Scheduler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* Implements the low-level details of bulk request handling
*/
public final class BulkRequestHandler {
private static final Logger logger = LogManager.getLogger(BulkRequestHandler.class);
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final Retry retry;
private final int concurrentRequests;
BulkRequestHandler(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener,
Scheduler scheduler,
int concurrentRequests
) {
assert concurrentRequests >= 0;
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
listener.afterBulk(executionId, bulkRequest, response);
}
@Override
public void onFailure(Exception e) {
listener.afterBulk(executionId, bulkRequest, e);
}
}, () -> {
semaphore.release();
latch.countDown();
}));
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> "Bulk request " + executionId + " has been cancelled.", e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> "Failed to execute bulk request " + executionId + ".", e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
return false;
}
}

View file

@ -1,557 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class BulkProcessorTests extends ESTestCase {
private ThreadPool threadPool;
@Before
public void startThreadPool() {
threadPool = new TestThreadPool("BulkProcessorTests");
}
@After
public void stopThreadPool() throws InterruptedException {
terminate(threadPool);
}
public void testBulkProcessorFlushPreservesContext() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final String headerKey = randomAlphaOfLengthBetween(1, 8);
final String transientKey = randomAlphaOfLengthBetween(1, 8);
final String headerValue = randomAlphaOfLengthBetween(1, 32);
final Object transientValue = new Object();
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
ThreadContext threadContext = threadPool.getThreadContext();
assertEquals(headerValue, threadContext.getHeader(headerKey));
assertSame(transientValue, threadContext.getTransient(transientKey));
latch.countDown();
};
final int bulkSize = randomIntBetween(2, 32);
final TimeValue flushInterval = TimeValue.timeValueSeconds(1L);
final BulkProcessor bulkProcessor;
assertNull(threadPool.getThreadContext().getHeader(headerKey));
assertNull(threadPool.getThreadContext().getTransient(transientKey));
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
threadPool.getThreadContext().putHeader(headerKey, headerValue);
threadPool.getThreadContext().putTransient(transientKey, transientValue);
bulkProcessor = new BulkProcessor(
consumer,
BackoffPolicy.noBackoff(),
emptyListener(),
1,
bulkSize,
ByteSizeValue.of(5, ByteSizeUnit.MB),
flushInterval,
threadPool,
() -> {},
BulkRequest::new
);
}
assertNull(threadPool.getThreadContext().getHeader(headerKey));
assertNull(threadPool.getThreadContext().getTransient(transientKey));
// add a single item which won't be over the size or number of items
bulkProcessor.add(new IndexRequest());
// wait for flush to execute
latch.await();
assertNull(threadPool.getThreadContext().getHeader(headerKey));
assertNull(threadPool.getThreadContext().getTransient(transientKey));
bulkProcessor.close();
}
public void testRetry() throws Exception {
final int maxAttempts = between(1, 3);
final AtomicInteger attemptRef = new AtomicInteger();
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
final int attempt = attemptRef.incrementAndGet();
assertThat(attempt, lessThanOrEqualTo(maxAttempts));
if (attempt != 1) {
assertThat(Thread.currentThread().getName(), containsString("[BulkProcessorTests-retry-scheduler]"));
}
if (attempt == maxAttempts) {
listener.onFailure(new ElasticsearchException("final failure"));
} else {
listener.onFailure(new RemoteTransportException("remote", new EsRejectedExecutionException("retryable failure")));
}
};
final CountDownLatch countDownLatch = new CountDownLatch(1);
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
fail("afterBulk should not return success");
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
assertThat(failure, instanceOf(ElasticsearchException.class));
assertThat(failure.getMessage(), equalTo("final failure"));
countDownLatch.countDown();
}
};
try (
BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests")
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.ZERO, Integer.MAX_VALUE))
.build()
) {
bulkProcessor.add(new IndexRequest());
bulkProcessor.flush();
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
}
assertThat(attemptRef.get(), equalTo(maxAttempts));
}
public void testConcurrentExecutions() throws Exception {
final AtomicBoolean called = new AtomicBoolean(false);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
int estimatedTimeForTest = Integer.MAX_VALUE;
final int simulateWorkTimeInMillis = 5;
int concurrentClients = 0;
int concurrentBulkRequests = 0;
int expectedExecutions = 0;
int maxBatchSize = 0;
int maxDocuments = 0;
int iterations = 0;
boolean runTest = true;
// find some randoms that allow this test to take under ~ 10 seconds
while (estimatedTimeForTest > 10_000) {
if (iterations++ > 1_000) { // extremely unlikely
runTest = false;
break;
}
maxBatchSize = randomIntBetween(1, 100);
maxDocuments = randomIntBetween(maxBatchSize, 1_000_000);
concurrentClients = randomIntBetween(1, 20);
concurrentBulkRequests = randomIntBetween(0, 20);
expectedExecutions = maxDocuments / maxBatchSize;
estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) / Math.min(
concurrentBulkRequests + 1,
concurrentClients
);
}
assumeTrue("failed to find random values that allows test to run quickly", runTest);
BulkResponse bulkResponse = new BulkResponse(
new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
0
);
AtomicInteger failureCount = new AtomicInteger(0);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger requestCount = new AtomicInteger(0);
AtomicInteger docCount = new AtomicInteger(0);
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
try {
Thread.sleep(simulateWorkTimeInMillis); // simulate work
listener.onResponse(bulkResponse);
} catch (InterruptedException e) {
// should never happen
Thread.currentThread().interrupt();
failureCount.getAndIncrement();
exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
}
};
try (
BulkProcessor bulkProcessor = new BulkProcessor(
consumer,
BackoffPolicy.noBackoff(),
countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
concurrentBulkRequests,
maxBatchSize,
ByteSizeValue.ofBytes(Integer.MAX_VALUE),
null,
UnusedScheduler.INSTANCE,
() -> called.set(true),
BulkRequest::new
)
) {
ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
IndexRequest indexRequest = new IndexRequest();
String bulkRequest = """
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
""";
BytesReference bytesReference = BytesReference.fromByteBuffers(
new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }
);
List<Future<?>> futures = new ArrayList<>();
for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
futures.add(executorService.submit(() -> {
try {
// don't start any work until all tasks are submitted
startGate.countDown();
startGate.await();
// alternate between ways to add to the bulk processor
if (randomBoolean()) {
bulkProcessor.add(indexRequest);
} else {
bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
}
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}));
}
startGate.countDown();
startGate.await();
for (Future<?> f : futures) {
try {
f.get();
} catch (Exception e) {
failureCount.incrementAndGet();
exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
}
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
if (failureCount.get() > 0 || successCount.get() != expectedExecutions || requestCount.get() != successCount.get()) {
if (exceptionRef.get() != null) {
logger.error("exception(s) caught during test", exceptionRef.get());
}
String message = """
Expected Bulks: %s
Requested Bulks: %s
Successful Bulks: %s
Failed Bulks: %ds
Max Documents: %s
Max Batch Size: %s
Concurrent Clients: %s
Concurrent Bulk Requests: %s
""";
fail(
Strings.format(
message,
expectedExecutions,
requestCount.get(),
successCount.get(),
failureCount.get(),
maxDocuments,
maxBatchSize,
concurrentClients,
concurrentBulkRequests
)
);
}
}
// count total docs after processor is closed since there may have been partial batches that are flushed on close.
assertEquals(docCount.get(), maxDocuments);
}
public void testConcurrentExecutionsWithFlush() throws Exception {
final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
final int maxDocuments = 100_000;
final int concurrentClients = 2;
final int maxBatchSize = Integer.MAX_VALUE; // don't flush based on size
final int concurrentBulkRequests = randomIntBetween(0, 20);
final int simulateWorkTimeInMillis = 5;
BulkResponse bulkResponse = new BulkResponse(
new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
0
);
AtomicInteger failureCount = new AtomicInteger(0);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger requestCount = new AtomicInteger(0);
AtomicInteger docCount = new AtomicInteger(0);
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
try {
Thread.sleep(simulateWorkTimeInMillis); // simulate work
listener.onResponse(bulkResponse);
} catch (InterruptedException e) {
// should never happen
Thread.currentThread().interrupt();
failureCount.getAndIncrement();
exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
}
};
ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1);
try (
BulkProcessor bulkProcessor = new BulkProcessor(
consumer,
BackoffPolicy.noBackoff(),
countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
concurrentBulkRequests,
maxBatchSize,
ByteSizeValue.ofBytes(Integer.MAX_VALUE),
TimeValue.timeValueMillis(simulateWorkTimeInMillis * 2),
new ScheduledExecutorServiceScheduler(flushExecutor),
() -> {
flushExecutor.shutdown();
try {
flushExecutor.awaitTermination(10L, TimeUnit.SECONDS);
if (flushExecutor.isTerminated() == false) {
flushExecutor.shutdownNow();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
},
BulkRequest::new
)
) {
ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
IndexRequest indexRequest = new IndexRequest();
String bulkRequest = """
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
""";
BytesReference bytesReference = BytesReference.fromByteBuffers(
new ByteBuffer[] { ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }
);
List<Future<?>> futures = new ArrayList<>();
CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) {
futures.add(executorService.submit(() -> {
try {
// don't start any work until all tasks are submitted
startGate.countDown();
startGate.await();
// alternate between ways to add to the bulk processor
if (randomBoolean()) {
bulkProcessor.add(indexRequest);
} else {
bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
}
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}));
}
startGate.countDown();
startGate.await();
for (Future<?> f : futures) {
try {
f.get();
} catch (Exception e) {
failureCount.incrementAndGet();
exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
}
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
if (failureCount.get() > 0 || requestCount.get() != successCount.get() || maxDocuments != docCount.get()) {
if (exceptionRef.get() != null) {
logger.error("exception(s) caught during test", exceptionRef.get());
}
String message = """
Requested Bulks: %d
Successful Bulks: %d
Failed Bulks: %d
Total Documents: %d
Max Documents: %d
Max Batch Size: %d
Concurrent Clients: %d
Concurrent Bulk Requests: %d
""";
fail(
Strings.format(
message,
requestCount.get(),
successCount.get(),
failureCount.get(),
docCount.get(),
maxDocuments,
maxBatchSize,
concurrentClients,
concurrentBulkRequests
)
);
}
}
public void testAwaitOnCloseCallsOnClose() throws Exception {
final AtomicBoolean called = new AtomicBoolean(false);
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
BulkProcessor bulkProcessor = new BulkProcessor(
consumer,
BackoffPolicy.noBackoff(),
emptyListener(),
0,
10,
ByteSizeValue.ofBytes(1000),
null,
UnusedScheduler.INSTANCE,
() -> called.set(true),
BulkRequest::new
);
assertFalse(called.get());
bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS);
assertTrue(called.get());
}
public void testDisableFlush() {
final AtomicInteger attemptRef = new AtomicInteger();
BulkResponse bulkResponse = new BulkResponse(
new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
0
);
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
listener.onResponse(bulkResponse);
};
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
attemptRef.incrementAndGet();
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
AtomicBoolean flushEnabled = new AtomicBoolean(false);
try (
BulkProcessor bulkProcessor = BulkProcessor.builder(consumer, listener, "BulkProcessorTests")
.setFlushCondition(flushEnabled::get)
.build()
) {
bulkProcessor.add(new IndexRequest());
bulkProcessor.flush();
assertThat(attemptRef.get(), equalTo(0));
flushEnabled.set(true);
bulkProcessor.flush();
assertThat(attemptRef.get(), equalTo(1));
}
}
private BulkProcessor.Listener emptyListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
}
private BulkProcessor.Listener countingListener(
AtomicInteger requestCount,
AtomicInteger successCount,
AtomicInteger failureCount,
AtomicInteger docCount,
AtomicReference<Throwable> exceptionRef
) {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
requestCount.incrementAndGet();
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
successCount.incrementAndGet();
docCount.addAndGet(request.requests().size());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (failure != null) {
failureCount.incrementAndGet();
exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), failure));
}
}
};
}
private DocWriteResponse mockResponse() {
return new IndexResponse(new ShardId("index", "uid", 0), "id", 1, 1, 1, true);
}
private static class UnusedScheduler implements Scheduler {
static UnusedScheduler INSTANCE = new UnusedScheduler();
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
throw new AssertionError("should not be called");
}
}
}

View file

@ -19,6 +19,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -95,7 +96,11 @@ public class ILMHistoryStore implements Closeable {
new BulkProcessor2.Listener() { new BulkProcessor2.Listener() {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) { public void beforeBulk(long executionId, BulkRequest request) {
if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) { if (clusterService.state()
.getMetadata()
.getProject(ProjectId.DEFAULT)
.templatesV2()
.containsKey(ILM_TEMPLATE_NAME) == false) {
ElasticsearchException e = new ElasticsearchException("no ILM history template"); ElasticsearchException e = new ElasticsearchException("no ILM history template");
logger.warn( logger.warn(
() -> format( () -> format(