diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index fb4fadf043b0..e318beeb3ae6 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -16,9 +16,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.Booleans; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -105,8 +107,9 @@ public class DatabaseReaderLazyLoader implements IpDatabase { @Override @Nullable + @FixForMultiProject // do not use ProjectId.DEFAULT public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - return cache.putIfAbsent(ipAddress, cachedDatabasePathToString, ip -> { + return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> { try { return responseProvider.apply(get(), ipAddress); } catch (Exception e) { @@ -143,9 +146,10 @@ public class DatabaseReaderLazyLoader implements IpDatabase { } // Visible for Testing + @FixForMultiProject // do not use ProjectId.DEFAULT protected void doShutdown() throws IOException { IOUtils.close(databaseReader.get()); - int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath); + int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath); logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath); if (deleteDatabaseFileOnShutdown) { logger.info("deleting [{}]", databasePath); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java index d9c9c3aaf326..47dd3d86e998 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java @@ -10,6 +10,7 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.db.NodeCache; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.core.TimeValue; @@ -60,9 +61,9 @@ public final class GeoIpCache { } @SuppressWarnings("unchecked") - RESPONSE putIfAbsent(String ip, String databasePath, Function retrieveFunction) { + RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function retrieveFunction) { // can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader) - CacheKey cacheKey = new CacheKey(ip, databasePath); + CacheKey cacheKey = new CacheKey(projectId, ip, databasePath); long cacheStart = relativeNanoTimeProvider.getAsLong(); // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. Object response = cache.get(cacheKey); @@ -92,16 +93,16 @@ public final class GeoIpCache { } // only useful for testing - Object get(String ip, String databasePath) { - CacheKey cacheKey = new CacheKey(ip, databasePath); + Object get(ProjectId projectId, String ip, String databasePath) { + CacheKey cacheKey = new CacheKey(projectId, ip, databasePath); return cache.get(cacheKey); } - public int purgeCacheEntriesForDatabase(Path databaseFile) { + public int purgeCacheEntriesForDatabase(ProjectId projectId, Path databaseFile) { String databasePath = databaseFile.toString(); int counter = 0; for (CacheKey key : cache.keys()) { - if (key.databasePath.equals(databasePath)) { + if (key.projectId.equals(projectId) && key.databasePath.equals(databasePath)) { cache.invalidate(key); counter++; } @@ -135,5 +136,5 @@ public final class GeoIpCache { * path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same * IP may be in both with different values and we need to cache both. */ - private record CacheKey(String ip, String databasePath) {} + private record CacheKey(ProjectId projectId, String ip, String databasePath) {} } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java index 0c92aca88291..f8a67b3e35bc 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java @@ -11,6 +11,8 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.model.AbstractResponse; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.ingest.geoip.stats.CacheStats; import org.elasticsearch.test.ESTestCase; @@ -26,21 +28,22 @@ public class GeoIpCacheTests extends ESTestCase { public void testCachesAndEvictsResults() { GeoIpCache cache = new GeoIpCache(1); + ProjectId projectId = randomProjectIdOrDefault(); AbstractResponse response1 = mock(AbstractResponse.class); AbstractResponse response2 = mock(AbstractResponse.class); // add a key - AbstractResponse cachedResponse = cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1); + AbstractResponse cachedResponse = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1); assertSame(cachedResponse, response1); - assertSame(cachedResponse, cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1)); - assertSame(cachedResponse, cache.get("127.0.0.1", "path/to/db")); + assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1)); + assertSame(cachedResponse, cache.get(projectId, "127.0.0.1", "path/to/db")); // evict old key by adding another value - cachedResponse = cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2); + cachedResponse = cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2); assertSame(cachedResponse, response2); - assertSame(cachedResponse, cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2)); - assertSame(cachedResponse, cache.get("127.0.0.2", "path/to/db")); - assertNotSame(response1, cache.get("127.0.0.1", "path/to/db")); + assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2)); + assertSame(cachedResponse, cache.get(projectId, "127.0.0.2", "path/to/db")); + assertNotSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db")); } public void testCachesNoResult() { @@ -51,31 +54,47 @@ public class GeoIpCacheTests extends ESTestCase { return null; }; - AbstractResponse response = cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull); + ProjectId projectId = randomProjectIdOrDefault(); + AbstractResponse response = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull); assertNull(response); - assertNull(cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull)); + assertNull(cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull)); assertEquals(1, count.get()); // the cached value is not actually *null*, it's the NO_RESULT sentinel - assertSame(GeoIpCache.NO_RESULT, cache.get("127.0.0.1", "path/to/db")); + assertSame(GeoIpCache.NO_RESULT, cache.get(projectId, "127.0.0.1", "path/to/db")); } - public void testCacheKey() { + public void testCacheDoesNotCollideForDifferentDatabases() { GeoIpCache cache = new GeoIpCache(2); AbstractResponse response1 = mock(AbstractResponse.class); AbstractResponse response2 = mock(AbstractResponse.class); - assertSame(response1, cache.putIfAbsent("127.0.0.1", "path/to/db1", ip -> response1)); - assertSame(response2, cache.putIfAbsent("127.0.0.1", "path/to/db2", ip -> response2)); - assertSame(response1, cache.get("127.0.0.1", "path/to/db1")); - assertSame(response2, cache.get("127.0.0.1", "path/to/db2")); + ProjectId projectId = randomProjectIdOrDefault(); + assertSame(response1, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db1", ip -> response1)); + assertSame(response2, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db2", ip -> response2)); + assertSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db1")); + assertSame(response2, cache.get(projectId, "127.0.0.1", "path/to/db2")); + } + + public void testCacheDoesNotCollideForDifferentProjects() { + GeoIpCache cache = new GeoIpCache(2); + AbstractResponse response1 = mock(AbstractResponse.class); + AbstractResponse response2 = mock(AbstractResponse.class); + + ProjectId projectId1 = randomUniqueProjectId(); + ProjectId projectId2 = randomUniqueProjectId(); + assertSame(response1, cache.putIfAbsent(projectId1, "127.0.0.1", "path/to/db1", ip -> response1)); + assertSame(response2, cache.putIfAbsent(projectId2, "127.0.0.1", "path/to/db1", ip -> response2)); + assertSame(response1, cache.get(projectId1, "127.0.0.1", "path/to/db1")); + assertSame(response2, cache.get(projectId2, "127.0.0.1", "path/to/db1")); } public void testThrowsFunctionsException() { GeoIpCache cache = new GeoIpCache(1); + ProjectId projectId = randomProjectIdOrDefault(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> { + () -> cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> { throw new IllegalArgumentException("bad"); }) ); @@ -92,19 +111,20 @@ public class GeoIpCacheTests extends ESTestCase { final AtomicLong testNanoTime = new AtomicLong(0); // We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms GeoIpCache cache = new GeoIpCache(maxCacheSize, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos())); + ProjectId projectId = randomProjectIdOrDefault(); AbstractResponse response = mock(AbstractResponse.class); String databasePath = "path/to/db1"; String key1 = "127.0.0.1"; String key2 = "127.0.0.2"; String key3 = "127.0.0.3"; - cache.putIfAbsent(key1, databasePath, ip -> response); // cache miss - cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss - cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit - cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit - cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit - cache.putIfAbsent(key3, databasePath, ip -> response); // cache miss, key2 will be evicted - cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss, key1 will be evicted + cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache miss + cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss + cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit + cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit + cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit + cache.putIfAbsent(projectId, key3, databasePath, ip -> response); // cache miss, key2 will be evicted + cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss, key1 will be evicted CacheStats cacheStats = cache.getCacheStats(); assertThat(cacheStats.count(), equalTo(maxCacheSize)); assertThat(cacheStats.hits(), equalTo(3L)); @@ -115,4 +135,28 @@ public class GeoIpCacheTests extends ESTestCase { // There are 4 misses. Each is made up of a cache query, and a database query, each being 1ms: assertThat(cacheStats.missesTimeInMillis(), equalTo(8L)); } + + public void testPurgeCacheEntriesForDatabase() { + GeoIpCache cache = new GeoIpCache(100); + ProjectId projectId1 = randomUniqueProjectId(); + ProjectId projectId2 = randomUniqueProjectId(); + String databasePath1 = "path/to/db1"; + String databasePath2 = "path/to/db2"; + String ip1 = "127.0.0.1"; + String ip2 = "127.0.0.2"; + + AbstractResponse response = mock(AbstractResponse.class); + cache.putIfAbsent(projectId1, ip1, databasePath1, ip -> response); // cache miss + cache.putIfAbsent(projectId1, ip2, databasePath1, ip -> response); // cache miss + cache.putIfAbsent(projectId2, ip1, databasePath1, ip -> response); // cache miss + cache.putIfAbsent(projectId1, ip1, databasePath2, ip -> response); // cache miss + cache.purgeCacheEntriesForDatabase(projectId1, PathUtils.get(databasePath1)); + // should have purged entries for projectId1 and databasePath1... + assertNull(cache.get(projectId1, ip1, databasePath1)); + assertNull(cache.get(projectId1, ip2, databasePath1)); + // ...but left the one for projectId2... + assertSame(response, cache.get(projectId2, ip1, databasePath1)); + // ...and for databasePath2: + assertSame(response, cache.get(projectId1, ip1, databasePath2)); + } }