Make GeoIP database node loader project-aware (#129829)

- loads the downloaded GeoIP databases from system index to ingest node file system for each project
- each project's databases are loaded to directory `tmp/geoip-databases/{nodeId}/{projectId}`
This commit is contained in:
Sam Xiao 2025-06-25 12:46:47 +08:00 committed by GitHub
parent 73d2e30c39
commit 41a47c24b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 502 additions and 310 deletions

View file

@ -15,7 +15,9 @@ import com.maxmind.geoip2.record.Country;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.xcontent.XContentType;
import java.io.BufferedInputStream;
@ -40,6 +42,10 @@ import java.util.zip.GZIPOutputStream;
import static org.hamcrest.Matchers.equalTo;
public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
@FixForMultiProject(description = "Use random project ID after ESIntegTestCase is MP enabled")
private final ProjectId projectId = ProjectId.DEFAULT;
/*
* This test makes sure that if we index an ordinary mmdb file into the .geoip_databases index, it is correctly handled upon retrieval.
*/
@ -50,7 +56,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
String databaseName = randomAlphaOfLength(20) + "-" + databaseFileName;
byte[] mmdbBytes = getBytesForFile(databaseFileName);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseName));
assertNull(databaseNodeService.getDatabase(projectId, databaseName));
int numChunks = indexData(databaseName, mmdbBytes);
/*
* If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we
@ -58,7 +64,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
*/
assertBusy(() -> {
retrieveDatabase(databaseNodeService, databaseName, mmdbBytes, numChunks);
assertNotNull(databaseNodeService.getDatabase(databaseName));
assertNotNull(databaseNodeService.getDatabase(projectId, databaseName));
assertValidDatabase(databaseNodeService, databaseName, databaseType);
});
}
@ -75,7 +81,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
byte[] mmdbBytes = getBytesForFile(databaseFileName);
byte[] gzipBytes = gzipFileBytes(databaseName, mmdbBytes);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseName));
assertNull(databaseNodeService.getDatabase(projectId, databaseName));
int numChunks = indexData(databaseName, gzipBytes);
/*
* If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we
@ -83,7 +89,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
*/
assertBusy(() -> {
retrieveDatabase(databaseNodeService, databaseName, gzipBytes, numChunks);
assertNotNull(databaseNodeService.getDatabase(databaseName));
assertNotNull(databaseNodeService.getDatabase(projectId, databaseName));
assertValidDatabase(databaseNodeService, databaseName, databaseType);
});
}
@ -93,7 +99,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
*/
private void assertValidDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, String databaseType)
throws IOException {
IpDatabase database = databaseNodeService.getDatabase(databaseFileName);
IpDatabase database = databaseNodeService.getDatabase(projectId, databaseFileName);
assertNotNull(database);
assertThat(database.getDatabaseType(), equalTo(databaseType));
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
@ -110,7 +116,7 @@ public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
private void retrieveDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, byte[] expectedBytes, int numChunks)
throws IOException {
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(1, 0, numChunks - 1, getMd5(expectedBytes), 1);
databaseNodeService.retrieveAndUpdateDatabase(databaseFileName, metadata);
databaseNodeService.retrieveAndUpdateDatabase(projectId, databaseFileName, metadata);
}
private String getMd5(byte[] bytes) {

View file

@ -11,7 +11,11 @@ package org.elasticsearch.ingest.geoip;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.IOUtils;
@ -32,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.ClusterState.builder;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE;
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase;
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases;
@ -62,31 +67,39 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
* geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
*/
public void test() throws Exception {
ProjectId projectId = randomProjectIdOrDefault();
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir, clusterService);
when(clusterService.state()).thenReturn(
builder(ClusterName.DEFAULT).putProjectMetadata(ProjectMetadata.builder(projectId).build()).build()
);
DatabaseNodeService databaseNodeService = createRegistry(
geoIpConfigDir,
geoIpTmpDir,
clusterService,
TestProjectResolvers.singleProject(projectId)
);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseNodeService);
databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(projectId, databaseNodeService);
final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create(
null,
"_tag",
null,
new HashMap<>(Map.of("field", "_field")),
null
projectId
);
final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create(
null,
"_tag",
null,
new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb")),
null
projectId
);
final AtomicBoolean completed = new AtomicBoolean(false);
@ -134,9 +147,9 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
Thread updateDatabaseThread = new Thread(() -> {
for (int i = 0; i < numberOfDatabaseUpdates; i++) {
try {
DatabaseReaderLazyLoader previous1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader previous1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb");
if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) {
databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
databaseNodeService.removeStaleEntries(projectId, List.of("GeoLite2-City.mmdb"));
assertBusy(() -> {
// lazy loader may still be in use by an ingest thread,
// wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
@ -146,22 +159,32 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
});
} else {
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase(
projectId,
"GeoLite2-City.mmdb",
"md5",
geoIpTmpDir.resolve("GeoLite2-City.mmdb")
);
}
DatabaseReaderLazyLoader previous2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader previous2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb");
copyDatabase(
i % 2 == 0 ? "GeoIP2-City-Test.mmdb" : "GeoLite2-City-Test.mmdb",
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")
);
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase(
projectId,
"GeoLite2-City-Test.mmdb",
"md5",
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")
);
DatabaseReaderLazyLoader current1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader current1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb");
assertThat(current1, not(sameInstance(previous1)));
assertThat(current2, not(sameInstance(previous2)));
// lazy load type and reader:
lazyLoadReaders(databaseNodeService);
lazyLoadReaders(projectId, databaseNodeService);
} catch (Exception | AssertionError e) {
logger.error("error in update databases thread after run [" + i + "]", e);
failureHolder2.set(e);
@ -193,8 +216,12 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
IOUtils.rm(geoIpConfigDir, geoIpTmpDir);
}
private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir, ClusterService clusterService)
throws IOException {
private static DatabaseNodeService createRegistry(
Path geoIpConfigDir,
Path geoIpTmpDir,
ClusterService clusterService,
ProjectResolver projectResolver
) throws IOException {
GeoIpCache cache = new GeoIpCache(0);
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
copyDefaultDatabases(geoIpConfigDir, configDatabases);
@ -204,19 +231,21 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
cache,
configDatabases,
Runnable::run,
clusterService
clusterService,
mock(IngestService.class),
projectResolver
);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class));
return databaseNodeService;
}
private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get("GeoLite2-City.mmdb") != null) {
databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
private static void lazyLoadReaders(ProjectId projectId, DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get(projectId, "GeoLite2-City.mmdb") != null) {
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}
databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}
}

View file

@ -10,6 +10,8 @@ package org.elasticsearch.ingest.geoip;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.env.Environment;
import org.elasticsearch.watcher.FileChangesListener;
import org.elasticsearch.watcher.FileWatcher;
@ -69,12 +71,13 @@ final class ConfigDatabases implements Closeable {
return configDatabases;
}
@FixForMultiProject(description = "Replace DEFAULT project")
void updateDatabase(Path file, boolean update) {
String databaseFileName = file.getFileName().toString();
try {
if (update) {
logger.info("database file changed [{}], reloading database...", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, null);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, file, null);
DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader);
if (existing != null) {
existing.shutdown();
@ -90,6 +93,7 @@ final class ConfigDatabases implements Closeable {
}
}
@FixForMultiProject(description = "Replace DEFAULT project")
Map<String, DatabaseReaderLazyLoader> initConfigDatabases() throws IOException {
Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>();
@ -103,7 +107,7 @@ final class ConfigDatabases implements Closeable {
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
assert Files.exists(databasePath);
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath, null);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, databasePath, null);
databases.put(databaseFileName, loader);
}
}

View file

@ -17,8 +17,10 @@ import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -26,7 +28,6 @@ import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
@ -57,6 +58,7 @@ import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -72,6 +74,7 @@ import java.util.zip.GZIPInputStream;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpTaskState.getEnterpriseGeoIpTaskState;
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.getTaskId;
import static org.elasticsearch.ingest.geoip.GeoIpTaskState.getGeoIpTaskState;
/**
@ -104,16 +107,19 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
private final ConfigDatabases configDatabases;
private final Consumer<Runnable> genericExecutor;
private final ClusterService clusterService;
private IngestService ingestService;
private final IngestService ingestService;
private final ProjectResolver projectResolver;
private final ConcurrentMap<String, DatabaseReaderLazyLoader> databases = new ConcurrentHashMap<>();
private final ConcurrentMap<ProjectId, ConcurrentMap<String, DatabaseReaderLazyLoader>> databases = new ConcurrentHashMap<>();
DatabaseNodeService(
Environment environment,
Client client,
GeoIpCache cache,
Consumer<Runnable> genericExecutor,
ClusterService clusterService
ClusterService clusterService,
IngestService ingestService,
ProjectResolver projectResolver
) {
this(
environment.tmpDir(),
@ -121,7 +127,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
cache,
new ConfigDatabases(environment, cache),
genericExecutor,
clusterService
clusterService,
ingestService,
projectResolver
);
}
@ -131,7 +139,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
GeoIpCache cache,
ConfigDatabases configDatabases,
Consumer<Runnable> genericExecutor,
ClusterService clusterService
ClusterService clusterService,
IngestService ingestService,
ProjectResolver projectResolver
) {
this.client = client;
this.cache = cache;
@ -139,11 +149,14 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
this.configDatabases = configDatabases;
this.genericExecutor = genericExecutor;
this.clusterService = clusterService;
this.ingestService = ingestService;
this.projectResolver = projectResolver;
}
public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestServiceArg) throws IOException {
public void initialize(String nodeId, ResourceWatcherService resourceWatcher) throws IOException {
configDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
// delete all stale files in the geoip tmp directory
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
@ -164,6 +177,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
@Override
public FileVisitResult visitFileFailed(Path file, IOException e) {
if (e instanceof NoSuchFileException == false) {
// parameterized log fails logger check, see https://github.com/elastic/elasticsearch/issues/104782
logger.warn("can't delete stale file [" + file + "]", e);
}
return FileVisitResult.CONTINUE;
@ -178,16 +192,15 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
Files.createDirectories(geoipTmpDirectory);
}
logger.debug("initialized database node service, using geoip-databases directory [{}]", geoipTmpDirectory);
this.ingestService = ingestServiceArg;
clusterService.addListener(event -> checkDatabases(event.state()));
}
@Override
public Boolean isValid(String databaseFile) {
public Boolean isValid(ProjectId projectId, String databaseFile) {
ClusterState currentState = clusterService.state();
assert currentState != null;
ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId);
GeoIpTaskState state = getGeoIpTaskState(currentState);
GeoIpTaskState state = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects()));
if (state == null) {
return true;
}
@ -210,11 +223,11 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
}
// for testing only:
DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(String name) {
DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(ProjectId projectId, String name) {
// There is a need for reference counting in order to avoid using an instance
// that gets closed while using it. (this can happen during a database update)
while (true) {
DatabaseReaderLazyLoader instance = databases.get(name);
DatabaseReaderLazyLoader instance = getProjectLazyLoader(projectId, name);
if (instance == null) {
instance = configDatabases.getDatabase(name);
}
@ -227,25 +240,29 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
}
@Override
public IpDatabase getDatabase(String name) {
return getDatabaseReaderLazyLoader(name);
public IpDatabase getDatabase(ProjectId projectId, String name) {
return getDatabaseReaderLazyLoader(projectId, name);
}
List<DatabaseReaderLazyLoader> getAllDatabases() {
List<DatabaseReaderLazyLoader> all = new ArrayList<>(configDatabases.getConfigDatabases().values());
this.databases.forEach((key, value) -> all.add(value));
this.databases.forEach((key, value) -> all.addAll(value.values()));
return all;
}
// for testing only:
DatabaseReaderLazyLoader get(String key) {
return databases.get(key);
DatabaseReaderLazyLoader get(ProjectId projectId, String key) {
return databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>()).get(key);
}
public void shutdown() throws IOException {
// this is a little 'fun' looking, but it's just adapting IOUtils.close() into something
// that can call a bunch of shutdown methods (rather than close methods)
final var loadersToShutdown = databases.values().stream().map(ShutdownCloseable::new).toList();
final var loadersToShutdown = databases.values()
.stream()
.flatMap(map -> map.values().stream())
.map(ShutdownCloseable::new)
.toList();
databases.clear();
IOUtils.close(loadersToShutdown);
}
@ -270,103 +287,120 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
return;
}
PersistentTasksCustomMetadata persistentTasks = state.metadata().getProject().custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasks == null) {
logger.trace("Not checking databases because persistent tasks are null");
return;
}
// Optimization: only load the .geoip_databases for projects that are allocated to this node
for (ProjectMetadata projectMetadata : state.metadata().projects().values()) {
ProjectId projectId = projectMetadata.id();
IndexAbstraction databasesAbstraction = state.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (databasesAbstraction == null) {
logger.trace("Not checking databases because geoip databases index does not exist");
return;
} else {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(databasesIndex);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
logger.trace("Not checking databases because geoip databases index does not have all active primary shards");
PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasks == null) {
logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId);
continue;
}
IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (databasesAbstraction == null) {
logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId);
return;
} else {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
logger.trace(
"Not checking databases because geoip databases index does not have all active primary shards for"
+ " project [{}]",
projectId
);
return;
}
}
// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();
// process the geoip task state for the (ordinary) geoip downloader
{
GeoIpTaskState taskState = getGeoIpTaskState(
projectMetadata,
getTaskId(projectId, projectResolver.supportsMultipleProjects())
);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = GeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}
// process the geoip task state for the enterprise geoip downloader
{
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = EnterpriseGeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}
// run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task
// has downloaded a new version of the databases
validMetadatas.forEach(e -> {
String name = e.v1();
GeoIpTaskState.Metadata metadata = e.v2();
DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name);
String remoteMd5 = metadata.md5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
return;
}
try {
retrieveAndUpdateDatabase(projectId, name, metadata);
} catch (Exception ex) {
logger.error(() -> "failed to retrieve database [" + name + "]", ex);
}
});
// TODO perhaps we need to handle the license flap persistent task state better than we do
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader
// handled if they fall out -- which means we need to track that in the databases map itself
// start with the list of all databases we currently know about in this service,
// then drop the ones that didn't check out as valid from the task states
if (databases.containsKey(projectId)) {
Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet());
staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
removeStaleEntries(projectId, staleDatabases);
}
}
// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();
// process the geoip task state for the (ordinary) geoip downloader
{
GeoIpTaskState taskState = getGeoIpTaskState(state);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = GeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}
// process the geoip task state for the enterprise geoip downloader
{
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = EnterpriseGeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}
// run through all the valid metadatas, regardless of source, and retrieve them
validMetadatas.forEach(e -> {
String name = e.v1();
GeoIpTaskState.Metadata metadata = e.v2();
DatabaseReaderLazyLoader reference = databases.get(name);
String remoteMd5 = metadata.md5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
return;
}
try {
retrieveAndUpdateDatabase(name, metadata);
} catch (Exception ex) {
logger.error(() -> "failed to retrieve database [" + name + "]", ex);
}
});
// TODO perhaps we need to handle the license flap persistent task state better than we do
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader
// handled if they fall out -- which means we need to track that in the databases map itself
// start with the list of all databases we currently know about in this service,
// then drop the ones that didn't check out as valid from the task states
List<String> staleEntries = new ArrayList<>(databases.keySet());
staleEntries.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
removeStaleEntries(staleEntries);
}
void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata metadata) throws IOException {
void retrieveAndUpdateDatabase(ProjectId projectId, String databaseName, GeoIpTaskState.Metadata metadata) throws IOException {
logger.trace("retrieving database [{}]", databaseName);
final String recordedMd5 = metadata.md5();
// This acts as a lock, if this method for a specific db is executed later and downloaded for this db is still ongoing then
// FileAlreadyExistsException is thrown and this method silently returns.
Path databaseTmpDirectory = getDatabaseTmpDirectory(projectId);
// This acts as a lock to avoid multiple retrievals of the same database at the same time. If this method for a specific db is
// executed later again while a previous retrival of this db is still ongoing then FileAlreadyExistsException is thrown and
// this method silently returns.
// (this method is never invoked concurrently and is invoked by a cluster state applier thread)
final Path retrievedFile;
try {
retrievedFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.retrieved"));
retrievedFile = Files.createFile(databaseTmpDirectory.resolve(databaseName + ".tmp.retrieved"));
} catch (FileAlreadyExistsException e) {
logger.debug("database update [{}] already in progress, skipping...", databaseName);
return;
@ -378,75 +412,86 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
// Thread 2 may have updated the databases map after thread 1 detects that there is no entry (or md5 mismatch) for a database.
// If thread 2 then also removes the tmp file before thread 1 attempts to create it then we're about to retrieve the same database
// twice. This check is here to avoid this:
DatabaseReaderLazyLoader lazyLoader = databases.get(databaseName);
DatabaseReaderLazyLoader lazyLoader = getProjectLazyLoader(projectId, databaseName);
if (lazyLoader != null && recordedMd5.equals(lazyLoader.getMd5())) {
logger.debug("deleting tmp file because database [{}] has already been updated.", databaseName);
Files.delete(retrievedFile);
return;
}
final Path databaseTmpFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp"));
final Path databaseTmpFile = Files.createFile(databaseTmpDirectory.resolve(databaseName + ".tmp"));
logger.debug("retrieving database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, retrievedFile);
retrieveDatabase(databaseName, recordedMd5, metadata, bytes -> Files.write(retrievedFile, bytes, StandardOpenOption.APPEND), () -> {
final Path databaseFile = geoipTmpDirectory.resolve(databaseName);
retrieveDatabase(
projectId,
databaseName,
recordedMd5,
metadata,
bytes -> Files.write(retrievedFile, bytes, StandardOpenOption.APPEND),
() -> {
final Path databaseFile = databaseTmpDirectory.resolve(databaseName);
boolean isTarGz = MMDBUtil.isGzip(retrievedFile);
if (isTarGz) {
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
// we store mmdb file as is and prepend database name to all other entries to avoid conflicts
logger.debug("decompressing [{}]", retrievedFile.getFileName());
try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(retrievedFile), 8192))) {
TarInputStream.TarEntry entry;
while ((entry = is.getNextEntry()) != null) {
// there might be ./ entry in tar, we should skip it
if (entry.notFile()) {
continue;
}
// flatten structure, remove any directories present from the path (should be ./ only)
String name = entry.name().substring(entry.name().lastIndexOf('/') + 1);
if (name.startsWith(databaseName)) {
Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
} else {
Files.copy(is, geoipTmpDirectory.resolve(databaseName + "_" + name), StandardCopyOption.REPLACE_EXISTING);
boolean isTarGz = MMDBUtil.isGzip(retrievedFile);
if (isTarGz) {
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
// we store mmdb file as is and prepend database name to all other entries to avoid conflicts
logger.debug("decompressing [{}]", retrievedFile.getFileName());
try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(retrievedFile), 8192))) {
TarInputStream.TarEntry entry;
while ((entry = is.getNextEntry()) != null) {
// there might be ./ entry in tar, we should skip it
if (entry.notFile()) {
continue;
}
// flatten structure, remove any directories present from the path (should be ./ only)
String name = entry.name().substring(entry.name().lastIndexOf('/') + 1);
if (name.startsWith(databaseName)) {
Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
} else {
Files.copy(
is,
databaseTmpDirectory.resolve(databaseName + "_" + name),
StandardCopyOption.REPLACE_EXISTING
);
}
}
}
} else {
/*
* Given that this is not code that will be called extremely frequently, we copy the file to the
* expected location here in order to avoid making the rest of the code more complex to avoid this.
*/
Files.copy(retrievedFile, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
}
// finally, atomically move some-database.mmdb.tmp to some-database.mmdb
logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(projectId, databaseName, recordedMd5, databaseFile);
Files.delete(retrievedFile);
},
failure -> {
logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
try {
Files.deleteIfExists(databaseTmpFile);
Files.deleteIfExists(retrievedFile);
} catch (IOException ioe) {
ioe.addSuppressed(failure);
logger.error("unable to delete tmp database file after failure", ioe);
}
} else {
/*
* Given that this is not code that will be called extremely frequently, we copy the file to the expected location here in
* order to avoid making the rest of the code more complex to avoid this.
*/
Files.copy(retrievedFile, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
}
// finally, atomically move some-database.mmdb.tmp to some-database.mmdb
logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile);
Files.delete(retrievedFile);
}, failure -> {
logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
try {
Files.deleteIfExists(databaseTmpFile);
Files.deleteIfExists(retrievedFile);
} catch (IOException ioe) {
ioe.addSuppressed(failure);
logger.error("unable to delete tmp database file after failure", ioe);
}
});
);
}
@FixForMultiProject // Don't use default project id
void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
void updateDatabase(ProjectId projectId, String databaseFileName, String recordedMd5, Path file) {
try {
logger.debug("starting reload of changed database file [{}]", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(projectId, cache, file, recordedMd5);
DatabaseReaderLazyLoader existing = databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>())
.put(databaseFileName, loader);
if (existing != null) {
existing.shutdown();
} else {
// Loaded a database for the first time, so reload pipelines for which a database was not available:
Predicate<GeoIpProcessor.DatabaseUnavailableProcessor> predicate = p -> databaseFileName.equals(p.getDatabaseName());
var projectId = Metadata.DEFAULT_PROJECT_ID;
var ids = ingestService.getPipelineWithProcessorType(
projectId,
GeoIpProcessor.DatabaseUnavailableProcessor.class,
@ -479,20 +524,25 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
}
}
void removeStaleEntries(Collection<String> staleEntries) {
void removeStaleEntries(ProjectId projectId, Collection<String> staleEntries) {
ConcurrentMap<String, DatabaseReaderLazyLoader> projectLoaders = databases.get(projectId);
assert projectLoaders != null;
for (String staleEntry : staleEntries) {
try {
logger.debug("database [{}] no longer exists, cleaning up...", staleEntry);
DatabaseReaderLazyLoader existing = databases.remove(staleEntry);
logger.debug("database [{}] for project [{}] no longer exists, cleaning up...", staleEntry, projectId);
DatabaseReaderLazyLoader existing = projectLoaders.remove(staleEntry);
assert existing != null;
existing.shutdown(true);
} catch (Exception e) {
logger.error(() -> "failed to clean database [" + staleEntry + "]", e);
logger.error(() -> "failed to clean database [" + staleEntry + "] for project [" + projectId + "]", e);
}
}
}
// This method issues search request to retrieves the database chunks from the .geoip_databases index and passes
// them to the chunkConsumer (which appends the data to a tmp file). This method forks to the generic thread pool to do the search.
void retrieveDatabase(
ProjectId projectId,
String databaseName,
String expectedMd5,
GeoIpTaskState.Metadata metadata,
@ -500,6 +550,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
CheckedRunnable<Exception> completedHandler,
Consumer<Exception> failureHandler
) {
// Search in the project specific .geoip_databases
// Need to run the search from a different thread, since this is executed from cluster state applier thread:
genericExecutor.accept(() -> {
MessageDigest md = MessageDigests.md5();
@ -517,7 +568,8 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
// At most once a day a few searches may be executed to fetch the new files,
// so it is ok if this happens in a blocking manner on a thread from generic thread pool.
// This makes the code easier to understand and maintain.
SearchResponse searchResponse = client.search(searchRequest).actionGet();
// Probably revisit if blocking the generic thread pool is acceptable in multi-project if we see performance issue
SearchResponse searchResponse = client.projectClient(projectId).search(searchRequest).actionGet();
try {
SearchHit[] hits = searchResponse.getHits().getHits();
@ -546,8 +598,9 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
});
}
public Set<String> getAvailableDatabases() {
return Set.copyOf(databases.keySet());
public Set<String> getAvailableDatabases(ProjectId projectId) {
var loaders = databases.get(projectId);
return loaders == null ? Set.of() : Set.copyOf(loaders.keySet());
}
public Set<String> getConfigDatabases() {
@ -583,8 +636,8 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
public record ConfigDatabaseDetail(String name, @Nullable String md5, @Nullable Long buildDateInMillis, @Nullable String type) {}
public Set<String> getFilesInTemp() {
try (Stream<Path> files = Files.list(geoipTmpDirectory)) {
public Set<String> getFilesInTemp(ProjectId projectId) {
try (Stream<Path> files = Files.list(getDatabaseTmpDirectory(projectId))) {
return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet());
} catch (IOException e) {
throw new UncheckedIOException(e);
@ -595,4 +648,19 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
return cache.getCacheStats();
}
private DatabaseReaderLazyLoader getProjectLazyLoader(ProjectId projectId, String databaseName) {
return databases.computeIfAbsent(projectId, (k) -> new ConcurrentHashMap<>()).get(databaseName);
}
private Path getDatabaseTmpDirectory(ProjectId projectId) {
Path path = projectResolver.supportsMultipleProjects() ? geoipTmpDirectory.resolve(projectId.toString()) : geoipTmpDirectory;
try {
if (Files.exists(path) == false) {
Files.createDirectories(path);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to create geoip tmp directory for project [" + projectId + "]", e);
}
return path;
}
}

View file

@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
@ -46,6 +45,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<Reader, IOException> loader;
private final ProjectId projectId;
final SetOnce<Reader> databaseReader;
// cache the database type so that we do not re-read it on every pipeline execution
@ -59,7 +59,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
// than calling it on every call to cache.putIfAbsent that it makes the slight additional internal complication worth it
private final String cachedDatabasePathToString;
DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5) {
DatabaseReaderLazyLoader(ProjectId projectId, GeoIpCache cache, Path databasePath, String md5) {
this.cache = cache;
this.databasePath = Objects.requireNonNull(databasePath);
this.md5 = md5;
@ -67,6 +67,7 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
this.databaseReader = new SetOnce<>();
this.databaseType = new SetOnce<>();
this.buildDate = new SetOnce<>();
this.projectId = projectId;
// cache the toString on construction
this.cachedDatabasePathToString = databasePath.toString();
@ -90,6 +91,13 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
return databaseType.get();
}
/**
* Prepares the database for lookup by incrementing the usage count.
* If the usage count is already negative, it indicates that the database is being closed,
* and this method will return false to indicate that no lookup should be performed.
*
* @return true if the database is ready for lookup, false if it is being closed
*/
boolean preLookup() {
return currentUsages.updateAndGet(current -> current < 0 ? current : current + 1) > 0;
}
@ -107,9 +115,8 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
@Override
@Nullable
@FixForMultiProject // do not use ProjectId.DEFAULT
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> {
return cache.putIfAbsent(projectId, ipAddress, cachedDatabasePathToString, ip -> {
try {
return responseProvider.apply(get(), ipAddress);
} catch (Exception e) {
@ -146,10 +153,9 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
}
// Visible for Testing
@FixForMultiProject // do not use ProjectId.DEFAULT
protected void doShutdown() throws IOException {
IOUtils.close(databaseReader.get());
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath);
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(projectId, databasePath);
logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
if (deleteDatabaseFileOnShutdown) {
logger.info("deleting [{}]", databasePath);

View file

@ -12,9 +12,11 @@ package org.elasticsearch.ingest.geoip;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.ingest.EnterpriseGeoIpTask;
@ -146,8 +148,12 @@ class EnterpriseGeoIpTaskState implements PersistentTaskState, VersionedNamedWri
* @return the geoip downloader's task state or null if there is not a state to read
*/
@Nullable
@FixForMultiProject(description = "Replace ProjectId.DEFAULT")
static EnterpriseGeoIpTaskState getEnterpriseGeoIpTaskState(ClusterState state) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER);
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(
state.projectState(ProjectId.DEFAULT).metadata(),
EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER
);
return (task == null) ? null : (EnterpriseGeoIpTaskState) task.getState();
}

View file

@ -180,16 +180,23 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final IpDatabaseProvider ipDatabaseProvider;
private final String databaseFile;
private final String databaseType;
private final ProjectId projectId;
public DatabaseVerifyingSupplier(IpDatabaseProvider ipDatabaseProvider, String databaseFile, String databaseType) {
public DatabaseVerifyingSupplier(
IpDatabaseProvider ipDatabaseProvider,
String databaseFile,
String databaseType,
ProjectId projectId
) {
this.ipDatabaseProvider = ipDatabaseProvider;
this.databaseFile = databaseFile;
this.databaseType = databaseType;
this.projectId = projectId;
}
@Override
public IpDatabase get() throws IOException {
IpDatabase loader = ipDatabaseProvider.getDatabase(databaseFile);
IpDatabase loader = ipDatabaseProvider.getDatabase(projectId, databaseFile);
if (loader == null) {
return null;
}
@ -242,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
readBooleanProperty(type, processorTag, config, "download_database_on_pipeline_creation", true);
final String databaseType;
try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile)) {
try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(projectId, databaseFile)) {
if (ipDatabase == null) {
// It's possible that the database could be downloaded via the GeoipDownloader process and could become available
// at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced
@ -302,8 +309,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
processorTag,
description,
ipField,
new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType),
() -> ipDatabaseProvider.isValid(databaseFile),
new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType, projectId),
() -> ipDatabaseProvider.isValid(projectId, databaseFile),
targetField,
ipDataLookup,
ignoreMissing,

View file

@ -11,7 +11,7 @@ package org.elasticsearch.ingest.geoip;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
@ -253,15 +253,16 @@ public class GeoIpTaskState implements PersistentTaskState, VersionedNamedWritea
}
/**
* Retrieves the geoip downloader's task state from the cluster state. This may return null in some circumstances,
* Retrieves the geoip downloader's task state from the project metadata. This may return null in some circumstances,
* for example if the geoip downloader task hasn't been created yet (which it wouldn't be if it's disabled).
*
* @param state the cluster state to read the task state from
* @param projectMetadata the project metatdata to read the task state from.
* @param taskId the task ID of the geoip downloader task to read the state for.
* @return the geoip downloader's task state or null if there is not a state to read
*/
@Nullable
static GeoIpTaskState getGeoIpTaskState(ClusterState state) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
static GeoIpTaskState getGeoIpTaskState(ProjectMetadata projectMetadata, String taskId) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(projectMetadata, taskId);
return (task == null) ? null : (GeoIpTaskState) task.getState();
}

View file

@ -126,7 +126,9 @@ public class IngestGeoIpPlugin extends Plugin
parameters.client,
geoIpCache,
parameters.genericExecutor,
parameters.ingestService.getClusterService()
parameters.ingestService.getClusterService(),
parameters.ingestService,
parameters.client.projectResolver()
);
databaseRegistry.set(registry);
return Map.ofEntries(
@ -139,7 +141,7 @@ public class IngestGeoIpPlugin extends Plugin
public Collection<?> createComponents(PluginServices services) {
try {
String nodeId = services.nodeEnvironment().nodeId();
databaseRegistry.get().initialize(nodeId, services.resourceWatcherService(), ingestService.get());
databaseRegistry.get().initialize(nodeId, services.resourceWatcherService());
} catch (IOException e) {
throw new UncheckedIOException(e);
}

View file

@ -9,6 +9,8 @@
package org.elasticsearch.ingest.geoip;
import org.elasticsearch.cluster.metadata.ProjectId;
/**
* Provides construction and initialization logic for {@link IpDatabase} instances.
*/
@ -20,15 +22,17 @@ public interface IpDatabaseProvider {
* Verifying database expiration is left to each provider implementation to determine. A return value of <code>false</code> does not
* preclude the possibility of a provider returning <code>true</code> in the future.
*
* @param projectId projectId to look for database.
* @param name the name of the database to provide.
* @return <code>false</code> IFF the requested database file is expired,
* <code>true</code> for all other cases (including unknown file name, file missing, wrong database type, etc).
*/
Boolean isValid(String name);
Boolean isValid(ProjectId projectId, String name);
/**
* @param projectId projectId to look for database.
* @param name the name of the database to provide.
* @return a ready-to-use database instance, or <code>null</code> if no database could be loaded.
*/
IpDatabase getDatabase(String name);
IpDatabase getDatabase(ProjectId projectId, String name);
}

View file

@ -12,6 +12,7 @@ package org.elasticsearch.ingest.geoip.stats;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
@ -77,15 +78,16 @@ public class GeoIpStatsTransportAction extends TransportNodesAction<Request, Res
@Override
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getTask(projectResolver.getProjectId());
ProjectId projectId = projectResolver.getProjectId();
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getTask(projectId);
GeoIpDownloaderStats downloaderStats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus();
CacheStats cacheStats = registry.getCacheStats();
return new NodeResponse(
transportService.getLocalNode(),
downloaderStats,
cacheStats,
registry.getAvailableDatabases(),
registry.getFilesInTemp(),
registry.getAvailableDatabases(projectId),
registry.getFilesInTemp(projectId),
registry.getConfigDatabases()
);
}

View file

@ -21,12 +21,13 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -88,6 +89,7 @@ import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.getTaskId;
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases;
import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE;
@ -116,11 +118,17 @@ public class DatabaseNodeServiceTests extends ESTestCase {
private ResourceWatcherService resourceWatcherService;
private IngestService ingestService;
private ClusterService clusterService;
private ProjectId projectId;
private ProjectResolver projectResolver;
private final Collection<Releasable> toRelease = new CopyOnWriteArrayList<>();
@Before
public void setup() throws IOException {
// cover for multi-project enable/disabled
boolean multiProject = randomBoolean();
projectId = multiProject ? randomProjectIdOrDefault() : ProjectId.DEFAULT;
projectResolver = multiProject ? TestProjectResolvers.singleProject(projectId) : TestProjectResolvers.DEFAULT_PROJECT_ONLY;
final Path geoIpConfigDir = createTempDir();
GeoIpCache cache = new GeoIpCache(1000);
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
@ -134,8 +142,17 @@ public class DatabaseNodeServiceTests extends ESTestCase {
ingestService = mock(IngestService.class);
clusterService = mock(ClusterService.class);
geoIpTmpDir = createTempDir();
databaseNodeService = new DatabaseNodeService(geoIpTmpDir, client, cache, configDatabases, Runnable::run, clusterService);
databaseNodeService.initialize("nodeId", resourceWatcherService, ingestService);
databaseNodeService = new DatabaseNodeService(
geoIpTmpDir,
client,
cache,
configDatabases,
Runnable::run,
clusterService,
ingestService,
projectResolver
);
databaseNodeService.initialize("nodeId", resourceWatcherService);
}
@After
@ -148,21 +165,21 @@ public class DatabaseNodeServiceTests extends ESTestCase {
public void testCheckDatabases() throws Exception {
String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14);
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects());
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10, 5, 14, md5, 10))));
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
ClusterState state = createClusterState(tasksCustomMetadata);
ClusterState state = createClusterState(projectId, tasksCustomMetadata);
int numPipelinesToBeReloaded = randomInt(4);
List<String> pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).toList();
when(ingestService.getPipelineWithProcessorType(any(), any(), any())).thenReturn(pipelineIds);
assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue());
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
// Nothing should be downloaded, since the database is no longer valid (older than 30 days)
databaseNodeService.checkDatabases(state);
DatabaseReaderLazyLoader database = databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb");
DatabaseReaderLazyLoader database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb");
assertThat(database, nullValue());
verify(client, times(0)).search(any());
verify(ingestService, times(0)).reloadPipeline(any(), anyString());
@ -176,11 +193,11 @@ public class DatabaseNodeServiceTests extends ESTestCase {
);
tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
state = createClusterState(tasksCustomMetadata);
state = createClusterState(projectId, tasksCustomMetadata);
// Database should be downloaded
databaseNodeService.checkDatabases(state);
database = databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb");
database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb");
assertThat(database, notNullValue());
verify(client, times(10)).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
@ -194,12 +211,12 @@ public class DatabaseNodeServiceTests extends ESTestCase {
public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Exception {
String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects());
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
ClusterState state = ClusterState.builder(createClusterState(tasksCustomMetadata))
ClusterState state = ClusterState.builder(createClusterState(projectId, tasksCustomMetadata))
.nodes(
new DiscoveryNodes.Builder().add(
DiscoveryNodeUtils.builder("_id1").name("_name1").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build()
@ -208,7 +225,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
.build();
databaseNodeService.checkDatabases(state);
assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue());
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.toList(), empty());
@ -217,18 +234,18 @@ public class DatabaseNodeServiceTests extends ESTestCase {
public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Exception {
String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
String taskId = getTaskId(projectId, projectResolver.supportsMultipleProjects());
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
ClusterState state = ClusterState.builder(new ClusterName("name"))
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build())
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(TYPE, tasksCustomMetadata))
.nodes(new DiscoveryNodes.Builder().add(DiscoveryNodeUtils.create("_id1")).localNodeId("_id1"))
.build();
databaseNodeService.checkDatabases(state);
assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue());
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.toList(), empty());
@ -238,12 +255,12 @@ public class DatabaseNodeServiceTests extends ESTestCase {
public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws Exception {
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(0L, Map.of());
ClusterState state = createClusterState(tasksCustomMetadata);
ClusterState state = createClusterState(projectId, tasksCustomMetadata);
mockSearches("GeoIP2-City.mmdb", 0, 9);
databaseNodeService.checkDatabases(state);
assertThat(databaseNodeService.getDatabase("GeoIP2-City.mmdb"), nullValue());
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
verify(client, never()).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.toList(), empty());
@ -260,7 +277,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
CheckedRunnable<Exception> completedHandler = mock(CheckedRunnable.class);
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
databaseNodeService.retrieveDatabase("_name", md5, metadata, chunkConsumer, completedHandler, failureHandler);
databaseNodeService.retrieveDatabase(projectId, "_name", md5, metadata, chunkConsumer, completedHandler, failureHandler);
verify(failureHandler, never()).accept(any());
verify(chunkConsumer, times(30)).accept(any());
verify(completedHandler, times(1)).run();
@ -278,7 +295,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
CheckedRunnable<Exception> completedHandler = mock(CheckedRunnable.class);
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
databaseNodeService.retrieveDatabase("_name", incorrectMd5, metadata, chunkConsumer, completedHandler, failureHandler);
databaseNodeService.retrieveDatabase(projectId, "_name", incorrectMd5, metadata, chunkConsumer, completedHandler, failureHandler);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(failureHandler, times(1)).accept(exceptionCaptor.capture());
assertThat(exceptionCaptor.getAllValues().size(), equalTo(1));
@ -296,7 +313,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
List<String> pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).toList();
when(ingestService.getPipelineWithProcessorType(any(), any(), any())).thenReturn(pipelineIds);
databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));
databaseNodeService.updateDatabase(projectId, "_name", "_md5", geoIpTmpDir.resolve("some-file"));
// Updating the first time may trigger a reload.
verify(clusterService, times(1)).addListener(any());
@ -308,7 +325,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
reset(ingestService);
// Subsequent updates shouldn't trigger a reload.
databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));
databaseNodeService.updateDatabase(projectId, "_name", "_md5", geoIpTmpDir.resolve("some-file"));
verifyNoMoreInteractions(clusterService);
verifyNoMoreInteractions(ingestService);
}
@ -354,6 +371,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
});
requestMap.put(databaseName + "_" + i, actionFuture);
}
when(client.projectClient(any())).thenReturn(client);
when(client.search(any())).thenAnswer(invocationOnMock -> {
SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0];
TermQueryBuilder term = (TermQueryBuilder) req.source().query();
@ -366,18 +384,10 @@ public class DatabaseNodeServiceTests extends ESTestCase {
return MessageDigests.toHexString(md.digest());
}
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) {
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, false);
}
static ClusterState createClusterState(ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata) {
return createClusterState(projectId, tasksCustomMetadata, false);
}
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata, boolean noStartedShards) {
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, noStartedShards);
}
static ClusterState createClusterState(
ProjectId projectId,
PersistentTasksCustomMetadata tasksCustomMetadata,

View file

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.settings.ClusterSettings;
@ -76,9 +77,12 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
private ThreadPool threadPool;
private MockClient client;
private EnterpriseGeoIpDownloader geoIpDownloader;
private ProjectId projectId;
@Before
public void setup() throws IOException {
// TODO: change to random projectId
projectId = ProjectId.DEFAULT;
httpClient = mock(HttpClient.class);
when(httpClient.getBytes(any(), anyString())).thenReturn(
"e4a3411cdd7b21eaf18675da5a7f9f360d33c6882363b2c19c38715834c9e836 GeoIP2-City_20240709.tar.gz".getBytes(StandardCharsets.UTF_8)
@ -92,7 +96,7 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, Set.of(GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING))
);
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
when(clusterService.state()).thenReturn(state);
client = new MockClient(threadPool);
geoIpDownloader = new EnterpriseGeoIpDownloader(
@ -440,15 +444,15 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
}
public void testUpdateDatabasesWriteBlock() {
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
var geoIpIndex = state.getMetadata()
.getProject()
.getProject(projectId)
.getIndicesLookup()
.get(EnterpriseGeoIpDownloader.DATABASES_INDEX)
.getWriteIndex()
.getName();
state = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases());
@ -467,15 +471,15 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
}
public void testUpdateDatabasesIndexNotReady() throws IOException {
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true);
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()), true);
var geoIpIndex = state.getMetadata()
.getProject()
.getProject(projectId)
.getIndicesLookup()
.get(EnterpriseGeoIpDownloader.DATABASES_INDEX)
.getWriteIndex()
.getName();
state = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
geoIpDownloader.updateDatabases();

View file

@ -13,8 +13,12 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
@ -54,6 +58,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -64,9 +69,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
private ConfigDatabases configDatabases;
private DatabaseNodeService databaseNodeService;
private ClusterService clusterService;
private ProjectId projectId;
private ProjectResolver projectResolver;
@Before
public void loadDatabaseReaders() throws IOException {
// cover for multi-project enable/disabled
boolean multiProject = randomBoolean();
projectId = multiProject ? randomProjectIdOrDefault() : ProjectId.DEFAULT;
projectResolver = multiProject ? TestProjectResolvers.singleProject(projectId) : TestProjectResolvers.DEFAULT_PROJECT_ONLY;
final Path configDir = createTempDir();
geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
@ -77,9 +89,21 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
copyDefaultDatabases(geoIpConfigDir, configDatabases);
geoipTmpDir = createTempDir();
clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
databaseNodeService = new DatabaseNodeService(geoipTmpDir, client, cache, configDatabases, Runnable::run, clusterService);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.putProjectMetadata(ProjectMetadata.builder(projectId).build())
.build();
when(clusterService.state()).thenReturn(state);
databaseNodeService = new DatabaseNodeService(
geoipTmpDir,
client,
cache,
configDatabases,
Runnable::run,
clusterService,
mock(IngestService.class),
projectResolver
);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class));
}
@After
@ -95,7 +119,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -112,7 +136,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("ignore_missing", true);
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
@ -129,7 +153,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("database_file", "GeoLite2-Country.mmdb");
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
@ -147,7 +171,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("database_file", "GeoLite2-ASN.mmdb");
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
@ -162,7 +186,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
assertFalse(processor.isIgnoreMissing());
@ -173,7 +197,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country"));
@ -190,7 +214,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
asnOnlyProperties.remove(Property.IP);
String asnProperty = RandomPicks.randomFrom(Randomness.get(), asnOnlyProperties).toString();
config.put("properties", List.of(asnProperty));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, projectId));
assertThat(
e.getMessage(),
equalTo(
@ -211,7 +235,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
cityOnlyProperties.remove(Property.IP);
String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString();
config.put("properties", List.of(cityProperty));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config, projectId));
assertThat(
e.getMessage(),
equalTo("[properties] illegal property value [" + cityProperty + "]. valid values are [IP, ASN, ORGANIZATION_NAME, NETWORK]")
@ -219,14 +243,15 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuildNonExistingDbFile() throws Exception {
ProjectId projectId = randomProjectIdOrDefault();
copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb"));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "does-not-exist.mmdb");
Processor processor = factory.create(null, null, null, config, null);
Processor processor = factory.create(null, null, null, config, projectId);
assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class));
}
@ -237,7 +262,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", randomFrom(DEFAULT_DATABASES));
Processor processor = factory.create(null, null, null, config, null);
Processor processor = factory.create(null, null, null, config, projectId);
assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class));
}
@ -259,7 +284,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("properties", fieldNames);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
assertFalse(processor.isIgnoreMissing());
@ -271,7 +296,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config1 = new HashMap<>();
config1.put("field", "_field");
config1.put("properties", List.of("invalid"));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId));
assertThat(
e.getMessage(),
equalTo(
@ -285,7 +310,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config2 = new HashMap<>();
config2.put("field", "_field");
config2.put("properties", "invalid");
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config2, null));
e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config2, projectId));
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
}
@ -294,14 +319,14 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
IpDatabase database = mock(IpDatabase.class);
when(database.getDatabaseType()).thenReturn("some-unsupported-database");
IpDatabaseProvider provider = mock(IpDatabaseProvider.class);
when(provider.getDatabase(anyString())).thenReturn(database);
when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider);
Map<String, Object> config1 = new HashMap<>();
config1.put("field", "_field");
config1.put("properties", List.of("ip"));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId));
assertThat(
e.getMessage(),
equalTo("[database_file] Unsupported database type [some-unsupported-database] for file [GeoLite2-City.mmdb]")
@ -313,14 +338,14 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
IpDatabase database = mock(IpDatabase.class);
when(database.getDatabaseType()).thenReturn(null);
IpDatabaseProvider provider = mock(IpDatabaseProvider.class);
when(provider.getDatabase(anyString())).thenReturn(database);
when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider);
Map<String, Object> config1 = new HashMap<>();
config1.put("field", "_field");
config1.put("properties", List.of("ip"));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId));
assertThat(e.getMessage(), equalTo("[database_file] Unsupported database type [null] for file [GeoLite2-City.mmdb]"));
}
@ -328,7 +353,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
IpDatabase database = mock(IpDatabase.class);
when(database.getDatabaseType()).thenReturn("ipinfo some_ipinfo_database.mmdb-City");
IpDatabaseProvider provider = mock(IpDatabaseProvider.class);
when(provider.getDatabase(anyString())).thenReturn(database);
when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider);
@ -336,7 +361,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config1.put("database_file", "some-ipinfo-database.mmdb");
config1.put("field", "_field");
config1.put("properties", List.of("ip"));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, null));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config1, projectId));
assertThat(
e.getMessage(),
equalTo(
@ -350,7 +375,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
IpDatabase database = mock(IpDatabase.class);
when(database.getDatabaseType()).thenReturn("some_custom_database.mmdb-City");
IpDatabaseProvider provider = mock(IpDatabaseProvider.class);
when(provider.getDatabase(anyString())).thenReturn(database);
when(provider.getDatabase(eq(projectId), anyString())).thenReturn(database);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, provider);
@ -358,7 +383,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config1.put("database_file", "some-custom-database.mmdb");
config1.put("field", "_field");
config1.put("properties", List.of("ip"));
factory.create(null, null, null, config1, null);
factory.create(null, null, null, config1, projectId);
assertWarnings(GeoIpProcessor.UNSUPPORTED_DATABASE_DEPRECATION_MESSAGE.replaceAll("\\{}", "some_custom_database.mmdb-City"));
}
@ -374,14 +399,19 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// database readers used at class level are reused between tests. (we want to keep that otherwise running this
// test will take roughly 4 times more time)
Client client = mock(Client.class);
ThreadPool threadPool = new TestThreadPool("test");
ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
DatabaseNodeService databaseNodeService = new DatabaseNodeService(
createTempDir(),
client,
cache,
configDatabases,
Runnable::run,
clusterService
clusterService,
mock(IngestService.class),
projectResolver
);
databaseNodeService.initialize("nodeId", resourceWatcherService);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
for (DatabaseReaderLazyLoader lazyLoader : configDatabases.getConfigDatabases().values()) {
assertNull(lazyLoader.databaseReader.get());
@ -393,35 +423,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-City.mmdb");
final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, null);
final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId);
// these are lazy loaded until first use, so we expect null here
assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-City.mmdb").databaseReader.get());
assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-City.mmdb").databaseReader.get());
city.execute(document);
// the first ingest should trigger a database load
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-City.mmdb").databaseReader.get());
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-City.mmdb").databaseReader.get());
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config, null);
final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId);
// these are lazy loaded until first use, so we expect null here
assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-Country.mmdb").databaseReader.get());
assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-Country.mmdb").databaseReader.get());
country.execute(document);
// the first ingest should trigger a database load
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-Country.mmdb").databaseReader.get());
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-Country.mmdb").databaseReader.get());
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb");
final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config, null);
final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId);
// these are lazy loaded until first use, so we expect null here
assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-ASN.mmdb").databaseReader.get());
assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-ASN.mmdb").databaseReader.get());
asn.execute(document);
// the first ingest should trigger a database load
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoLite2-ASN.mmdb").databaseReader.get());
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoLite2-ASN.mmdb").databaseReader.get());
resourceWatcherService.close();
threadPool.shutdown();
}
public void testLoadingCustomDatabase() throws IOException {
@ -448,9 +480,11 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
cache,
configDatabases,
Runnable::run,
clusterService
clusterService,
mock(IngestService.class),
projectResolver
);
databaseNodeService.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
databaseNodeService.initialize("nodeId", resourceWatcherService);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
for (DatabaseReaderLazyLoader lazyLoader : configDatabases.getConfigDatabases().values()) {
assertNull(lazyLoader.databaseReader.get());
@ -462,13 +496,13 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoIP2-City.mmdb");
final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, null);
final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config, projectId);
// these are lazy loaded until first use, so we expect null here
assertNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb").databaseReader.get());
assertNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb").databaseReader.get());
city.execute(document);
// the first ingest should trigger a database load
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader("GeoIP2-City.mmdb").databaseReader.get());
assertNotNull(databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb").databaseReader.get());
resourceWatcherService.close();
threadPool.shutdown();
}
@ -478,7 +512,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", randomIdentifier());
config.put("download_database_on_pipeline_creation", randomBoolean());
factory.create(null, null, null, config, null);
factory.create(null, null, null, config, projectId);
// Check all the config params were consumed.
assertThat(config, anEmptyMap());
}
@ -489,7 +523,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
.updateTaskState(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpTaskState.EMPTY)
.build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks))
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(PersistentTasksCustomMetadata.TYPE, tasks))
.build();
when(clusterService.state()).thenReturn(clusterState);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
@ -498,7 +532,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field");
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config, projectId);
processor.execute(RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("_field", "89.160.20.128"))));
}
@ -507,7 +541,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
Map<String, Object> config = new HashMap<>();
config.put("field", "source_field");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId);
Map<String, Object> document = Map.of("source_field", "89.160.20.128");
{
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document));
@ -518,7 +552,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
{
copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
processor.execute(ingestDocument);
Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
assertThat(geoData.get("city_name"), equalTo("Linköping"));
@ -526,7 +560,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
{
// No databases are available, so assume that databases still need to be downloaded and therefore not fail:
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document));
databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
databaseNodeService.removeStaleEntries(projectId, List.of("GeoLite2-City.mmdb"));
configDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), false);
processor.execute(ingestDocument);
Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
@ -534,7 +568,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
{
// There are databases available, but not the right one, so tag:
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document));
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata(), hasEntry("tags", List.of("_geoip_database_unavailable_GeoLite2-City.mmdb")));
@ -559,7 +593,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
null,
null,
config,
null
projectId
);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue());
@ -570,7 +604,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir);
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
{
Map<String, Object> config = new HashMap<>();
@ -581,7 +615,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
document.put("source_field", "89.160.20.128");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, null);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config, projectId);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("tags"), nullValue());
Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");

View file

@ -9,7 +9,9 @@
package org.elasticsearch.ingest.geoip;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
@ -484,13 +486,14 @@ public class GeoIpProcessorTests extends ESTestCase {
return () -> loader;
}
@FixForMultiProject(description = "Replace DEFAULT project")
private DatabaseReaderLazyLoader loader(final String databaseName, final AtomicBoolean closed) {
int last = databaseName.lastIndexOf("/");
final Path path = tmpDir.resolve(last == -1 ? databaseName : databaseName.substring(last + 1));
copyDatabase(databaseName, path);
final GeoIpCache cache = new GeoIpCache(1000);
return new DatabaseReaderLazyLoader(cache, path, null) {
return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null) {
@Override
protected void doShutdown() throws IOException {
if (closed != null) {

View file

@ -14,8 +14,10 @@ import com.maxmind.db.Networks;
import com.maxmind.db.Reader;
import org.apache.lucene.util.Constants;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.SuppressForbidden;
@ -555,10 +557,11 @@ public class IpinfoIpDataLookupsTests extends ESTestCase {
}
}
@FixForMultiProject(description = "Replace DEFAULT project")
private DatabaseReaderLazyLoader loader(final String databaseName) {
Path path = tmpDir.resolve(databaseName);
copyDatabase("ipinfo/" + databaseName, path); // the ipinfo databases are prefixed on the test classpath
final GeoIpCache cache = new GeoIpCache(1000);
return new DatabaseReaderLazyLoader(cache, path, null);
return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null);
}
}

View file

@ -10,6 +10,8 @@
package org.elasticsearch.ingest.geoip;
import org.apache.lucene.util.Constants;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
@ -330,10 +332,11 @@ public class MaxmindIpDataLookupsTests extends ESTestCase {
}
}
@FixForMultiProject(description = "Replace DEFAULT project")
private DatabaseReaderLazyLoader loader(final String databaseName) {
Path path = tmpDir.resolve(databaseName);
copyDatabase(databaseName, path);
final GeoIpCache cache = new GeoIpCache(1000);
return new DatabaseReaderLazyLoader(cache, path, null);
return new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, path, null);
}
}