mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-07-19 04:03:27 -04:00
Split bulks based on memory usage (#112267)
This commit splits bulks once memory usage for indexing pressure has passed a configurable threshold.
This commit is contained in:
parent
cbcbc34863
commit
c00768a116
9 changed files with 179 additions and 48 deletions
|
@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
|
||||||
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
|
||||||
public class IncrementalBulkIT extends ESIntegTestCase {
|
public class IncrementalBulkIT extends ESIntegTestCase {
|
||||||
|
@ -55,6 +56,14 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
||||||
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
|
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
||||||
|
return Settings.builder()
|
||||||
|
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
||||||
|
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
public void testSingleBulkRequest() {
|
public void testSingleBulkRequest() {
|
||||||
String index = "test";
|
String index = "test";
|
||||||
createIndex(index);
|
createIndex(index);
|
||||||
|
@ -81,6 +90,71 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
||||||
assertFalse(refCounted.hasReferences());
|
assertFalse(refCounted.hasReferences());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndexingPressureRejection() {
|
||||||
|
String index = "test";
|
||||||
|
createIndex(index);
|
||||||
|
|
||||||
|
String nodeName = internalCluster().getRandomNodeName();
|
||||||
|
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
|
||||||
|
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
|
||||||
|
|
||||||
|
try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
|
||||||
|
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
|
||||||
|
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
AtomicBoolean nextPage = new AtomicBoolean(false);
|
||||||
|
refCounted.incRef();
|
||||||
|
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
|
||||||
|
assertTrue(nextPage.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
|
||||||
|
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
|
||||||
|
|
||||||
|
expectThrows(EsRejectedExecutionException.class, future::actionGet);
|
||||||
|
assertFalse(refCounted.hasReferences());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
|
||||||
|
String index = "test";
|
||||||
|
createIndex(index);
|
||||||
|
|
||||||
|
String nodeName = internalCluster().getRandomNodeName();
|
||||||
|
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
|
||||||
|
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
|
||||||
|
|
||||||
|
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
|
||||||
|
|
||||||
|
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
|
||||||
|
AtomicBoolean nextPage = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
IndexRequest indexRequest = indexRequest(index);
|
||||||
|
long total = indexRequest.ramBytesUsed();
|
||||||
|
while (total < 512) {
|
||||||
|
refCounted.incRef();
|
||||||
|
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
|
||||||
|
assertTrue(nextPage.get());
|
||||||
|
nextPage.set(false);
|
||||||
|
indexRequest = indexRequest(index);
|
||||||
|
total += indexRequest.ramBytesUsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
|
||||||
|
refCounted.incRef();
|
||||||
|
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
|
||||||
|
|
||||||
|
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
|
||||||
|
|
||||||
|
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
|
||||||
|
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
|
||||||
|
|
||||||
|
BulkResponse bulkResponse = future.actionGet();
|
||||||
|
assertNoFailures(bulkResponse);
|
||||||
|
assertFalse(refCounted.hasReferences());
|
||||||
|
}
|
||||||
|
|
||||||
public void testMultipleBulkPartsWithBackoff() {
|
public void testMultipleBulkPartsWithBackoff() {
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||||
|
|
||||||
|
|
|
@ -409,7 +409,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
|
||||||
responses.toArray(new BulkItemResponse[responses.length()]),
|
responses.toArray(new BulkItemResponse[responses.length()]),
|
||||||
buildTookInMillis(startTimeNanos),
|
buildTookInMillis(startTimeNanos),
|
||||||
BulkResponse.NO_INGEST_TOOK,
|
BulkResponse.NO_INGEST_TOOK,
|
||||||
new BulkRequest.IncrementalState(shortCircuitShardFailures)
|
new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
// Allow memory for bulk shard request items to be reclaimed before all items have been completed
|
// Allow memory for bulk shard request items to be reclaimed before all items have been completed
|
||||||
|
|
|
@ -506,12 +506,12 @@ public class BulkRequest extends ActionRequest
|
||||||
return Map.of();
|
return Map.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable {
|
record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {
|
||||||
|
|
||||||
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap());
|
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);
|
||||||
|
|
||||||
IncrementalState(StreamInput in) throws IOException {
|
IncrementalState(StreamInput in) throws IOException {
|
||||||
this(in.readMap(ShardId::new, input -> input.readException()));
|
this(in.readMap(ShardId::new, input -> input.readException()), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -9,24 +9,30 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.bulk;
|
package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.Accountable;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.client.internal.Client;
|
import org.elasticsearch.client.internal.Client;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.Releasable;
|
import org.elasticsearch.core.Releasable;
|
||||||
import org.elasticsearch.core.Releasables;
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
|
import org.elasticsearch.index.IndexingPressure;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class IncrementalBulkService {
|
public class IncrementalBulkService {
|
||||||
|
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
private final IndexingPressure indexingPressure;
|
||||||
|
|
||||||
public IncrementalBulkService(Client client) {
|
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
this.indexingPressure = indexingPressure;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Handler newBulkRequest() {
|
public Handler newBulkRequest() {
|
||||||
|
@ -34,12 +40,15 @@ public class IncrementalBulkService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
||||||
return new Handler(client, waitForActiveShards, timeout, refresh);
|
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Handler {
|
public static class Handler {
|
||||||
|
|
||||||
|
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
|
||||||
|
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
private final IndexingPressure indexingPressure;
|
||||||
private final ActiveShardCount waitForActiveShards;
|
private final ActiveShardCount waitForActiveShards;
|
||||||
private final TimeValue timeout;
|
private final TimeValue timeout;
|
||||||
private final String refresh;
|
private final String refresh;
|
||||||
|
@ -51,12 +60,19 @@ public class IncrementalBulkService {
|
||||||
private Exception bulkActionLevelFailure = null;
|
private Exception bulkActionLevelFailure = null;
|
||||||
private BulkRequest bulkRequest = null;
|
private BulkRequest bulkRequest = null;
|
||||||
|
|
||||||
private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
|
private Handler(
|
||||||
|
Client client,
|
||||||
|
IndexingPressure indexingPressure,
|
||||||
|
@Nullable String waitForActiveShards,
|
||||||
|
@Nullable TimeValue timeout,
|
||||||
|
@Nullable String refresh
|
||||||
|
) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
this.indexingPressure = indexingPressure;
|
||||||
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
this.refresh = refresh;
|
this.refresh = refresh;
|
||||||
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY);
|
createNewBulkRequest(EMPTY_STATE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
|
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
|
||||||
|
@ -65,35 +81,39 @@ public class IncrementalBulkService {
|
||||||
nextItems.run();
|
nextItems.run();
|
||||||
} else {
|
} else {
|
||||||
assert bulkRequest != null;
|
assert bulkRequest != null;
|
||||||
internalAddItems(items, releasable);
|
if (internalAddItems(items, releasable)) {
|
||||||
|
if (shouldBackOff()) {
|
||||||
|
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||||
|
incrementalRequestSubmitted = true;
|
||||||
|
|
||||||
if (shouldBackOff()) {
|
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
||||||
final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
|
||||||
incrementalRequestSubmitted = true;
|
|
||||||
|
|
||||||
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {
|
@Override
|
||||||
|
public void onResponse(BulkResponse bulkResponse) {
|
||||||
|
responses.add(bulkResponse);
|
||||||
|
releaseCurrentReferences();
|
||||||
|
createNewBulkRequest(
|
||||||
|
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse bulkResponse) {
|
public void onFailure(Exception e) {
|
||||||
responses.add(bulkResponse);
|
handleBulkFailure(isFirstRequest, e);
|
||||||
releaseCurrentReferences();
|
}
|
||||||
createNewBulkRequest(bulkResponse.getIncrementalState());
|
}, nextItems));
|
||||||
}
|
} else {
|
||||||
|
nextItems.run();
|
||||||
@Override
|
}
|
||||||
public void onFailure(Exception e) {
|
|
||||||
handleBulkFailure(isFirstRequest, e);
|
|
||||||
}
|
|
||||||
}, nextItems::run));
|
|
||||||
} else {
|
} else {
|
||||||
nextItems.run();
|
nextItems.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldBackOff() {
|
private boolean shouldBackOff() {
|
||||||
// TODO: Implement Real Memory Logic
|
return indexingPressure.shouldSplitBulks();
|
||||||
return bulkRequest.requests().size() >= 16;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
|
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
|
||||||
|
@ -102,25 +122,27 @@ public class IncrementalBulkService {
|
||||||
errorResponse(listener);
|
errorResponse(listener);
|
||||||
} else {
|
} else {
|
||||||
assert bulkRequest != null;
|
assert bulkRequest != null;
|
||||||
internalAddItems(items, releasable);
|
if (internalAddItems(items, releasable)) {
|
||||||
|
client.bulk(bulkRequest, new ActionListener<>() {
|
||||||
|
|
||||||
client.bulk(bulkRequest, new ActionListener<>() {
|
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
||||||
|
|
||||||
private final boolean isFirstRequest = incrementalRequestSubmitted == false;
|
@Override
|
||||||
|
public void onResponse(BulkResponse bulkResponse) {
|
||||||
|
responses.add(bulkResponse);
|
||||||
|
releaseCurrentReferences();
|
||||||
|
listener.onResponse(combineResponses());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse bulkResponse) {
|
public void onFailure(Exception e) {
|
||||||
responses.add(bulkResponse);
|
handleBulkFailure(isFirstRequest, e);
|
||||||
releaseCurrentReferences();
|
errorResponse(listener);
|
||||||
listener.onResponse(combineResponses());
|
}
|
||||||
}
|
});
|
||||||
|
} else {
|
||||||
@Override
|
errorResponse(listener);
|
||||||
public void onFailure(Exception e) {
|
}
|
||||||
handleBulkFailure(isFirstRequest, e);
|
|
||||||
errorResponse(listener);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,9 +182,22 @@ public class IncrementalBulkService {
|
||||||
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
|
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
|
private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
|
||||||
bulkRequest.add(items);
|
try {
|
||||||
releasables.add(releasable);
|
bulkRequest.add(items);
|
||||||
|
releasables.add(releasable);
|
||||||
|
releasables.add(
|
||||||
|
indexingPressure.markCoordinatingOperationStarted(
|
||||||
|
items.size(),
|
||||||
|
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
|
||||||
|
false
|
||||||
|
)
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
} catch (EsRejectedExecutionException e) {
|
||||||
|
handleBulkFailure(incrementalRequestSubmitted == false, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
|
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
|
||||||
|
|
|
@ -112,7 +112,12 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
|
||||||
clusterService.state().metadata().getIndicesLookup(),
|
clusterService.state().metadata().getIndicesLookup(),
|
||||||
systemIndices
|
systemIndices
|
||||||
);
|
);
|
||||||
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
|
final Releasable releasable;
|
||||||
|
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
|
||||||
|
releasable = () -> {};
|
||||||
|
} else {
|
||||||
|
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
|
||||||
|
}
|
||||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||||
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
|
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
|
||||||
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
|
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
|
||||||
|
|
|
@ -560,6 +560,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
FsHealthService.REFRESH_INTERVAL_SETTING,
|
FsHealthService.REFRESH_INTERVAL_SETTING,
|
||||||
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
|
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
|
||||||
IndexingPressure.MAX_INDEXING_BYTES,
|
IndexingPressure.MAX_INDEXING_BYTES,
|
||||||
|
IndexingPressure.SPLIT_BULK_THRESHOLD,
|
||||||
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
|
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
|
||||||
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
|
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
|
||||||
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,
|
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,
|
||||||
|
|
|
@ -30,6 +30,12 @@ public class IndexingPressure {
|
||||||
Setting.Property.NodeScope
|
Setting.Property.NodeScope
|
||||||
);
|
);
|
||||||
|
|
||||||
|
public static final Setting<ByteSizeValue> SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting(
|
||||||
|
"indexing_pressure.memory.split_bulk_threshold",
|
||||||
|
"8.5%",
|
||||||
|
Setting.Property.NodeScope
|
||||||
|
);
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
|
private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
|
||||||
|
|
||||||
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
|
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
|
||||||
|
@ -57,10 +63,12 @@ public class IndexingPressure {
|
||||||
private final AtomicLong primaryDocumentRejections = new AtomicLong(0);
|
private final AtomicLong primaryDocumentRejections = new AtomicLong(0);
|
||||||
|
|
||||||
private final long primaryAndCoordinatingLimits;
|
private final long primaryAndCoordinatingLimits;
|
||||||
|
private final long splitBulkThreshold;
|
||||||
private final long replicaLimits;
|
private final long replicaLimits;
|
||||||
|
|
||||||
public IndexingPressure(Settings settings) {
|
public IndexingPressure(Settings settings) {
|
||||||
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||||
|
this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes();
|
||||||
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
|
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,6 +212,10 @@ public class IndexingPressure {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean shouldSplitBulks() {
|
||||||
|
return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
public IndexingPressureStats stats() {
|
public IndexingPressureStats stats() {
|
||||||
return new IndexingPressureStats(
|
return new IndexingPressureStats(
|
||||||
totalCombinedCoordinatingAndPrimaryBytes.get(),
|
totalCombinedCoordinatingAndPrimaryBytes.get(),
|
||||||
|
|
|
@ -980,7 +980,7 @@ class NodeConstruction {
|
||||||
);
|
);
|
||||||
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
|
||||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||||
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client);
|
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
|
||||||
|
|
||||||
SnapshotsService snapshotsService = new SnapshotsService(
|
SnapshotsService snapshotsService = new SnapshotsService(
|
||||||
settings,
|
settings,
|
||||||
|
|
|
@ -125,6 +125,7 @@ import org.elasticsearch.http.HttpInfo;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
|
import org.elasticsearch.index.IndexingPressure;
|
||||||
import org.elasticsearch.index.MergePolicyConfig;
|
import org.elasticsearch.index.MergePolicyConfig;
|
||||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||||
import org.elasticsearch.index.MockEngineFactoryPlugin;
|
import org.elasticsearch.index.MockEngineFactoryPlugin;
|
||||||
|
@ -2103,6 +2104,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
||||||
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(),
|
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE.getKey(),
|
||||||
randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE)
|
randomFrom(1, 2, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE)
|
||||||
);
|
);
|
||||||
|
if (randomBoolean()) {
|
||||||
|
builder.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), randomFrom("256B", "1KB", "64KB"));
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue