Split bulks based on memory usage (#112267)

This commit splits bulks once memory usage for indexing pressure has
passed a configurable threshold.
This commit is contained in:
Tim Brooks 2024-08-28 16:49:30 -06:00
parent cbcbc34863
commit c00768a116
9 changed files with 179 additions and 48 deletions

View file

@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
public class IncrementalBulkIT extends ESIntegTestCase {
@ -55,6 +56,14 @@ public class IncrementalBulkIT extends ESIntegTestCase {
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
.build();
}
public void testSingleBulkRequest() {
String index = "test";
createIndex(index);
@ -81,6 +90,71 @@ public class IncrementalBulkIT extends ESIntegTestCase {
assertFalse(refCounted.hasReferences());
}
public void testIndexingPressureRejection() {
String index = "test";
createIndex(index);
String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
if (randomBoolean()) {
AtomicBoolean nextPage = new AtomicBoolean(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
}
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
expectThrows(EsRejectedExecutionException.class, future::actionGet);
assertFalse(refCounted.hasReferences());
}
}
public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
String index = "test";
createIndex(index);
String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);
IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
while (total < 512) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
indexRequest = indexRequest(index);
total += indexRequest.ramBytesUsed();
}
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
BulkResponse bulkResponse = future.actionGet();
assertNoFailures(bulkResponse);
assertFalse(refCounted.hasReferences());
}
public void testMultipleBulkPartsWithBackoff() {
ExecutorService executorService = Executors.newFixedThreadPool(1);

View file

@ -409,7 +409,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos),
BulkResponse.NO_INGEST_TOOK,
new BulkRequest.IncrementalState(shortCircuitShardFailures)
new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
)
);
// Allow memory for bulk shard request items to be reclaimed before all items have been completed

View file

@ -506,12 +506,12 @@ public class BulkRequest extends ActionRequest
return Map.of();
}
record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable {
record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap());
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);
IncrementalState(StreamInput in) throws IOException {
this(in.readMap(ShardId::new, input -> input.readException()));
this(in.readMap(ShardId::new, input -> input.readException()), false);
}
@Override

View file

@ -9,24 +9,30 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class IncrementalBulkService {
private final Client client;
private final IndexingPressure indexingPressure;
public IncrementalBulkService(Client client) {
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
this.client = client;
this.indexingPressure = indexingPressure;
}
public Handler newBulkRequest() {
@ -34,12 +40,15 @@ public class IncrementalBulkService {
}
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, waitForActiveShards, timeout, refresh);
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
}
public static class Handler {
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
private final Client client;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
private final String refresh;
@ -51,12 +60,19 @@ public class IncrementalBulkService {
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;
private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
private Handler(
Client client,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY);
createNewBulkRequest(EMPTY_STATE);
}
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
@ -65,35 +81,39 @@ public class IncrementalBulkService {
nextItems.run();
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);
if (internalAddItems(items, releasable)) {
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(bulkResponse.getIncrementalState());
}
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems::run));
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
} else {
nextItems.run();
}
} else {
nextItems.run();
}
}
}
private boolean shouldBackOff() {
// TODO: Implement Real Memory Logic
return bulkRequest.requests().size() >= 16;
return indexingPressure.shouldSplitBulks();
}
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
@ -102,25 +122,27 @@ public class IncrementalBulkService {
errorResponse(listener);
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);
if (internalAddItems(items, releasable)) {
client.bulk(bulkRequest, new ActionListener<>() {
client.bulk(bulkRequest, new ActionListener<>() {
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
} else {
errorResponse(listener);
}
}
}
@ -160,9 +182,22 @@ public class IncrementalBulkService {
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
}
private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
bulkRequest.add(items);
releasables.add(releasable);
private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
try {
bulkRequest.add(items);
releasables.add(releasable);
releasables.add(
indexingPressure.markCoordinatingOperationStarted(
items.size(),
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
false
)
);
return true;
} catch (EsRejectedExecutionException e) {
handleBulkFailure(incrementalRequestSubmitted == false, e);
return false;
}
}
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {

View file

@ -112,7 +112,12 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
clusterService.state().metadata().getIndicesLookup(),
systemIndices
);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final Releasable releasable;
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
releasable = () -> {};
} else {
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);

View file

@ -560,6 +560,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
IndexingPressure.MAX_INDEXING_BYTES,
IndexingPressure.SPLIT_BULK_THRESHOLD,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,

View file

@ -30,6 +30,12 @@ public class IndexingPressure {
Setting.Property.NodeScope
);
public static final Setting<ByteSizeValue> SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting(
"indexing_pressure.memory.split_bulk_threshold",
"8.5%",
Setting.Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
@ -57,10 +63,12 @@ public class IndexingPressure {
private final AtomicLong primaryDocumentRejections = new AtomicLong(0);
private final long primaryAndCoordinatingLimits;
private final long splitBulkThreshold;
private final long replicaLimits;
public IndexingPressure(Settings settings) {
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes();
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}
@ -204,6 +212,10 @@ public class IndexingPressure {
});
}
public boolean shouldSplitBulks() {
return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold;
}
public IndexingPressureStats stats() {
return new IndexingPressureStats(
totalCombinedCoordinatingAndPrimaryBytes.get(),

View file

@ -980,7 +980,7 @@ class NodeConstruction {
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
SnapshotsService snapshotsService = new SnapshotsService(
settings,

View file

@ -125,6 +125,7 @@ import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.MockEngineFactoryPlugin;
@ -2103,6 +2104,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(),
randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE)
);
if (randomBoolean()) {
builder.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), randomFrom("256B", "1KB", "64KB"));
}
return builder.build();
}