diff --git a/docs/changelog/96624.yaml b/docs/changelog/96624.yaml new file mode 100644 index 000000000000..a9e8b5e9ab6b --- /dev/null +++ b/docs/changelog/96624.yaml @@ -0,0 +1,5 @@ +pr: 96624 +summary: Enable analytics geoip in behavioral analytics +area: "Application" +type: feature +issues: [] diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index fac1c943b8a7..ab2e3f7285ee 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -14,7 +14,7 @@ CC BY-SA 4.0 license. It automatically downloads these databases if your nodes c * `ingest.geoip.downloader.eager.download` is set to true * your cluster has at least one pipeline with a `geoip` processor - + {es} automatically downloads updates for these databases from the Elastic GeoIP endpoint: https://geoip.elastic.co/v1/database. To get download statistics for these updates, use the <>. @@ -33,13 +33,14 @@ field instead. .`geoip` options [options="header"] |====== -| Name | Required | Default | Description -| `field` | yes | - | The field to get the ip address from for the geographical lookup. -| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database. -| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory. -| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup. -| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document -| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array +| Name | Required | Default | Description +| `field` | yes | - | The field to get the ip address from for the geographical lookup. +| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database. +| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory. +| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup. +| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document +| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array +| `download_database_on_pipeline_creation` | no | `true` | If `true` (and if `ingest.geoip.downloader.eager.download` is `false`), the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as the `default_pipeline` or `final_pipeline` in an index. |====== *Depends on what is available in `database_file`: diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index 1014ffabf042..6b6188cceec2 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -18,11 +18,13 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -150,6 +152,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221") public void testInvalidTimestamp() throws Exception { assumeTrue("only test with fixture to have stable results", getEndpoint() != null); + setupDatabasesInConfigDirectory(); putGeoIpPipeline(); updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)); assertBusy(() -> { @@ -283,6 +286,42 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { }, 2, TimeUnit.MINUTES); } + public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception { + assumeTrue("only test with fixture to have stable results", getEndpoint() != null); + String pipelineId = randomIdentifier(); + + // Removing databases from tmp dir. So we can test the downloader. + deleteDatabasesInConfigDirectory(); + + // Enabling the downloader. + putGeoIpPipeline("_id", false); + updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)); + assertBusy(() -> assertNotNull(getTask())); + + // Creating a pipeline containing a geo processor with lazy download enable. + // Download should not be triggered and task state should stay null. + putGeoIpPipeline(pipelineId, false); + assertNull(getTask().getState()); + + // Creating an index which does not reference the pipeline should not trigger the database download. + String indexIdentifier = randomIdentifier(); + assertAcked(client().admin().indices().prepareCreate(indexIdentifier).get()); + assertNull(getTask().getState()); + + // Set the pipeline as default_pipeline or final_pipeline for the index. + // This should trigger the database download. + Setting pipelineSetting = randomFrom(IndexSettings.FINAL_PIPELINE, IndexSettings.DEFAULT_PIPELINE); + Settings indexSettings = Settings.builder().put(pipelineSetting.getKey(), pipelineId).build(); + assertAcked(client().admin().indices().prepareUpdateSettings(indexIdentifier).setSettings(indexSettings).get()); + assertBusy(() -> { + GeoIpTaskState state = getGeoIpTaskState(); + assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); + }, 2, TimeUnit.MINUTES); + + // Remove the created index. + assertAcked(client().admin().indices().prepareDelete(indexIdentifier).get()); + } + @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972") public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { assumeTrue("only test with fixture to have stable results", getEndpoint() != null); @@ -450,6 +489,17 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { * @throws IOException */ private void putGeoIpPipeline(String pipelineId) throws IOException { + putGeoIpPipeline(pipelineId, true); + } + + /** + * This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is + * enabled). + * @param pipelineId The name of the new pipeline with a geoip processor + * @param downloadDatabaseOnPipelineCreation Indicates whether the pipeline creation should trigger database download or not. + * @throws IOException + */ + private void putGeoIpPipeline(String pipelineId, boolean downloadDatabaseOnPipelineCreation) throws IOException { BytesReference bytes; try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); @@ -463,6 +513,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { builder.field("field", "ip"); builder.field("target_field", "ip-city"); builder.field("database_file", "GeoLite2-City.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); + } } builder.endObject(); } @@ -474,6 +527,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { builder.field("field", "ip"); builder.field("target_field", "ip-country"); builder.field("database_file", "GeoLite2-Country.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); + } } builder.endObject(); } @@ -485,6 +541,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT { builder.field("field", "ip"); builder.field("target_field", "ip-asn"); builder.field("database_file", "GeoLite2-ASN.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); + } } builder.endObject(); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 7457738b7530..8534749cace6 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; @@ -43,11 +44,14 @@ import org.elasticsearch.transport.RemoteTransportException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; +import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation; /** * Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node. @@ -207,7 +211,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor pipelineDefinitions = IngestService.getPipelines(clusterState); - return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> { - Map pipelineMap = pipelineDefinition.getConfigAsMap(); - return hasAtLeastOneGeoipProcessor((List>) pipelineMap.get(Pipeline.PROCESSORS_KEY)); + if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) { + return true; + } + + Set checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream() + .map(PipelineConfiguration::getId) + .collect(Collectors.toSet()); + + if (checkReferencedPipelines.isEmpty()) { + return false; + } + + return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> { + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings()); + String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings()); + return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline); }); } - private static boolean hasAtLeastOneGeoipProcessor(List> processors) { - return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor); - } - - private static boolean hasAtLeastOneGeoipProcessor(Map processor) { - return processor != null - && (processor.containsKey(GeoIpProcessor.TYPE) - || isProcessorWithOnFailureGeoIpProcessor(processor) - || isForeachProcessorWithGeoipProcessor(processor)); - } - + /** + * Retrieve list of pipelines that have at least one geoip processor. + * @param clusterState Cluster state. + * @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation + * matching the param. + * @return A list of {@link PipelineConfiguration} matching criteria. + */ @SuppressWarnings("unchecked") - private static boolean isProcessorWithOnFailureGeoIpProcessor(Map processor) { + private static List pipelineConfigurationsWithGeoIpProcessor( + ClusterState clusterState, + boolean downloadDatabaseOnPipelineCreation + ) { + List pipelineDefinitions = IngestService.getPipelines(clusterState); + return pipelineDefinitions.stream().filter(pipelineConfig -> { + List> processors = (List>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY); + return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation); + }).collect(Collectors.toList()); + } + + /** + * Check if a list of processor contains at least a geoip processor. + * @param processors List of processors. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @return true if a geoip processor is found in the processor list. + */ + private static boolean hasAtLeastOneGeoipProcessor(List> processors, boolean downloadDatabaseOnPipelineCreation) { + return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation)); + } + + /** + * Check if a processor config is a geoip processor or contains at least a geoip processor. + * @param processor Processor config. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @return true if a geoip processor is found in the processor list. + */ + @SuppressWarnings("unchecked") + private static boolean hasAtLeastOneGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { + if (processor == null) { + return false; + } + + if (processor.containsKey(GeoIpProcessor.TYPE)) { + Map processorConfig = (Map) processor.get(GeoIpProcessor.TYPE); + return downloadDatabaseOnPipelineCreation(processorConfig) == downloadDatabaseOnPipelineCreation; + } + + return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation) + || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation); + } + + /** + * Check if a processor config is has an on_failure clause containing at least a geoip processor. + * @param processor Processor config. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @return true if a geoip processor is found in the processor list. + */ + @SuppressWarnings("unchecked") + private static boolean isProcessorWithOnFailureGeoIpProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation + ) { return processor != null && processor.values() .stream() .anyMatch( value -> value instanceof Map - && hasAtLeastOneGeoipProcessor(((Map>>) value).get("on_failure")) + && hasAtLeastOneGeoipProcessor( + ((Map>>) value).get("on_failure"), + downloadDatabaseOnPipelineCreation + ) ); } + /** + * Check if a processor is a foreach processor containing at least a geoip processor. + * @param processor Processor config. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @return true if a geoip processor is found in the processor list. + */ @SuppressWarnings("unchecked") - private static boolean isForeachProcessorWithGeoipProcessor(Map processor) { + private static boolean isForeachProcessorWithGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { return processor.containsKey("foreach") - && hasAtLeastOneGeoipProcessor(((Map>) processor.get("foreach")).get("processor")); + && hasAtLeastOneGeoipProcessor( + ((Map>) processor.get("foreach")).get("processor"), + downloadDatabaseOnPipelineCreation + ); } private void startTask(Runnable onFailure) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index d57124670db0..5acdf38f3b89 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -427,6 +427,10 @@ public final class GeoIpProcessor extends AbstractProcessor { boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true); + // Validating the download_database_on_pipeline_creation even if the result + // is not used directly by the factory. + downloadDatabaseOnPipelineCreation(config, processorTag); + // noop, should be removed in 9.0 Object value = config.remove("fallback_to_default_databases"); if (value != null) { @@ -487,6 +491,14 @@ public final class GeoIpProcessor extends AbstractProcessor { ); } + public static boolean downloadDatabaseOnPipelineCreation(Map config) { + return downloadDatabaseOnPipelineCreation(config, null); + } + + public static boolean downloadDatabaseOnPipelineCreation(Map config, String processorTag) { + return readBooleanProperty(GeoIpProcessor.TYPE, processorTag, config, "download_database_on_pipeline_creation", true); + } + private static boolean useDatabaseUnavailableProcessor(GeoIpDatabase database, String databaseName) { // If there is no instance for a database we should fail with a config error, but // if there is no instance for a builtin database that we manage via GeoipDownloader then don't fail. diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java index 7d90130bb64b..b33b66696cfc 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -9,13 +9,20 @@ package org.elasticsearch.ingest.geoip; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,13 +31,46 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { - public void testHasAtLeastOneGeoipProcessor() { + + public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException { + ClusterState clusterState = mock(ClusterState.class); + Metadata metadata = mock(Metadata.class); + when(clusterState.getMetadata()).thenReturn(metadata); + + final IngestMetadata[] ingestMetadata = new IngestMetadata[1]; + when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]); + + final Settings[] indexSettings = new Settings[1]; + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(indexMetadata.getSettings()).thenAnswer(invocationMock -> indexSettings[0]); + when(metadata.indices()).thenReturn(Map.of("index", indexMetadata)); + + for (String pipelineConfigJson : getPipelinesWithGeoIpProcessors(false)) { + ingestMetadata[0] = new IngestMetadata( + Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipelineConfigJson), XContentType.JSON)) + ); + // The pipeline is not used in any index, expected to return false. + indexSettings[0] = Settings.EMPTY; + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + + // The pipeline is set as default pipeline in an index, expected to return true. + indexSettings[0] = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1").build(); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + + // The pipeline is set as final pipeline in an index, expected to return true. + indexSettings[0] = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1").build(); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState)); + } + + } + + public void testHasAtLeastOneGeoipProcessor() throws IOException { final IngestMetadata[] ingestMetadata = new IngestMetadata[1]; ClusterState clusterState = mock(ClusterState.class); Metadata metadata = mock(Metadata.class); when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]); when(clusterState.getMetadata()).thenReturn(metadata); - List expectHitsInputs = getPipelinesWithGeoIpProcessors(); + List expectHitsInputs = getPipelinesWithGeoIpProcessors(true); List expectMissesInputs = getPipelinesWithoutGeoIpProcessors(); { // Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor: @@ -73,15 +113,10 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { * This method returns an assorted list of pipelines that have geoip processors -- ones that ought to cause hasAtLeastOneGeoipProcessor * to return true. */ - private List getPipelinesWithGeoIpProcessors() { + private List getPipelinesWithGeoIpProcessors(boolean downloadDatabaseOnPipelineCreation) throws IOException { String simpleGeoIpProcessor = """ { - "processors":[ - { - "geoip":{ - "field":"provider" - } - } + "processors":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ ] } """; @@ -92,12 +127,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { "rename":{ "field":"provider", "target_field":"cloud.provider", - "on_failure":[ - { - "geoip":{ - "field":"error.message" - } - } + "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ ] } } @@ -110,12 +140,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { { "foreach":{ "field":"values", - "processor": - { - "geoip":{ - "field":"someField" - } - } + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ } } ] @@ -131,12 +156,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { { "foreach":{ "field":"someField", - "processor": - { - "geoip":{ - "field":"someField" - } - } + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ } } } @@ -159,12 +179,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { "rename":{ "field":"provider", "target_field":"cloud.provider", - "on_failure":[ - { - "geoip":{ - "field":"error.message" - } - } + "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ ] } } @@ -186,12 +201,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { { "foreach":{ "field":"values", - "processor": - { - "geoip":{ - "field":"someField" - } - } + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ } } ] @@ -252,4 +262,23 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase { """; return List.of(empty, noProcessors, onFailureWithForeachWithSet); } + + private String getGeoIpProcessor(boolean downloadDatabaseOnPipelineCreation) throws IOException { + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", randomIdentifier()); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); + } + } + builder.endObject(); + } + builder.endObject(); + + return Strings.toString(builder); + } + } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index d9ee6cda883e..f3e8a5028a83 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -45,6 +45,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.instanceOf; @@ -407,6 +408,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertWarnings(GeoIpProcessor.DEFAULT_DATABASES_DEPRECATION_MESSAGE); } + public void testDownloadDatabaseOnPipelineCreation() throws IOException { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService); + Map config = new HashMap<>(); + config.put("field", randomIdentifier()); + config.put("download_database_on_pipeline_creation", randomBoolean()); + factory.create(null, null, null, config); + // Check all the config params were consumed. + assertThat(config, anEmptyMap()); + } + public void testDefaultDatabaseWithTaskPresent() throws Exception { PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder() .addTask(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpDownloader.GEOIP_DOWNLOADER, null, null) diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-final_pipeline.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-final_pipeline.json index 39655a9ea8a6..cc9d7037700e 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-final_pipeline.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-final_pipeline.json @@ -57,6 +57,14 @@ "ignore_missing": true } }, + { + "geoip": { + "field": "session.ip", + "target_field": "session.location", + "ignore_missing": true, + "download_database_on_pipeline_creation": false + } + }, { "remove": { "field": "session.ip", diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-mappings.json index d4ef8d24b693..c7193472bdcd 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-mappings.json @@ -259,6 +259,10 @@ } } } + }, + "tags": { + "ignore_above": 1024, + "type": "keyword" } } } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java index b7ed31b82e82..eb0e97e1ecce 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java @@ -8,7 +8,6 @@ package org.elasticsearch.upgrades; import org.apache.http.util.EntityUtils; -import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.hamcrest.Matchers; @@ -17,10 +16,7 @@ import java.nio.charset.StandardCharsets; public class GeoIpUpgradeIT extends AbstractUpgradeTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/96129") public void testGeoIpDownloader() throws Exception { - assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0)); - if (CLUSTER_TYPE == ClusterType.UPGRADED) { assertBusy(() -> { Response response = client().performRequest(new Request("GET", "_cat/tasks"));