diff --git a/modules/ingest-geoip/qa/multi-project/build.gradle b/modules/ingest-geoip/qa/multi-project/build.gradle new file mode 100644 index 000000000000..c67fa94e2763 --- /dev/null +++ b/modules/ingest-geoip/qa/multi-project/build.gradle @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +apply plugin: 'elasticsearch.internal-java-rest-test' + +dependencies { + javaRestTestImplementation project(':modules:ingest-geoip') + javaRestTestImplementation project(':test:external-modules:test-multi-project') + javaRestTestImplementation project(':test:fixtures:geoip-fixture') + + clusterModules project(':modules:ingest-geoip') + clusterModules project(':modules:reindex') // needed for database cleanup + clusterModules project(':test:external-modules:test-multi-project') +} + +tasks.withType(Test).configureEach { + it.systemProperty "tests.multi_project.enabled", true +} diff --git a/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java b/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java new file mode 100644 index 000000000000..1ed4c9e2172d --- /dev/null +++ b/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package geoip; + +import fixture.geoip.GeoIpHttpFixture; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.core.Booleans; +import org.elasticsearch.ingest.geoip.GeoIpDownloader; +import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class GeoIpMultiProjectIT extends ESRestTestCase { + // default true + private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false; + + public static final GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture); + + public static final ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("ingest-geoip") + .module("reindex") // for database cleanup + .module("test-multi-project") + .setting("test.multi_project.enabled", "true") + .setting(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), "true") + .setting(GeoIpDownloader.ENDPOINT_SETTING.getKey(), fixture::getAddress, (k) -> useFixture) + .build(); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected boolean shouldConfigureProjects() { + return false; + } + + public void testGeoIpDownloader() throws Exception { + String project1 = randomUniqueProjectId().id(); + String project2 = randomUniqueProjectId().id(); + createProject(project1); + createProject(project2); + + // download databases for project1 + putGeoIpPipeline(project1); + assertBusy(() -> assertDatabases(project1, true), 30, TimeUnit.SECONDS); + assertBusy(() -> assertDatabases(project2, false), 30, TimeUnit.SECONDS); + + // download databases for project2 + putGeoIpPipeline(project2); + assertBusy(() -> assertDatabases(project2, true), 30, TimeUnit.SECONDS); + } + + private void putGeoIpPipeline(String projectId) throws IOException { + Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/geoip-pipeline"); + putPipelineRequest.setJsonEntity(""" + { + "processors" : [ + { + "geoip" : { + "field" : "ip", + "target_field" : "geo", + "database_file" : "GeoLite2-Country.mmdb" + } + } + ] + } + """); + setRequestProjectId(projectId, putPipelineRequest); + assertOK(client().performRequest(putPipelineRequest)); + } + + private static Request setRequestProjectId(String projectId, Request request) { + RequestOptions.Builder options = request.getOptions().toBuilder(); + options.removeHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER); + options.addHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId); + request.setOptions(options); + return request; + } + + @SuppressWarnings("unchecked") + private void assertDatabases(String projectId, boolean shouldDownload) throws IOException { + Request getTaskState = new Request("GET", "/_cluster/state"); + setRequestProjectId(projectId, getTaskState); + + ObjectPath state = ObjectPath.createFromResponse(assertOK(client().performRequest(getTaskState))); + + List> tasks = state.evaluate("metadata.persistent_tasks.tasks"); + // Short-circuit to avoid using steams if the list is empty + if (tasks.isEmpty()) { + fail("persistent tasks list is empty, expected at least one task for geoip-downloader"); + } + + // verify project task id + Set> id = tasks.stream() + .filter(task -> String.format("%s/geoip-downloader", projectId).equals(task.get("id"))) + .collect(Collectors.toSet()); + assertThat(id.size(), equalTo(1)); + + // verify database download + Map databases = (Map) tasks.stream().map(task -> { + try { + return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases"); + } catch (IOException e) { + return null; + } + }).filter(Objects::nonNull).findFirst().orElse(null); + + if (shouldDownload) { + // verify database downloaded + assertNotNull(databases); + for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) { + Object database = databases.get(name); + assertNotNull(database); + assertNotNull(ObjectPath.evaluate(database, "md5")); + } + } else { + // verify database not downloaded + assertNull(databases); + } + + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 1617386111b2..b2c3c29fe070 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.settings.Setting; @@ -95,6 +96,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask { */ private final Supplier atLeastOneGeoipProcessorSupplier; + private final ProjectId projectId; + GeoIpDownloader( Client client, HttpClient httpClient, @@ -109,10 +112,11 @@ public class GeoIpDownloader extends AllocatedPersistentTask { Map headers, Supplier pollIntervalSupplier, Supplier eagerDownloadSupplier, - Supplier atLeastOneGeoipProcessorSupplier + Supplier atLeastOneGeoipProcessorSupplier, + ProjectId projectId ) { super(id, type, action, description, parentTask, headers); - this.client = client; + this.client = client.projectClient(projectId); this.httpClient = httpClient; this.clusterService = clusterService; this.threadPool = threadPool; @@ -120,6 +124,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { this.pollIntervalSupplier = pollIntervalSupplier; this.eagerDownloadSupplier = eagerDownloadSupplier; this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier; + this.projectId = projectId; } void setState(GeoIpTaskState state) { @@ -134,16 +139,17 @@ public class GeoIpDownloader extends AllocatedPersistentTask { // visible for testing void updateDatabases() throws IOException { var clusterState = clusterService.state(); - var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); + var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); if (geoipIndex != null) { logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX); - if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) { + if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) { logger.debug( "Not updating geoip database because not all primary shards of the [" + DATABASES_INDEX + "] index are active." ); return; } - var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName()); + var blockException = clusterState.blocks() + .indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName()); if (blockException != null) { logger.debug( "Not updating geoip database because there is a write block on the " + geoipIndex.getWriteIndex().getName() + " index", @@ -196,7 +202,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { updateTimestamp(name, metadata); return; } - logger.debug("downloading geoip database [{}]", name); + logger.debug("downloading geoip database [{}] for project [{}]", name, projectId); long start = System.currentTimeMillis(); try (InputStream is = httpClient.get(url)) { int firstChunk = metadata.lastChunk() + 1; // if there is no metadata, then Metadata.EMPTY.lastChunk() + 1 = 0 @@ -205,12 +211,12 @@ public class GeoIpDownloader extends AllocatedPersistentTask { state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start)); updateTaskState(); stats = stats.successfulDownload(System.currentTimeMillis() - start).databasesCount(state.getDatabases().size()); - logger.info("successfully downloaded geoip database [{}]", name); + logger.info("successfully downloaded geoip database [{}] for project [{}]", name, projectId); deleteOldChunks(name, firstChunk); } } catch (Exception e) { stats = stats.failedDownload(); - logger.error(() -> "error downloading geoip database [" + name + "]", e); + logger.error(() -> "error downloading geoip database [" + name + "] for project [" + projectId + "]", e); } } @@ -230,7 +236,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { // visible for testing protected void updateTimestamp(String name, Metadata old) { - logger.debug("geoip database [{}] is up to date, updated timestamp", name); + logger.debug("geoip database [{}] is up to date for project [{}], updated timestamp", name, projectId); state = state.put(name, new Metadata(old.lastUpdate(), old.firstChunk(), old.lastChunk(), old.md5(), System.currentTimeMillis())); stats = stats.skippedDownload(); updateTaskState(); @@ -238,7 +244,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { void updateTaskState() { PlainActionFuture> future = new PlainActionFuture<>(); - updatePersistentTaskState(state, future); + updateProjectPersistentTaskState(projectId, state, future); state = ((GeoIpTaskState) future.actionGet().getState()); } @@ -360,5 +366,4 @@ public class GeoIpDownloader extends AllocatedPersistentTask { scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic()); } } - } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 39fec22dc1bd..087a5b4e6296 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -19,13 +19,16 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; @@ -49,8 +52,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; @@ -97,11 +100,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor currentTask = new AtomicReference<>(); + @FixForMultiProject(description = "These settings need to be project-scoped") private volatile TimeValue pollInterval; private volatile boolean eagerDownload; - private volatile boolean atLeastOneGeoipProcessor; - private final AtomicBoolean taskIsBootstrapped = new AtomicBoolean(false); + + private final ConcurrentHashMap atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>(); + private final ConcurrentHashMap taskIsBootstrappedByProject = new ConcurrentHashMap<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); + private final ProjectResolver projectResolver; GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool) { super(GEOIP_DOWNLOADER, threadPool.generic()); @@ -113,6 +119,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor {}); + startTask(ProjectId.DEFAULT, () -> {}); } else { - stopTask(() -> {}); + stopTask(ProjectId.DEFAULT, () -> {}); } } + @FixForMultiProject(description = "Should execute in the context of the current project after settings are project-aware") private void setEagerDownload(Boolean eagerDownload) { if (Objects.equals(this.eagerDownload, eagerDownload) == false) { this.eagerDownload = eagerDownload; - GeoIpDownloader currentDownloader = getCurrentTask(); + GeoIpDownloader currentDownloader = getTask(ProjectId.DEFAULT); if (currentDownloader != null && Objects.equals(eagerDownload, Boolean.TRUE)) { currentDownloader.requestReschedule(); } } } + @FixForMultiProject(description = "Should execute in the context of the current project after settings are project-aware") private void setPollInterval(TimeValue pollInterval) { if (Objects.equals(this.pollInterval, pollInterval) == false) { this.pollInterval = pollInterval; - GeoIpDownloader currentDownloader = getCurrentTask(); + GeoIpDownloader currentDownloader = getTask(ProjectId.DEFAULT); if (currentDownloader != null) { currentDownloader.requestReschedule(); } @@ -162,7 +172,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor taskInProgress, Map headers ) { + ProjectId projectId = projectResolver.getProjectId(); return new GeoIpDownloader( client, httpClient, @@ -191,10 +202,12 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor pollInterval, () -> eagerDownload, - () -> atLeastOneGeoipProcessor + () -> atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false), + projectId ); } + @FixForMultiProject(description = "Make sure removed project tasks are cancelled: https://elasticco.atlassian.net/browse/ES-12054") @Override public void clusterChanged(ClusterChangedEvent event) { if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { @@ -208,52 +221,66 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor taskIsBootstrapped.set(false)); - } else { - stopTask(() -> taskIsBootstrapped.set(false)); - } - } - if (event.metadataChanged() == false) { return; } - boolean hasIndicesChanges = event.previousState() - .metadata() - .getProject() - .indices() - .equals(event.state().metadata().getProject().indices()) == false; - boolean hasIngestPipelineChanges = event.metadataChanged() && event.changedCustomProjectMetadataSet().contains(IngestMetadata.TYPE); + for (var projectMetadata : event.state().metadata().projects().values()) { + ProjectId projectId = projectMetadata.id(); - if (hasIngestPipelineChanges || hasIndicesChanges) { - boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state()); - if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) { - atLeastOneGeoipProcessor = true; - logger.trace("Scheduling runDownloader because a geoip processor has been added"); - GeoIpDownloader currentDownloader = getCurrentTask(); - if (currentDownloader != null) { - currentDownloader.requestReschedule(); + // bootstrap task once iff it is not already bootstrapped + AtomicBoolean taskIsBootstrapped = taskIsBootstrappedByProject.computeIfAbsent(projectId, k -> new AtomicBoolean(false)); + if (taskIsBootstrapped.getAndSet(true) == false) { + atLeastOneGeoipProcessorByProject.computeIfAbsent(projectId, k -> hasAtLeastOneGeoipProcessor(projectMetadata)); + if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) { + logger.debug("Bootstrapping geoip downloader task for project [{}]", projectId); + startTask(projectId, () -> taskIsBootstrapped.set(false)); + } else { + logger.debug("Stopping geoip downloader task for project [{}]", projectId); + stopTask(projectId, () -> taskIsBootstrapped.set(false)); + } + } + + boolean hasIngestPipelineChanges = event.customMetadataChanged(projectId, IngestMetadata.TYPE); + boolean hasIndicesChanges = false; + boolean projectExisted = event.previousState().metadata().hasProject(projectId); + if (projectExisted) { + hasIndicesChanges = event.previousState() + .metadata() + .getProject(projectId) + .indices() + .equals(projectMetadata.indices()) == false; + } + + if (hasIngestPipelineChanges || hasIndicesChanges) { + boolean atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false); + boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(projectMetadata); + // update if necessary + if (newAtLeastOneGeoipProcessor != atLeastOneGeoipProcessor) { + atLeastOneGeoipProcessorByProject.put(projectId, newAtLeastOneGeoipProcessor); + } + if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) { + logger.trace("Scheduling runDownloader for project [{}] because a geoip processor has been added", projectId); + GeoIpDownloader currentDownloader = getTask(projectId); + if (currentDownloader != null) { + currentDownloader.requestReschedule(); + } } - } else { - atLeastOneGeoipProcessor = newAtLeastOneGeoipProcessor; } } } - static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) { - if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) { + static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { + if (pipelinesWithGeoIpProcessor(projectMetadata, true).isEmpty() == false) { return true; } - final Set checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false); + final Set checkReferencedPipelines = pipelinesWithGeoIpProcessor(projectMetadata, false); if (checkReferencedPipelines.isEmpty()) { return false; } - return clusterState.getMetadata().getProject().indices().values().stream().anyMatch(indexMetadata -> { + return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> { String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings()); String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings()); return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline); @@ -262,14 +289,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) { - List configurations = IngestService.getPipelines(clusterState.metadata().getProject()); + private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) { + List configurations = IngestService.getPipelines(projectMetadata); Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { @@ -366,9 +393,11 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation); } - private void startTask(Runnable onFailure) { - persistentTasksService.sendStartRequest( - GEOIP_DOWNLOADER, + // starts GeoIP downloader task for a single project + private void startTask(ProjectId projectId, Runnable onFailure) { + persistentTasksService.sendProjectStartRequest( + projectId, + getTaskId(projectId, projectResolver.supportsMultipleProjects()), GEOIP_DOWNLOADER, new GeoIpTaskParams(), MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, @@ -382,7 +411,8 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor> listener = ActionListener.wrap( r -> logger.debug("Stopped geoip downloader task"), e -> { @@ -393,30 +423,44 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor { IndexAbstraction databasesAbstraction = clusterService.state() .metadata() - .getDefaultProject() + .getProject(projectId) .getIndicesLookup() .get(DATABASES_INDEX); if (databasesAbstraction != null) { // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index Index databasesIndex = databasesAbstraction.getWriteIndex(); - client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { - Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e; - if (t instanceof ResourceNotFoundException == false) { - logger.warn("failed to remove " + databasesIndex, e); - } - })); + client.projectClient(projectId) + .admin() + .indices() + .prepareDelete(databasesIndex.getName()) + .execute(ActionListener.wrap(rr -> { + // remove task reference in the map so it can be garbage collected + tasks.remove(projectId); + taskIsBootstrappedByProject.remove(projectId); + atLeastOneGeoipProcessorByProject.remove(projectId); + }, e -> { + Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e; + if (t instanceof ResourceNotFoundException == false) { + logger.warn("failed to remove " + databasesIndex, e); + } + })); } }) ); } - public GeoIpDownloader getCurrentTask() { - return currentTask.get(); + public GeoIpDownloader getTask(ProjectId projectId) { + return tasks.get(projectId); + } + + public static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) { + return supportsMultipleProjects ? projectId + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER; } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java index 9ebf97ca4e9e..e11566d409c3 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.ingest.geoip.DatabaseNodeService; @@ -34,6 +35,7 @@ public class GeoIpStatsTransportAction extends TransportNodesAction {}, ingestMetadata); - assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); // The pipeline is set as default pipeline in an index, expected to return true. - clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1"), ingestMetadata); - assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); // The pipeline is set as final pipeline in an index, expected to return true. - clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1"), ingestMetadata); - assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); } } @@ -64,10 +62,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { var ingestMetadata = new IngestMetadata( Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON)) ); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build()) - .build(); - assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build(); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); } } { @@ -76,10 +72,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { var ingestMetadata = new IngestMetadata( Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON)) ); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build()) - .build(); - assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build(); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); } } { @@ -97,10 +91,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON)); } var ingestMetadata = new IngestMetadata(configs); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build()) - .build(); - assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build(); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); } } @@ -277,14 +269,13 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { } } - private ClusterState clusterStateWithIndex(Consumer consumer, IngestMetadata ingestMetadata) { + private ProjectMetadata projectMetadataWithIndex(Consumer consumer, IngestMetadata ingestMetadata) { var builder = indexSettings(IndexVersion.current(), 1, 1); consumer.accept(builder); var indexMetadata = new IndexMetadata.Builder("index").settings(builder.build()).build(); - var project = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID) + return ProjectMetadata.builder(randomProjectIdOrDefault()) .putCustom(IngestMetadata.TYPE, ingestMetadata) .put(indexMetadata, false) .build(); - return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build(); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index dce4faf6bcef..ab20a925fced 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -85,9 +87,11 @@ public class GeoIpDownloaderTests extends ESTestCase { private ThreadPool threadPool; private MockClient client; private GeoIpDownloader geoIpDownloader; + private ProjectId projectId; @Before public void setup() throws IOException { + projectId = randomProjectIdOrDefault(); httpClient = mock(HttpClient.class); when(httpClient.getBytes(anyString())).thenReturn("[]".getBytes(StandardCharsets.UTF_8)); clusterService = mock(ClusterService.class); @@ -107,9 +111,9 @@ public class GeoIpDownloaderTests extends ESTestCase { ) ) ); - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of())); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of())); when(clusterService.state()).thenReturn(state); - client = new MockClient(threadPool); + client = new MockClient(threadPool, projectId); geoIpDownloader = new GeoIpDownloader( client, httpClient, @@ -124,7 +128,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { { GeoIpTaskParams geoIpTaskParams = mock(GeoIpTaskParams.class); @@ -296,7 +301,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { @@ -347,7 +353,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { @@ -400,7 +407,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override protected void updateTimestamp(String name, GeoIpTaskState.Metadata newMetadata) { @@ -450,7 +458,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override void updateDatabases() throws IOException { @@ -495,10 +504,15 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override - public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + public void updateProjectPersistentTaskState( + ProjectId projectId, + PersistentTaskState state, + ActionListener> listener + ) { assertSame(GeoIpTaskState.EMPTY, state); PersistentTask task = mock(PersistentTask.class); when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); @@ -525,10 +539,15 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - () -> true + () -> true, + projectId ) { @Override - public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + public void updateProjectPersistentTaskState( + ProjectId projectId, + PersistentTaskState state, + ActionListener> listener + ) { assertSame(GeoIpTaskState.EMPTY, state); PersistentTask task = mock(PersistentTask.class); when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); @@ -566,7 +585,8 @@ public class GeoIpDownloaderTests extends ESTestCase { Map.of(), () -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY), () -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY), - atLeastOneGeoipProcessor::get + atLeastOneGeoipProcessor::get, + projectId ) { @Override void processDatabase(Map databaseInfo) { @@ -584,10 +604,15 @@ public class GeoIpDownloaderTests extends ESTestCase { /* * Here we make sure that we bail out before making an httpClient request if there is write block on the .geoip_databases index */ - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of())); - var geoIpIndex = state.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex().getName(); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of())); + var geoIpIndex = state.getMetadata() + .getProject(projectId) + .getIndicesLookup() + .get(GeoIpDownloader.DATABASES_INDEX) + .getWriteIndex() + .getName(); state = ClusterState.builder(state) - .blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + .blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) .build(); when(clusterService.state()).thenReturn(state); geoIpDownloader.updateDatabases(); @@ -599,7 +624,7 @@ public class GeoIpDownloaderTests extends ESTestCase { * Here we make sure that we bail out before making an httpClient request if there are unallocated shards on the .geoip_databases * index */ - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()), true); when(clusterService.state()).thenReturn(state); geoIpDownloader.updateDatabases(); verifyNoInteractions(httpClient); @@ -610,7 +635,7 @@ public class GeoIpDownloaderTests extends ESTestCase { * This test puts some expired databases and some non-expired ones into the GeoIpTaskState, and then calls runDownloader(), making * sure that the expired databases have been deleted. */ - AtomicInteger updatePersistentTaskStateCount = new AtomicInteger(0); + AtomicInteger updateProjectPersistentTaskStateCount = new AtomicInteger(0); AtomicInteger deleteCount = new AtomicInteger(0); int expiredDatabasesCount = randomIntBetween(1, 100); int unexpiredDatabasesCount = randomIntBetween(0, 100); @@ -634,7 +659,7 @@ public class GeoIpDownloaderTests extends ESTestCase { request.getAllocationId(), assignment ); - updatePersistentTaskStateCount.incrementAndGet(); + updateProjectPersistentTaskStateCount.incrementAndGet(); taskResponseListener.onResponse(new PersistentTaskResponse(new PersistentTask<>(persistentTask, request.getState()))); } ); @@ -657,14 +682,14 @@ public class GeoIpDownloaderTests extends ESTestCase { ); } assertThat(deleteCount.get(), equalTo(expiredDatabasesCount)); - assertThat(updatePersistentTaskStateCount.get(), equalTo(expiredDatabasesCount)); + assertThat(updateProjectPersistentTaskStateCount.get(), equalTo(expiredDatabasesCount)); geoIpDownloader.runDownloader(); /* * The following two lines assert current behavior that might not be desirable -- we continue to delete expired databases every * time that runDownloader runs. This seems unnecessary. */ assertThat(deleteCount.get(), equalTo(expiredDatabasesCount * 2)); - assertThat(updatePersistentTaskStateCount.get(), equalTo(expiredDatabasesCount * 2)); + assertThat(updateProjectPersistentTaskStateCount.get(), equalTo(expiredDatabasesCount * 2)); } private GeoIpTaskState.Metadata newGeoIpTaskStateMetadata(boolean expired) { @@ -681,8 +706,8 @@ public class GeoIpDownloaderTests extends ESTestCase { private final Map, BiConsumer>> handlers = new HashMap<>(); - private MockClient(ThreadPool threadPool) { - super(threadPool); + private MockClient(ThreadPool threadPool, ProjectId projectId) { + super(threadPool, TestProjectResolvers.singleProject(projectId)); } public void addHandler( diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 8285900f0b00..5ae8be378082 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -163,6 +163,19 @@ public class ClusterChangedEvent { return result; } + /** + * Checks whether custom metadata type for a project has changed between the previous cluster state + * and the new cluster state. Custom metadata types are considered changed iff they have been added, + * updated or removed between the previous and the current state + */ + public boolean customMetadataChanged(ProjectId projectId, String customMetadataType) { + ProjectMetadata previousProject = previousState.metadata().projects().get(projectId); + ProjectMetadata project = state.metadata().projects().get(projectId); + Object previousValue = previousProject == null ? null : previousProject.customs().get(customMetadataType); + Object value = project == null ? null : project.customs().get(customMetadataType); + return Objects.equals(previousValue, value) == false; + } + private > Set changedCustoms( Map currentCustoms, Map previousCustoms diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index cda73d4fa0bc..20c80533c659 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.CancellableTask; @@ -74,6 +75,26 @@ public class AllocatedPersistentTask extends CancellableTask { ); } + /** + * Updates the persistent state for the corresponding project scope persistent task. + *

+ * This doesn't affect the status of this allocated task. + */ + public void updateProjectPersistentTaskState( + final ProjectId projectId, + final PersistentTaskState state, + final ActionListener> listener + ) { + persistentTasksService.sendProjectUpdateStateRequest( + projectId, + persistentTaskId, + allocationId, + state, + TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */, + listener + ); + } + public String getPersistentTaskId() { return persistentTaskId; } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index ea83c92bfcdd..e112df1072c3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -483,6 +483,91 @@ public class ClusterChangedEventTests extends ESTestCase { assertThat(event.changedCustomProjectMetadataSet(), containsInAnyOrder(customProjectMetadata.getWriteableName())); } + public void testChangedCustomProjectMetadataSet() { + final CustomProjectMetadata custom1 = new CustomProjectMetadata("custom1"); + final CustomProjectMetadata custom2 = new CustomProjectMetadata("custom2"); + final ProjectMetadata project1 = ProjectMetadata.builder(randomUniqueProjectId()) + .putCustom(custom1.getWriteableName(), custom1) + .build(); + final ProjectMetadata project2 = ProjectMetadata.builder(randomUniqueProjectId()) + .putCustom(custom2.getWriteableName(), custom2) + .build(); + + final ClusterState originalState = ClusterState.builder(TEST_CLUSTER_NAME) + .metadata(Metadata.builder().put(project1).build()) + .build(); + + // No changes + { + ClusterState newState = ClusterState.builder(originalState).build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, originalState); + // existing project + assertFalse(event.customMetadataChanged(project1.id(), custom1.getWriteableName())); + // non-existing project + assertFalse(event.customMetadataChanged(project2.id(), custom2.getWriteableName())); + } + + // Add custom to existing project + { + ClusterState newState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project1).putCustom(custom2.getWriteableName(), custom2).build()) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, originalState); + assertTrue(event.customMetadataChanged(project1.id(), custom2.getWriteableName())); + } + + // Remove custom from existing project + { + ClusterState newState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project1).removeCustom(custom1.getWriteableName()).build()) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, originalState); + assertTrue(event.customMetadataChanged(project1.id(), custom1.getWriteableName())); + } + + // Add new project with custom + { + ClusterState newState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project2).build()) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, originalState); + // IndexGraveyard is always added when a new project is created, this checks that IndexGraveyard is "changed + assertTrue(event.customMetadataChanged(project2.id(), IndexGraveyard.TYPE)); + assertTrue(event.customMetadataChanged(project2.id(), custom2.getWriteableName())); + // No change to other project + assertFalse(event.customMetadataChanged(project1.id(), custom1.getWriteableName())); + } + + // remove project + { + ClusterState oldState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project2).build()) + .build(); + // project2 is removed + ClusterState newState = originalState; + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, oldState); + // IndexGraveyard is always added when a new project is created, this checks that IndexGraveyard is "changed" + assertTrue(event.customMetadataChanged(project2.id(), IndexGraveyard.TYPE)); + assertTrue(event.customMetadataChanged(project2.id(), custom2.getWriteableName())); + // No change to other project + assertFalse(event.customMetadataChanged(project1.id(), custom1.getWriteableName())); + } + + // add custom to project1 + remove project2 + { + ClusterState oldState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project2).build()) + .build(); + ClusterState newState = ClusterState.builder(originalState) + .putProjectMetadata(ProjectMetadata.builder(project1).putCustom(custom2.getWriteableName(), custom2).build()) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, oldState); + assertTrue(event.customMetadataChanged(project2.id(), IndexGraveyard.TYPE)); + assertTrue(event.customMetadataChanged(project2.id(), custom2.getWriteableName())); + assertTrue(event.customMetadataChanged(project1.id(), custom2.getWriteableName())); + } + } + public void testChangedCustomMetadataSetMultiProject() { final CustomProjectMetadata project1Custom = new CustomProjectMetadata("project1"); final CustomProjectMetadata project2Custom = new CustomProjectMetadata("project2"); @@ -519,6 +604,7 @@ public class ClusterChangedEventTests extends ESTestCase { ) .build(); event = new ClusterChangedEvent("_na_", originalState, newState); + // IndexGraveyard is always added when a new project is created, this checks that IndexGraveyard is "changed" assertEquals(Set.of(IndexGraveyard.TYPE, project2Custom.getWriteableName()), event.changedCustomProjectMetadataSet()); }