diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 8982fbba9b61..4e9034e5a4c3 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -296,6 +296,7 @@ public class PersistentTasksService { /** * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter */ + @Deprecated(forRemoval = true) // Use the explict cluster/project version instead public void sendRemoveRequest(final String taskId, final TimeValue timeout, final ActionListener> listener) { sendRemoveRequest(null, taskId, timeout, listener); } diff --git a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java index e34ecdda81d7..d33201a3fdc9 100644 --- a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java +++ b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java @@ -40,7 +40,8 @@ public class EnterpriseDownloaderPlugin extends Plugin { services.client(), services.clusterService(), services.threadPool(), - getLicenseState() + getLicenseState(), + services.projectResolver() ); enterpriseGeoIpDownloaderLicenseListener.init(); return List.of(enterpriseGeoIpDownloaderLicenseListener); diff --git a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java index 545a5132a201..78b161f60d27 100644 --- a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java +++ b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java @@ -16,9 +16,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.NotMultiProjectCapable; import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams; import org.elasticsearch.license.License; import org.elasticsearch.license.LicenseStateListener; @@ -31,12 +33,14 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.core.XPackField; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener, ClusterStateListener { private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloaderLicenseListener.class); - // Note: This custom type is GeoIpMetadata.TYPE, but that class is not exposed to this plugin + // Note: This custom type is IngestGeoIpMetadata.TYPE, but that class is not exposed to this plugin static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip"; private final PersistentTasksService persistentTasksService; @@ -47,18 +51,21 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis XPackField.ENTERPRISE_GEOIP_DOWNLOADER, License.OperationMode.PLATINUM ); - private volatile boolean licenseIsValid = false; - private volatile boolean hasIngestGeoIpMetadata = false; + private final ConcurrentMap licenseIsValid = new ConcurrentHashMap<>(); + private final ConcurrentMap hasIngestGeoIpMetadata = new ConcurrentHashMap<>(); + private final ProjectResolver projectResolver; protected EnterpriseGeoIpDownloaderLicenseListener( Client client, ClusterService clusterService, ThreadPool threadPool, - XPackLicenseState licenseState + XPackLicenseState licenseState, + ProjectResolver projectResolver ) { this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); this.clusterService = clusterService; this.licenseState = licenseState; + this.projectResolver = projectResolver; } private volatile boolean licenseStateListenerRegistered; @@ -74,47 +81,55 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis licenseState.addListener(this); } + @NotMultiProjectCapable(description = "Replace DEFAULT project after enterprise license is supported in serverless and project-aware") @Override public void licenseStateChanged() { - licenseIsValid = ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState); - maybeUpdateTaskState(clusterService.state()); + licenseIsValid.put(ProjectId.DEFAULT, ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState)); + final boolean isLocalNodeMaster = clusterService.state().nodes().isLocalNodeElectedMaster(); + maybeUpdateTaskState(ProjectId.DEFAULT, isLocalNodeMaster); } @Override public void clusterChanged(ClusterChangedEvent event) { - hasIngestGeoIpMetadata = event.state().metadata().getProject().custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null; - final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged() - && event.changedCustomProjectMetadataSet().contains(INGEST_GEOIP_CUSTOM_METADATA_TYPE); final boolean masterNodeChanged = Objects.equals( event.state().nodes().getMasterNode(), event.previousState().nodes().getMasterNode() ) == false; - /* - * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState if this cluster change - * event involved the modification of custom geoip metadata OR a master node change - */ - if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata)) { - maybeUpdateTaskState(event.state()); - } + final boolean isLocalNodeMaster = event.state().nodes().isLocalNodeElectedMaster(); + event.state().metadata().projects().values().forEach(projectMetadata -> { + ProjectId projectId = projectMetadata.id(); + final boolean hasMetadata = projectMetadata.custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null; + hasIngestGeoIpMetadata.put(projectId, hasMetadata); + final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged() + && event.customMetadataChanged(projectId, INGEST_GEOIP_CUSTOM_METADATA_TYPE); + /* + * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState + * if this cluster change event involved the modification of custom geoip metadata OR a master node change + */ + if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata.getOrDefault(projectId, false))) { + maybeUpdateTaskState(projectId, isLocalNodeMaster); + } + }); } - private void maybeUpdateTaskState(ClusterState state) { + private void maybeUpdateTaskState(ProjectId projectId, boolean isLocalNodeMaster) { // We should only start/stop task from single node, master is the best as it will go through it anyway - if (state.nodes().isLocalNodeElectedMaster()) { - if (licenseIsValid) { - if (hasIngestGeoIpMetadata) { - ensureTaskStarted(); + if (isLocalNodeMaster) { + if (licenseIsValid.getOrDefault(projectId, false)) { + if (hasIngestGeoIpMetadata.getOrDefault(projectId, false)) { + ensureTaskStarted(projectId); } } else { - ensureTaskStopped(); + ensureTaskStopped(projectId); } } } - private void ensureTaskStarted() { - assert licenseIsValid : "Task should never be started without valid license"; - persistentTasksService.sendStartRequest( - ENTERPRISE_GEOIP_DOWNLOADER, + private void ensureTaskStarted(ProjectId projectId) { + assert licenseIsValid.getOrDefault(projectId, false) : "Task should never be started without valid license"; + persistentTasksService.sendProjectStartRequest( + projectId, + getTaskId(projectId, projectResolver.supportsMultipleProjects()), ENTERPRISE_GEOIP_DOWNLOADER, new EnterpriseGeoIpTaskParams(), MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, @@ -127,7 +142,7 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis ); } - private void ensureTaskStopped() { + private void ensureTaskStopped(ProjectId projectId) { ActionListener> listener = ActionListener.wrap( r -> logger.debug("Stopped enterprise geoip downloader task"), e -> { @@ -137,6 +152,15 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis } } ); - persistentTasksService.sendRemoveRequest(ENTERPRISE_GEOIP_DOWNLOADER, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, listener); + persistentTasksService.sendProjectRemoveRequest( + projectId, + getTaskId(projectId, projectResolver.supportsMultipleProjects()), + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, + listener + ); + } + + protected static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) { + return supportsMultipleProjects ? projectId + "/" + ENTERPRISE_GEOIP_DOWNLOADER : ENTERPRISE_GEOIP_DOWNLOADER; } } diff --git a/x-pack/plugin/geoip-enterprise-downloader/src/test/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListenerTests.java b/x-pack/plugin/geoip-enterprise-downloader/src/test/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListenerTests.java index 8b5b2b84c3ca..ccdcf9c31699 100644 --- a/x-pack/plugin/geoip-enterprise-downloader/src/test/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListenerTests.java +++ b/x-pack/plugin/geoip-enterprise-downloader/src/test/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListenerTests.java @@ -16,10 +16,14 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.NotMultiProjectCapable; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.license.License; @@ -48,6 +52,8 @@ import static org.mockito.Mockito.when; public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { private ThreadPool threadPool; + @NotMultiProjectCapable(description = "Enterprise license not available in serverless or multi-project yet") + private final ProjectResolver projectResolver = TestProjectResolvers.DEFAULT_PROJECT_ONLY; @Before public void setup() { @@ -68,12 +74,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { // Should never start if not master node, even if all other conditions have been met final XPackLicenseState licenseState = getAlwaysValidLicense(); ClusterService clusterService = createClusterService(true, false); - TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false); + TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false, projectResolver); EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( client, clusterService, threadPool, - licenseState + licenseState, + projectResolver ); listener.init(); listener.licenseStateChanged(); @@ -85,12 +92,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { final TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState(); licenseState.update(new XPackLicenseStatus(License.OperationMode.TRIAL, false, "")); ClusterService clusterService = createClusterService(true, true); - TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true); + TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true, projectResolver); EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( client, clusterService, threadPool, - licenseState + licenseState, + projectResolver ); listener.init(); listener.licenseStateChanged(); @@ -110,12 +118,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { public void testDatabaseChanges() { final XPackLicenseState licenseState = getAlwaysValidLicense(); ClusterService clusterService = createClusterService(true, false); - TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false); + TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver); EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( client, clusterService, threadPool, - licenseState + licenseState, + projectResolver ); listener.init(); listener.licenseStateChanged(); @@ -134,12 +143,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { // Should never start if not master node, even if all other conditions have been met final XPackLicenseState licenseState = getAlwaysValidLicense(); ClusterService clusterService = createClusterService(false, false); - TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false); + TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver); EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( client, clusterService, threadPool, - licenseState + licenseState, + projectResolver ); listener.init(); listener.licenseStateChanged(); @@ -172,7 +182,15 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("name")); if (hasGeoIpDatabases) { PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of()); - clusterStateBuilder.metadata(Metadata.builder().putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata).put(idxMeta)); + clusterStateBuilder.metadata( + Metadata.builder() + .put( + ProjectMetadata.builder(projectResolver.getProjectId()) + .putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata) + .put(idxMeta) + .build() + ) + ); } return clusterStateBuilder.nodes(discoveryNodesBuilder).build(); } @@ -184,8 +202,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { private boolean taskStartCalled = false; private boolean taskRemoveCalled = false; - private TaskStartAndRemoveMockClient(ThreadPool threadPool, boolean expectStartTask, boolean expectRemoveTask) { - super(threadPool); + private TaskStartAndRemoveMockClient( + ThreadPool threadPool, + boolean expectStartTask, + boolean expectRemoveTask, + ProjectResolver projectResolver + ) { + super(threadPool, projectResolver); this.expectStartTask = expectStartTask; this.expectRemoveTask = expectRemoveTask; }