mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-20 13:17:31 -04:00
Split bulks based on memory usage (#112267)
This commit splits bulks once memory usage for indexing pressure has passed a configurable threshold.
This commit is contained in:
parent
cbcbc34863
commit
c00768a116
9 changed files with 179 additions and 48 deletions
|
@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
|
||||
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class IncrementalBulkIT extends ESIntegTestCase {
|
||||
|
@ -55,6 +56,14 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
|||
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
||||
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testSingleBulkRequest() {
|
||||
String index = "test";
|
||||
createIndex(index);
|
||||
|
@ -81,6 +90,71 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
|||
assertFalse(refCounted.hasReferences());
|
||||
}
|
||||
|
||||
public void testIndexingPressureRejection() {
|
||||
String index = "test";
|
||||
createIndex(index);
|
||||
|
||||
String nodeName = internalCluster().getRandomNodeName();
|
||||
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
|
||||
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
|
||||
|
||||
try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
|
||||
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
|
||||
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
|
||||
|
||||
if (randomBoolean()) {
|
||||
AtomicBoolean nextPage = new AtomicBoolean(false);
|
||||
refCounted.incRef();
|
||||
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
|
||||
assertTrue(nextPage.get());
|
||||
}
|
||||
|
||||
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
|
||||
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class, future::actionGet);
|
||||
assertFalse(refCounted.hasReferences());
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
|
||||
String index = "test";
|
||||
createIndex(index);
|
||||
|
||||
String nodeName = internalCluster().getRandomNodeName();
|
||||
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
|
||||
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
|
||||
|
||||
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
|
||||
|
||||
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
|
||||
AtomicBoolean nextPage = new AtomicBoolean(false);
|
||||
|
||||
IndexRequest indexRequest = indexRequest(index);
|
||||
long total = indexRequest.ramBytesUsed();
|
||||
while (total < 512) {
|
||||
refCounted.incRef();
|
||||
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
|
||||
assertTrue(nextPage.get());
|
||||
nextPage.set(false);
|
||||
indexRequest = indexRequest(index);
|
||||
total += indexRequest.ramBytesUsed();
|
||||
}
|
||||
|
||||
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
|
||||
refCounted.incRef();
|
||||
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
|
||||
|
||||
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
|
||||
|
||||
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
|
||||
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
|
||||
|
||||
BulkResponse bulkResponse = future.actionGet();
|
||||
assertNoFailures(bulkResponse);
|
||||
assertFalse(refCounted.hasReferences());
|
||||
}
|
||||
|
||||
public void testMultipleBulkPartsWithBackoff() {
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||
|
||||
|
|
|
@ -409,7 +409,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
|
|||
responses.toArray(new BulkItemResponse[responses.length()]),
|
||||
buildTookInMillis(startTimeNanos),
|
||||
BulkResponse.NO_INGEST_TOOK,
|
||||
new BulkRequest.IncrementalState(shortCircuitShardFailures)
|
||||
new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
|
||||
)
|
||||
);
|
||||
// Allow memory for bulk shard request items to be reclaimed before all items have been completed
|
||||
|
|
|
@ -506,12 +506,12 @@ public class BulkRequest extends ActionRequest
|
|||
return Map.of();
|
||||
}
|
||||
|
||||
record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable {
|
||||
record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {
|
||||
|
||||
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap());
|
||||
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);
|
||||
|
||||
IncrementalState(StreamInput in) throws IOException {
|
||||
this(in.readMap(ShardId::new, input -> input.readException()));
|
||||
this(in.readMap(ShardId::new, input -> input.readException()), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,24 +9,30 @@
|
|||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class IncrementalBulkService {
|
||||
|
||||
private final Client client;
|
||||
private final IndexingPressure indexingPressure;
|
||||
|
||||
public IncrementalBulkService(Client client) {
|
||||
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
|
||||
this.client = client;
|
||||
this.indexingPressure = indexingPressure;
|
||||
}
|
||||
|
||||
public Handler newBulkRequest() {
|
||||
|
@ -34,12 +40,15 @@ public class IncrementalBulkService {
|
|||
}
|
||||
|
||||
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
||||
return new Handler(client, waitForActiveShards, timeout, refresh);
|
||||
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
|
||||
}
|
||||
|
||||
public static class Handler {
|
||||
|
||||
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
|
||||
|
||||
private final Client client;
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final ActiveShardCount waitForActiveShards;
|
||||
private final TimeValue timeout;
|
||||
private final String refresh;
|
||||
|
@ -51,12 +60,19 @@ public class IncrementalBulkService {
|
|||
private Exception bulkActionLevelFailure = null;
|
||||
private BulkRequest bulkRequest = null;
|
||||
|
||||
private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
||||
private Handler(
|
||||
Client client,
|
||||
IndexingPressure indexingPressure,
|
||||
@Nullable String waitForActiveShards,
|
||||
@Nullable TimeValue timeout,
|
||||
@Nullable String refresh
|
||||
) {
|
||||
this.client = client;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
||||
this.timeout = timeout;
|
||||
this.refresh = refresh;
|
||||
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY);
|
||||
createNewBulkRequest(EMPTY_STATE);
|
||||
}
|
||||
|
||||
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
|
||||
|
@ -65,35 +81,39 @@ public class IncrementalBulkService {
|
|||
nextItems.run();
|
||||
} else {
|
||||
assert bulkRequest != null;
|
||||
internalAddItems(items, releasable);
|
||||
if (internalAddItems(items, releasable)) {
|
||||
if (shouldBackOff()) {
|
||||
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
incrementalRequestSubmitted = true;
|
||||
|
||||
if (shouldBackOff()) {
|
||||
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
incrementalRequestSubmitted = true;
|
||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||
|
||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
createNewBulkRequest(
|
||||
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
createNewBulkRequest(bulkResponse.getIncrementalState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
}
|
||||
}, nextItems::run));
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
}
|
||||
}, nextItems));
|
||||
} else {
|
||||
nextItems.run();
|
||||
}
|
||||
} else {
|
||||
nextItems.run();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldBackOff() {
|
||||
// TODO: Implement Real Memory Logic
|
||||
return bulkRequest.requests().size() >= 16;
|
||||
return indexingPressure.shouldSplitBulks();
|
||||
}
|
||||
|
||||
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
|
||||
|
@ -102,25 +122,27 @@ public class IncrementalBulkService {
|
|||
errorResponse(listener);
|
||||
} else {
|
||||
assert bulkRequest != null;
|
||||
internalAddItems(items, releasable);
|
||||
if (internalAddItems(items, releasable)) {
|
||||
client.bulk(bulkRequest, new ActionListener<>() {
|
||||
|
||||
client.bulk(bulkRequest, new ActionListener<>() {
|
||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
|
||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
listener.onResponse(combineResponses());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
responses.add(bulkResponse);
|
||||
releaseCurrentReferences();
|
||||
listener.onResponse(combineResponses());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
errorResponse(listener);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleBulkFailure(isFirstRequest, e);
|
||||
errorResponse(listener);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
errorResponse(listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,9 +182,22 @@ public class IncrementalBulkService {
|
|||
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
|
||||
}
|
||||
|
||||
private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
|
||||
bulkRequest.add(items);
|
||||
releasables.add(releasable);
|
||||
private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
|
||||
try {
|
||||
bulkRequest.add(items);
|
||||
releasables.add(releasable);
|
||||
releasables.add(
|
||||
indexingPressure.markCoordinatingOperationStarted(
|
||||
items.size(),
|
||||
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
|
||||
false
|
||||
)
|
||||
);
|
||||
return true;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
handleBulkFailure(incrementalRequestSubmitted == false, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
|
||||
|
|
|
@ -112,7 +112,12 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
|
|||
clusterService.state().metadata().getIndicesLookup(),
|
||||
systemIndices
|
||||
);
|
||||
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
|
||||
final Releasable releasable;
|
||||
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
|
||||
releasable = () -> {};
|
||||
} else {
|
||||
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
|
||||
}
|
||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
|
||||
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
|
||||
|
|
|
@ -560,6 +560,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
FsHealthService.REFRESH_INTERVAL_SETTING,
|
||||
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
|
||||
IndexingPressure.MAX_INDEXING_BYTES,
|
||||
IndexingPressure.SPLIT_BULK_THRESHOLD,
|
||||
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
|
||||
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
|
||||
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,
|
||||
|
|
|
@ -30,6 +30,12 @@ public class IndexingPressure {
|
|||
Setting.Property.NodeScope
|
||||
);
|
||||
|
||||
public static final Setting<ByteSizeValue> SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting(
|
||||
"indexing_pressure.memory.split_bulk_threshold",
|
||||
"8.5%",
|
||||
Setting.Property.NodeScope
|
||||
);
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
|
||||
|
||||
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
|
||||
|
@ -57,10 +63,12 @@ public class IndexingPressure {
|
|||
private final AtomicLong primaryDocumentRejections = new AtomicLong(0);
|
||||
|
||||
private final long primaryAndCoordinatingLimits;
|
||||
private final long splitBulkThreshold;
|
||||
private final long replicaLimits;
|
||||
|
||||
public IndexingPressure(Settings settings) {
|
||||
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes();
|
||||
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
|
||||
}
|
||||
|
||||
|
@ -204,6 +212,10 @@ public class IndexingPressure {
|
|||
});
|
||||
}
|
||||
|
||||
public boolean shouldSplitBulks() {
|
||||
return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold;
|
||||
}
|
||||
|
||||
public IndexingPressureStats stats() {
|
||||
return new IndexingPressureStats(
|
||||
totalCombinedCoordinatingAndPrimaryBytes.get(),
|
||||
|
|
|
@ -980,7 +980,7 @@ class NodeConstruction {
|
|||
);
|
||||
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client);
|
||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
|
||||
|
||||
SnapshotsService snapshotsService = new SnapshotsService(
|
||||
settings,
|
||||
|
|
|
@ -125,6 +125,7 @@ import org.elasticsearch.http.HttpInfo;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MockEngineFactoryPlugin;
|
||||
|
@ -2103,6 +2104,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(),
|
||||
randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE)
|
||||
);
|
||||
if (randomBoolean()) {
|
||||
builder.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), randomFrom("256B", "1KB", "64KB"));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue