Make TransportPutDatabaseConfigurationAction project aware (#130063)

This commit is contained in:
Sam Xiao 2025-06-27 16:15:49 +08:00 committed by GitHub
parent 3200abc4ce
commit 6cb7b2d2f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 24 additions and 15 deletions

View file

@ -20,7 +20,9 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -58,13 +60,15 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
}; };
private final MasterServiceTaskQueue<UpdateDatabaseConfigurationTask> updateDatabaseConfigurationTaskQueue; private final MasterServiceTaskQueue<UpdateDatabaseConfigurationTask> updateDatabaseConfigurationTaskQueue;
private final ProjectResolver projectResolver;
@Inject @Inject
public TransportPutDatabaseConfigurationAction( public TransportPutDatabaseConfigurationAction(
TransportService transportService, TransportService transportService,
ClusterService clusterService, ClusterService clusterService,
ThreadPool threadPool, ThreadPool threadPool,
ActionFilters actionFilters ActionFilters actionFilters,
ProjectResolver projectResolver
) { ) {
super( super(
PutDatabaseConfigurationAction.NAME, PutDatabaseConfigurationAction.NAME,
@ -81,6 +85,7 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
Priority.NORMAL, Priority.NORMAL,
UPDATE_TASK_EXECUTOR UPDATE_TASK_EXECUTOR
); );
this.projectResolver = projectResolver;
} }
@Override @Override
@ -89,7 +94,7 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
updateDatabaseConfigurationTaskQueue.submitTask( updateDatabaseConfigurationTaskQueue.submitTask(
Strings.format("update-geoip-database-configuration-[%s]", id), Strings.format("update-geoip-database-configuration-[%s]", id),
new UpdateDatabaseConfigurationTask(listener, request.getDatabase()), new UpdateDatabaseConfigurationTask(projectResolver.getProjectId(), listener, request.getDatabase()),
null null
); );
} }
@ -105,9 +110,9 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
} }
} }
static void validatePrerequisites(DatabaseConfiguration database, ClusterState state) { static void validatePrerequisites(ProjectId projectId, DatabaseConfiguration database, ClusterState state) {
// we need to verify that the database represents a unique file (name) among the various databases for this same provider // we need to verify that the database represents a unique file (name) among the various databases for this same provider
IngestGeoIpMetadata geoIpMeta = state.metadata().getProject().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY); IngestGeoIpMetadata geoIpMeta = state.metadata().getProject(projectId).custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
Optional<DatabaseConfiguration> sameName = geoIpMeta.getDatabases() Optional<DatabaseConfiguration> sameName = geoIpMeta.getDatabases()
.values() .values()
@ -125,12 +130,14 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
}); });
} }
private record UpdateDatabaseConfigurationTask(ActionListener<AcknowledgedResponse> listener, DatabaseConfiguration database) private record UpdateDatabaseConfigurationTask(
implements ProjectId projectId,
ClusterStateTaskListener { ActionListener<AcknowledgedResponse> listener,
DatabaseConfiguration database
) implements ClusterStateTaskListener {
ClusterState execute(ClusterState currentState) throws Exception { ClusterState execute(ClusterState currentState) throws Exception {
final var project = currentState.metadata().getProject(); final var project = currentState.metadata().getProject(projectId);
IngestGeoIpMetadata geoIpMeta = project.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY); IngestGeoIpMetadata geoIpMeta = project.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
String id = database.id(); String id = database.id();
@ -140,7 +147,7 @@ public class TransportPutDatabaseConfigurationAction extends TransportMasterNode
return currentState; return currentState;
} }
validatePrerequisites(database, currentState); validatePrerequisites(projectId, database, currentState);
Map<String, DatabaseConfigurationMetadata> databases = new HashMap<>(geoIpMeta.getDatabases()); Map<String, DatabaseConfigurationMetadata> databases = new HashMap<>(geoIpMeta.getDatabases());
databases.put( databases.put(

View file

@ -10,7 +10,8 @@
package org.elasticsearch.ingest.geoip.direct; package org.elasticsearch.ingest.geoip.direct;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata; import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -20,25 +21,26 @@ import java.util.Map;
public class TransportPutDatabaseConfigurationActionTests extends ESTestCase { public class TransportPutDatabaseConfigurationActionTests extends ESTestCase {
public void testValidatePrerequisites() { public void testValidatePrerequisites() {
ProjectId projectId = randomProjectIdOrDefault();
// Test that we reject two configurations with the same database name but different ids: // Test that we reject two configurations with the same database name but different ids:
String name = randomAlphaOfLengthBetween(1, 50); String name = randomAlphaOfLengthBetween(1, 50);
IngestGeoIpMetadata ingestGeoIpMetadata = randomIngestGeoIpMetadata(name); IngestGeoIpMetadata ingestGeoIpMetadata = randomIngestGeoIpMetadata(name);
ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE) ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder(Metadata.EMPTY_METADATA).putCustom(IngestGeoIpMetadata.TYPE, ingestGeoIpMetadata)) .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestGeoIpMetadata.TYPE, ingestGeoIpMetadata).build())
.build(); .build();
DatabaseConfiguration databaseConfiguration = randomDatabaseConfiguration(randomIdentifier(), name); DatabaseConfiguration databaseConfiguration = randomDatabaseConfiguration(randomIdentifier(), name);
expectThrows( expectThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
() -> TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfiguration, state) () -> TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfiguration, state)
); );
// Test that we do not reject two configurations with different database names: // Test that we do not reject two configurations with different database names:
String differentName = randomValueOtherThan(name, () -> randomAlphaOfLengthBetween(1, 50)); String differentName = randomValueOtherThan(name, () -> randomAlphaOfLengthBetween(1, 50));
DatabaseConfiguration databaseConfigurationForDifferentName = randomDatabaseConfiguration(randomIdentifier(), differentName); DatabaseConfiguration databaseConfigurationForDifferentName = randomDatabaseConfiguration(randomIdentifier(), differentName);
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfigurationForDifferentName, state); TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfigurationForDifferentName, state);
// Test that we do not reject a configuration if none already exists: // Test that we do not reject a configuration if none already exists:
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfiguration, ClusterState.EMPTY_STATE); TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfiguration, ClusterState.EMPTY_STATE);
// Test that we do not reject a configuration if one with the same database name AND id already exists: // Test that we do not reject a configuration if one with the same database name AND id already exists:
DatabaseConfiguration databaseConfigurationSameNameSameId = ingestGeoIpMetadata.getDatabases() DatabaseConfiguration databaseConfigurationSameNameSameId = ingestGeoIpMetadata.getDatabases()
@ -46,7 +48,7 @@ public class TransportPutDatabaseConfigurationActionTests extends ESTestCase {
.iterator() .iterator()
.next() .next()
.database(); .database();
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfigurationSameNameSameId, state); TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfigurationSameNameSameId, state);
} }
private IngestGeoIpMetadata randomIngestGeoIpMetadata(String name) { private IngestGeoIpMetadata randomIngestGeoIpMetadata(String name) {