mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 01:22:26 -04:00
ES-11372: Add project ID to key for geoip cache (#129572)
In this change, the lazy loader passes in the default project ID. That will be fixed in a later PR. ES-11372 #comment Project ID added to cache key in https://github.com/elastic/elasticsearch/pull/129572
This commit is contained in:
parent
b25d7b9471
commit
c4249a8596
3 changed files with 81 additions and 32 deletions
|
@ -16,9 +16,11 @@ import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.common.CheckedBiFunction;
|
import org.elasticsearch.common.CheckedBiFunction;
|
||||||
import org.elasticsearch.common.CheckedSupplier;
|
import org.elasticsearch.common.CheckedSupplier;
|
||||||
import org.elasticsearch.core.Booleans;
|
import org.elasticsearch.core.Booleans;
|
||||||
|
import org.elasticsearch.core.FixForMultiProject;
|
||||||
import org.elasticsearch.core.IOUtils;
|
import org.elasticsearch.core.IOUtils;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.SuppressForbidden;
|
import org.elasticsearch.core.SuppressForbidden;
|
||||||
|
@ -105,8 +107,9 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@FixForMultiProject // do not use ProjectId.DEFAULT
|
||||||
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
|
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
|
||||||
return cache.putIfAbsent(ipAddress, cachedDatabasePathToString, ip -> {
|
return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> {
|
||||||
try {
|
try {
|
||||||
return responseProvider.apply(get(), ipAddress);
|
return responseProvider.apply(get(), ipAddress);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -143,9 +146,10 @@ public class DatabaseReaderLazyLoader implements IpDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for Testing
|
// Visible for Testing
|
||||||
|
@FixForMultiProject // do not use ProjectId.DEFAULT
|
||||||
protected void doShutdown() throws IOException {
|
protected void doShutdown() throws IOException {
|
||||||
IOUtils.close(databaseReader.get());
|
IOUtils.close(databaseReader.get());
|
||||||
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath);
|
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath);
|
||||||
logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
|
logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
|
||||||
if (deleteDatabaseFileOnShutdown) {
|
if (deleteDatabaseFileOnShutdown) {
|
||||||
logger.info("deleting [{}]", databasePath);
|
logger.info("deleting [{}]", databasePath);
|
||||||
|
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.ingest.geoip;
|
||||||
|
|
||||||
import com.maxmind.db.NodeCache;
|
import com.maxmind.db.NodeCache;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.common.cache.Cache;
|
import org.elasticsearch.common.cache.Cache;
|
||||||
import org.elasticsearch.common.cache.CacheBuilder;
|
import org.elasticsearch.common.cache.CacheBuilder;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
|
@ -60,9 +61,9 @@ public final class GeoIpCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
<RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
|
<RESPONSE> RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
|
||||||
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
|
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
|
||||||
CacheKey cacheKey = new CacheKey(ip, databasePath);
|
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
|
||||||
long cacheStart = relativeNanoTimeProvider.getAsLong();
|
long cacheStart = relativeNanoTimeProvider.getAsLong();
|
||||||
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
|
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
|
||||||
Object response = cache.get(cacheKey);
|
Object response = cache.get(cacheKey);
|
||||||
|
@ -92,16 +93,16 @@ public final class GeoIpCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// only useful for testing
|
// only useful for testing
|
||||||
Object get(String ip, String databasePath) {
|
Object get(ProjectId projectId, String ip, String databasePath) {
|
||||||
CacheKey cacheKey = new CacheKey(ip, databasePath);
|
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
|
||||||
return cache.get(cacheKey);
|
return cache.get(cacheKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int purgeCacheEntriesForDatabase(Path databaseFile) {
|
public int purgeCacheEntriesForDatabase(ProjectId projectId, Path databaseFile) {
|
||||||
String databasePath = databaseFile.toString();
|
String databasePath = databaseFile.toString();
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
for (CacheKey key : cache.keys()) {
|
for (CacheKey key : cache.keys()) {
|
||||||
if (key.databasePath.equals(databasePath)) {
|
if (key.projectId.equals(projectId) && key.databasePath.equals(databasePath)) {
|
||||||
cache.invalidate(key);
|
cache.invalidate(key);
|
||||||
counter++;
|
counter++;
|
||||||
}
|
}
|
||||||
|
@ -135,5 +136,5 @@ public final class GeoIpCache {
|
||||||
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
|
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
|
||||||
* IP may be in both with different values and we need to cache both.
|
* IP may be in both with different values and we need to cache both.
|
||||||
*/
|
*/
|
||||||
private record CacheKey(String ip, String databasePath) {}
|
private record CacheKey(ProjectId projectId, String ip, String databasePath) {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,8 @@ package org.elasticsearch.ingest.geoip;
|
||||||
|
|
||||||
import com.maxmind.geoip2.model.AbstractResponse;
|
import com.maxmind.geoip2.model.AbstractResponse;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
|
import org.elasticsearch.core.PathUtils;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.ingest.geoip.stats.CacheStats;
|
import org.elasticsearch.ingest.geoip.stats.CacheStats;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -26,21 +28,22 @@ public class GeoIpCacheTests extends ESTestCase {
|
||||||
|
|
||||||
public void testCachesAndEvictsResults() {
|
public void testCachesAndEvictsResults() {
|
||||||
GeoIpCache cache = new GeoIpCache(1);
|
GeoIpCache cache = new GeoIpCache(1);
|
||||||
|
ProjectId projectId = randomProjectIdOrDefault();
|
||||||
AbstractResponse response1 = mock(AbstractResponse.class);
|
AbstractResponse response1 = mock(AbstractResponse.class);
|
||||||
AbstractResponse response2 = mock(AbstractResponse.class);
|
AbstractResponse response2 = mock(AbstractResponse.class);
|
||||||
|
|
||||||
// add a key
|
// add a key
|
||||||
AbstractResponse cachedResponse = cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1);
|
AbstractResponse cachedResponse = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1);
|
||||||
assertSame(cachedResponse, response1);
|
assertSame(cachedResponse, response1);
|
||||||
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1));
|
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1));
|
||||||
assertSame(cachedResponse, cache.get("127.0.0.1", "path/to/db"));
|
assertSame(cachedResponse, cache.get(projectId, "127.0.0.1", "path/to/db"));
|
||||||
|
|
||||||
// evict old key by adding another value
|
// evict old key by adding another value
|
||||||
cachedResponse = cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2);
|
cachedResponse = cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2);
|
||||||
assertSame(cachedResponse, response2);
|
assertSame(cachedResponse, response2);
|
||||||
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2));
|
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2));
|
||||||
assertSame(cachedResponse, cache.get("127.0.0.2", "path/to/db"));
|
assertSame(cachedResponse, cache.get(projectId, "127.0.0.2", "path/to/db"));
|
||||||
assertNotSame(response1, cache.get("127.0.0.1", "path/to/db"));
|
assertNotSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCachesNoResult() {
|
public void testCachesNoResult() {
|
||||||
|
@ -51,31 +54,47 @@ public class GeoIpCacheTests extends ESTestCase {
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
|
|
||||||
AbstractResponse response = cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull);
|
ProjectId projectId = randomProjectIdOrDefault();
|
||||||
|
AbstractResponse response = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull);
|
||||||
assertNull(response);
|
assertNull(response);
|
||||||
assertNull(cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull));
|
assertNull(cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull));
|
||||||
assertEquals(1, count.get());
|
assertEquals(1, count.get());
|
||||||
|
|
||||||
// the cached value is not actually *null*, it's the NO_RESULT sentinel
|
// the cached value is not actually *null*, it's the NO_RESULT sentinel
|
||||||
assertSame(GeoIpCache.NO_RESULT, cache.get("127.0.0.1", "path/to/db"));
|
assertSame(GeoIpCache.NO_RESULT, cache.get(projectId, "127.0.0.1", "path/to/db"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCacheKey() {
|
public void testCacheDoesNotCollideForDifferentDatabases() {
|
||||||
GeoIpCache cache = new GeoIpCache(2);
|
GeoIpCache cache = new GeoIpCache(2);
|
||||||
AbstractResponse response1 = mock(AbstractResponse.class);
|
AbstractResponse response1 = mock(AbstractResponse.class);
|
||||||
AbstractResponse response2 = mock(AbstractResponse.class);
|
AbstractResponse response2 = mock(AbstractResponse.class);
|
||||||
|
|
||||||
assertSame(response1, cache.putIfAbsent("127.0.0.1", "path/to/db1", ip -> response1));
|
ProjectId projectId = randomProjectIdOrDefault();
|
||||||
assertSame(response2, cache.putIfAbsent("127.0.0.1", "path/to/db2", ip -> response2));
|
assertSame(response1, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db1", ip -> response1));
|
||||||
assertSame(response1, cache.get("127.0.0.1", "path/to/db1"));
|
assertSame(response2, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db2", ip -> response2));
|
||||||
assertSame(response2, cache.get("127.0.0.1", "path/to/db2"));
|
assertSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db1"));
|
||||||
|
assertSame(response2, cache.get(projectId, "127.0.0.1", "path/to/db2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCacheDoesNotCollideForDifferentProjects() {
|
||||||
|
GeoIpCache cache = new GeoIpCache(2);
|
||||||
|
AbstractResponse response1 = mock(AbstractResponse.class);
|
||||||
|
AbstractResponse response2 = mock(AbstractResponse.class);
|
||||||
|
|
||||||
|
ProjectId projectId1 = randomUniqueProjectId();
|
||||||
|
ProjectId projectId2 = randomUniqueProjectId();
|
||||||
|
assertSame(response1, cache.putIfAbsent(projectId1, "127.0.0.1", "path/to/db1", ip -> response1));
|
||||||
|
assertSame(response2, cache.putIfAbsent(projectId2, "127.0.0.1", "path/to/db1", ip -> response2));
|
||||||
|
assertSame(response1, cache.get(projectId1, "127.0.0.1", "path/to/db1"));
|
||||||
|
assertSame(response2, cache.get(projectId2, "127.0.0.1", "path/to/db1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThrowsFunctionsException() {
|
public void testThrowsFunctionsException() {
|
||||||
GeoIpCache cache = new GeoIpCache(1);
|
GeoIpCache cache = new GeoIpCache(1);
|
||||||
|
ProjectId projectId = randomProjectIdOrDefault();
|
||||||
IllegalArgumentException ex = expectThrows(
|
IllegalArgumentException ex = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> {
|
() -> cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> {
|
||||||
throw new IllegalArgumentException("bad");
|
throw new IllegalArgumentException("bad");
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
@ -92,19 +111,20 @@ public class GeoIpCacheTests extends ESTestCase {
|
||||||
final AtomicLong testNanoTime = new AtomicLong(0);
|
final AtomicLong testNanoTime = new AtomicLong(0);
|
||||||
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
|
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
|
||||||
GeoIpCache cache = new GeoIpCache(maxCacheSize, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
|
GeoIpCache cache = new GeoIpCache(maxCacheSize, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
|
||||||
|
ProjectId projectId = randomProjectIdOrDefault();
|
||||||
AbstractResponse response = mock(AbstractResponse.class);
|
AbstractResponse response = mock(AbstractResponse.class);
|
||||||
String databasePath = "path/to/db1";
|
String databasePath = "path/to/db1";
|
||||||
String key1 = "127.0.0.1";
|
String key1 = "127.0.0.1";
|
||||||
String key2 = "127.0.0.2";
|
String key2 = "127.0.0.2";
|
||||||
String key3 = "127.0.0.3";
|
String key3 = "127.0.0.3";
|
||||||
|
|
||||||
cache.putIfAbsent(key1, databasePath, ip -> response); // cache miss
|
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache miss
|
||||||
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss
|
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss
|
||||||
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
|
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
|
||||||
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
|
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
|
||||||
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
|
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
|
||||||
cache.putIfAbsent(key3, databasePath, ip -> response); // cache miss, key2 will be evicted
|
cache.putIfAbsent(projectId, key3, databasePath, ip -> response); // cache miss, key2 will be evicted
|
||||||
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss, key1 will be evicted
|
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss, key1 will be evicted
|
||||||
CacheStats cacheStats = cache.getCacheStats();
|
CacheStats cacheStats = cache.getCacheStats();
|
||||||
assertThat(cacheStats.count(), equalTo(maxCacheSize));
|
assertThat(cacheStats.count(), equalTo(maxCacheSize));
|
||||||
assertThat(cacheStats.hits(), equalTo(3L));
|
assertThat(cacheStats.hits(), equalTo(3L));
|
||||||
|
@ -115,4 +135,28 @@ public class GeoIpCacheTests extends ESTestCase {
|
||||||
// There are 4 misses. Each is made up of a cache query, and a database query, each being 1ms:
|
// There are 4 misses. Each is made up of a cache query, and a database query, each being 1ms:
|
||||||
assertThat(cacheStats.missesTimeInMillis(), equalTo(8L));
|
assertThat(cacheStats.missesTimeInMillis(), equalTo(8L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPurgeCacheEntriesForDatabase() {
|
||||||
|
GeoIpCache cache = new GeoIpCache(100);
|
||||||
|
ProjectId projectId1 = randomUniqueProjectId();
|
||||||
|
ProjectId projectId2 = randomUniqueProjectId();
|
||||||
|
String databasePath1 = "path/to/db1";
|
||||||
|
String databasePath2 = "path/to/db2";
|
||||||
|
String ip1 = "127.0.0.1";
|
||||||
|
String ip2 = "127.0.0.2";
|
||||||
|
|
||||||
|
AbstractResponse response = mock(AbstractResponse.class);
|
||||||
|
cache.putIfAbsent(projectId1, ip1, databasePath1, ip -> response); // cache miss
|
||||||
|
cache.putIfAbsent(projectId1, ip2, databasePath1, ip -> response); // cache miss
|
||||||
|
cache.putIfAbsent(projectId2, ip1, databasePath1, ip -> response); // cache miss
|
||||||
|
cache.putIfAbsent(projectId1, ip1, databasePath2, ip -> response); // cache miss
|
||||||
|
cache.purgeCacheEntriesForDatabase(projectId1, PathUtils.get(databasePath1));
|
||||||
|
// should have purged entries for projectId1 and databasePath1...
|
||||||
|
assertNull(cache.get(projectId1, ip1, databasePath1));
|
||||||
|
assertNull(cache.get(projectId1, ip2, databasePath1));
|
||||||
|
// ...but left the one for projectId2...
|
||||||
|
assertSame(response, cache.get(projectId2, ip1, databasePath1));
|
||||||
|
// ...and for databasePath2:
|
||||||
|
assertSame(response, cache.get(projectId1, ip1, databasePath2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue