mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 01:22:26 -04:00
Make ILM operation mode APIs project-aware (#129678)
Also updates the lifecycle service.
This commit is contained in:
parent
f5963959c7
commit
1e9db8b248
8 changed files with 75 additions and 46 deletions
|
@ -80,6 +80,11 @@ public class LifecycleOperationMetadata implements Metadata.ProjectCustom {
|
|||
);
|
||||
}
|
||||
|
||||
@Deprecated(forRemoval = true)
|
||||
public static OperationMode currentSLMMode(final ClusterState state) {
|
||||
return currentSLMMode(state.metadata().getProject());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current ILM mode based on the given cluster state. It first checks the newer
|
||||
* storage mechanism ({@link LifecycleOperationMetadata#getSLMOperationMode()}) before falling
|
||||
|
@ -87,9 +92,9 @@ public class LifecycleOperationMetadata implements Metadata.ProjectCustom {
|
|||
* value for an empty state is used.
|
||||
*/
|
||||
@SuppressWarnings("deprecated")
|
||||
public static OperationMode currentSLMMode(final ClusterState state) {
|
||||
SnapshotLifecycleMetadata oldMetadata = state.metadata().getProject().custom(SnapshotLifecycleMetadata.TYPE);
|
||||
LifecycleOperationMetadata currentMetadata = state.metadata().getProject().custom(LifecycleOperationMetadata.TYPE);
|
||||
public static OperationMode currentSLMMode(ProjectMetadata project) {
|
||||
SnapshotLifecycleMetadata oldMetadata = project.custom(SnapshotLifecycleMetadata.TYPE);
|
||||
LifecycleOperationMetadata currentMetadata = project.custom(LifecycleOperationMetadata.TYPE);
|
||||
return Optional.ofNullable(currentMetadata)
|
||||
.map(LifecycleOperationMetadata::getSLMOperationMode)
|
||||
.orElse(
|
||||
|
|
|
@ -14,7 +14,10 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.core.FixForMultiProject;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.core.Strings;
|
||||
|
||||
|
@ -29,6 +32,8 @@ import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.curren
|
|||
*/
|
||||
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
||||
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
|
||||
|
||||
private final ProjectId projectId;
|
||||
@Nullable
|
||||
private final OperationMode ilmMode;
|
||||
@Nullable
|
||||
|
@ -47,18 +52,21 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
|||
};
|
||||
}
|
||||
|
||||
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
|
||||
private OperationModeUpdateTask(Priority priority, ProjectId projectId, OperationMode ilmMode, OperationMode slmMode) {
|
||||
super(priority);
|
||||
this.projectId = projectId;
|
||||
this.ilmMode = ilmMode;
|
||||
this.slmMode = slmMode;
|
||||
}
|
||||
|
||||
public static OperationModeUpdateTask ilmMode(OperationMode mode) {
|
||||
return new OperationModeUpdateTask(getPriority(mode), mode, null);
|
||||
public static OperationModeUpdateTask ilmMode(ProjectId projectId, OperationMode mode) {
|
||||
return new OperationModeUpdateTask(getPriority(mode), projectId, mode, null);
|
||||
}
|
||||
|
||||
public static OperationModeUpdateTask slmMode(OperationMode mode) {
|
||||
return new OperationModeUpdateTask(getPriority(mode), null, mode);
|
||||
@FixForMultiProject // Use non-default ID when SLM has been made project-aware
|
||||
final var projectId = ProjectId.DEFAULT;
|
||||
return new OperationModeUpdateTask(getPriority(mode), projectId, null, mode);
|
||||
}
|
||||
|
||||
private static Priority getPriority(OperationMode mode) {
|
||||
|
@ -79,22 +87,24 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
ClusterState newState = currentState;
|
||||
newState = updateILMState(newState);
|
||||
newState = updateSLMState(newState);
|
||||
return newState;
|
||||
}
|
||||
|
||||
private ClusterState updateILMState(final ClusterState currentState) {
|
||||
if (ilmMode == null) {
|
||||
ProjectMetadata oldProject = currentState.metadata().getProject(projectId);
|
||||
ProjectMetadata newProject = updateILMState(oldProject);
|
||||
newProject = updateSLMState(newProject);
|
||||
if (newProject == oldProject) {
|
||||
return currentState;
|
||||
}
|
||||
return ClusterState.builder(currentState).putProjectMetadata(newProject).build();
|
||||
}
|
||||
|
||||
final var project = currentState.metadata().getProject();
|
||||
final OperationMode currentMode = currentILMMode(project);
|
||||
private ProjectMetadata updateILMState(final ProjectMetadata currentProject) {
|
||||
if (ilmMode == null) {
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
final OperationMode currentMode = currentILMMode(currentProject);
|
||||
if (currentMode.equals(ilmMode)) {
|
||||
// No need for a new state
|
||||
return currentState;
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
final OperationMode newMode;
|
||||
|
@ -102,24 +112,23 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
|||
newMode = ilmMode;
|
||||
} else {
|
||||
// The transition is invalid, return the current state
|
||||
return currentState;
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
logger.info("updating ILM operation mode to {}", newMode);
|
||||
final var updatedMetadata = new LifecycleOperationMetadata(newMode, currentSLMMode(currentState));
|
||||
return currentState.copyAndUpdateProject(project.id(), b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
|
||||
final var updatedMetadata = new LifecycleOperationMetadata(newMode, currentSLMMode(currentProject));
|
||||
return currentProject.copyAndUpdate(b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
|
||||
}
|
||||
|
||||
private ClusterState updateSLMState(final ClusterState currentState) {
|
||||
private ProjectMetadata updateSLMState(final ProjectMetadata currentProject) {
|
||||
if (slmMode == null) {
|
||||
return currentState;
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
final var project = currentState.metadata().getProject();
|
||||
final OperationMode currentMode = currentSLMMode(currentState);
|
||||
final OperationMode currentMode = currentSLMMode(currentProject);
|
||||
if (currentMode.equals(slmMode)) {
|
||||
// No need for a new state
|
||||
return currentState;
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
final OperationMode newMode;
|
||||
|
@ -127,12 +136,12 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
|
|||
newMode = slmMode;
|
||||
} else {
|
||||
// The transition is invalid, return the current state
|
||||
return currentState;
|
||||
return currentProject;
|
||||
}
|
||||
|
||||
logger.info("updating SLM operation mode to {}", newMode);
|
||||
final var updatedMetadata = new LifecycleOperationMetadata(currentILMMode(project), newMode);
|
||||
return currentState.copyAndUpdateProject(project.id(), b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
|
||||
final var updatedMetadata = new LifecycleOperationMetadata(currentILMMode(currentProject), newMode);
|
||||
return currentProject.copyAndUpdate(b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.ilm;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||
|
@ -102,23 +103,23 @@ public class OperationModeUpdateTaskTests extends ESTestCase {
|
|||
currentMode,
|
||||
new SnapshotLifecycleStats()
|
||||
);
|
||||
Metadata.Builder metadata = Metadata.builder().persistentSettings(settings(IndexVersion.current()).build());
|
||||
ProjectMetadata.Builder project = ProjectMetadata.builder(randomProjectIdOrDefault());
|
||||
if (metadataInstalled) {
|
||||
metadata.projectCustoms(
|
||||
project.customs(
|
||||
Map.of(IndexLifecycleMetadata.TYPE, indexLifecycleMetadata, SnapshotLifecycleMetadata.TYPE, snapshotLifecycleMetadata)
|
||||
);
|
||||
}
|
||||
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
|
||||
OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(requestMode);
|
||||
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
|
||||
OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(project.getId(), requestMode);
|
||||
ClusterState newState = task.execute(state);
|
||||
if (assertSameClusterState) {
|
||||
assertSame("expected the same state instance but they were different", state, newState);
|
||||
} else {
|
||||
assertThat("expected a different state instance but they were the same", state, not(equalTo(newState)));
|
||||
}
|
||||
LifecycleOperationMetadata newMetadata = newState.metadata().getProject().custom(LifecycleOperationMetadata.TYPE);
|
||||
LifecycleOperationMetadata newMetadata = newState.metadata().getProject(project.getId()).custom(LifecycleOperationMetadata.TYPE);
|
||||
IndexLifecycleMetadata oldMetadata = newState.metadata()
|
||||
.getProject()
|
||||
.getProject(project.getId())
|
||||
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
|
||||
return Optional.ofNullable(newMetadata)
|
||||
.map(LifecycleOperationMetadata::getILMOperationMode)
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|||
import org.elasticsearch.cluster.ProjectState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -262,13 +263,13 @@ public class IndexLifecycleService
|
|||
}
|
||||
|
||||
if (safeToStop && OperationMode.STOPPING == currentMode) {
|
||||
stopILM();
|
||||
stopILM(state.projectId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void stopILM() {
|
||||
submitUnbatchedTask("ilm_operation_mode_update[stopped]", OperationModeUpdateTask.ilmMode(OperationMode.STOPPED));
|
||||
private void stopILM(ProjectId projectId) {
|
||||
submitUnbatchedTask("ilm_operation_mode_update[stopped]", OperationModeUpdateTask.ilmMode(projectId, OperationMode.STOPPED));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -479,7 +480,7 @@ public class IndexLifecycleService
|
|||
if (currentMetadata == null) {
|
||||
if (currentMode == OperationMode.STOPPING) {
|
||||
// There are no policies and ILM is in stopping mode, so stop ILM and get out of here
|
||||
stopILM();
|
||||
stopILM(state.projectId());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -562,7 +563,7 @@ public class IndexLifecycleService
|
|||
}
|
||||
|
||||
if (safeToStop && OperationMode.STOPPING == currentMode) {
|
||||
stopILM();
|
||||
stopILM(state.projectId());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.project.ProjectResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.SuppressForbidden;
|
||||
|
@ -29,12 +30,15 @@ import org.elasticsearch.xpack.core.ilm.action.ILMActions;
|
|||
|
||||
public class TransportStartILMAction extends AcknowledgedTransportMasterNodeAction<StartILMRequest> {
|
||||
|
||||
private final ProjectResolver projectResolver;
|
||||
|
||||
@Inject
|
||||
public TransportStartILMAction(
|
||||
TransportService transportService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ActionFilters actionFilters
|
||||
ActionFilters actionFilters,
|
||||
ProjectResolver projectResolver
|
||||
) {
|
||||
super(
|
||||
ILMActions.START.name(),
|
||||
|
@ -45,13 +49,15 @@ public class TransportStartILMAction extends AcknowledgedTransportMasterNodeActi
|
|||
StartILMRequest::new,
|
||||
EsExecutors.DIRECT_EXECUTOR_SERVICE
|
||||
);
|
||||
this.projectResolver = projectResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Task task, StartILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
final var projectId = projectResolver.getProjectId();
|
||||
submitUnbatchedTask(
|
||||
"ilm_operation_mode_update[running]",
|
||||
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.RUNNING), request, listener)
|
||||
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(projectId, OperationMode.RUNNING), request, listener)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.project.ProjectResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.SuppressForbidden;
|
||||
|
@ -29,12 +30,15 @@ import org.elasticsearch.xpack.core.ilm.action.ILMActions;
|
|||
|
||||
public class TransportStopILMAction extends AcknowledgedTransportMasterNodeAction<StopILMRequest> {
|
||||
|
||||
private final ProjectResolver projectResolver;
|
||||
|
||||
@Inject
|
||||
public TransportStopILMAction(
|
||||
TransportService transportService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ActionFilters actionFilters
|
||||
ActionFilters actionFilters,
|
||||
ProjectResolver projectResolver
|
||||
) {
|
||||
super(
|
||||
ILMActions.STOP.name(),
|
||||
|
@ -45,13 +49,15 @@ public class TransportStopILMAction extends AcknowledgedTransportMasterNodeActio
|
|||
StopILMRequest::new,
|
||||
EsExecutors.DIRECT_EXECUTOR_SERVICE
|
||||
);
|
||||
this.projectResolver = projectResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Task task, StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
|
||||
final var projectId = projectResolver.getProjectId();
|
||||
submitUnbatchedTask(
|
||||
"ilm_operation_mode_update[stopping]",
|
||||
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(OperationMode.STOPPING), request, listener)
|
||||
OperationModeUpdateTask.wrap(OperationModeUpdateTask.ilmMode(projectId, OperationMode.STOPPING), request, listener)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -41,7 +42,8 @@ public class TransportStopILMActionTests extends ESTestCase {
|
|||
transportService,
|
||||
clusterService,
|
||||
threadPool,
|
||||
mock(ActionFilters.class)
|
||||
mock(ActionFilters.class),
|
||||
TestProjectResolvers.singleProject(randomProjectIdOrDefault())
|
||||
);
|
||||
Task task = new Task(
|
||||
randomLong(),
|
||||
|
|
|
@ -46,7 +46,6 @@ tasks.named("yamlRestTest").configure {
|
|||
'^esql/191_lookup_join_text/*',
|
||||
'^esql/192_lookup_join_on_aliases/*',
|
||||
'^health/10_usage/*',
|
||||
'^ilm/60_operation_mode/*',
|
||||
'^ilm/80_health/*',
|
||||
'^logsdb/10_usage/*',
|
||||
'^migrate/10_reindex/*',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue