Make EnterpriseGeoIpDownloaderLicenseListener project aware (#129992)

This commit is contained in:
Sam Xiao 2025-06-27 16:12:09 +08:00 committed by GitHub
parent 93e4e01277
commit 3200abc4ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 90 additions and 41 deletions

View file

@ -296,6 +296,7 @@ public class PersistentTasksService {
/** /**
* Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter
*/ */
@Deprecated(forRemoval = true) // Use the explict cluster/project version instead
public void sendRemoveRequest(final String taskId, final TimeValue timeout, final ActionListener<PersistentTask<?>> listener) { public void sendRemoveRequest(final String taskId, final TimeValue timeout, final ActionListener<PersistentTask<?>> listener) {
sendRemoveRequest(null, taskId, timeout, listener); sendRemoveRequest(null, taskId, timeout, listener);
} }

View file

@ -40,7 +40,8 @@ public class EnterpriseDownloaderPlugin extends Plugin {
services.client(), services.client(),
services.clusterService(), services.clusterService(),
services.threadPool(), services.threadPool(),
getLicenseState() getLicenseState(),
services.projectResolver()
); );
enterpriseGeoIpDownloaderLicenseListener.init(); enterpriseGeoIpDownloaderLicenseListener.init();
return List.of(enterpriseGeoIpDownloaderLicenseListener); return List.of(enterpriseGeoIpDownloaderLicenseListener);

View file

@ -16,9 +16,11 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams; import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
import org.elasticsearch.license.License; import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseStateListener; import org.elasticsearch.license.LicenseStateListener;
@ -31,12 +33,14 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener, ClusterStateListener { public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloaderLicenseListener.class); private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloaderLicenseListener.class);
// Note: This custom type is GeoIpMetadata.TYPE, but that class is not exposed to this plugin // Note: This custom type is IngestGeoIpMetadata.TYPE, but that class is not exposed to this plugin
static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip"; static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip";
private final PersistentTasksService persistentTasksService; private final PersistentTasksService persistentTasksService;
@ -47,18 +51,21 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
XPackField.ENTERPRISE_GEOIP_DOWNLOADER, XPackField.ENTERPRISE_GEOIP_DOWNLOADER,
License.OperationMode.PLATINUM License.OperationMode.PLATINUM
); );
private volatile boolean licenseIsValid = false; private final ConcurrentMap<ProjectId, Boolean> licenseIsValid = new ConcurrentHashMap<>();
private volatile boolean hasIngestGeoIpMetadata = false; private final ConcurrentMap<ProjectId, Boolean> hasIngestGeoIpMetadata = new ConcurrentHashMap<>();
private final ProjectResolver projectResolver;
protected EnterpriseGeoIpDownloaderLicenseListener( protected EnterpriseGeoIpDownloaderLicenseListener(
Client client, Client client,
ClusterService clusterService, ClusterService clusterService,
ThreadPool threadPool, ThreadPool threadPool,
XPackLicenseState licenseState XPackLicenseState licenseState,
ProjectResolver projectResolver
) { ) {
this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
this.clusterService = clusterService; this.clusterService = clusterService;
this.licenseState = licenseState; this.licenseState = licenseState;
this.projectResolver = projectResolver;
} }
private volatile boolean licenseStateListenerRegistered; private volatile boolean licenseStateListenerRegistered;
@ -74,47 +81,55 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
licenseState.addListener(this); licenseState.addListener(this);
} }
@NotMultiProjectCapable(description = "Replace DEFAULT project after enterprise license is supported in serverless and project-aware")
@Override @Override
public void licenseStateChanged() { public void licenseStateChanged() {
licenseIsValid = ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState); licenseIsValid.put(ProjectId.DEFAULT, ENTERPRISE_GEOIP_FEATURE.checkWithoutTracking(licenseState));
maybeUpdateTaskState(clusterService.state()); final boolean isLocalNodeMaster = clusterService.state().nodes().isLocalNodeElectedMaster();
maybeUpdateTaskState(ProjectId.DEFAULT, isLocalNodeMaster);
} }
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
hasIngestGeoIpMetadata = event.state().metadata().getProject().custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null;
final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged()
&& event.changedCustomProjectMetadataSet().contains(INGEST_GEOIP_CUSTOM_METADATA_TYPE);
final boolean masterNodeChanged = Objects.equals( final boolean masterNodeChanged = Objects.equals(
event.state().nodes().getMasterNode(), event.state().nodes().getMasterNode(),
event.previousState().nodes().getMasterNode() event.previousState().nodes().getMasterNode()
) == false; ) == false;
final boolean isLocalNodeMaster = event.state().nodes().isLocalNodeElectedMaster();
event.state().metadata().projects().values().forEach(projectMetadata -> {
ProjectId projectId = projectMetadata.id();
final boolean hasMetadata = projectMetadata.custom(INGEST_GEOIP_CUSTOM_METADATA_TYPE) != null;
hasIngestGeoIpMetadata.put(projectId, hasMetadata);
final boolean ingestGeoIpCustomMetaChangedInEvent = event.metadataChanged()
&& event.customMetadataChanged(projectId, INGEST_GEOIP_CUSTOM_METADATA_TYPE);
/* /*
* We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState if this cluster change * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState
* event involved the modification of custom geoip metadata OR a master node change * if this cluster change event involved the modification of custom geoip metadata OR a master node change
*/ */
if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata)) { if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata.getOrDefault(projectId, false))) {
maybeUpdateTaskState(event.state()); maybeUpdateTaskState(projectId, isLocalNodeMaster);
} }
});
} }
private void maybeUpdateTaskState(ClusterState state) { private void maybeUpdateTaskState(ProjectId projectId, boolean isLocalNodeMaster) {
// We should only start/stop task from single node, master is the best as it will go through it anyway // We should only start/stop task from single node, master is the best as it will go through it anyway
if (state.nodes().isLocalNodeElectedMaster()) { if (isLocalNodeMaster) {
if (licenseIsValid) { if (licenseIsValid.getOrDefault(projectId, false)) {
if (hasIngestGeoIpMetadata) { if (hasIngestGeoIpMetadata.getOrDefault(projectId, false)) {
ensureTaskStarted(); ensureTaskStarted(projectId);
} }
} else { } else {
ensureTaskStopped(); ensureTaskStopped(projectId);
} }
} }
} }
private void ensureTaskStarted() { private void ensureTaskStarted(ProjectId projectId) {
assert licenseIsValid : "Task should never be started without valid license"; assert licenseIsValid.getOrDefault(projectId, false) : "Task should never be started without valid license";
persistentTasksService.sendStartRequest( persistentTasksService.sendProjectStartRequest(
ENTERPRISE_GEOIP_DOWNLOADER, projectId,
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
ENTERPRISE_GEOIP_DOWNLOADER, ENTERPRISE_GEOIP_DOWNLOADER,
new EnterpriseGeoIpTaskParams(), new EnterpriseGeoIpTaskParams(),
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
@ -127,7 +142,7 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
); );
} }
private void ensureTaskStopped() { private void ensureTaskStopped(ProjectId projectId) {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap( ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
r -> logger.debug("Stopped enterprise geoip downloader task"), r -> logger.debug("Stopped enterprise geoip downloader task"),
e -> { e -> {
@ -137,6 +152,15 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
} }
} }
); );
persistentTasksService.sendRemoveRequest(ENTERPRISE_GEOIP_DOWNLOADER, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, listener); persistentTasksService.sendProjectRemoveRequest(
projectId,
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
listener
);
}
protected static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) {
return supportsMultipleProjects ? projectId + "/" + ENTERPRISE_GEOIP_DOWNLOADER : ENTERPRISE_GEOIP_DOWNLOADER;
} }
} }

View file

@ -16,10 +16,14 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.license.License; import org.elasticsearch.license.License;
@ -48,6 +52,8 @@ import static org.mockito.Mockito.when;
public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase { public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
@NotMultiProjectCapable(description = "Enterprise license not available in serverless or multi-project yet")
private final ProjectResolver projectResolver = TestProjectResolvers.DEFAULT_PROJECT_ONLY;
@Before @Before
public void setup() { public void setup() {
@ -68,12 +74,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
// Should never start if not master node, even if all other conditions have been met // Should never start if not master node, even if all other conditions have been met
final XPackLicenseState licenseState = getAlwaysValidLicense(); final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(true, false); ClusterService clusterService = createClusterService(true, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false); TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, true, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client, client,
clusterService, clusterService,
threadPool, threadPool,
licenseState licenseState,
projectResolver
); );
listener.init(); listener.init();
listener.licenseStateChanged(); listener.licenseStateChanged();
@ -85,12 +92,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
final TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState(); final TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
licenseState.update(new XPackLicenseStatus(License.OperationMode.TRIAL, false, "")); licenseState.update(new XPackLicenseStatus(License.OperationMode.TRIAL, false, ""));
ClusterService clusterService = createClusterService(true, true); ClusterService clusterService = createClusterService(true, true);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true); TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, true, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client, client,
clusterService, clusterService,
threadPool, threadPool,
licenseState licenseState,
projectResolver
); );
listener.init(); listener.init();
listener.licenseStateChanged(); listener.licenseStateChanged();
@ -110,12 +118,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
public void testDatabaseChanges() { public void testDatabaseChanges() {
final XPackLicenseState licenseState = getAlwaysValidLicense(); final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(true, false); ClusterService clusterService = createClusterService(true, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false); TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client, client,
clusterService, clusterService,
threadPool, threadPool,
licenseState licenseState,
projectResolver
); );
listener.init(); listener.init();
listener.licenseStateChanged(); listener.licenseStateChanged();
@ -134,12 +143,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
// Should never start if not master node, even if all other conditions have been met // Should never start if not master node, even if all other conditions have been met
final XPackLicenseState licenseState = getAlwaysValidLicense(); final XPackLicenseState licenseState = getAlwaysValidLicense();
ClusterService clusterService = createClusterService(false, false); ClusterService clusterService = createClusterService(false, false);
TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false); TaskStartAndRemoveMockClient client = new TaskStartAndRemoveMockClient(threadPool, false, false, projectResolver);
EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener( EnterpriseGeoIpDownloaderLicenseListener listener = new EnterpriseGeoIpDownloaderLicenseListener(
client, client,
clusterService, clusterService,
threadPool, threadPool,
licenseState licenseState,
projectResolver
); );
listener.init(); listener.init();
listener.licenseStateChanged(); listener.licenseStateChanged();
@ -172,7 +182,15 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("name")); ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("name"));
if (hasGeoIpDatabases) { if (hasGeoIpDatabases) {
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of()); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of());
clusterStateBuilder.metadata(Metadata.builder().putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata).put(idxMeta)); clusterStateBuilder.metadata(
Metadata.builder()
.put(
ProjectMetadata.builder(projectResolver.getProjectId())
.putCustom(INGEST_GEOIP_CUSTOM_METADATA_TYPE, tasksCustomMetadata)
.put(idxMeta)
.build()
)
);
} }
return clusterStateBuilder.nodes(discoveryNodesBuilder).build(); return clusterStateBuilder.nodes(discoveryNodesBuilder).build();
} }
@ -184,8 +202,13 @@ public class EnterpriseGeoIpDownloaderLicenseListenerTests extends ESTestCase {
private boolean taskStartCalled = false; private boolean taskStartCalled = false;
private boolean taskRemoveCalled = false; private boolean taskRemoveCalled = false;
private TaskStartAndRemoveMockClient(ThreadPool threadPool, boolean expectStartTask, boolean expectRemoveTask) { private TaskStartAndRemoveMockClient(
super(threadPool); ThreadPool threadPool,
boolean expectStartTask,
boolean expectRemoveTask,
ProjectResolver projectResolver
) {
super(threadPool, projectResolver);
this.expectStartTask = expectStartTask; this.expectStartTask = expectStartTask;
this.expectRemoveTask = expectRemoveTask; this.expectRemoveTask = expectRemoveTask;
} }