diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceIT.java index 7331afdbf585..57d7a58b6797 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceIT.java @@ -15,7 +15,9 @@ import com.maxmind.geoip2.record.Country; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.xcontent.XContentType; import java.io.BufferedInputStream; @@ -40,6 +42,10 @@ import java.util.zip.GZIPOutputStream; import static org.hamcrest.Matchers.equalTo; public class DatabaseNodeServiceIT extends AbstractGeoIpIT { + + @FixForMultiProject(description = "Use random project ID after ESIntegTestCase is MP enabled") + private final ProjectId projectId = ProjectId.DEFAULT; + /* * This test makes sure that if we index an ordinary mmdb file into the .geoip_databases index, it is correctly handled upon retrieval. */ @@ -50,7 +56,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { String databaseName = randomAlphaOfLength(20) + "-" + databaseFileName; byte[] mmdbBytes = getBytesForFile(databaseFileName); final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class); - assertNull(databaseNodeService.getDatabase(databaseName)); + assertNull(databaseNodeService.getDatabase(projectId, databaseName)); int numChunks = indexData(databaseName, mmdbBytes); /* * If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we @@ -58,7 +64,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { */ assertBusy(() -> { retrieveDatabase(databaseNodeService, databaseName, mmdbBytes, numChunks); - assertNotNull(databaseNodeService.getDatabase(databaseName)); + assertNotNull(databaseNodeService.getDatabase(projectId, databaseName)); assertValidDatabase(databaseNodeService, databaseName, databaseType); }); } @@ -75,7 +81,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { byte[] mmdbBytes = getBytesForFile(databaseFileName); byte[] gzipBytes = gzipFileBytes(databaseName, mmdbBytes); final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class); - assertNull(databaseNodeService.getDatabase(databaseName)); + assertNull(databaseNodeService.getDatabase(projectId, databaseName)); int numChunks = indexData(databaseName, gzipBytes); /* * If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we @@ -83,7 +89,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { */ assertBusy(() -> { retrieveDatabase(databaseNodeService, databaseName, gzipBytes, numChunks); - assertNotNull(databaseNodeService.getDatabase(databaseName)); + assertNotNull(databaseNodeService.getDatabase(projectId, databaseName)); assertValidDatabase(databaseNodeService, databaseName, databaseType); }); } @@ -93,7 +99,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { */ private void assertValidDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, String databaseType) throws IOException { - IpDatabase database = databaseNodeService.getDatabase(databaseFileName); + IpDatabase database = databaseNodeService.getDatabase(projectId, databaseFileName); assertNotNull(database); assertThat(database.getDatabaseType(), equalTo(databaseType)); CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry); @@ -110,7 +116,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT { private void retrieveDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, byte[] expectedBytes, int numChunks) throws IOException { GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(1, 0, numChunks - 1, getMd5(expectedBytes), 1); - databaseNodeService.retrieveAndUpdateDatabase(databaseFileName, metadata); + databaseNodeService.retrieveAndUpdateDatabase(projectId, databaseFileName, metadata); } private String getMd5(byte[] bytes) { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java index 7c92636d3420..c65d9a2dc200 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java @@ -11,7 +11,11 @@ package org.elasticsearch.ingest.geoip; import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.IOUtils; @@ -32,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.cluster.ClusterState.builder; import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE; import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase; import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases; @@ -62,31 +67,39 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance */ public void test() throws Exception { + ProjectId projectId = randomProjectIdOrDefault(); Path geoIpConfigDir = createTempDir(); Path geoIpTmpDir = createTempDir(); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir, clusterService); + when(clusterService.state()).thenReturn( + builder(ClusterName.DEFAULT).putProjectMetadata(ProjectMetadata.builder(projectId).build()).build() + ); + DatabaseNodeService databaseNodeService = createRegistry( + geoIpConfigDir, + geoIpTmpDir, + clusterService, + TestProjectResolvers.singleProject(projectId) + ); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); - databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); - databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); - lazyLoadReaders(databaseNodeService); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + lazyLoadReaders(projectId, databaseNodeService); final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create( null, "_tag", null, new HashMap<>(Map.of("field", "_field")), - null + projectId ); final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create( null, "_tag", null, new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb")), - null + projectId ); final AtomicBoolean completed = new AtomicBoolean(false); @@ -134,9 +147,9 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { Thread updateDatabaseThread = new Thread(() -> { for (int i = 0; i < numberOfDatabaseUpdates; i++) { try { - DatabaseReaderLazyLoader previous1 = databaseNodeService.get("GeoLite2-City.mmdb"); + DatabaseReaderLazyLoader previous1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb"); if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) { - databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb")); + databaseNodeService.removeStaleEntries(projectId, List.of("GeoLite2-City.mmdb")); assertBusy(() -> { // lazy loader may still be in use by an ingest thread, // wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)), @@ -146,22 +159,32 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { }); } else { copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); - databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + databaseNodeService.updateDatabase( + projectId, + "GeoLite2-City.mmdb", + "md5", + geoIpTmpDir.resolve("GeoLite2-City.mmdb") + ); } - DatabaseReaderLazyLoader previous2 = databaseNodeService.get("GeoLite2-City-Test.mmdb"); + DatabaseReaderLazyLoader previous2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb"); copyDatabase( i % 2 == 0 ? "GeoIP2-City-Test.mmdb" : "GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb") ); - databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseNodeService.updateDatabase( + projectId, + "GeoLite2-City-Test.mmdb", + "md5", + geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb") + ); - DatabaseReaderLazyLoader current1 = databaseNodeService.get("GeoLite2-City.mmdb"); - DatabaseReaderLazyLoader current2 = databaseNodeService.get("GeoLite2-City-Test.mmdb"); + DatabaseReaderLazyLoader current1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb"); + DatabaseReaderLazyLoader current2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb"); assertThat(current1, not(sameInstance(previous1))); assertThat(current2, not(sameInstance(previous2))); // lazy load type and reader: - lazyLoadReaders(databaseNodeService); + lazyLoadReaders(projectId, databaseNodeService); } catch (Exception | AssertionError e) { logger.error("error in update databases thread after run [" + i + "]", e); failureHolder2.set(e); @@ -193,8 +216,12 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { IOUtils.rm(geoIpConfigDir, geoIpTmpDir); } - private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir, ClusterService clusterService) - throws IOException { + private static DatabaseNodeService createRegistry( + Path geoIpConfigDir, + Path geoIpTmpDir, + ClusterService clusterService, + ProjectResolver projectResolver + ) throws IOException { GeoIpCache cache = new GeoIpCache(0); ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache); copyDefaultDatabases(geoIpConfigDir, configDatabases); @@ -204,19 +231,21 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { cache, configDatabases, Runnable::run, - clusterService + clusterService, + mock(IngestService.class), + projectResolver ); - databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class)); + databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class)); return databaseNodeService; } - private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException { - if (databaseNodeService.get("GeoLite2-City.mmdb") != null) { - databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType(); - databaseNodeService.get("GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity); + private static void lazyLoadReaders(ProjectId projectId, DatabaseNodeService databaseNodeService) throws IOException { + if (databaseNodeService.get(projectId, "GeoLite2-City.mmdb") != null) { + databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getDatabaseType(); + databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity); } - databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType(); - databaseNodeService.get("GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity); + databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getDatabaseType(); + databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity); } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java index 289008236a85..cf677558785f 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java @@ -10,6 +10,8 @@ package org.elasticsearch.ingest.geoip; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.env.Environment; import org.elasticsearch.watcher.FileChangesListener; import org.elasticsearch.watcher.FileWatcher; @@ -69,12 +71,13 @@ final class ConfigDatabases implements Closeable { return configDatabases; } + @FixForMultiProject(description = "Replace DEFAULT project") void updateDatabase(Path file, boolean update) { String databaseFileName = file.getFileName().toString(); try { if (update) { logger.info("database file changed [{}], reloading database...", file); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, null); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, file, null); DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader); if (existing != null) { existing.shutdown(); @@ -90,6 +93,7 @@ final class ConfigDatabases implements Closeable { } } + @FixForMultiProject(description = "Replace DEFAULT project") Map initConfigDatabases() throws IOException { Map databases = new HashMap<>(); @@ -103,7 +107,7 @@ final class ConfigDatabases implements Closeable { if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { assert Files.exists(databasePath); String databaseFileName = databasePath.getFileName().toString(); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath, null); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, databasePath, null); databases.put(databaseFileName, loader); } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java index 614a81da08f4..6de92f373e64 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java @@ -17,8 +17,10 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -26,7 +28,6 @@ import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedRunnable; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; @@ -57,6 +58,7 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -72,6 +74,7 @@ import java.util.zip.GZIPInputStream; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpTaskState.getEnterpriseGeoIpTaskState; +import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.getTaskId; import static org.elasticsearch.ingest.geoip.GeoIpTaskState.getGeoIpTaskState; /** @@ -104,16 +107,19 @@ public final class DatabaseNodeService implements IpDatabaseProvider { private final ConfigDatabases configDatabases; private final Consumer genericExecutor; private final ClusterService clusterService; - private IngestService ingestService; + private final IngestService ingestService; + private final ProjectResolver projectResolver; - private final ConcurrentMap databases = new ConcurrentHashMap<>(); + private final ConcurrentMap> databases = new ConcurrentHashMap<>(); DatabaseNodeService( Environment environment, Client client, GeoIpCache cache, Consumer genericExecutor, - ClusterService clusterService + ClusterService clusterService, + IngestService ingestService, + ProjectResolver projectResolver ) { this( environment.tmpDir(), @@ -121,7 +127,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider { cache, new ConfigDatabases(environment, cache), genericExecutor, - clusterService + clusterService, + ingestService, + projectResolver ); } @@ -131,7 +139,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider { GeoIpCache cache, ConfigDatabases configDatabases, Consumer genericExecutor, - ClusterService clusterService + ClusterService clusterService, + IngestService ingestService, + ProjectResolver projectResolver ) { this.client = client; this.cache = cache; @@ -139,11 +149,14 @@ public final class DatabaseNodeService implements IpDatabaseProvider { this.configDatabases = configDatabases; this.genericExecutor = genericExecutor; this.clusterService = clusterService; + this.ingestService = ingestService; + this.projectResolver = projectResolver; } - public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestServiceArg) throws IOException { + public void initialize(String nodeId, ResourceWatcherService resourceWatcher) throws IOException { configDatabases.initialize(resourceWatcher); geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId); + // delete all stale files in the geoip tmp directory Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() { @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { @@ -164,6 +177,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider { @Override public FileVisitResult visitFileFailed(Path file, IOException e) { if (e instanceof NoSuchFileException == false) { + // parameterized log fails logger check, see https://github.com/elastic/elasticsearch/issues/104782 logger.warn("can't delete stale file [" + file + "]", e); } return FileVisitResult.CONTINUE; @@ -178,16 +192,15 @@ public final class DatabaseNodeService implements IpDatabaseProvider { Files.createDirectories(geoipTmpDirectory); } logger.debug("initialized database node service, using geoip-databases directory [{}]", geoipTmpDirectory); - this.ingestService = ingestServiceArg; clusterService.addListener(event -> checkDatabases(event.state())); } @Override - public Boolean isValid(String databaseFile) { + public Boolean isValid(ProjectId projectId, String databaseFile) { ClusterState currentState = clusterService.state(); - assert currentState != null; + ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId); - GeoIpTaskState state = getGeoIpTaskState(currentState); + GeoIpTaskState state = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects())); if (state == null) { return true; } @@ -210,11 +223,11 @@ public final class DatabaseNodeService implements IpDatabaseProvider { } // for testing only: - DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(String name) { + DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(ProjectId projectId, String name) { // There is a need for reference counting in order to avoid using an instance // that gets closed while using it. (this can happen during a database update) while (true) { - DatabaseReaderLazyLoader instance = databases.get(name); + DatabaseReaderLazyLoader instance = getProjectLazyLoader(projectId, name); if (instance == null) { instance = configDatabases.getDatabase(name); } @@ -227,25 +240,29 @@ public final class DatabaseNodeService implements IpDatabaseProvider { } @Override - public IpDatabase getDatabase(String name) { - return getDatabaseReaderLazyLoader(name); + public IpDatabase getDatabase(ProjectId projectId, String name) { + return getDatabaseReaderLazyLoader(projectId, name); } List getAllDatabases() { List all = new ArrayList<>(configDatabases.getConfigDatabases().values()); - this.databases.forEach((key, value) -> all.add(value)); + this.databases.forEach((key, value) -> all.addAll(value.values())); return all; } // for testing only: - DatabaseReaderLazyLoader get(String key) { - return databases.get(key); + DatabaseReaderLazyLoader get(ProjectId projectId, String key) { + return databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>()).get(key); } public void shutdown() throws IOException { // this is a little 'fun' looking, but it's just adapting IOUtils.close() into something // that can call a bunch of shutdown methods (rather than close methods) - final var loadersToShutdown = databases.values().stream().map(ShutdownCloseable::new).toList(); + final var loadersToShutdown = databases.values() + .stream() + .flatMap(map -> map.values().stream()) + .map(ShutdownCloseable::new) + .toList(); databases.clear(); IOUtils.close(loadersToShutdown); } @@ -270,103 +287,120 @@ public final class DatabaseNodeService implements IpDatabaseProvider { return; } - PersistentTasksCustomMetadata persistentTasks = state.metadata().getProject().custom(PersistentTasksCustomMetadata.TYPE); - if (persistentTasks == null) { - logger.trace("Not checking databases because persistent tasks are null"); - return; - } + // Optimization: only load the .geoip_databases for projects that are allocated to this node + for (ProjectMetadata projectMetadata : state.metadata().projects().values()) { + ProjectId projectId = projectMetadata.id(); - IndexAbstraction databasesAbstraction = state.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); - if (databasesAbstraction == null) { - logger.trace("Not checking databases because geoip databases index does not exist"); - return; - } else { - // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index - Index databasesIndex = databasesAbstraction.getWriteIndex(); - IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(databasesIndex); - if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { - logger.trace("Not checking databases because geoip databases index does not have all active primary shards"); + PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); + if (persistentTasks == null) { + logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); + continue; + } + + IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); + if (databasesAbstraction == null) { + logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); return; + } else { + // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index + Index databasesIndex = databasesAbstraction.getWriteIndex(); + IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex); + if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { + logger.trace( + "Not checking databases because geoip databases index does not have all active primary shards for" + + " project [{}]", + projectId + ); + return; + } + } + + // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with + List> validMetadatas = new ArrayList<>(); + + // process the geoip task state for the (ordinary) geoip downloader + { + GeoIpTaskState taskState = getGeoIpTaskState( + projectMetadata, + getTaskId(projectId, projectResolver.supportsMultipleProjects()) + ); + if (taskState == null) { + // Note: an empty state will purge stale entries in databases map + taskState = GeoIpTaskState.EMPTY; + } + validMetadatas.addAll( + taskState.getDatabases() + .entrySet() + .stream() + .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) + .toList() + ); + } + + // process the geoip task state for the enterprise geoip downloader + { + EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); + if (taskState == null) { + // Note: an empty state will purge stale entries in databases map + taskState = EnterpriseGeoIpTaskState.EMPTY; + } + validMetadatas.addAll( + taskState.getDatabases() + .entrySet() + .stream() + .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) + .toList() + ); + } + + // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task + // has downloaded a new version of the databases + validMetadatas.forEach(e -> { + String name = e.v1(); + GeoIpTaskState.Metadata metadata = e.v2(); + DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); + String remoteMd5 = metadata.md5(); + String localMd5 = reference != null ? reference.getMd5() : null; + if (Objects.equals(localMd5, remoteMd5)) { + logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); + return; + } + + try { + retrieveAndUpdateDatabase(projectId, name, metadata); + } catch (Exception ex) { + logger.error(() -> "failed to retrieve database [" + name + "]", ex); + } + }); + + // TODO perhaps we need to handle the license flap persistent task state better than we do + // i think the ideal end state is that we *do not* drop the files that the enterprise downloader + // handled if they fall out -- which means we need to track that in the databases map itself + + // start with the list of all databases we currently know about in this service, + // then drop the ones that didn't check out as valid from the task states + if (databases.containsKey(projectId)) { + Set staleDatabases = new HashSet<>(databases.get(projectId).keySet()); + staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); + removeStaleEntries(projectId, staleDatabases); } } - - // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with - List> validMetadatas = new ArrayList<>(); - - // process the geoip task state for the (ordinary) geoip downloader - { - GeoIpTaskState taskState = getGeoIpTaskState(state); - if (taskState == null) { - // Note: an empty state will purge stale entries in databases map - taskState = GeoIpTaskState.EMPTY; - } - validMetadatas.addAll( - taskState.getDatabases() - .entrySet() - .stream() - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) - .toList() - ); - } - - // process the geoip task state for the enterprise geoip downloader - { - EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); - if (taskState == null) { - // Note: an empty state will purge stale entries in databases map - taskState = EnterpriseGeoIpTaskState.EMPTY; - } - validMetadatas.addAll( - taskState.getDatabases() - .entrySet() - .stream() - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) - .toList() - ); - } - - // run through all the valid metadatas, regardless of source, and retrieve them - validMetadatas.forEach(e -> { - String name = e.v1(); - GeoIpTaskState.Metadata metadata = e.v2(); - DatabaseReaderLazyLoader reference = databases.get(name); - String remoteMd5 = metadata.md5(); - String localMd5 = reference != null ? reference.getMd5() : null; - if (Objects.equals(localMd5, remoteMd5)) { - logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); - return; - } - - try { - retrieveAndUpdateDatabase(name, metadata); - } catch (Exception ex) { - logger.error(() -> "failed to retrieve database [" + name + "]", ex); - } - }); - - // TODO perhaps we need to handle the license flap persistent task state better than we do - // i think the ideal end state is that we *do not* drop the files that the enterprise downloader - // handled if they fall out -- which means we need to track that in the databases map itself - - // start with the list of all databases we currently know about in this service, - // then drop the ones that didn't check out as valid from the task states - List staleEntries = new ArrayList<>(databases.keySet()); - staleEntries.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); - removeStaleEntries(staleEntries); } - void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata metadata) throws IOException { + void retrieveAndUpdateDatabase(ProjectId projectId, String databaseName, GeoIpTaskState.Metadata metadata) throws IOException { logger.trace("retrieving database [{}]", databaseName); final String recordedMd5 = metadata.md5(); - // This acts as a lock, if this method for a specific db is executed later and downloaded for this db is still ongoing then - // FileAlreadyExistsException is thrown and this method silently returns. + Path databaseTmpDirectory = getDatabaseTmpDirectory(projectId); + // This acts as a lock to avoid multiple retrievals of the same database at the same time. If this method for a specific db is + // executed later again while a previous retrival of this db is still ongoing then FileAlreadyExistsException is thrown and + // this method silently returns. // (this method is never invoked concurrently and is invoked by a cluster state applier thread) final Path retrievedFile; try { - retrievedFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.retrieved")); + retrievedFile = Files.createFile(databaseTmpDirectory.resolve(databaseName + ".tmp.retrieved")); } catch (FileAlreadyExistsException e) { logger.debug("database update [{}] already in progress, skipping...", databaseName); return; @@ -378,75 +412,86 @@ public final class DatabaseNodeService implements IpDatabaseProvider { // Thread 2 may have updated the databases map after thread 1 detects that there is no entry (or md5 mismatch) for a database. // If thread 2 then also removes the tmp file before thread 1 attempts to create it then we're about to retrieve the same database // twice. This check is here to avoid this: - DatabaseReaderLazyLoader lazyLoader = databases.get(databaseName); + DatabaseReaderLazyLoader lazyLoader = getProjectLazyLoader(projectId, databaseName); if (lazyLoader != null && recordedMd5.equals(lazyLoader.getMd5())) { logger.debug("deleting tmp file because database [{}] has already been updated.", databaseName); Files.delete(retrievedFile); return; } - final Path databaseTmpFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp")); + final Path databaseTmpFile = Files.createFile(databaseTmpDirectory.resolve(databaseName + ".tmp")); logger.debug("retrieving database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, retrievedFile); - retrieveDatabase(databaseName, recordedMd5, metadata, bytes -> Files.write(retrievedFile, bytes, StandardOpenOption.APPEND), () -> { - final Path databaseFile = geoipTmpDirectory.resolve(databaseName); + retrieveDatabase( + projectId, + databaseName, + recordedMd5, + metadata, + bytes -> Files.write(retrievedFile, bytes, StandardOpenOption.APPEND), + () -> { + final Path databaseFile = databaseTmpDirectory.resolve(databaseName); - boolean isTarGz = MMDBUtil.isGzip(retrievedFile); - if (isTarGz) { - // tarball contains .mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files. - // we store mmdb file as is and prepend database name to all other entries to avoid conflicts - logger.debug("decompressing [{}]", retrievedFile.getFileName()); - try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(retrievedFile), 8192))) { - TarInputStream.TarEntry entry; - while ((entry = is.getNextEntry()) != null) { - // there might be ./ entry in tar, we should skip it - if (entry.notFile()) { - continue; - } - // flatten structure, remove any directories present from the path (should be ./ only) - String name = entry.name().substring(entry.name().lastIndexOf('/') + 1); - if (name.startsWith(databaseName)) { - Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING); - } else { - Files.copy(is, geoipTmpDirectory.resolve(databaseName + "_" + name), StandardCopyOption.REPLACE_EXISTING); + boolean isTarGz = MMDBUtil.isGzip(retrievedFile); + if (isTarGz) { + // tarball contains .mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files. + // we store mmdb file as is and prepend database name to all other entries to avoid conflicts + logger.debug("decompressing [{}]", retrievedFile.getFileName()); + try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(retrievedFile), 8192))) { + TarInputStream.TarEntry entry; + while ((entry = is.getNextEntry()) != null) { + // there might be ./ entry in tar, we should skip it + if (entry.notFile()) { + continue; + } + // flatten structure, remove any directories present from the path (should be ./ only) + String name = entry.name().substring(entry.name().lastIndexOf('/') + 1); + if (name.startsWith(databaseName)) { + Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING); + } else { + Files.copy( + is, + databaseTmpDirectory.resolve(databaseName + "_" + name), + StandardCopyOption.REPLACE_EXISTING + ); + } } } + } else { + /* + * Given that this is not code that will be called extremely frequently, we copy the file to the + * expected location here in order to avoid making the rest of the code more complex to avoid this. + */ + Files.copy(retrievedFile, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING); + } + // finally, atomically move some-database.mmdb.tmp to some-database.mmdb + logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile); + Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + updateDatabase(projectId, databaseName, recordedMd5, databaseFile); + Files.delete(retrievedFile); + }, + failure -> { + logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure); + try { + Files.deleteIfExists(databaseTmpFile); + Files.deleteIfExists(retrievedFile); + } catch (IOException ioe) { + ioe.addSuppressed(failure); + logger.error("unable to delete tmp database file after failure", ioe); } - } else { - /* - * Given that this is not code that will be called extremely frequently, we copy the file to the expected location here in - * order to avoid making the rest of the code more complex to avoid this. - */ - Files.copy(retrievedFile, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING); } - // finally, atomically move some-database.mmdb.tmp to some-database.mmdb - logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile); - Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - updateDatabase(databaseName, recordedMd5, databaseFile); - Files.delete(retrievedFile); - }, failure -> { - logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure); - try { - Files.deleteIfExists(databaseTmpFile); - Files.deleteIfExists(retrievedFile); - } catch (IOException ioe) { - ioe.addSuppressed(failure); - logger.error("unable to delete tmp database file after failure", ioe); - } - }); + ); } - @FixForMultiProject // Don't use default project id - void updateDatabase(String databaseFileName, String recordedMd5, Path file) { + void updateDatabase(ProjectId projectId, String databaseFileName, String recordedMd5, Path file) { try { logger.debug("starting reload of changed database file [{}]", file); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5); - DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader); + DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(projectId, cache, file, recordedMd5); + DatabaseReaderLazyLoader existing = databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>()) + .put(databaseFileName, loader); if (existing != null) { existing.shutdown(); } else { // Loaded a database for the first time, so reload pipelines for which a database was not available: Predicate predicate = p -> databaseFileName.equals(p.getDatabaseName()); - var projectId = Metadata.DEFAULT_PROJECT_ID; var ids = ingestService.getPipelineWithProcessorType( projectId, GeoIpProcessor.DatabaseUnavailableProcessor.class, @@ -479,20 +524,25 @@ public final class DatabaseNodeService implements IpDatabaseProvider { } } - void removeStaleEntries(Collection staleEntries) { + void removeStaleEntries(ProjectId projectId, Collection staleEntries) { + ConcurrentMap projectLoaders = databases.get(projectId); + assert projectLoaders != null; for (String staleEntry : staleEntries) { try { - logger.debug("database [{}] no longer exists, cleaning up...", staleEntry); - DatabaseReaderLazyLoader existing = databases.remove(staleEntry); + logger.debug("database [{}] for project [{}] no longer exists, cleaning up...", staleEntry, projectId); + DatabaseReaderLazyLoader existing = projectLoaders.remove(staleEntry); assert existing != null; existing.shutdown(true); } catch (Exception e) { - logger.error(() -> "failed to clean database [" + staleEntry + "]", e); + logger.error(() -> "failed to clean database [" + staleEntry + "] for project [" + projectId + "]", e); } } } + // This method issues search request to retrieves the database chunks from the .geoip_databases index and passes + // them to the chunkConsumer (which appends the data to a tmp file). This method forks to the generic thread pool to do the search. void retrieveDatabase( + ProjectId projectId, String databaseName, String expectedMd5, GeoIpTaskState.Metadata metadata, @@ -500,6 +550,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider { CheckedRunnable completedHandler, Consumer failureHandler ) { + // Search in the project specific .geoip_databases // Need to run the search from a different thread, since this is executed from cluster state applier thread: genericExecutor.accept(() -> { MessageDigest md = MessageDigests.md5(); @@ -517,7 +568,8 @@ public final class DatabaseNodeService implements IpDatabaseProvider { // At most once a day a few searches may be executed to fetch the new files, // so it is ok if this happens in a blocking manner on a thread from generic thread pool. // This makes the code easier to understand and maintain. - SearchResponse searchResponse = client.search(searchRequest).actionGet(); + // Probably revisit if blocking the generic thread pool is acceptable in multi-project if we see performance issue + SearchResponse searchResponse = client.projectClient(projectId).search(searchRequest).actionGet(); try { SearchHit[] hits = searchResponse.getHits().getHits(); @@ -546,8 +598,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider { }); } - public Set getAvailableDatabases() { - return Set.copyOf(databases.keySet()); + public Set getAvailableDatabases(ProjectId projectId) { + var loaders = databases.get(projectId); + return loaders == null ? Set.of() : Set.copyOf(loaders.keySet()); } public Set getConfigDatabases() { @@ -583,8 +636,8 @@ public final class DatabaseNodeService implements IpDatabaseProvider { public record ConfigDatabaseDetail(String name, @Nullable String md5, @Nullable Long buildDateInMillis, @Nullable String type) {} - public Set getFilesInTemp() { - try (Stream files = Files.list(geoipTmpDirectory)) { + public Set getFilesInTemp(ProjectId projectId) { + try (Stream files = Files.list(getDatabaseTmpDirectory(projectId))) { return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); } catch (IOException e) { throw new UncheckedIOException(e); @@ -595,4 +648,19 @@ public final class DatabaseNodeService implements IpDatabaseProvider { return cache.getCacheStats(); } + private DatabaseReaderLazyLoader getProjectLazyLoader(ProjectId projectId, String databaseName) { + return databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>()).get(databaseName); + } + + private Path getDatabaseTmpDirectory(ProjectId projectId) { + Path path = projectResolver.supportsMultipleProjects() ? geoipTmpDirectory.resolve(projectId.toString()) : geoipTmpDirectory; + try { + if (Files.exists(path) == false) { + Files.createDirectories(path); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to create geoip tmp directory for project [" + projectId + "]", e); + } + return path; + } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index e318beeb3ae6..db4dc1a9dab8 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.Booleans; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -46,6 +45,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase { private final GeoIpCache cache; private final Path databasePath; private final CheckedSupplier loader; + private final ProjectId projectId; final SetOnce databaseReader; // cache the database type so that we do not re-read it on every pipeline execution @@ -59,7 +59,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase { // than calling it on every call to cache.putIfAbsent that it makes the slight additional internal complication worth it private final String cachedDatabasePathToString; - DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5) { + DatabaseReaderLazyLoader(ProjectId projectId, GeoIpCache cache, Path databasePath, String md5) { this.cache = cache; this.databasePath = Objects.requireNonNull(databasePath); this.md5 = md5; @@ -67,6 +67,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase { this.databaseReader = new SetOnce<>(); this.databaseType = new SetOnce<>(); this.buildDate = new SetOnce<>(); + this.projectId = projectId; // cache the toString on construction this.cachedDatabasePathToString = databasePath.toString(); @@ -90,6 +91,13 @@ public class DatabaseReaderLazyLoader implements IpDatabase { return databaseType.get(); } + /** + * Prepares the database for lookup by incrementing the usage count. + * If the usage count is already negative, it indicates that the database is being closed, + * and this method will return false to indicate that no lookup should be performed. + * + * @return true if the database is ready for lookup, false if it is being closed + */ boolean preLookup() { return currentUsages.updateAndGet(current -> current < 0 ? current : current + 1) > 0; } @@ -107,9 +115,8 @@ public class DatabaseReaderLazyLoader implements IpDatabase { @Override @Nullable - @FixForMultiProject // do not use ProjectId.DEFAULT public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> { + return cache.putIfAbsent(projectId, ipAddress, cachedDatabasePathToString, ip -> { try { return responseProvider.apply(get(), ipAddress); } catch (Exception e) { @@ -146,10 +153,9 @@ public class DatabaseReaderLazyLoader implements IpDatabase { } // Visible for Testing - @FixForMultiProject // do not use ProjectId.DEFAULT protected void doShutdown() throws IOException { IOUtils.close(databaseReader.get()); - int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath); + int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(projectId, databasePath); logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath); if (deleteDatabaseFileOnShutdown) { logger.info("deleting [{}]", databasePath); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpTaskState.java index c128af69009b..7608fced2460 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpTaskState.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpTaskState.java @@ -12,9 +12,11 @@ package org.elasticsearch.ingest.geoip; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.ingest.EnterpriseGeoIpTask; @@ -146,8 +148,12 @@ class EnterpriseGeoIpTaskState implements PersistentTaskState, VersionedNamedWri * @return the geoip downloader's task state or null if there is not a state to read */ @Nullable + @FixForMultiProject(description = "Replace ProjectId.DEFAULT") static EnterpriseGeoIpTaskState getEnterpriseGeoIpTaskState(ClusterState state) { - PersistentTasksCustomMetadata.PersistentTask task = getTaskWithId(state, EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER); + PersistentTasksCustomMetadata.PersistentTask task = getTaskWithId( + state.projectState(ProjectId.DEFAULT).metadata(), + EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER + ); return (task == null) ? null : (EnterpriseGeoIpTaskState) task.getState(); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5b36e98b66f2..23affd5e4315 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -180,16 +180,23 @@ public final class GeoIpProcessor extends AbstractProcessor { private final IpDatabaseProvider ipDatabaseProvider; private final String databaseFile; private final String databaseType; + private final ProjectId projectId; - public DatabaseVerifyingSupplier(IpDatabaseProvider ipDatabaseProvider, String databaseFile, String databaseType) { + public DatabaseVerifyingSupplier( + IpDatabaseProvider ipDatabaseProvider, + String databaseFile, + String databaseType, + ProjectId projectId + ) { this.ipDatabaseProvider = ipDatabaseProvider; this.databaseFile = databaseFile; this.databaseType = databaseType; + this.projectId = projectId; } @Override public IpDatabase get() throws IOException { - IpDatabase loader = ipDatabaseProvider.getDatabase(databaseFile); + IpDatabase loader = ipDatabaseProvider.getDatabase(projectId, databaseFile); if (loader == null) { return null; } @@ -242,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor { readBooleanProperty(type, processorTag, config, "download_database_on_pipeline_creation", true); final String databaseType; - try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile)) { + try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(projectId, databaseFile)) { if (ipDatabase == null) { // It's possible that the database could be downloaded via the GeoipDownloader process and could become available // at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced @@ -302,8 +309,8 @@ public final class GeoIpProcessor extends AbstractProcessor { processorTag, description, ipField, - new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType), - () -> ipDatabaseProvider.isValid(databaseFile), + new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType, projectId), + () -> ipDatabaseProvider.isValid(projectId, databaseFile), targetField, ipDataLookup, ignoreMissing, diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java index c2858df268a0..4014291cfaf6 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java @@ -11,7 +11,7 @@ package org.elasticsearch.ingest.geoip; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; @@ -253,15 +253,16 @@ public class GeoIpTaskState implements PersistentTaskState, VersionedNamedWritea } /** - * Retrieves the geoip downloader's task state from the cluster state. This may return null in some circumstances, + * Retrieves the geoip downloader's task state from the project metadata. This may return null in some circumstances, * for example if the geoip downloader task hasn't been created yet (which it wouldn't be if it's disabled). * - * @param state the cluster state to read the task state from + * @param projectMetadata the project metatdata to read the task state from. + * @param taskId the task ID of the geoip downloader task to read the state for. * @return the geoip downloader's task state or null if there is not a state to read */ @Nullable - static GeoIpTaskState getGeoIpTaskState(ClusterState state) { - PersistentTasksCustomMetadata.PersistentTask task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER); + static GeoIpTaskState getGeoIpTaskState(ProjectMetadata projectMetadata, String taskId) { + PersistentTasksCustomMetadata.PersistentTask task = getTaskWithId(projectMetadata, taskId); return (task == null) ? null : (GeoIpTaskState) task.getState(); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 68aba5445e2a..7996faa68486 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -126,7 +126,9 @@ public class IngestGeoIpPlugin extends Plugin parameters.client, geoIpCache, parameters.genericExecutor, - parameters.ingestService.getClusterService() + parameters.ingestService.getClusterService(), + parameters.ingestService, + parameters.client.projectResolver() ); databaseRegistry.set(registry); return Map.ofEntries( @@ -139,7 +141,7 @@ public class IngestGeoIpPlugin extends Plugin public Collection createComponents(PluginServices services) { try { String nodeId = services.nodeEnvironment().nodeId(); - databaseRegistry.get().initialize(nodeId, services.resourceWatcherService(), ingestService.get()); + databaseRegistry.get().initialize(nodeId, services.resourceWatcherService()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabaseProvider.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabaseProvider.java index 9438bf74f8c1..068c4db8ed27 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabaseProvider.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabaseProvider.java @@ -9,6 +9,8 @@ package org.elasticsearch.ingest.geoip; +import org.elasticsearch.cluster.metadata.ProjectId; + /** * Provides construction and initialization logic for {@link IpDatabase} instances. */ @@ -20,15 +22,17 @@ public interface IpDatabaseProvider { * Verifying database expiration is left to each provider implementation to determine. A return value of false does not * preclude the possibility of a provider returning true in the future. * + * @param projectId projectId to look for database. * @param name the name of the database to provide. * @return false IFF the requested database file is expired, * true for all other cases (including unknown file name, file missing, wrong database type, etc). */ - Boolean isValid(String name); + Boolean isValid(ProjectId projectId, String name); /** + * @param projectId projectId to look for database. * @param name the name of the database to provide. * @return a ready-to-use database instance, or null if no database could be loaded. */ - IpDatabase getDatabase(String name); + IpDatabase getDatabase(ProjectId projectId, String name); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java index e11566d409c3..275c65521c14 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java @@ -12,6 +12,7 @@ package org.elasticsearch.ingest.geoip.stats; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -77,15 +78,16 @@ public class GeoIpStatsTransportAction extends TransportNodesAction toRelease = new CopyOnWriteArrayList<>(); @Before public void setup() throws IOException { + // cover for multi-project enable/disabled + boolean multiProject = randomBoolean(); + projectId = multiProject ? randomProjectIdOrDefault() : ProjectId.DEFAULT; + projectResolver = multiProject ? TestProjectResolvers.singleProject(projectId) : TestProjectResolvers.DEFAULT_PROJECT_ONLY; final Path geoIpConfigDir = createTempDir(); GeoIpCache cache = new GeoIpCache(1000); ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache); @@ -134,8 +142,17 @@ public class DatabaseNodeServiceTests extends ESTestCase { ingestService = mock(IngestService.class); clusterService = mock(ClusterService.class); geoIpTmpDir = createTempDir(); - databaseNodeService = new DatabaseNodeService(geoIpTmpDir, client, cache, configDatabases, Runnable::run, clusterService); - databaseNodeService.initialize("nodeId", resourceWatcherService, ingestService); + databaseNodeService = new DatabaseNodeService( + geoIpTmpDir, + client, + cache, + configDatabases, + Runnable::run, + clusterService, + ingestService, + projectResolver + ); + databaseNodeService.initialize("nodeId", resourceWatcherService); } @After @@ -148,21 +165,21 @@ public class DatabaseNodeServiceTests extends ESTestCase { public void testCheckDatabases() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14); - String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects()); PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10, 5, 14, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); - ClusterState state = createClusterState(tasksCustomMetadata); + ClusterState state = createClusterState(projectId, tasksCustomMetadata); int numPipelinesToBeReloaded = randomInt(4); List pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).toList(); when(ingestService.getPipelineWithProcessorType(any(), any(), any())).thenReturn(pipelineIds); - assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue()); + assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue()); // Nothing should be downloaded, since the database is no longer valid (older than 30 days) databaseNodeService.checkDatabases(state); - DatabaseReaderLazyLoader database = databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb"); + DatabaseReaderLazyLoader database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb"); assertThat(database, nullValue()); verify(client, times(0)).search(any()); verify(ingestService, times(0)).reloadPipeline(any(), anyString()); @@ -176,11 +193,11 @@ public class DatabaseNodeServiceTests extends ESTestCase { ); tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); - state = createClusterState(tasksCustomMetadata); + state = createClusterState(projectId, tasksCustomMetadata); // Database should be downloaded databaseNodeService.checkDatabases(state); - database = databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb"); + database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb"); assertThat(database, notNullValue()); verify(client, times(10)).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { @@ -194,12 +211,12 @@ public class DatabaseNodeServiceTests extends ESTestCase { public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9); - String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects()); PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); - ClusterState state = ClusterState.builder(createClusterState(tasksCustomMetadata)) + ClusterState state = ClusterState.builder(createClusterState(projectId, tasksCustomMetadata)) .nodes( new DiscoveryNodes.Builder().add( DiscoveryNodeUtils.builder("_id1").name("_name1").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build() @@ -208,7 +225,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { .build(); databaseNodeService.checkDatabases(state); - assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue()); + assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.toList(), empty()); @@ -217,18 +234,18 @@ public class DatabaseNodeServiceTests extends ESTestCase { public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9); - String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; + String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects()); PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); ClusterState state = ClusterState.builder(new ClusterName("name")) - .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(TYPE, tasksCustomMetadata)) .nodes(new DiscoveryNodes.Builder().add(DiscoveryNodeUtils.create("_id1")).localNodeId("_id1")) .build(); databaseNodeService.checkDatabases(state); - assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue()); + assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.toList(), empty()); @@ -238,12 +255,12 @@ public class DatabaseNodeServiceTests extends ESTestCase { public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws Exception { PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(0L, Map.of()); - ClusterState state = createClusterState(tasksCustomMetadata); + ClusterState state = createClusterState(projectId, tasksCustomMetadata); mockSearches("GeoIP2-City.mmdb", 0, 9); databaseNodeService.checkDatabases(state); - assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue()); + assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.toList(), empty()); @@ -260,7 +277,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { CheckedRunnable completedHandler = mock(CheckedRunnable.class); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); - databaseNodeService.retrieveDatabase("_name", md5, metadata, chunkConsumer, completedHandler, failureHandler); + databaseNodeService.retrieveDatabase(projectId, "_name", md5, metadata, chunkConsumer, completedHandler, failureHandler); verify(failureHandler, never()).accept(any()); verify(chunkConsumer, times(30)).accept(any()); verify(completedHandler, times(1)).run(); @@ -278,7 +295,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { CheckedRunnable completedHandler = mock(CheckedRunnable.class); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); - databaseNodeService.retrieveDatabase("_name", incorrectMd5, metadata, chunkConsumer, completedHandler, failureHandler); + databaseNodeService.retrieveDatabase(projectId, "_name", incorrectMd5, metadata, chunkConsumer, completedHandler, failureHandler); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(failureHandler, times(1)).accept(exceptionCaptor.capture()); assertThat(exceptionCaptor.getAllValues().size(), equalTo(1)); @@ -296,7 +313,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { List pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).toList(); when(ingestService.getPipelineWithProcessorType(any(), any(), any())).thenReturn(pipelineIds); - databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file")); + databaseNodeService.updateDatabase(projectId, "_name", "_md5", geoIpTmpDir.resolve("some-file")); // Updating the first time may trigger a reload. verify(clusterService, times(1)).addListener(any()); @@ -308,7 +325,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { reset(ingestService); // Subsequent updates shouldn't trigger a reload. - databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file")); + databaseNodeService.updateDatabase(projectId, "_name", "_md5", geoIpTmpDir.resolve("some-file")); verifyNoMoreInteractions(clusterService); verifyNoMoreInteractions(ingestService); } @@ -354,6 +371,7 @@ public class DatabaseNodeServiceTests extends ESTestCase { }); requestMap.put(databaseName + "_" + i, actionFuture); } + when(client.projectClient(any())).thenReturn(client); when(client.search(any())).thenAnswer(invocationOnMock -> { SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0]; TermQueryBuilder term = (TermQueryBuilder) req.source().query(); @@ -366,18 +384,10 @@ public class DatabaseNodeServiceTests extends ESTestCase { return MessageDigests.toHexString(md.digest()); } - static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) { - return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, false); - } - static ClusterState createClusterState(ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata) { return createClusterState(projectId, tasksCustomMetadata, false); } - static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata, boolean noStartedShards) { - return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, noStartedShards); - } - static ClusterState createClusterState( ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata, diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java index eee0de733f5f..aaad44bfe8fc 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.settings.ClusterSettings; @@ -76,9 +77,12 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase { private ThreadPool threadPool; private MockClient client; private EnterpriseGeoIpDownloader geoIpDownloader; + private ProjectId projectId; @Before public void setup() throws IOException { + // TODO: change to random projectId + projectId = ProjectId.DEFAULT; httpClient = mock(HttpClient.class); when(httpClient.getBytes(any(), anyString())).thenReturn( "e4a3411cdd7b21eaf18675da5a7f9f360d33c6882363b2c19c38715834c9e836 GeoIP2-City_20240709.tar.gz".getBytes(StandardCharsets.UTF_8) @@ -92,7 +96,7 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase { when(clusterService.getClusterSettings()).thenReturn( new ClusterSettings(Settings.EMPTY, Set.of(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING)) ); - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of())); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of())); when(clusterService.state()).thenReturn(state); client = new MockClient(threadPool); geoIpDownloader = new EnterpriseGeoIpDownloader( @@ -440,15 +444,15 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase { } public void testUpdateDatabasesWriteBlock() { - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of())); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of())); var geoIpIndex = state.getMetadata() - .getProject() + .getProject(projectId) .getIndicesLookup() .get(EnterpriseGeoIpDownloader.DATABASES_INDEX) .getWriteIndex() .getName(); state = ClusterState.builder(state) - .blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + .blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) .build(); when(clusterService.state()).thenReturn(state); var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases()); @@ -467,15 +471,15 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase { } public void testUpdateDatabasesIndexNotReady() throws IOException { - ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true); + ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()), true); var geoIpIndex = state.getMetadata() - .getProject() + .getProject(projectId) .getIndicesLookup() .get(EnterpriseGeoIpDownloader.DATABASES_INDEX) .getWriteIndex() .getName(); state = ClusterState.builder(state) - .blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + .blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) .build(); when(clusterService.state()).thenReturn(state); geoIpDownloader.updateDatabases(); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 65b4c52a29e0..ff3ea6d6bfd9 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -13,8 +13,12 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; @@ -54,6 +58,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -64,9 +69,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { private ConfigDatabases configDatabases; private DatabaseNodeService databaseNodeService; private ClusterService clusterService; + private ProjectId projectId; + private ProjectResolver projectResolver; @Before public void loadDatabaseReaders() throws IOException { + // cover for multi-project enable/disabled + boolean multiProject = randomBoolean(); + projectId = multiProject ? randomProjectIdOrDefault() : ProjectId.DEFAULT; + projectResolver = multiProject ? TestProjectResolvers.singleProject(projectId) : TestProjectResolvers.DEFAULT_PROJECT_ONLY; + final Path configDir = createTempDir(); geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); @@ -77,9 +89,21 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { copyDefaultDatabases(geoIpConfigDir, configDatabases); geoipTmpDir = createTempDir(); clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - databaseNodeService = new DatabaseNodeService(geoipTmpDir, client, cache, configDatabases, Runnable::run, clusterService); - databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class)); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .build(); + when(clusterService.state()).thenReturn(state); + databaseNodeService = new DatabaseNodeService( + geoipTmpDir, + client, + cache, + configDatabases, + Runnable::run, + clusterService, + mock(IngestService.class), + projectResolver + ); + databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class)); } @After @@ -95,7 +119,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("field", "_field"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -112,7 +136,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("ignore_missing", true); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -129,7 +153,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("database_file", "GeoLite2-Country.mmdb"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); @@ -147,7 +171,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("database_file", "GeoLite2-ASN.mmdb"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); @@ -162,7 +186,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("_field")); assertFalse(processor.isIgnoreMissing()); @@ -173,7 +197,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country")); @@ -190,7 +214,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { asnOnlyProperties.remove(Property.IP); String asnProperty = RandomPicks.randomFrom(Randomness.get(), asnOnlyProperties).toString(); config.put("properties", List.of(asnProperty)); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, projectId)); assertThat( e.getMessage(), equalTo( @@ -211,7 +235,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { cityOnlyProperties.remove(Property.IP); String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString(); config.put("properties", List.of(cityProperty)); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, projectId)); assertThat( e.getMessage(), equalTo("[properties] illegal property value [" + cityProperty + "]. valid values are [IP, ASN, ORGANIZATION_NAME, NETWORK]") @@ -219,14 +243,15 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuildNonExistingDbFile() throws Exception { + ProjectId projectId = randomProjectIdOrDefault(); copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir.resolve("GeoLite2-City.mmdb")); - databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb")); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb")); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "does-not-exist.mmdb"); - Processor processor = factory.create(null, null, null, config, null); + Processor processor = factory.create(null, null, null, config, projectId); assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class)); } @@ -237,7 +262,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", randomFrom(DEFAULT_DATABASES)); - Processor processor = factory.create(null, null, null, config, null); + Processor processor = factory.create(null, null, null, config, projectId); assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class)); } @@ -259,7 +284,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("properties", fieldNames); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getProperties(), equalTo(properties)); assertFalse(processor.isIgnoreMissing()); @@ -271,7 +296,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config1 = new HashMap<>(); config1.put("field", "_field"); config1.put("properties", List.of("invalid")); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId)); assertThat( e.getMessage(), equalTo( @@ -285,7 +310,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config2 = new HashMap<>(); config2.put("field", "_field"); config2.put("properties", "invalid"); - e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config2, null)); + e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config2, projectId)); assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]")); } @@ -294,14 +319,14 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { IpDatabase database = mock(IpDatabase.class); when(database.getDatabaseType()).thenReturn("some-unsupported-database"); IpDatabaseProvider provider = mock(IpDatabaseProvider.class); - when(provider.getDatabase(anyString())).thenReturn(database); + when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider); Map config1 = new HashMap<>(); config1.put("field", "_field"); config1.put("properties", List.of("ip")); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId)); assertThat( e.getMessage(), equalTo("[database_file] Unsupported database type [some-unsupported-database] for file [GeoLite2-City.mmdb]") @@ -313,14 +338,14 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { IpDatabase database = mock(IpDatabase.class); when(database.getDatabaseType()).thenReturn(null); IpDatabaseProvider provider = mock(IpDatabaseProvider.class); - when(provider.getDatabase(anyString())).thenReturn(database); + when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider); Map config1 = new HashMap<>(); config1.put("field", "_field"); config1.put("properties", List.of("ip")); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId)); assertThat(e.getMessage(), equalTo("[database_file] Unsupported database type [null] for file [GeoLite2-City.mmdb]")); } @@ -328,7 +353,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { IpDatabase database = mock(IpDatabase.class); when(database.getDatabaseType()).thenReturn("ipinfo some_ipinfo_database.mmdb-City"); IpDatabaseProvider provider = mock(IpDatabaseProvider.class); - when(provider.getDatabase(anyString())).thenReturn(database); + when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider); @@ -336,7 +361,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config1.put("database_file", "some-ipinfo-database.mmdb"); config1.put("field", "_field"); config1.put("properties", List.of("ip")); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId)); assertThat( e.getMessage(), equalTo( @@ -350,7 +375,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { IpDatabase database = mock(IpDatabase.class); when(database.getDatabaseType()).thenReturn("some_custom_database.mmdb-City"); IpDatabaseProvider provider = mock(IpDatabaseProvider.class); - when(provider.getDatabase(anyString())).thenReturn(database); + when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider); @@ -358,7 +383,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config1.put("database_file", "some-custom-database.mmdb"); config1.put("field", "_field"); config1.put("properties", List.of("ip")); - factory.create(null, null, null, config1, null); + factory.create(null, null, null, config1, projectId); assertWarnings(GeoIpProcessor.UNSUPPORTED_DATABASE_DEPRECATION_MESSAGE.replaceAll("\\{}", "some_custom_database.mmdb-City")); } @@ -374,14 +399,19 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) Client client = mock(Client.class); + ThreadPool threadPool = new TestThreadPool("test"); + ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); DatabaseNodeService databaseNodeService = new DatabaseNodeService( createTempDir(), client, cache, configDatabases, Runnable::run, - clusterService + clusterService, + mock(IngestService.class), + projectResolver ); + databaseNodeService.initialize("nodeId", resourceWatcherService); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); for (DatabaseReaderLazyLoader lazyLoader : configDatabases.getConfigDatabases().values()) { assertNull(lazyLoader.databaseReader.get()); @@ -393,35 +423,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-City.mmdb"); - final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, null); + final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId); // these are lazy loaded until first use, so we expect null here - assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-City.mmdb").databaseReader.get()); + assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-City.mmdb").databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-City.mmdb").databaseReader.get()); + assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-City.mmdb").databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); - final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config, null); + final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId); // these are lazy loaded until first use, so we expect null here - assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-Country.mmdb").databaseReader.get()); + assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-Country.mmdb").databaseReader.get()); country.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-Country.mmdb").databaseReader.get()); + assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-Country.mmdb").databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); - final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config, null); + final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId); // these are lazy loaded until first use, so we expect null here - assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-ASN.mmdb").databaseReader.get()); + assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-ASN.mmdb").databaseReader.get()); asn.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-ASN.mmdb").databaseReader.get()); + assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-ASN.mmdb").databaseReader.get()); + resourceWatcherService.close(); + threadPool.shutdown(); } public void testLoadingCustomDatabase() throws IOException { @@ -448,9 +480,11 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { cache, configDatabases, Runnable::run, - clusterService + clusterService, + mock(IngestService.class), + projectResolver ); - databaseNodeService.initialize("nodeId", resourceWatcherService, mock(IngestService.class)); + databaseNodeService.initialize("nodeId", resourceWatcherService); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); for (DatabaseReaderLazyLoader lazyLoader : configDatabases.getConfigDatabases().values()) { assertNull(lazyLoader.databaseReader.get()); @@ -462,13 +496,13 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoIP2-City.mmdb"); - final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, null); + final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId); // these are lazy loaded until first use, so we expect null here - assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb").databaseReader.get()); + assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb").databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb").databaseReader.get()); + assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb").databaseReader.get()); resourceWatcherService.close(); threadPool.shutdown(); } @@ -478,7 +512,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", randomIdentifier()); config.put("download_database_on_pipeline_creation", randomBoolean()); - factory.create(null, null, null, config, null); + factory.create(null, null, null, config, projectId); // Check all the config params were consumed. assertThat(config, anEmptyMap()); } @@ -489,7 +523,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { .updateTaskState(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpTaskState.EMPTY) .build(); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) - .metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks)) + .putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(PersistentTasksCustomMetadata.TYPE, tasks)) .build(); when(clusterService.state()).thenReturn(clusterState); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); @@ -498,7 +532,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("field", "_field"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId); processor.execute(RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("_field", "89.160.20.128")))); } @@ -507,7 +541,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService); Map config = new HashMap<>(); config.put("field", "source_field"); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId); Map document = Map.of("source_field", "89.160.20.128"); { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document)); @@ -518,7 +552,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { { copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document)); - databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); processor.execute(ingestDocument); Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Linköping")); @@ -526,7 +560,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { { // No databases are available, so assume that databases still need to be downloaded and therefore not fail: IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document)); - databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb")); + databaseNodeService.removeStaleEntries(projectId, List.of("GeoLite2-City.mmdb")); configDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), false); processor.execute(ingestDocument); Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); @@ -534,7 +568,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } { // There are databases available, but not the right one, so tag: - databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document)); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata(), hasEntry("tags", List.of("_geoip_database_unavailable_GeoLite2-City.mmdb"))); @@ -559,7 +593,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { null, null, config, - null + projectId ); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue()); @@ -570,7 +604,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir); - databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); { Map config = new HashMap<>(); @@ -581,7 +615,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { document.put("source_field", "89.160.20.128"); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get("tags"), nullValue()); Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 4548e92239ce..8e34bff46539 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -9,7 +9,9 @@ package org.elasticsearch.ingest.geoip; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; @@ -484,13 +486,14 @@ public class GeoIpProcessorTests extends ESTestCase { return () -> loader; } + @FixForMultiProject(description = "Replace DEFAULT project") private DatabaseReaderLazyLoader loader(final String databaseName, final AtomicBoolean closed) { int last = databaseName.lastIndexOf("/"); final Path path = tmpDir.resolve(last == -1 ? databaseName : databaseName.substring(last + 1)); copyDatabase(databaseName, path); final GeoIpCache cache = new GeoIpCache(1000); - return new DatabaseReaderLazyLoader(cache, path, null) { + return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null) { @Override protected void doShutdown() throws IOException { if (closed != null) { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IpinfoIpDataLookupsTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IpinfoIpDataLookupsTests.java index 5d45ff5d5585..ffcaa3960476 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IpinfoIpDataLookupsTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IpinfoIpDataLookupsTests.java @@ -14,8 +14,10 @@ import com.maxmind.db.Networks; import com.maxmind.db.Reader; import org.apache.lucene.util.Constants; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Strings; import org.elasticsearch.core.SuppressForbidden; @@ -555,10 +557,11 @@ public class IpinfoIpDataLookupsTests extends ESTestCase { } } + @FixForMultiProject(description = "Replace DEFAULT project") private DatabaseReaderLazyLoader loader(final String databaseName) { Path path = tmpDir.resolve(databaseName); copyDatabase("ipinfo/" + databaseName, path); // the ipinfo databases are prefixed on the test classpath final GeoIpCache cache = new GeoIpCache(1000); - return new DatabaseReaderLazyLoader(cache, path, null); + return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookupsTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookupsTests.java index 88cff23cd9b7..7da621c1dacc 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookupsTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookupsTests.java @@ -10,6 +10,8 @@ package org.elasticsearch.ingest.geoip; import org.apache.lucene.util.Constants; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.test.ESTestCase; import org.junit.After; @@ -330,10 +332,11 @@ public class MaxmindIpDataLookupsTests extends ESTestCase { } } + @FixForMultiProject(description = "Replace DEFAULT project") private DatabaseReaderLazyLoader loader(final String databaseName) { Path path = tmpDir.resolve(databaseName); copyDatabase(databaseName, path); final GeoIpCache cache = new GeoIpCache(1000); - return new DatabaseReaderLazyLoader(cache, path, null); + return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null); } }