Make ILMHistoryStore semi-project-aware

This commit is contained in:
Niels Bauman 2025-06-23 19:40:03 -03:00
parent a671505c8a
commit dc210d9752
No known key found for this signature in database
GPG key ID: 1E23BD8DDAC3C49C
6 changed files with 72 additions and 44 deletions

View file

@ -221,9 +221,9 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask
LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState(); LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) { if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) {
lifecycleRunner.registerFailedOperation(indexMetadata, failure); lifecycleRunner.registerFailedOperation(projectId, indexMetadata, failure);
} else { } else {
lifecycleRunner.registerSuccessfulOperation(indexMetadata); lifecycleRunner.registerSuccessfulOperation(projectId, indexMetadata);
} }
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) { if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {

View file

@ -153,7 +153,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin, HealthPlugin
new ILMHistoryStore( new ILMHistoryStore(
new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN), new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN),
services.clusterService(), services.clusterService(),
services.threadPool() services.threadPool(),
services.projectResolver()
) )
); );
/* /*

View file

@ -371,7 +371,7 @@ class IndexLifecycleRunner {
// Delete needs special handling, because after this step we // Delete needs special handling, because after this step we
// will no longer have access to any information about the // will no longer have access to any information about the
// index since it will be... deleted. // index since it will be... deleted.
registerDeleteOperation(indexMetadata); registerDeleteOperation(projectId, indexMetadata);
} }
} }
@ -479,7 +479,7 @@ class IndexLifecycleRunner {
), ),
new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> { new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
IndexMetadata indexMetadata = state.metadata().index(index); IndexMetadata indexMetadata = state.metadata().index(index);
registerSuccessfulOperation(indexMetadata); registerSuccessfulOperation(projectId, indexMetadata);
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) { if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
maybeRunAsyncAction(state, indexMetadata, policy, newStepKey); maybeRunAsyncAction(state, indexMetadata, policy, newStepKey);
} }
@ -499,7 +499,7 @@ class IndexLifecycleRunner {
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey), Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> { new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
IndexMetadata indexMetadata = state.metadata().index(index); IndexMetadata indexMetadata = state.metadata().index(index);
registerFailedOperation(indexMetadata, e); registerFailedOperation(projectId, indexMetadata, e);
}) })
); );
} }
@ -556,13 +556,14 @@ class IndexLifecycleRunner {
* For the given index metadata, register (index a document) that the index has transitioned * For the given index metadata, register (index a document) that the index has transitioned
* successfully into this new state using the {@link ILMHistoryStore} * successfully into this new state using the {@link ILMHistoryStore}
*/ */
void registerSuccessfulOperation(IndexMetadata indexMetadata) { void registerSuccessfulOperation(ProjectId projectId, IndexMetadata indexMetadata) {
if (indexMetadata == null) { if (indexMetadata == null) {
// This index may have been deleted and has no metadata, so ignore it // This index may have been deleted and has no metadata, so ignore it
return; return;
} }
Long origination = calculateOriginationMillis(indexMetadata); Long origination = calculateOriginationMillis(indexMetadata);
ilmHistoryStore.putAsync( ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.success( ILMHistoryItem.success(
indexMetadata.getIndex().getName(), indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(), indexMetadata.getLifecyclePolicyName(),
@ -577,12 +578,13 @@ class IndexLifecycleRunner {
* For the given index metadata, register (index a document) that the index * For the given index metadata, register (index a document) that the index
* has been deleted by ILM using the {@link ILMHistoryStore} * has been deleted by ILM using the {@link ILMHistoryStore}
*/ */
void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) { void registerDeleteOperation(ProjectId projectId, IndexMetadata metadataBeforeDeletion) {
if (metadataBeforeDeletion == null) { if (metadataBeforeDeletion == null) {
throw new IllegalStateException("cannot register deletion of an index that did not previously exist"); throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
} }
Long origination = calculateOriginationMillis(metadataBeforeDeletion); Long origination = calculateOriginationMillis(metadataBeforeDeletion);
ilmHistoryStore.putAsync( ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.success( ILMHistoryItem.success(
metadataBeforeDeletion.getIndex().getName(), metadataBeforeDeletion.getIndex().getName(),
metadataBeforeDeletion.getLifecyclePolicyName(), metadataBeforeDeletion.getLifecyclePolicyName(),
@ -600,13 +602,14 @@ class IndexLifecycleRunner {
* For the given index metadata, register (index a document) that the index has transitioned * For the given index metadata, register (index a document) that the index has transitioned
* into the ERROR state using the {@link ILMHistoryStore} * into the ERROR state using the {@link ILMHistoryStore}
*/ */
void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) { void registerFailedOperation(ProjectId projectId, IndexMetadata indexMetadata, Exception failure) {
if (indexMetadata == null) { if (indexMetadata == null) {
// This index may have been deleted and has no metadata, so ignore it // This index may have been deleted and has no metadata, so ignore it
return; return;
} }
Long origination = calculateOriginationMillis(indexMetadata); Long origination = calculateOriginationMillis(indexMetadata);
ilmHistoryStore.putAsync( ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.failure( ILMHistoryItem.failure(
indexMetadata.getIndex().getName(), indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(), indexMetadata.getLifecyclePolicyName(),

View file

@ -19,6 +19,8 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -66,27 +68,25 @@ public class ILMHistoryStore implements Closeable {
); );
private volatile boolean ilmHistoryEnabled = true; private volatile boolean ilmHistoryEnabled = true;
private final ProjectResolver projectResolver;
private final BulkProcessor2 processor; private final BulkProcessor2 processor;
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool) { public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool, ProjectResolver projectResolver) {
this(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueSeconds(5)); this(client, clusterService, threadPool, projectResolver, ActionListener.noop(), TimeValue.timeValueSeconds(5));
} }
/** /**
* For unit testing, allows a more frequent flushInterval * For unit testing, allows a more frequent flushInterval
* @param client
* @param clusterService
* @param threadPool
* @param listener
* @param flushInterval
*/ */
ILMHistoryStore( ILMHistoryStore(
Client client, Client client,
ClusterService clusterService, ClusterService clusterService,
ThreadPool threadPool, ThreadPool threadPool,
ProjectResolver projectResolver,
ActionListener<BulkResponse> listener, ActionListener<BulkResponse> listener,
TimeValue flushInterval TimeValue flushInterval
) { ) {
this.projectResolver = projectResolver;
this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings())); this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled); clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled);
@ -95,7 +95,8 @@ public class ILMHistoryStore implements Closeable {
new BulkProcessor2.Listener() { new BulkProcessor2.Listener() {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) { public void beforeBulk(long executionId, BulkRequest request) {
if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) { final var project = projectResolver.getProjectMetadata(clusterService.state());
if (project.templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
ElasticsearchException e = new ElasticsearchException("no ILM history template"); ElasticsearchException e = new ElasticsearchException("no ILM history template");
logger.warn( logger.warn(
() -> format( () -> format(
@ -169,7 +170,7 @@ public class ILMHistoryStore implements Closeable {
/** /**
* Attempts to asynchronously index an ILM history entry * Attempts to asynchronously index an ILM history entry
*/ */
public void putAsync(ILMHistoryItem item) { public void putAsync(ProjectId projectId, ILMHistoryItem item) {
if (ilmHistoryEnabled == false) { if (ilmHistoryEnabled == false) {
logger.trace( logger.trace(
"not recording ILM history item because [{}] is [false]: [{}]", "not recording ILM history item because [{}] is [false]: [{}]",
@ -182,7 +183,7 @@ public class ILMHistoryStore implements Closeable {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS); item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE); IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE);
processor.add(request); projectResolver.executeOnProject(projectId, () -> processor.add(request));
} catch (Exception e) { } catch (Exception e) {
logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e); logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e);
} }

View file

@ -16,9 +16,11 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -1266,7 +1268,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>(); private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>();
NoOpHistoryStore(Client noopClient, ClusterService clusterService) { NoOpHistoryStore(Client noopClient, ClusterService clusterService) {
super(noopClient, clusterService, clusterService.threadPool()); super(noopClient, clusterService, clusterService.threadPool(), TestProjectResolvers.alwaysThrow());
} }
public List<ILMHistoryItem> getItems() { public List<ILMHistoryItem> getItems() {
@ -1274,7 +1276,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
} }
@Override @Override
public void putAsync(ILMHistoryItem item) { public void putAsync(ProjectId projectId, ILMHistoryItem item) {
logger.info("--> adding ILM history item: [{}]", item); logger.info("--> adding ILM history item: [{}]", item);
items.add(item); items.add(item);
} }

View file

@ -25,6 +25,9 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
@ -65,6 +68,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
private VerifyingClient client; private VerifyingClient client;
private ClusterService clusterService; private ClusterService clusterService;
private ILMHistoryStore historyStore; private ILMHistoryStore historyStore;
private ProjectId projectId;
@Before @Before
public void setup() { public void setup() {
@ -83,13 +87,21 @@ public class ILMHistoryStoreTests extends ESTestCase {
NamedXContentRegistry.EMPTY NamedXContentRegistry.EMPTY
); );
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
projectId = randomProjectIdOrDefault();
ClusterServiceUtils.setState( ClusterServiceUtils.setState(
clusterService, clusterService,
ClusterState.builder(state) ClusterState.builder(state)
.metadata(Metadata.builder(state.metadata()).indexTemplates(registry.getComposableTemplateConfigs())) .putProjectMetadata(ProjectMetadata.builder(projectId).indexTemplates(registry.getComposableTemplateConfigs()))
.build() .build()
); );
historyStore = new ILMHistoryStore(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueMillis(500)); historyStore = new ILMHistoryStore(
client,
clusterService,
threadPool,
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
ActionListener.noop(),
TimeValue.timeValueMillis(500)
);
} }
@After @After
@ -115,7 +127,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
latch.countDown(); latch.countDown();
return null; return null;
}); });
historyStore.putAsync(record); historyStore.putAsync(projectId, record);
assertFalse(latch.await(2, TimeUnit.SECONDS)); assertFalse(latch.await(2, TimeUnit.SECONDS));
} }
@ -156,7 +168,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
); );
}); });
historyStore.putAsync(record); historyStore.putAsync(projectId, record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
} }
@ -207,7 +219,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
); );
}); });
historyStore.putAsync(record); historyStore.putAsync(projectId, record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
} }
} }
@ -255,7 +267,13 @@ public class ILMHistoryStoreTests extends ESTestCase {
); );
return bulkItemResponse; return bulkItemResponse;
}); });
try (ILMHistoryStore localHistoryStore = new ILMHistoryStore(client, clusterService, threadPool, new ActionListener<>() { try (
ILMHistoryStore localHistoryStore = new ILMHistoryStore(
client,
clusterService,
threadPool,
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
new ActionListener<>() {
@Override @Override
public void onResponse(BulkResponse response) { public void onResponse(BulkResponse response) {
int itemsInResponse = response.getItems().length; int itemsInResponse = response.getItems().length;
@ -271,7 +289,10 @@ public class ILMHistoryStoreTests extends ESTestCase {
logger.error(e); logger.error(e);
fail(e.getMessage()); fail(e.getMessage());
} }
}, TimeValue.timeValueMillis(randomIntBetween(50, 1000)))) { },
TimeValue.timeValueMillis(randomIntBetween(50, 1000))
)
) {
for (int i = 0; i < numberOfDocs; i++) { for (int i = 0; i < numberOfDocs; i++) {
ILMHistoryItem record1 = ILMHistoryItem.success( ILMHistoryItem record1 = ILMHistoryItem.success(
"index", "index",
@ -280,7 +301,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
10L, 10L,
LifecycleExecutionState.builder().setPhase("phase").build() LifecycleExecutionState.builder().setPhase("phase").build()
); );
localHistoryStore.putAsync(record1); localHistoryStore.putAsync(projectId, record1);
} }
latch.await(5, TimeUnit.SECONDS); latch.await(5, TimeUnit.SECONDS);
assertThat(actions.get(), equalTo(numberOfDocs)); assertThat(actions.get(), equalTo(numberOfDocs));