mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Avoid hoarding cluster state references during rollover (#124107)
By keeping a list of all the rollover results in a rollover request batch, we were keeping references to all the intermediate cluster states that we built. We've seen this list take up ~1.4GB with 600 rollover requests in one batch. We only kept the list of results to compute the "reason" for the allocation reroute, so we can easily drop the cluster state reference from the list and only keep what we need. Fixes #123893
This commit is contained in:
parent
a1ee3c9291
commit
ff6465b83b
3 changed files with 14 additions and 24 deletions
6
docs/changelog/124107.yaml
Normal file
6
docs/changelog/124107.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 124107
|
||||||
|
summary: Avoid hoarding cluster state references during rollover
|
||||||
|
area: Indices APIs
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 123893
|
|
@ -36,7 +36,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Iterators;
|
|
||||||
import org.elasticsearch.injection.guice.Inject;
|
import org.elasticsearch.injection.guice.Inject;
|
||||||
import org.elasticsearch.tasks.CancellableTask;
|
import org.elasticsearch.tasks.CancellableTask;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
@ -189,7 +188,7 @@ public final class LazyRolloverAction extends ActionType<RolloverResponse> {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
|
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
|
||||||
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
|
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
|
||||||
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
|
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
|
||||||
var state = batchExecutionContext.initialState();
|
var state = batchExecutionContext.initialState();
|
||||||
Map<ProjectId, Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>>> groupedRequests = new HashMap<>();
|
Map<ProjectId, Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>>> groupedRequests = new HashMap<>();
|
||||||
for (final var taskContext : batchExecutionContext.taskContexts()) {
|
for (final var taskContext : batchExecutionContext.taskContexts()) {
|
||||||
|
@ -214,14 +213,7 @@ public final class LazyRolloverAction extends ActionType<RolloverResponse> {
|
||||||
|
|
||||||
if (state != batchExecutionContext.initialState()) {
|
if (state != batchExecutionContext.initialState()) {
|
||||||
var reason = new StringBuilder();
|
var reason = new StringBuilder();
|
||||||
Strings.collectionToDelimitedStringWithLimit(
|
Strings.collectionToDelimitedStringWithLimit(results, ",", "lazy bulk rollover [", "]", 1024, reason);
|
||||||
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
|
|
||||||
",",
|
|
||||||
"lazy bulk rollover [",
|
|
||||||
"]",
|
|
||||||
1024,
|
|
||||||
reason
|
|
||||||
);
|
|
||||||
try (var ignored = batchExecutionContext.dropHeadersContext()) {
|
try (var ignored = batchExecutionContext.dropHeadersContext()) {
|
||||||
state = allocationService.reroute(state, reason.toString(), listener.reroute());
|
state = allocationService.reroute(state, reason.toString(), listener.reroute());
|
||||||
}
|
}
|
||||||
|
@ -234,7 +226,7 @@ public final class LazyRolloverAction extends ActionType<RolloverResponse> {
|
||||||
public ClusterState executeTask(
|
public ClusterState executeTask(
|
||||||
ProjectState currentState,
|
ProjectState currentState,
|
||||||
RolloverRequest rolloverRequest,
|
RolloverRequest rolloverRequest,
|
||||||
List<MetadataRolloverService.RolloverResult> results,
|
ArrayList<String> results,
|
||||||
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
|
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
|
||||||
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
|
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
|
||||||
) throws Exception {
|
) throws Exception {
|
||||||
|
@ -271,7 +263,7 @@ public final class LazyRolloverAction extends ActionType<RolloverResponse> {
|
||||||
null,
|
null,
|
||||||
isFailureStoreRollover
|
isFailureStoreRollover
|
||||||
);
|
);
|
||||||
results.add(rolloverResult);
|
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
|
||||||
logger.trace("lazy rollover result [{}]", rolloverResult);
|
logger.trace("lazy rollover result [{}]", rolloverResult);
|
||||||
|
|
||||||
final var rolloverIndexName = rolloverResult.rolloverIndexName();
|
final var rolloverIndexName = rolloverResult.rolloverIndexName();
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Iterators;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
|
@ -494,7 +493,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionContext) {
|
public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionContext) {
|
||||||
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
|
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
|
||||||
final var results = new ArrayList<MetadataRolloverService.RolloverResult>(batchExecutionContext.taskContexts().size());
|
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
|
||||||
var state = batchExecutionContext.initialState();
|
var state = batchExecutionContext.initialState();
|
||||||
for (final var taskContext : batchExecutionContext.taskContexts()) {
|
for (final var taskContext : batchExecutionContext.taskContexts()) {
|
||||||
try (var ignored = taskContext.captureResponseHeaders()) {
|
try (var ignored = taskContext.captureResponseHeaders()) {
|
||||||
|
@ -506,14 +505,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
|
|
||||||
if (state != batchExecutionContext.initialState()) {
|
if (state != batchExecutionContext.initialState()) {
|
||||||
var reason = new StringBuilder();
|
var reason = new StringBuilder();
|
||||||
Strings.collectionToDelimitedStringWithLimit(
|
Strings.collectionToDelimitedStringWithLimit(results, ",", "bulk rollover [", "]", 1024, reason);
|
||||||
(Iterable<String>) () -> Iterators.map(results.iterator(), t -> t.sourceIndexName() + "->" + t.rolloverIndexName()),
|
|
||||||
",",
|
|
||||||
"bulk rollover [",
|
|
||||||
"]",
|
|
||||||
1024,
|
|
||||||
reason
|
|
||||||
);
|
|
||||||
try (var ignored = batchExecutionContext.dropHeadersContext()) {
|
try (var ignored = batchExecutionContext.dropHeadersContext()) {
|
||||||
state = allocationService.reroute(state, reason.toString(), listener.reroute());
|
state = allocationService.reroute(state, reason.toString(), listener.reroute());
|
||||||
}
|
}
|
||||||
|
@ -525,7 +517,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
|
|
||||||
public ClusterState executeTask(
|
public ClusterState executeTask(
|
||||||
ClusterState currentState,
|
ClusterState currentState,
|
||||||
List<MetadataRolloverService.RolloverResult> results,
|
ArrayList<String> results,
|
||||||
TaskContext<RolloverTask> rolloverTaskContext,
|
TaskContext<RolloverTask> rolloverTaskContext,
|
||||||
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
|
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
|
||||||
) throws Exception {
|
) throws Exception {
|
||||||
|
@ -596,7 +588,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
|
||||||
rolloverTask.autoShardingResult(),
|
rolloverTask.autoShardingResult(),
|
||||||
targetFailureStore
|
targetFailureStore
|
||||||
);
|
);
|
||||||
results.add(rolloverResult);
|
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
|
||||||
logger.trace("rollover result [{}]", rolloverResult);
|
logger.trace("rollover result [{}]", rolloverResult);
|
||||||
|
|
||||||
final var rolloverIndexName = rolloverResult.rolloverIndexName();
|
final var rolloverIndexName = rolloverResult.rolloverIndexName();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue