diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index 1fa870286b14..7804921ac378 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -221,9 +221,9 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState(); if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) { - lifecycleRunner.registerFailedOperation(indexMetadata, failure); + lifecycleRunner.registerFailedOperation(projectId, indexMetadata, failure); } else { - lifecycleRunner.registerSuccessfulOperation(indexMetadata); + lifecycleRunner.registerSuccessfulOperation(projectId, indexMetadata); } if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 6038f6ddb1b0..14958dd971f3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -153,7 +153,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin, HealthPlugin new ILMHistoryStore( new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN), services.clusterService(), - services.threadPool() + services.threadPool(), + services.projectResolver() ) ); /* diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 48854972f5ed..d6669f9701ec 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -371,7 +371,7 @@ class IndexLifecycleRunner { // Delete needs special handling, because after this step we // will no longer have access to any information about the // index since it will be... deleted. - registerDeleteOperation(indexMetadata); + registerDeleteOperation(projectId, indexMetadata); } } @@ -479,7 +479,7 @@ class IndexLifecycleRunner { ), new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> { IndexMetadata indexMetadata = state.metadata().index(index); - registerSuccessfulOperation(indexMetadata); + registerSuccessfulOperation(projectId, indexMetadata); if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) { maybeRunAsyncAction(state, indexMetadata, policy, newStepKey); } @@ -499,7 +499,7 @@ class IndexLifecycleRunner { Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey), new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> { IndexMetadata indexMetadata = state.metadata().index(index); - registerFailedOperation(indexMetadata, e); + registerFailedOperation(projectId, indexMetadata, e); }) ); } @@ -556,13 +556,14 @@ class IndexLifecycleRunner { * For the given index metadata, register (index a document) that the index has transitioned * successfully into this new state using the {@link ILMHistoryStore} */ - void registerSuccessfulOperation(IndexMetadata indexMetadata) { + void registerSuccessfulOperation(ProjectId projectId, IndexMetadata indexMetadata) { if (indexMetadata == null) { // This index may have been deleted and has no metadata, so ignore it return; } Long origination = calculateOriginationMillis(indexMetadata); ilmHistoryStore.putAsync( + projectId, ILMHistoryItem.success( indexMetadata.getIndex().getName(), indexMetadata.getLifecyclePolicyName(), @@ -577,12 +578,13 @@ class IndexLifecycleRunner { * For the given index metadata, register (index a document) that the index * has been deleted by ILM using the {@link ILMHistoryStore} */ - void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) { + void registerDeleteOperation(ProjectId projectId, IndexMetadata metadataBeforeDeletion) { if (metadataBeforeDeletion == null) { throw new IllegalStateException("cannot register deletion of an index that did not previously exist"); } Long origination = calculateOriginationMillis(metadataBeforeDeletion); ilmHistoryStore.putAsync( + projectId, ILMHistoryItem.success( metadataBeforeDeletion.getIndex().getName(), metadataBeforeDeletion.getLifecyclePolicyName(), @@ -600,13 +602,14 @@ class IndexLifecycleRunner { * For the given index metadata, register (index a document) that the index has transitioned * into the ERROR state using the {@link ILMHistoryStore} */ - void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) { + void registerFailedOperation(ProjectId projectId, IndexMetadata indexMetadata, Exception failure) { if (indexMetadata == null) { // This index may have been deleted and has no metadata, so ignore it return; } Long origination = calculateOriginationMillis(indexMetadata); ilmHistoryStore.putAsync( + projectId, ILMHistoryItem.failure( indexMetadata.getIndex().getName(), indexMetadata.getLifecyclePolicyName(), diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java index b25a722abcab..3b0c6303d86b 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -19,6 +19,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; @@ -66,27 +68,25 @@ public class ILMHistoryStore implements Closeable { ); private volatile boolean ilmHistoryEnabled = true; + private final ProjectResolver projectResolver; private final BulkProcessor2 processor; - public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool) { - this(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueSeconds(5)); + public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool, ProjectResolver projectResolver) { + this(client, clusterService, threadPool, projectResolver, ActionListener.noop(), TimeValue.timeValueSeconds(5)); } /** - * For unit testing, allows a more frequent flushInterval - * @param client - * @param clusterService - * @param threadPool - * @param listener - * @param flushInterval + * For unit testing, allows a more frequent flushInterval */ ILMHistoryStore( Client client, ClusterService clusterService, ThreadPool threadPool, + ProjectResolver projectResolver, ActionListener listener, TimeValue flushInterval ) { + this.projectResolver = projectResolver; this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings())); clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled); @@ -95,7 +95,8 @@ public class ILMHistoryStore implements Closeable { new BulkProcessor2.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) { + final var project = projectResolver.getProjectMetadata(clusterService.state()); + if (project.templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) { ElasticsearchException e = new ElasticsearchException("no ILM history template"); logger.warn( () -> format( @@ -169,7 +170,7 @@ public class ILMHistoryStore implements Closeable { /** * Attempts to asynchronously index an ILM history entry */ - public void putAsync(ILMHistoryItem item) { + public void putAsync(ProjectId projectId, ILMHistoryItem item) { if (ilmHistoryEnabled == false) { logger.trace( "not recording ILM history item because [{}] is [false]: [{}]", @@ -182,7 +183,7 @@ public class ILMHistoryStore implements Closeable { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { item.toXContent(builder, ToXContent.EMPTY_PARAMS); IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE); - processor.add(request); + projectResolver.executeOnProject(projectId, () -> processor.add(request)); } catch (Exception e) { logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 2db166a91cbb..41cda5ebf491 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -16,9 +16,11 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -1266,7 +1268,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private final List items = new CopyOnWriteArrayList<>(); NoOpHistoryStore(Client noopClient, ClusterService clusterService) { - super(noopClient, clusterService, clusterService.threadPool()); + super(noopClient, clusterService, clusterService.threadPool(), TestProjectResolvers.alwaysThrow()); } public List getItems() { @@ -1274,7 +1276,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } @Override - public void putAsync(ILMHistoryItem item) { + public void putAsync(ProjectId projectId, ILMHistoryItem item) { logger.info("--> adding ILM history item: [{}]", item); items.add(item); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java index 1797f6b10f3c..8594a9dfdf7a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -25,6 +25,9 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.settings.ClusterSettings; @@ -65,6 +68,7 @@ public class ILMHistoryStoreTests extends ESTestCase { private VerifyingClient client; private ClusterService clusterService; private ILMHistoryStore historyStore; + private ProjectId projectId; @Before public void setup() { @@ -83,13 +87,21 @@ public class ILMHistoryStoreTests extends ESTestCase { NamedXContentRegistry.EMPTY ); ClusterState state = clusterService.state(); + projectId = randomProjectIdOrDefault(); ClusterServiceUtils.setState( clusterService, ClusterState.builder(state) - .metadata(Metadata.builder(state.metadata()).indexTemplates(registry.getComposableTemplateConfigs())) + .putProjectMetadata(ProjectMetadata.builder(projectId).indexTemplates(registry.getComposableTemplateConfigs())) .build() ); - historyStore = new ILMHistoryStore(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueMillis(500)); + historyStore = new ILMHistoryStore( + client, + clusterService, + threadPool, + TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()), + ActionListener.noop(), + TimeValue.timeValueMillis(500) + ); } @After @@ -115,7 +127,7 @@ public class ILMHistoryStoreTests extends ESTestCase { latch.countDown(); return null; }); - historyStore.putAsync(record); + historyStore.putAsync(projectId, record); assertFalse(latch.await(2, TimeUnit.SECONDS)); } @@ -156,7 +168,7 @@ public class ILMHistoryStoreTests extends ESTestCase { ); }); - historyStore.putAsync(record); + historyStore.putAsync(projectId, record); assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); } @@ -207,7 +219,7 @@ public class ILMHistoryStoreTests extends ESTestCase { ); }); - historyStore.putAsync(record); + historyStore.putAsync(projectId, record); assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); } } @@ -255,23 +267,32 @@ public class ILMHistoryStoreTests extends ESTestCase { ); return bulkItemResponse; }); - try (ILMHistoryStore localHistoryStore = new ILMHistoryStore(client, clusterService, threadPool, new ActionListener<>() { - @Override - public void onResponse(BulkResponse response) { - int itemsInResponse = response.getItems().length; - actions.addAndGet(itemsInResponse); - for (int i = 0; i < itemsInResponse; i++) { - latch.countDown(); - } - logger.info("cumulative responses: {}", actions.get()); - } + try ( + ILMHistoryStore localHistoryStore = new ILMHistoryStore( + client, + clusterService, + threadPool, + TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()), + new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + int itemsInResponse = response.getItems().length; + actions.addAndGet(itemsInResponse); + for (int i = 0; i < itemsInResponse; i++) { + latch.countDown(); + } + logger.info("cumulative responses: {}", actions.get()); + } - @Override - public void onFailure(Exception e) { - logger.error(e); - fail(e.getMessage()); - } - }, TimeValue.timeValueMillis(randomIntBetween(50, 1000)))) { + @Override + public void onFailure(Exception e) { + logger.error(e); + fail(e.getMessage()); + } + }, + TimeValue.timeValueMillis(randomIntBetween(50, 1000)) + ) + ) { for (int i = 0; i < numberOfDocs; i++) { ILMHistoryItem record1 = ILMHistoryItem.success( "index", @@ -280,7 +301,7 @@ public class ILMHistoryStoreTests extends ESTestCase { 10L, LifecycleExecutionState.builder().setPhase("phase").build() ); - localHistoryStore.putAsync(record1); + localHistoryStore.putAsync(projectId, record1); } latch.await(5, TimeUnit.SECONDS); assertThat(actions.get(), equalTo(numberOfDocs));