Split bulks based on memory usage (#112267)

This commit splits bulks once memory usage for indexing pressure has
passed a configurable threshold.
This commit is contained in:
Tim Brooks 2024-08-28 16:49:30 -06:00
parent cbcbc34863
commit c00768a116
9 changed files with 179 additions and 48 deletions

View file

@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
public class IncrementalBulkIT extends ESIntegTestCase { public class IncrementalBulkIT extends ESIntegTestCase {
@ -55,6 +56,14 @@ public class IncrementalBulkIT extends ESIntegTestCase {
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class); return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
} }
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
.build();
}
public void testSingleBulkRequest() { public void testSingleBulkRequest() {
String index = "test"; String index = "test";
createIndex(index); createIndex(index);
@ -81,6 +90,71 @@ public class IncrementalBulkIT extends ESIntegTestCase {
assertFalse(refCounted.hasReferences()); assertFalse(refCounted.hasReferences());
} }
public void testIndexingPressureRejection() {
String index = "test";
createIndex(index);
String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
if (randomBoolean()) {
AtomicBoolean nextPage = new AtomicBoolean(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
}
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
expectThrows(EsRejectedExecutionException.class, future::actionGet);
assertFalse(refCounted.hasReferences());
}
}
public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
String index = "test";
createIndex(index);
String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);
IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
while (total < 512) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
indexRequest = indexRequest(index);
total += indexRequest.ramBytesUsed();
}
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
BulkResponse bulkResponse = future.actionGet();
assertNoFailures(bulkResponse);
assertFalse(refCounted.hasReferences());
}
public void testMultipleBulkPartsWithBackoff() { public void testMultipleBulkPartsWithBackoff() {
ExecutorService executorService = Executors.newFixedThreadPool(1); ExecutorService executorService = Executors.newFixedThreadPool(1);

View file

@ -409,7 +409,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
responses.toArray(new BulkItemResponse[responses.length()]), responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos), buildTookInMillis(startTimeNanos),
BulkResponse.NO_INGEST_TOOK, BulkResponse.NO_INGEST_TOOK,
new BulkRequest.IncrementalState(shortCircuitShardFailures) new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
) )
); );
// Allow memory for bulk shard request items to be reclaimed before all items have been completed // Allow memory for bulk shard request items to be reclaimed before all items have been completed

View file

@ -506,12 +506,12 @@ public class BulkRequest extends ActionRequest
return Map.of(); return Map.of();
} }
record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable { record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap()); static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);
IncrementalState(StreamInput in) throws IOException { IncrementalState(StreamInput in) throws IOException {
this(in.readMap(ShardId::new, input -> input.readException())); this(in.readMap(ShardId::new, input -> input.readException()), false);
} }
@Override @Override

View file

@ -9,24 +9,30 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
public class IncrementalBulkService { public class IncrementalBulkService {
private final Client client; private final Client client;
private final IndexingPressure indexingPressure;
public IncrementalBulkService(Client client) { public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
this.client = client; this.client = client;
this.indexingPressure = indexingPressure;
} }
public Handler newBulkRequest() { public Handler newBulkRequest() {
@ -34,12 +40,15 @@ public class IncrementalBulkService {
} }
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, waitForActiveShards, timeout, refresh); return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
} }
public static class Handler { public static class Handler {
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
private final Client client; private final Client client;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards; private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout; private final TimeValue timeout;
private final String refresh; private final String refresh;
@ -51,12 +60,19 @@ public class IncrementalBulkService {
private Exception bulkActionLevelFailure = null; private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null; private BulkRequest bulkRequest = null;
private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { private Handler(
Client client,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client; this.client = client;
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout; this.timeout = timeout;
this.refresh = refresh; this.refresh = refresh;
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY); createNewBulkRequest(EMPTY_STATE);
} }
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) { public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
@ -65,35 +81,39 @@ public class IncrementalBulkService {
nextItems.run(); nextItems.run();
} else { } else {
assert bulkRequest != null; assert bulkRequest != null;
internalAddItems(items, releasable); if (internalAddItems(items, releasable)) {
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
if (shouldBackOff()) { client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { @Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}
@Override @Override
public void onResponse(BulkResponse bulkResponse) { public void onFailure(Exception e) {
responses.add(bulkResponse); handleBulkFailure(isFirstRequest, e);
releaseCurrentReferences(); }
createNewBulkRequest(bulkResponse.getIncrementalState()); }, nextItems));
} } else {
nextItems.run();
@Override }
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems::run));
} else { } else {
nextItems.run(); nextItems.run();
} }
} }
} }
private boolean shouldBackOff() { private boolean shouldBackOff() {
// TODO: Implement Real Memory Logic return indexingPressure.shouldSplitBulks();
return bulkRequest.requests().size() >= 16;
} }
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) { public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
@ -102,25 +122,27 @@ public class IncrementalBulkService {
errorResponse(listener); errorResponse(listener);
} else { } else {
assert bulkRequest != null; assert bulkRequest != null;
internalAddItems(items, releasable); if (internalAddItems(items, releasable)) {
client.bulk(bulkRequest, new ActionListener<>() {
client.bulk(bulkRequest, new ActionListener<>() { private final boolean isFirstRequest = incrementalRequestSubmitted == false;
private final boolean isFirstRequest = incrementalRequestSubmitted == false; @Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override @Override
public void onResponse(BulkResponse bulkResponse) { public void onFailure(Exception e) {
responses.add(bulkResponse); handleBulkFailure(isFirstRequest, e);
releaseCurrentReferences(); errorResponse(listener);
listener.onResponse(combineResponses()); }
} });
} else {
@Override errorResponse(listener);
public void onFailure(Exception e) { }
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
} }
} }
@ -160,9 +182,22 @@ public class IncrementalBulkService {
responses.add(new BulkResponse(bulkItemResponses, 0, 0)); responses.add(new BulkResponse(bulkItemResponses, 0, 0));
} }
private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) { private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
bulkRequest.add(items); try {
releasables.add(releasable); bulkRequest.add(items);
releasables.add(releasable);
releasables.add(
indexingPressure.markCoordinatingOperationStarted(
items.size(),
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
false
)
);
return true;
} catch (EsRejectedExecutionException e) {
handleBulkFailure(incrementalRequestSubmitted == false, e);
return false;
}
} }
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) { private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {

View file

@ -112,7 +112,12 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
clusterService.state().metadata().getIndicesLookup(), clusterService.state().metadata().getIndicesLookup(),
systemIndices systemIndices
); );
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); final Releasable releasable;
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
releasable = () -> {};
} else {
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close); final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor; final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);

View file

@ -560,6 +560,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
IndexingPressure.MAX_INDEXING_BYTES, IndexingPressure.MAX_INDEXING_BYTES,
IndexingPressure.SPLIT_BULK_THRESHOLD,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING, DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING, CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,

View file

@ -30,6 +30,12 @@ public class IndexingPressure {
Setting.Property.NodeScope Setting.Property.NodeScope
); );
public static final Setting<ByteSizeValue> SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting(
"indexing_pressure.memory.split_bulk_threshold",
"8.5%",
Setting.Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(IndexingPressure.class); private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
@ -57,10 +63,12 @@ public class IndexingPressure {
private final AtomicLong primaryDocumentRejections = new AtomicLong(0); private final AtomicLong primaryDocumentRejections = new AtomicLong(0);
private final long primaryAndCoordinatingLimits; private final long primaryAndCoordinatingLimits;
private final long splitBulkThreshold;
private final long replicaLimits; private final long replicaLimits;
public IndexingPressure(Settings settings) { public IndexingPressure(Settings settings) {
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes();
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
} }
@ -204,6 +212,10 @@ public class IndexingPressure {
}); });
} }
public boolean shouldSplitBulks() {
return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold;
}
public IndexingPressureStats stats() { public IndexingPressureStats stats() {
return new IndexingPressureStats( return new IndexingPressureStats(
totalCombinedCoordinatingAndPrimaryBytes.get(), totalCombinedCoordinatingAndPrimaryBytes.get(),

View file

@ -980,7 +980,7 @@ class NodeConstruction {
); );
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings); final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
SnapshotsService snapshotsService = new SnapshotsService( SnapshotsService snapshotsService = new SnapshotsService(
settings, settings,

View file

@ -125,6 +125,7 @@ import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.MockEngineFactoryPlugin;
@ -2103,6 +2104,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(), TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(),
randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE)
); );
if (randomBoolean()) {
builder.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), randomFrom("256B", "1KB", "64KB"));
}
return builder.build(); return builder.build();
} }