mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-27 17:10:22 -04:00
Make GeoIp downloader multi-project aware (#128282)
This change makes the GeoIp persistent task executor/downloader multi-project aware. - the database downloader persistent task will be at the project level, meaning there will be a downloader instance per project - persistent task id is prefixed with project id, namely `<project-id>/geoip-downloader` for cluster in MP mode
This commit is contained in:
parent
41f69810df
commit
e3838a4b9c
11 changed files with 498 additions and 119 deletions
24
modules/ingest-geoip/qa/multi-project/build.gradle
Normal file
24
modules/ingest-geoip/qa/multi-project/build.gradle
Normal file
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
apply plugin: 'elasticsearch.internal-java-rest-test'
|
||||
|
||||
dependencies {
|
||||
javaRestTestImplementation project(':modules:ingest-geoip')
|
||||
javaRestTestImplementation project(':test:external-modules:test-multi-project')
|
||||
javaRestTestImplementation project(':test:fixtures:geoip-fixture')
|
||||
|
||||
clusterModules project(':modules:ingest-geoip')
|
||||
clusterModules project(':modules:reindex') // needed for database cleanup
|
||||
clusterModules project(':test:external-modules:test-multi-project')
|
||||
}
|
||||
|
||||
tasks.withType(Test).configureEach {
|
||||
it.systemProperty "tests.multi_project.enabled", true
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
package geoip;
|
||||
|
||||
import fixture.geoip.GeoIpHttpFixture;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.core.Booleans;
|
||||
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
|
||||
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.cluster.ElasticsearchCluster;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.ObjectPath;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.rules.RuleChain;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class GeoIpMultiProjectIT extends ESRestTestCase {
|
||||
// default true
|
||||
private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false;
|
||||
|
||||
public static final GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture);
|
||||
|
||||
public static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
|
||||
.module("ingest-geoip")
|
||||
.module("reindex") // for database cleanup
|
||||
.module("test-multi-project")
|
||||
.setting("test.multi_project.enabled", "true")
|
||||
.setting(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), "true")
|
||||
.setting(GeoIpDownloader.ENDPOINT_SETTING.getKey(), fixture::getAddress, (k) -> useFixture)
|
||||
.build();
|
||||
|
||||
@ClassRule
|
||||
public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster);
|
||||
|
||||
@Override
|
||||
protected String getTestRestCluster() {
|
||||
return cluster.getHttpAddresses();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldConfigureProjects() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void testGeoIpDownloader() throws Exception {
|
||||
String project1 = randomUniqueProjectId().id();
|
||||
String project2 = randomUniqueProjectId().id();
|
||||
createProject(project1);
|
||||
createProject(project2);
|
||||
|
||||
// download databases for project1
|
||||
putGeoIpPipeline(project1);
|
||||
assertBusy(() -> assertDatabases(project1, true), 30, TimeUnit.SECONDS);
|
||||
assertBusy(() -> assertDatabases(project2, false), 30, TimeUnit.SECONDS);
|
||||
|
||||
// download databases for project2
|
||||
putGeoIpPipeline(project2);
|
||||
assertBusy(() -> assertDatabases(project2, true), 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void putGeoIpPipeline(String projectId) throws IOException {
|
||||
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/geoip-pipeline");
|
||||
putPipelineRequest.setJsonEntity("""
|
||||
{
|
||||
"processors" : [
|
||||
{
|
||||
"geoip" : {
|
||||
"field" : "ip",
|
||||
"target_field" : "geo",
|
||||
"database_file" : "GeoLite2-Country.mmdb"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
""");
|
||||
setRequestProjectId(projectId, putPipelineRequest);
|
||||
assertOK(client().performRequest(putPipelineRequest));
|
||||
}
|
||||
|
||||
private static Request setRequestProjectId(String projectId, Request request) {
|
||||
RequestOptions.Builder options = request.getOptions().toBuilder();
|
||||
options.removeHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER);
|
||||
options.addHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId);
|
||||
request.setOptions(options);
|
||||
return request;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void assertDatabases(String projectId, boolean shouldDownload) throws IOException {
|
||||
Request getTaskState = new Request("GET", "/_cluster/state");
|
||||
setRequestProjectId(projectId, getTaskState);
|
||||
|
||||
ObjectPath state = ObjectPath.createFromResponse(assertOK(client().performRequest(getTaskState)));
|
||||
|
||||
List<Map<String, ?>> tasks = state.evaluate("metadata.persistent_tasks.tasks");
|
||||
// Short-circuit to avoid using steams if the list is empty
|
||||
if (tasks.isEmpty()) {
|
||||
fail("persistent tasks list is empty, expected at least one task for geoip-downloader");
|
||||
}
|
||||
|
||||
// verify project task id
|
||||
Set<Map<String, ?>> id = tasks.stream()
|
||||
.filter(task -> String.format("%s/geoip-downloader", projectId).equals(task.get("id")))
|
||||
.collect(Collectors.toSet());
|
||||
assertThat(id.size(), equalTo(1));
|
||||
|
||||
// verify database download
|
||||
Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
|
||||
try {
|
||||
return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull).findFirst().orElse(null);
|
||||
|
||||
if (shouldDownload) {
|
||||
// verify database downloaded
|
||||
assertNotNull(databases);
|
||||
for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
|
||||
Object database = databases.get(name);
|
||||
assertNotNull(database);
|
||||
assertNotNull(ObjectPath.evaluate(database, "md5"));
|
||||
}
|
||||
} else {
|
||||
// verify database not downloaded
|
||||
assertNull(databases);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.hash.MessageDigests;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -95,6 +96,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
*/
|
||||
private final Supplier<Boolean> atLeastOneGeoipProcessorSupplier;
|
||||
|
||||
private final ProjectId projectId;
|
||||
|
||||
GeoIpDownloader(
|
||||
Client client,
|
||||
HttpClient httpClient,
|
||||
|
@ -109,10 +112,11 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
Map<String, String> headers,
|
||||
Supplier<TimeValue> pollIntervalSupplier,
|
||||
Supplier<Boolean> eagerDownloadSupplier,
|
||||
Supplier<Boolean> atLeastOneGeoipProcessorSupplier
|
||||
Supplier<Boolean> atLeastOneGeoipProcessorSupplier,
|
||||
ProjectId projectId
|
||||
) {
|
||||
super(id, type, action, description, parentTask, headers);
|
||||
this.client = client;
|
||||
this.client = client.projectClient(projectId);
|
||||
this.httpClient = httpClient;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -120,6 +124,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
this.pollIntervalSupplier = pollIntervalSupplier;
|
||||
this.eagerDownloadSupplier = eagerDownloadSupplier;
|
||||
this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier;
|
||||
this.projectId = projectId;
|
||||
}
|
||||
|
||||
void setState(GeoIpTaskState state) {
|
||||
|
@ -134,16 +139,17 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
// visible for testing
|
||||
void updateDatabases() throws IOException {
|
||||
var clusterState = clusterService.state();
|
||||
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
|
||||
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
|
||||
if (geoipIndex != null) {
|
||||
logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX);
|
||||
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
|
||||
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
|
||||
logger.debug(
|
||||
"Not updating geoip database because not all primary shards of the [" + DATABASES_INDEX + "] index are active."
|
||||
);
|
||||
return;
|
||||
}
|
||||
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
|
||||
var blockException = clusterState.blocks()
|
||||
.indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
|
||||
if (blockException != null) {
|
||||
logger.debug(
|
||||
"Not updating geoip database because there is a write block on the " + geoipIndex.getWriteIndex().getName() + " index",
|
||||
|
@ -196,7 +202,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
updateTimestamp(name, metadata);
|
||||
return;
|
||||
}
|
||||
logger.debug("downloading geoip database [{}]", name);
|
||||
logger.debug("downloading geoip database [{}] for project [{}]", name, projectId);
|
||||
long start = System.currentTimeMillis();
|
||||
try (InputStream is = httpClient.get(url)) {
|
||||
int firstChunk = metadata.lastChunk() + 1; // if there is no metadata, then Metadata.EMPTY.lastChunk() + 1 = 0
|
||||
|
@ -205,12 +211,12 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
|
||||
updateTaskState();
|
||||
stats = stats.successfulDownload(System.currentTimeMillis() - start).databasesCount(state.getDatabases().size());
|
||||
logger.info("successfully downloaded geoip database [{}]", name);
|
||||
logger.info("successfully downloaded geoip database [{}] for project [{}]", name, projectId);
|
||||
deleteOldChunks(name, firstChunk);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
stats = stats.failedDownload();
|
||||
logger.error(() -> "error downloading geoip database [" + name + "]", e);
|
||||
logger.error(() -> "error downloading geoip database [" + name + "] for project [" + projectId + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -230,7 +236,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
|
||||
// visible for testing
|
||||
protected void updateTimestamp(String name, Metadata old) {
|
||||
logger.debug("geoip database [{}] is up to date, updated timestamp", name);
|
||||
logger.debug("geoip database [{}] is up to date for project [{}], updated timestamp", name, projectId);
|
||||
state = state.put(name, new Metadata(old.lastUpdate(), old.firstChunk(), old.lastChunk(), old.md5(), System.currentTimeMillis()));
|
||||
stats = stats.skippedDownload();
|
||||
updateTaskState();
|
||||
|
@ -238,7 +244,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
|
||||
void updateTaskState() {
|
||||
PlainActionFuture<PersistentTask<?>> future = new PlainActionFuture<>();
|
||||
updatePersistentTaskState(state, future);
|
||||
updateProjectPersistentTaskState(projectId, state, future);
|
||||
state = ((GeoIpTaskState) future.actionGet().getState());
|
||||
}
|
||||
|
||||
|
@ -360,5 +366,4 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
|
|||
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,13 +19,16 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
|
|||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.client.internal.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.project.ProjectResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.FixForMultiProject;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -49,8 +52,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
|
||||
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
|
||||
|
@ -97,11 +100,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
private final ThreadPool threadPool;
|
||||
private final Settings settings;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final AtomicReference<GeoIpDownloader> currentTask = new AtomicReference<>();
|
||||
@FixForMultiProject(description = "These settings need to be project-scoped")
|
||||
private volatile TimeValue pollInterval;
|
||||
private volatile boolean eagerDownload;
|
||||
private volatile boolean atLeastOneGeoipProcessor;
|
||||
private final AtomicBoolean taskIsBootstrapped = new AtomicBoolean(false);
|
||||
|
||||
private final ConcurrentHashMap<ProjectId, Boolean> atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<ProjectId, AtomicBoolean> taskIsBootstrappedByProject = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<ProjectId, GeoIpDownloader> tasks = new ConcurrentHashMap<>();
|
||||
private final ProjectResolver projectResolver;
|
||||
|
||||
GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool) {
|
||||
super(GEOIP_DOWNLOADER, threadPool.generic());
|
||||
|
@ -113,6 +119,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
|
||||
this.pollInterval = POLL_INTERVAL_SETTING.get(settings);
|
||||
this.eagerDownload = EAGER_DOWNLOAD_SETTING.get(settings);
|
||||
this.projectResolver = client.projectResolver();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,32 +132,35 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
clusterService.getClusterSettings().addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::setPollInterval);
|
||||
}
|
||||
|
||||
@FixForMultiProject(description = "Should execute in the context of the current project after settings are project-aware")
|
||||
private void setEnabled(boolean enabled) {
|
||||
if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) {
|
||||
// we should only start/stop task from single node, master is the best as it will go through it anyway
|
||||
return;
|
||||
}
|
||||
if (enabled) {
|
||||
startTask(() -> {});
|
||||
startTask(ProjectId.DEFAULT, () -> {});
|
||||
} else {
|
||||
stopTask(() -> {});
|
||||
stopTask(ProjectId.DEFAULT, () -> {});
|
||||
}
|
||||
}
|
||||
|
||||
@FixForMultiProject(description = "Should execute in the context of the current project after settings are project-aware")
|
||||
private void setEagerDownload(Boolean eagerDownload) {
|
||||
if (Objects.equals(this.eagerDownload, eagerDownload) == false) {
|
||||
this.eagerDownload = eagerDownload;
|
||||
GeoIpDownloader currentDownloader = getCurrentTask();
|
||||
GeoIpDownloader currentDownloader = getTask(ProjectId.DEFAULT);
|
||||
if (currentDownloader != null && Objects.equals(eagerDownload, Boolean.TRUE)) {
|
||||
currentDownloader.requestReschedule();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@FixForMultiProject(description = "Should execute in the context of the current project after settings are project-aware")
|
||||
private void setPollInterval(TimeValue pollInterval) {
|
||||
if (Objects.equals(this.pollInterval, pollInterval) == false) {
|
||||
this.pollInterval = pollInterval;
|
||||
GeoIpDownloader currentDownloader = getCurrentTask();
|
||||
GeoIpDownloader currentDownloader = getTask(ProjectId.DEFAULT);
|
||||
if (currentDownloader != null) {
|
||||
currentDownloader.requestReschedule();
|
||||
}
|
||||
|
@ -162,7 +172,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
GeoIpDownloader downloader = (GeoIpDownloader) task;
|
||||
GeoIpTaskState geoIpTaskState = (state == null) ? GeoIpTaskState.EMPTY : (GeoIpTaskState) state;
|
||||
downloader.setState(geoIpTaskState);
|
||||
currentTask.set(downloader);
|
||||
tasks.put(projectResolver.getProjectId(), downloader);
|
||||
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
|
||||
downloader.runDownloader();
|
||||
}
|
||||
|
@ -177,6 +187,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
PersistentTasksCustomMetadata.PersistentTask<GeoIpTaskParams> taskInProgress,
|
||||
Map<String, String> headers
|
||||
) {
|
||||
ProjectId projectId = projectResolver.getProjectId();
|
||||
return new GeoIpDownloader(
|
||||
client,
|
||||
httpClient,
|
||||
|
@ -191,10 +202,12 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
headers,
|
||||
() -> pollInterval,
|
||||
() -> eagerDownload,
|
||||
() -> atLeastOneGeoipProcessor
|
||||
() -> atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false),
|
||||
projectId
|
||||
);
|
||||
}
|
||||
|
||||
@FixForMultiProject(description = "Make sure removed project tasks are cancelled: https://elasticco.atlassian.net/browse/ES-12054")
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
|
@ -208,52 +221,66 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
return;
|
||||
}
|
||||
|
||||
if (taskIsBootstrapped.getAndSet(true) == false) {
|
||||
this.atLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state());
|
||||
if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
|
||||
startTask(() -> taskIsBootstrapped.set(false));
|
||||
} else {
|
||||
stopTask(() -> taskIsBootstrapped.set(false));
|
||||
}
|
||||
}
|
||||
|
||||
if (event.metadataChanged() == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean hasIndicesChanges = event.previousState()
|
||||
.metadata()
|
||||
.getProject()
|
||||
.indices()
|
||||
.equals(event.state().metadata().getProject().indices()) == false;
|
||||
boolean hasIngestPipelineChanges = event.metadataChanged() && event.changedCustomProjectMetadataSet().contains(IngestMetadata.TYPE);
|
||||
for (var projectMetadata : event.state().metadata().projects().values()) {
|
||||
ProjectId projectId = projectMetadata.id();
|
||||
|
||||
if (hasIngestPipelineChanges || hasIndicesChanges) {
|
||||
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state());
|
||||
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
|
||||
atLeastOneGeoipProcessor = true;
|
||||
logger.trace("Scheduling runDownloader because a geoip processor has been added");
|
||||
GeoIpDownloader currentDownloader = getCurrentTask();
|
||||
if (currentDownloader != null) {
|
||||
currentDownloader.requestReschedule();
|
||||
// bootstrap task once iff it is not already bootstrapped
|
||||
AtomicBoolean taskIsBootstrapped = taskIsBootstrappedByProject.computeIfAbsent(projectId, k -> new AtomicBoolean(false));
|
||||
if (taskIsBootstrapped.getAndSet(true) == false) {
|
||||
atLeastOneGeoipProcessorByProject.computeIfAbsent(projectId, k -> hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
|
||||
logger.debug("Bootstrapping geoip downloader task for project [{}]", projectId);
|
||||
startTask(projectId, () -> taskIsBootstrapped.set(false));
|
||||
} else {
|
||||
logger.debug("Stopping geoip downloader task for project [{}]", projectId);
|
||||
stopTask(projectId, () -> taskIsBootstrapped.set(false));
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasIngestPipelineChanges = event.customMetadataChanged(projectId, IngestMetadata.TYPE);
|
||||
boolean hasIndicesChanges = false;
|
||||
boolean projectExisted = event.previousState().metadata().hasProject(projectId);
|
||||
if (projectExisted) {
|
||||
hasIndicesChanges = event.previousState()
|
||||
.metadata()
|
||||
.getProject(projectId)
|
||||
.indices()
|
||||
.equals(projectMetadata.indices()) == false;
|
||||
}
|
||||
|
||||
if (hasIngestPipelineChanges || hasIndicesChanges) {
|
||||
boolean atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false);
|
||||
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(projectMetadata);
|
||||
// update if necessary
|
||||
if (newAtLeastOneGeoipProcessor != atLeastOneGeoipProcessor) {
|
||||
atLeastOneGeoipProcessorByProject.put(projectId, newAtLeastOneGeoipProcessor);
|
||||
}
|
||||
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
|
||||
logger.trace("Scheduling runDownloader for project [{}] because a geoip processor has been added", projectId);
|
||||
GeoIpDownloader currentDownloader = getTask(projectId);
|
||||
if (currentDownloader != null) {
|
||||
currentDownloader.requestReschedule();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
atLeastOneGeoipProcessor = newAtLeastOneGeoipProcessor;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
|
||||
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
|
||||
static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
|
||||
if (pipelinesWithGeoIpProcessor(projectMetadata, true).isEmpty() == false) {
|
||||
return true;
|
||||
}
|
||||
|
||||
final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
|
||||
final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(projectMetadata, false);
|
||||
if (checkReferencedPipelines.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return clusterState.getMetadata().getProject().indices().values().stream().anyMatch(indexMetadata -> {
|
||||
return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> {
|
||||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
|
||||
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
|
||||
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
|
||||
|
@ -262,14 +289,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
|
||||
/**
|
||||
* Retrieve the set of pipeline ids that have at least one geoip processor.
|
||||
* @param clusterState Cluster state.
|
||||
* @param projectMetadata project metadata
|
||||
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
|
||||
* matching the param.
|
||||
* @return A set of pipeline ids matching criteria.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
|
||||
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState.metadata().getProject());
|
||||
private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) {
|
||||
List<PipelineConfiguration> configurations = IngestService.getPipelines(projectMetadata);
|
||||
Set<String> ids = new HashSet<>();
|
||||
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
|
||||
for (PipelineConfiguration configuration : configurations) {
|
||||
|
@ -366,9 +393,11 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
|
||||
}
|
||||
|
||||
private void startTask(Runnable onFailure) {
|
||||
persistentTasksService.sendStartRequest(
|
||||
GEOIP_DOWNLOADER,
|
||||
// starts GeoIP downloader task for a single project
|
||||
private void startTask(ProjectId projectId, Runnable onFailure) {
|
||||
persistentTasksService.sendProjectStartRequest(
|
||||
projectId,
|
||||
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
|
||||
GEOIP_DOWNLOADER,
|
||||
new GeoIpTaskParams(),
|
||||
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
|
||||
|
@ -382,7 +411,8 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
);
|
||||
}
|
||||
|
||||
private void stopTask(Runnable onFailure) {
|
||||
// stops GeoIP downloader task for a single project
|
||||
private void stopTask(ProjectId projectId, Runnable onFailure) {
|
||||
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
|
||||
r -> logger.debug("Stopped geoip downloader task"),
|
||||
e -> {
|
||||
|
@ -393,30 +423,44 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|||
}
|
||||
}
|
||||
);
|
||||
persistentTasksService.sendRemoveRequest(
|
||||
GEOIP_DOWNLOADER,
|
||||
persistentTasksService.sendProjectRemoveRequest(
|
||||
projectId,
|
||||
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
|
||||
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
|
||||
ActionListener.runAfter(listener, () -> {
|
||||
IndexAbstraction databasesAbstraction = clusterService.state()
|
||||
.metadata()
|
||||
.getDefaultProject()
|
||||
.getProject(projectId)
|
||||
.getIndicesLookup()
|
||||
.get(DATABASES_INDEX);
|
||||
if (databasesAbstraction != null) {
|
||||
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
|
||||
Index databasesIndex = databasesAbstraction.getWriteIndex();
|
||||
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> {
|
||||
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
|
||||
if (t instanceof ResourceNotFoundException == false) {
|
||||
logger.warn("failed to remove " + databasesIndex, e);
|
||||
}
|
||||
}));
|
||||
client.projectClient(projectId)
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareDelete(databasesIndex.getName())
|
||||
.execute(ActionListener.wrap(rr -> {
|
||||
// remove task reference in the map so it can be garbage collected
|
||||
tasks.remove(projectId);
|
||||
taskIsBootstrappedByProject.remove(projectId);
|
||||
atLeastOneGeoipProcessorByProject.remove(projectId);
|
||||
}, e -> {
|
||||
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
|
||||
if (t instanceof ResourceNotFoundException == false) {
|
||||
logger.warn("failed to remove " + databasesIndex, e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
public GeoIpDownloader getCurrentTask() {
|
||||
return currentTask.get();
|
||||
public GeoIpDownloader getTask(ProjectId projectId) {
|
||||
return tasks.get(projectId);
|
||||
}
|
||||
|
||||
public static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) {
|
||||
return supportsMultipleProjects ? projectId + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.FailedNodeException;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.nodes.TransportNodesAction;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.project.ProjectResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.geoip.DatabaseNodeService;
|
||||
|
@ -34,6 +35,7 @@ public class GeoIpStatsTransportAction extends TransportNodesAction<Request, Res
|
|||
|
||||
private final DatabaseNodeService registry;
|
||||
private final GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;
|
||||
private final ProjectResolver projectResolver;
|
||||
|
||||
@Inject
|
||||
public GeoIpStatsTransportAction(
|
||||
|
@ -42,7 +44,8 @@ public class GeoIpStatsTransportAction extends TransportNodesAction<Request, Res
|
|||
ThreadPool threadPool,
|
||||
ActionFilters actionFilters,
|
||||
DatabaseNodeService registry,
|
||||
GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor
|
||||
GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor,
|
||||
ProjectResolver projectResolver
|
||||
) {
|
||||
super(
|
||||
GeoIpStatsAction.INSTANCE.name(),
|
||||
|
@ -54,6 +57,7 @@ public class GeoIpStatsTransportAction extends TransportNodesAction<Request, Res
|
|||
);
|
||||
this.registry = registry;
|
||||
this.geoIpDownloaderTaskExecutor = geoIpDownloaderTaskExecutor;
|
||||
this.projectResolver = projectResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,7 +77,7 @@ public class GeoIpStatsTransportAction extends TransportNodesAction<Request, Res
|
|||
|
||||
@Override
|
||||
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
|
||||
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getCurrentTask();
|
||||
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getTask(projectResolver.getProjectId());
|
||||
GeoIpDownloaderStats downloaderStats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus();
|
||||
CacheStats cacheStats = registry.getCacheStats();
|
||||
return new NodeResponse(
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.AliasMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -365,10 +367,22 @@ public class DatabaseNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) {
|
||||
return createClusterState(tasksCustomMetadata, false);
|
||||
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, false);
|
||||
}
|
||||
|
||||
static ClusterState createClusterState(ProjectId projectId, PersistentTasksCustomMetadata tasksCustomMetadata) {
|
||||
return createClusterState(projectId, tasksCustomMetadata, false);
|
||||
}
|
||||
|
||||
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata, boolean noStartedShards) {
|
||||
return createClusterState(Metadata.DEFAULT_PROJECT_ID, tasksCustomMetadata, noStartedShards);
|
||||
}
|
||||
|
||||
static ClusterState createClusterState(
|
||||
ProjectId projectId,
|
||||
PersistentTasksCustomMetadata tasksCustomMetadata,
|
||||
boolean noStartedShards
|
||||
) {
|
||||
boolean aliasGeoipDatabase = randomBoolean();
|
||||
String indexName = aliasGeoipDatabase
|
||||
? GeoIpDownloader.DATABASES_INDEX + "-" + randomAlphaOfLength(5)
|
||||
|
@ -392,14 +406,16 @@ public class DatabaseNodeServiceTests extends ESTestCase {
|
|||
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||
}
|
||||
return ClusterState.builder(new ClusterName("name"))
|
||||
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).put(idxMeta))
|
||||
.putProjectMetadata(ProjectMetadata.builder(projectId).put(idxMeta).putCustom(TYPE, tasksCustomMetadata))
|
||||
.nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("_id1")).localNodeId("_id1"))
|
||||
.routingTable(
|
||||
.putRoutingTable(
|
||||
projectId,
|
||||
RoutingTable.builder()
|
||||
.add(
|
||||
IndexRoutingTable.builder(index)
|
||||
.addIndexShard(IndexShardRoutingTable.builder(new ShardId(index, 0)).addShard(shardRouting))
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -9,8 +9,6 @@
|
|||
|
||||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||
|
@ -40,16 +38,16 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
|
|||
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipelineConfigJson), XContentType.JSON))
|
||||
);
|
||||
// The pipeline is not used in any index, expected to return false.
|
||||
var clusterState = clusterStateWithIndex(b -> {}, ingestMetadata);
|
||||
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata);
|
||||
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
|
||||
// The pipeline is set as default pipeline in an index, expected to return true.
|
||||
clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1"), ingestMetadata);
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1"), ingestMetadata);
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
|
||||
// The pipeline is set as final pipeline in an index, expected to return true.
|
||||
clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1"), ingestMetadata);
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1"), ingestMetadata);
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -64,10 +62,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
|
|||
var ingestMetadata = new IngestMetadata(
|
||||
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
|
||||
);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build())
|
||||
.build();
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build();
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
}
|
||||
}
|
||||
{
|
||||
|
@ -76,10 +72,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
|
|||
var ingestMetadata = new IngestMetadata(
|
||||
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
|
||||
);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build())
|
||||
.build();
|
||||
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build();
|
||||
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
}
|
||||
}
|
||||
{
|
||||
|
@ -97,10 +91,8 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
|
|||
configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
|
||||
}
|
||||
var ingestMetadata = new IngestMetadata(configs);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build())
|
||||
.build();
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
|
||||
ProjectMetadata projectMetadata = ProjectMetadata.builder(projectId).putCustom(IngestMetadata.TYPE, ingestMetadata).build();
|
||||
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,14 +269,13 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private ClusterState clusterStateWithIndex(Consumer<Settings.Builder> consumer, IngestMetadata ingestMetadata) {
|
||||
private ProjectMetadata projectMetadataWithIndex(Consumer<Settings.Builder> consumer, IngestMetadata ingestMetadata) {
|
||||
var builder = indexSettings(IndexVersion.current(), 1, 1);
|
||||
consumer.accept(builder);
|
||||
var indexMetadata = new IndexMetadata.Builder("index").settings(builder.build()).build();
|
||||
var project = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID)
|
||||
return ProjectMetadata.builder(randomProjectIdOrDefault())
|
||||
.putCustom(IngestMetadata.TYPE, ingestMetadata)
|
||||
.put(indexMetadata, false)
|
||||
.build();
|
||||
return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -85,9 +87,11 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
private ThreadPool threadPool;
|
||||
private MockClient client;
|
||||
private GeoIpDownloader geoIpDownloader;
|
||||
private ProjectId projectId;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
projectId = randomProjectIdOrDefault();
|
||||
httpClient = mock(HttpClient.class);
|
||||
when(httpClient.getBytes(anyString())).thenReturn("[]".getBytes(StandardCharsets.UTF_8));
|
||||
clusterService = mock(ClusterService.class);
|
||||
|
@ -107,9 +111,9 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
)
|
||||
)
|
||||
);
|
||||
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
|
||||
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
client = new MockClient(threadPool);
|
||||
client = new MockClient(threadPool, projectId);
|
||||
geoIpDownloader = new GeoIpDownloader(
|
||||
client,
|
||||
httpClient,
|
||||
|
@ -124,7 +128,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
{
|
||||
GeoIpTaskParams geoIpTaskParams = mock(GeoIpTaskParams.class);
|
||||
|
@ -296,7 +301,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) {
|
||||
|
@ -347,7 +353,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) {
|
||||
|
@ -400,7 +407,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
protected void updateTimestamp(String name, GeoIpTaskState.Metadata newMetadata) {
|
||||
|
@ -450,7 +458,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
void updateDatabases() throws IOException {
|
||||
|
@ -495,10 +504,15 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
public void updatePersistentTaskState(PersistentTaskState state, ActionListener<PersistentTask<?>> listener) {
|
||||
public void updateProjectPersistentTaskState(
|
||||
ProjectId projectId,
|
||||
PersistentTaskState state,
|
||||
ActionListener<PersistentTask<?>> listener
|
||||
) {
|
||||
assertSame(GeoIpTaskState.EMPTY, state);
|
||||
PersistentTask<?> task = mock(PersistentTask.class);
|
||||
when(task.getState()).thenReturn(GeoIpTaskState.EMPTY);
|
||||
|
@ -525,10 +539,15 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> true
|
||||
() -> true,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
public void updatePersistentTaskState(PersistentTaskState state, ActionListener<PersistentTask<?>> listener) {
|
||||
public void updateProjectPersistentTaskState(
|
||||
ProjectId projectId,
|
||||
PersistentTaskState state,
|
||||
ActionListener<PersistentTask<?>> listener
|
||||
) {
|
||||
assertSame(GeoIpTaskState.EMPTY, state);
|
||||
PersistentTask<?> task = mock(PersistentTask.class);
|
||||
when(task.getState()).thenReturn(GeoIpTaskState.EMPTY);
|
||||
|
@ -566,7 +585,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
Map.of(),
|
||||
() -> GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING.getDefault(Settings.EMPTY),
|
||||
() -> GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING.getDefault(Settings.EMPTY),
|
||||
atLeastOneGeoipProcessor::get
|
||||
atLeastOneGeoipProcessor::get,
|
||||
projectId
|
||||
) {
|
||||
@Override
|
||||
void processDatabase(Map<String, Object> databaseInfo) {
|
||||
|
@ -584,10 +604,15 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
/*
|
||||
* Here we make sure that we bail out before making an httpClient request if there is write block on the .geoip_databases index
|
||||
*/
|
||||
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
|
||||
var geoIpIndex = state.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex().getName();
|
||||
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
|
||||
var geoIpIndex = state.getMetadata()
|
||||
.getProject(projectId)
|
||||
.getIndicesLookup()
|
||||
.get(GeoIpDownloader.DATABASES_INDEX)
|
||||
.getWriteIndex()
|
||||
.getName();
|
||||
state = ClusterState.builder(state)
|
||||
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
|
||||
.blocks(new ClusterBlocks.Builder().addIndexBlock(projectId, geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
|
||||
.build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
geoIpDownloader.updateDatabases();
|
||||
|
@ -599,7 +624,7 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
* Here we make sure that we bail out before making an httpClient request if there are unallocated shards on the .geoip_databases
|
||||
* index
|
||||
*/
|
||||
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true);
|
||||
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()), true);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
geoIpDownloader.updateDatabases();
|
||||
verifyNoInteractions(httpClient);
|
||||
|
@ -610,7 +635,7 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
* This test puts some expired databases and some non-expired ones into the GeoIpTaskState, and then calls runDownloader(), making
|
||||
* sure that the expired databases have been deleted.
|
||||
*/
|
||||
AtomicInteger updatePersistentTaskStateCount = new AtomicInteger(0);
|
||||
AtomicInteger updateProjectPersistentTaskStateCount = new AtomicInteger(0);
|
||||
AtomicInteger deleteCount = new AtomicInteger(0);
|
||||
int expiredDatabasesCount = randomIntBetween(1, 100);
|
||||
int unexpiredDatabasesCount = randomIntBetween(0, 100);
|
||||
|
@ -634,7 +659,7 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
request.getAllocationId(),
|
||||
assignment
|
||||
);
|
||||
updatePersistentTaskStateCount.incrementAndGet();
|
||||
updateProjectPersistentTaskStateCount.incrementAndGet();
|
||||
taskResponseListener.onResponse(new PersistentTaskResponse(new PersistentTask<>(persistentTask, request.getState())));
|
||||
}
|
||||
);
|
||||
|
@ -657,14 +682,14 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
assertThat(deleteCount.get(), equalTo(expiredDatabasesCount));
|
||||
assertThat(updatePersistentTaskStateCount.get(), equalTo(expiredDatabasesCount));
|
||||
assertThat(updateProjectPersistentTaskStateCount.get(), equalTo(expiredDatabasesCount));
|
||||
geoIpDownloader.runDownloader();
|
||||
/*
|
||||
* The following two lines assert current behavior that might not be desirable -- we continue to delete expired databases every
|
||||
* time that runDownloader runs. This seems unnecessary.
|
||||
*/
|
||||
assertThat(deleteCount.get(), equalTo(expiredDatabasesCount * 2));
|
||||
assertThat(updatePersistentTaskStateCount.get(), equalTo(expiredDatabasesCount * 2));
|
||||
assertThat(updateProjectPersistentTaskStateCount.get(), equalTo(expiredDatabasesCount * 2));
|
||||
}
|
||||
|
||||
private GeoIpTaskState.Metadata newGeoIpTaskStateMetadata(boolean expired) {
|
||||
|
@ -681,8 +706,8 @@ public class GeoIpDownloaderTests extends ESTestCase {
|
|||
|
||||
private final Map<ActionType<?>, BiConsumer<? extends ActionRequest, ? extends ActionListener<?>>> handlers = new HashMap<>();
|
||||
|
||||
private MockClient(ThreadPool threadPool) {
|
||||
super(threadPool);
|
||||
private MockClient(ThreadPool threadPool, ProjectId projectId) {
|
||||
super(threadPool, TestProjectResolvers.singleProject(projectId));
|
||||
}
|
||||
|
||||
public <Response extends ActionResponse, Request extends ActionRequest> void addHandler(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue