diff --git a/build-tools-internal/src/main/resources/changelog-schema.json b/build-tools-internal/src/main/resources/changelog-schema.json index 9692af7adc5e..7229571fc8bf 100644 --- a/build-tools-internal/src/main/resources/changelog-schema.json +++ b/build-tools-internal/src/main/resources/changelog-schema.json @@ -291,6 +291,7 @@ "JVM option", "Java API", "Logging", + "Logs", "Mapping", "Packaging", "Painless", diff --git a/docs/changelog/121049.yaml b/docs/changelog/121049.yaml new file mode 100644 index 000000000000..760deb62e149 --- /dev/null +++ b/docs/changelog/121049.yaml @@ -0,0 +1,19 @@ +pr: 121049 +summary: Conditionally enable logsdb by default for data streams matching with logs-*-* + pattern. +area: Logs +type: breaking +issues: + - 106489 +breaking: + title: Conditionally enable logsdb by default + area: Logs + details: |- + Logsdb will be enabled by default for data streams matching with logs-*-* pattern. + If upgrading from 8.x to 9.x and data streams matching with log-*-* do exist, + then Logsdb will not be enabled by default. + impact: |- + Logsdb reduce storage footprint in Elasticsearch for logs, but there are side effects + to be taken into account that are described in the Logsdb docs: + https://www.elastic.co/guide/en/elasticsearch/reference/current/logs-data-stream.html#upgrade-to-logsdb-notes + notable: true diff --git a/docs/reference/data-streams/logs.asciidoc b/docs/reference/data-streams/logs.asciidoc index 7058cfe51496..797efb7bef94 100644 --- a/docs/reference/data-streams/logs.asciidoc +++ b/docs/reference/data-streams/logs.asciidoc @@ -237,3 +237,9 @@ The `logsdb` index mode uses the following settings: * **`index.mapping.ignore_above`**: `8191` * **`index.mapping.total_fields.ignore_dynamic_beyond_limit`**: `true` + +[discrete] +[[upgrade-to-logsdb-notes]] +=== Notes about upgrading to Logsdb + +TODO: add notes. diff --git a/docs/reference/rest-api/info.asciidoc b/docs/reference/rest-api/info.asciidoc index 318170ab089b..e1a7246342a3 100644 --- a/docs/reference/rest-api/info.asciidoc +++ b/docs/reference/rest-api/info.asciidoc @@ -177,7 +177,7 @@ Example response: }, "logsdb": { "available": true, - "enabled": false + "enabled": true } }, "tagline" : "You know, for X" diff --git a/docs/reference/rest-api/usage.asciidoc b/docs/reference/rest-api/usage.asciidoc index bb46c41b4bcc..1c907ec63d8f 100644 --- a/docs/reference/rest-api/usage.asciidoc +++ b/docs/reference/rest-api/usage.asciidoc @@ -514,7 +514,7 @@ GET /_xpack/usage }, "logsdb": { "available": true, - "enabled": false, + "enabled": true, "indices_count": 0, "indices_with_synthetic_source": 0, "num_docs": 0, diff --git a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/LogsIndexModeFullClusterRestartIT.java b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/LogsIndexModeFullClusterRestartIT.java index 9866d94dccc3..ebf72b26a211 100644 --- a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/LogsIndexModeFullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/LogsIndexModeFullClusterRestartIT.java @@ -120,6 +120,10 @@ public class LogsIndexModeFullClusterRestartIT extends ParameterizedFullClusterR }"""; public void testLogsIndexing() throws IOException { + assumeTrue( + "otherwise first backing index of logs-apache-production will be in logsdb mode", + getOldClusterTestVersion().before("9.0.0") + ); if (isRunningAgainstOldCluster()) { assertOK(client().performRequest(putTemplate(client(), "logs-template", STANDARD_TEMPLATE))); assertOK(client().performRequest(createDataStream("logs-apache-production"))); diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsUsageRollingUpgradeIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsUsageRollingUpgradeIT.java new file mode 100644 index 000000000000..ab9855b7398f --- /dev/null +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsUsageRollingUpgradeIT.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.Name; + +import org.elasticsearch.client.Request; + +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.upgrades.LogsdbIndexingRollingUpgradeIT.bulkIndex; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; + +public class LogsUsageRollingUpgradeIT extends AbstractRollingUpgradeTestCase { + + public LogsUsageRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) { + super(upgradedNodes); + } + + public void testUsage() throws Exception { + assumeTrue("logsdb.prior_logs_usage only gets set in 8.x", getOldClusterTestVersion().before("9.0.0")); + String dataStreamName = "logs-mysql-error"; + if (isOldCluster()) { + bulkIndex(dataStreamName, 4, 256, Instant.now()); + ensureGreen(dataStreamName); + assertBusy(() -> { + var getClusterSettingsResponse = getClusterSettings(); + Map persistentSettings = (Map) getClusterSettingsResponse.get("persistent"); + assertThat(persistentSettings, hasEntry("logsdb.prior_logs_usage", "true")); + }, 2, TimeUnit.MINUTES); + } else { + String newIndex = rolloverDataStream(dataStreamName); + bulkIndex(dataStreamName, 4, 256, Instant.now()); + Map indexResponse = (Map) getIndexSettings(newIndex, true).get(newIndex); + Map settings = (Map) indexResponse.get("settings"); + Map defaults = (Map) indexResponse.get("defaults"); + assertThat(settings, not(hasKey("index.mode"))); + assertThat(defaults, hasEntry("index.mode", "standard")); + } + } + + static Map getClusterSettings() throws IOException { + var request = new Request("GET", "/_cluster/settings"); + request.addParameter("flat_settings", "true"); + request.addParameter("include_defaults", "true"); + var response = client().performRequest(request); + assertOK(response); + return entityAsMap(response); + } + + static String rolloverDataStream(String dataStreamName) throws IOException { + var request = new Request("POST", "/" + dataStreamName + "/_rollover"); + var response = client().performRequest(request); + assertOK(response); + var responseBody = entityAsMap(response); + return (String) responseBody.get("new_index"); + } + +} diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsdbIndexingRollingUpgradeIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsdbIndexingRollingUpgradeIT.java index 9cb91438e09c..6b2a889d3c1a 100644 --- a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsdbIndexingRollingUpgradeIT.java +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/LogsdbIndexingRollingUpgradeIT.java @@ -22,13 +22,18 @@ import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.io.InputStream; import java.time.Instant; +import java.util.List; import java.util.Locale; import java.util.Map; import static org.elasticsearch.upgrades.LogsIndexModeRollingUpgradeIT.enableLogsdbByDefault; import static org.elasticsearch.upgrades.LogsIndexModeRollingUpgradeIT.getWriteBackingIndex; import static org.elasticsearch.upgrades.TsdbIT.formatInstant; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCase { @@ -122,7 +127,8 @@ public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCa assertOK(client().performRequest(putIndexTemplateRequest)); } - static void bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception { + static String bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception { + String firstIndex = null; for (int i = 0; i < numRequest; i++) { var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk"); StringBuilder requestBody = new StringBuilder(); @@ -155,7 +161,11 @@ public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCa assertOK(response); var responseBody = entityAsMap(response); assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false)); + if (firstIndex == null) { + firstIndex = (String) ((Map) ((Map) ((List) responseBody.get("items")).get(0)).get("create")).get("_index"); + } } + return firstIndex; } void search(String dataStreamName) throws Exception { diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/NoLogsUsageRollingUpgradeIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/NoLogsUsageRollingUpgradeIT.java new file mode 100644 index 000000000000..57e5655fda3b --- /dev/null +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/NoLogsUsageRollingUpgradeIT.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.Name; + +import java.time.Instant; +import java.util.Map; + +import static org.elasticsearch.upgrades.LogsUsageRollingUpgradeIT.getClusterSettings; +import static org.elasticsearch.upgrades.LogsdbIndexingRollingUpgradeIT.bulkIndex; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; + +public class NoLogsUsageRollingUpgradeIT extends AbstractRollingUpgradeTestCase { + + public NoLogsUsageRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) { + super(upgradedNodes); + } + + public void testUsage() throws Exception { + String dataStreamName = "logs-mysql-error"; + if (isOldCluster()) { + dataStreamName = dataStreamName.replace("logs-", "log-"); + bulkIndex(dataStreamName, 4, 256, Instant.now()); + ensureGreen(dataStreamName); + } else if (isUpgradedCluster()) { + String newIndex = bulkIndex(dataStreamName, 4, 256, Instant.now()); + ensureGreen(dataStreamName); + Map indexResponse = (Map) getIndexSettings(newIndex, true).get(newIndex); + Map settings = (Map) indexResponse.get("settings"); + assertThat(settings, hasEntry("index.mode", "logsdb")); + var getClusterSettingsResponse = getClusterSettings(); + Map defaults = (Map) getClusterSettingsResponse.get("defaults"); + Map persistentSettings = (Map) getClusterSettingsResponse.get("persistent"); + assertThat(persistentSettings, not(hasKey("logsdb.prior_logs_usage"))); + assertThat(defaults, hasEntry("cluster.logsdb.enabled", "true")); + } + } + +} diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeDisabledRestTestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeDisabledRestTestIT.java index 40aab696dc9c..4ae1e9961a10 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeDisabledRestTestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsIndexModeDisabledRestTestIT.java @@ -63,14 +63,14 @@ public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT { private RestClient client; - public void testLogsSettingsIndexModeDisabled() throws IOException { + public void testLogsSettingsIndexModeEnabledByDefault() throws IOException { assertOK(createDataStream(client, "logs-custom-dev")); final String indexMode = (String) getSetting( client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), IndexSettings.MODE.getKey() ); - assertThat(indexMode, Matchers.not(equalTo(IndexMode.LOGSDB.getName()))); + assertThat(indexMode, equalTo(IndexMode.LOGSDB.getName())); } public void testTogglingLogsdb() throws IOException { @@ -81,29 +81,21 @@ public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT { getDataStreamBackingIndex(client, "logs-custom-dev", 0), IndexSettings.MODE.getKey() ); - assertThat(indexModeBefore, Matchers.not(equalTo(IndexMode.LOGSDB.getName()))); - assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "true")); + assertThat(indexModeBefore, equalTo(IndexMode.LOGSDB.getName())); + assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "false")); final String indexModeAfter = (String) getSetting( client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), IndexSettings.MODE.getKey() ); - assertThat(indexModeAfter, Matchers.not(equalTo(IndexMode.LOGSDB.getName()))); + assertThat(indexModeAfter, equalTo(IndexMode.LOGSDB.getName())); assertOK(rolloverDataStream(client, "logs-custom-dev")); final String indexModeLater = (String) getSetting( client, getDataStreamBackingIndex(client, "logs-custom-dev", 1), IndexSettings.MODE.getKey() ); - assertThat(indexModeLater, equalTo(IndexMode.LOGSDB.getName())); - assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "false")); - assertOK(rolloverDataStream(client, "logs-custom-dev")); - final String indexModeFinal = (String) getSetting( - client, - getDataStreamBackingIndex(client, "logs-custom-dev", 2), - IndexSettings.MODE.getKey() - ); - assertThat(indexModeFinal, Matchers.not(equalTo(IndexMode.LOGSDB.getName()))); + assertThat(indexModeLater, Matchers.not(equalTo(IndexMode.LOGSDB.getName()))); } diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 4720ec87cb85..455e707cc0d2 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -11,10 +11,8 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -42,17 +40,21 @@ import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; -import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; -import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING; public class LogsDBPlugin extends Plugin implements ActionPlugin { private final Settings settings; private final SyntheticSourceLicenseService licenseService; + private static final Setting LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting( + "logsdb.prior_logs_usage", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); public static final Setting CLUSTER_LOGSDB_ENABLED = Setting.boolSetting( "cluster.logsdb.enabled", - false, + settings -> Boolean.toString(LOGSDB_PRIOR_LOGS_USAGE.get(settings) == false), Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -81,18 +83,6 @@ public class LogsDBPlugin extends Plugin implements ActionPlugin { logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled ); - var clusterService = services.clusterService(); - Supplier metadataSupplier = () -> clusterService.state().metadata(); - var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier); - clusterService.addLocalNodeMasterListener(historicLogsUsageService); - clusterService.addLifecycleListener(new LifecycleListener() { - - @Override - public void beforeStop() { - historicLogsUsageService.offMaster(); - } - }); - // Nothing to share here: return super.createComponents(services); } @@ -112,7 +102,7 @@ public class LogsDBPlugin extends Plugin implements ActionPlugin { @Override public List> getSettings() { - return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); + return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, LOGSDB_PRIOR_LOGS_USAGE); } @Override diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java deleted file mode 100644 index 929db16a618a..000000000000 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logsdb; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.LocalNodeMasterListener; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN; - -/** - * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this - * as persistent setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer - * run in the background. - *

- * After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no logs-*-* data streams are - * found, then the next check runs after 2 minutes. The schedule time will double if no data streams with logs-*-* pattern - * are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours). - *

- * If during a check one or more logs-*-* data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set - * as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings - * of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the - * logs-*-* pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting. - */ -final class LogsPatternUsageService implements LocalNodeMasterListener { - - private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class); - private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30); - static final Setting USAGE_CHECK_MAX_PERIOD = Setting.timeSetting( - "logsdb.usage_check.max_period", - new TimeValue(24, TimeUnit.HOURS), - Setting.Property.NodeScope - ); - static final Setting LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting( - "logsdb.prior_logs_usage", - false, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - private final Client client; - private final Settings nodeSettings; - private final ThreadPool threadPool; - private final Supplier metadataSupplier; - - // Initializing to 30s, so first time will run with a delay of 60s: - volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM; - volatile boolean isMaster; - volatile boolean hasPriorLogsUsage; - volatile Scheduler.Cancellable cancellable; - - LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier metadataSupplier) { - this.client = client; - this.nodeSettings = nodeSettings; - this.threadPool = threadPool; - this.metadataSupplier = metadataSupplier; - } - - @Override - public void onMaster() { - if (cancellable == null || cancellable.isCancelled()) { - isMaster = true; - nextWaitTime = USAGE_CHECK_MINIMUM; - scheduleNext(); - } - } - - @Override - public void offMaster() { - isMaster = false; - if (cancellable != null && cancellable.isCancelled() == false) { - cancellable.cancel(); - cancellable = null; - } - } - - void scheduleNext() { - TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings); - nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis())); - scheduleNext(nextWaitTime); - } - - void scheduleNext(TimeValue waitTime) { - if (isMaster && hasPriorLogsUsage == false) { - try { - cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic()); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - LOGGER.debug("Failed to check; Shutting down", e); - } else { - throw e; - } - } - } else { - LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage); - } - } - - void check() { - LOGGER.debug("Starting logs-*-* usage check"); - if (isMaster) { - var metadata = metadataSupplier.get(); - if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) { - LOGGER.debug("Using persistent logs-*-* usage check"); - hasPriorLogsUsage = true; - return; - } - - if (hasLogsUsage(metadata)) { - updateSetting(); - } else { - LOGGER.debug("No usage found; Skipping check"); - scheduleNext(); - } - } else { - LOGGER.debug("No longer master; Skipping check"); - } - } - - static boolean hasLogsUsage(Metadata metadata) { - for (var dataStream : metadata.dataStreams().values()) { - if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) { - return true; - } - } - return false; - } - - void updateSetting() { - var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build(); - var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE); - request.persistentSettings(settingsToUpdate); - client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> { - if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) { - hasPriorLogsUsage = true; - cancellable = null; - } else { - LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]"); - scheduleNext(TimeValue.ONE_MINUTE); - } - }, e -> { - LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e); - scheduleNext(TimeValue.ONE_MINUTE); - })); - } -} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java deleted file mode 100644 index fcd1d311df80..000000000000 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logsdb; - -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; -import org.elasticsearch.action.datastreams.DeleteDataStreamAction; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; -import org.elasticsearch.cluster.metadata.Template; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.DataStreamsPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolStats; - -import java.util.Collection; -import java.util.List; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.nullValue; - -public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase { - - @Override - protected Collection> getPlugins() { - return List.of(LogsDBPlugin.class, DataStreamsPlugin.class); - } - - @Override - protected Settings nodeSettings() { - return Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); - } - - @Override - protected boolean resetNodeAfterTest() { - return true; - } - - public void testLogsPatternUsage() throws Exception { - var template = ComposableIndexTemplate.builder() - .indexPatterns(List.of("logs-*-*")) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .build(); - assertAcked( - client().execute( - TransportPutComposableIndexTemplateAction.TYPE, - new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) - ).actionGet() - ); - - IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value"); - var indexResponse = client().index(indexRequest).actionGet(); - assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); - - { - var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) - .actionGet(); - assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); - } - - indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); - indexResponse = client().index(indexRequest).actionGet(); - assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); - - assertBusy(() -> { - var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) - .actionGet(); - assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true")); - }); - } - - public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception { - var template = ComposableIndexTemplate.builder() - .indexPatterns(List.of("log-*-*")) - .template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null)) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .build(); - assertAcked( - client().execute( - TransportPutComposableIndexTemplateAction.TYPE, - new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) - ).actionGet() - ); - - var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); - var indexResponse = client().index(indexRequest).actionGet(); - assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); - - ensureGreen("log-myapp-prod"); - // Check that LogsPatternUsageService checked three times by checking generic threadpool stats. - // (the LogsPatternUsageService's check is scheduled via the generic threadpool) - var threadPool = getInstanceFromNode(ThreadPool.class); - var beforeStat = getGenericThreadpoolStat(threadPool); - assertBusy(() -> { - var stat = getGenericThreadpoolStat(threadPool); - assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3)); - }); - var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) - .actionGet(); - assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); - } - - private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) { - var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList(); - assertThat(result.size(), equalTo(1)); - return result.get(0); - } - - @Override - public void tearDown() throws Exception { - // Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave - // persistent cluster settings around. - - var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*"); - deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); - assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest)); - - var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build(); - client().admin() - .cluster() - .updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings)) - .actionGet(); - - super.tearDown(); - } -} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java deleted file mode 100644 index 2cd2f9216aba..000000000000 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logsdb; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.DataStreamTestHelper; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.List; -import java.util.function.Supplier; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; - -public class LogsPatternUsageServiceTests extends ESTestCase { - - public void testOnMaster() throws Exception { - var nodeSettings = Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); - var client = mock(Client.class); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock - .getArguments()[2]; - var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); - return null; - }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - - try (var threadPool = new TestThreadPool(getTestName())) { - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); - Supplier metadataSupplier = clusterState::metadata; - - var service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - // pre-check: - assertFalse(service.isMaster); - assertFalse(service.hasPriorLogsUsage); - assertNull(service.cancellable); - // Trigger service: - service.onMaster(); - assertBusy(() -> { - assertTrue(service.isMaster); - assertTrue(service.hasPriorLogsUsage); - assertNull(service.cancellable); - }); - } - } - - public void testCheckHasUsage() { - var nodeSettings = Settings.EMPTY; - var client = mock(Client.class); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock - .getArguments()[2]; - var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); - return null; - }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - - var threadPool = mock(ThreadPool.class); - var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); - Supplier metadataSupplier = clusterState::metadata; - - LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.onMaster(); - assertFalse(service.hasPriorLogsUsage); - assertNotNull(service.cancellable); - assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); - service.check(); - assertTrue(service.hasPriorLogsUsage); - assertNull(service.cancellable); - assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); - - verify(threadPool, times(1)).schedule(any(), any(), any()); - verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - } - - public void testCheckHasUsageNoMatch() { - var nodeSettings = Settings.EMPTY; - var client = mock(Client.class); - - var threadPool = mock(ThreadPool.class); - var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); - Supplier metadataSupplier = clusterState::metadata; - - LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.onMaster(); - assertFalse(service.hasPriorLogsUsage); - assertNotNull(service.cancellable); - assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); - service.check(); - assertFalse(service.hasPriorLogsUsage); - assertNotNull(service.cancellable); - assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(2)); - - verify(threadPool, times(2)).schedule(any(), any(), any()); - verifyNoInteractions(client); - } - - public void testCheckPriorLogsUsageAlreadySet() { - var nodeSettings = Settings.EMPTY; - var client = mock(Client.class); - - var threadPool = mock(ThreadPool.class); - var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); - clusterState = ClusterState.builder(clusterState) - .metadata( - Metadata.builder(clusterState.getMetadata()) - .persistentSettings(Settings.builder().put("logsdb.prior_logs_usage", true).build()) - .build() - ) - .build(); - Supplier metadataSupplier = clusterState::metadata; - - LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.isMaster = true; - assertFalse(service.hasPriorLogsUsage); - assertNull(service.cancellable); - service.check(); - assertTrue(service.hasPriorLogsUsage); - assertNull(service.cancellable); - - verifyNoInteractions(client, threadPool); - } - - public void testCheckHasUsageUnexpectedResponse() { - var nodeSettings = Settings.EMPTY; - var client = mock(Client.class); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock - .getArguments()[2]; - ClusterUpdateSettingsResponse response; - if (randomBoolean()) { - var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - response = new ClusterUpdateSettingsResponse(false, Settings.EMPTY, persistentSettings); - } else { - response = new ClusterUpdateSettingsResponse(true, Settings.EMPTY, Settings.EMPTY); - } - listener.onResponse(response); - return null; - }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - - var threadPool = mock(ThreadPool.class); - var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); - Supplier metadataSupplier = clusterState::metadata; - - LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.isMaster = true; - assertFalse(service.hasPriorLogsUsage); - assertNull(service.cancellable); - service.check(); - assertFalse(service.hasPriorLogsUsage); - assertNotNull(service.cancellable); - - verify(threadPool, times(1)).schedule(any(), any(), any()); - verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - } - - public void testHasLogsUsage() { - var metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of()).getMetadata(); - assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1", 1)), List.of()).getMetadata(); - assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1", 1)), List.of()).getMetadata(); - assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()).getMetadata(); - assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()).getMetadata(); - assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams( - List.of(new Tuple<>("log-app1-prod", 1), new Tuple<>("logs-app2-prod", 1)), - List.of() - ).getMetadata(); - assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); - metadata = DataStreamTestHelper.getClusterStateWithDataStreams( - List.of(new Tuple<>("log-app1", 1), new Tuple<>("logs-app2-prod", 1)), - List.of() - ).getMetadata(); - assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); - } - -}