Conditionally enable logsdb by default (#121049)

Enable logsdb by default if logsdb.prior_logs_usage has not been set to true.

Meaning that if no data streams were created matching with the logs-- pattern in 8.x, then logsdb will be enabled by default for data streams matching with logs-*-* pattern.

Also removes LogsPatternUsageService as with version 9.0 and beyond, this component is no longer necessary.

Followup from #120708
Closes #106489
This commit is contained in:
Martijn van Groningen 2025-01-29 15:03:28 +01:00 committed by GitHub
parent 2b669167e1
commit 952bf229fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 178 additions and 554 deletions

View file

@ -291,6 +291,7 @@
"JVM option",
"Java API",
"Logging",
"Logs",
"Mapping",
"Packaging",
"Painless",

View file

@ -0,0 +1,19 @@
pr: 121049
summary: Conditionally enable logsdb by default for data streams matching with logs-*-*
pattern.
area: Logs
type: breaking
issues:
- 106489
breaking:
title: Conditionally enable logsdb by default
area: Logs
details: |-
Logsdb will be enabled by default for data streams matching with logs-*-* pattern.
If upgrading from 8.x to 9.x and data streams matching with log-*-* do exist,
then Logsdb will not be enabled by default.
impact: |-
Logsdb reduce storage footprint in Elasticsearch for logs, but there are side effects
to be taken into account that are described in the Logsdb docs:
https://www.elastic.co/guide/en/elasticsearch/reference/current/logs-data-stream.html#upgrade-to-logsdb-notes
notable: true

View file

@ -237,3 +237,9 @@ The `logsdb` index mode uses the following settings:
* **`index.mapping.ignore_above`**: `8191`
* **`index.mapping.total_fields.ignore_dynamic_beyond_limit`**: `true`
[discrete]
[[upgrade-to-logsdb-notes]]
=== Notes about upgrading to Logsdb
TODO: add notes.

View file

@ -177,7 +177,7 @@ Example response:
},
"logsdb": {
"available": true,
"enabled": false
"enabled": true
}
},
"tagline" : "You know, for X"

View file

@ -514,7 +514,7 @@ GET /_xpack/usage
},
"logsdb": {
"available": true,
"enabled": false,
"enabled": true,
"indices_count": 0,
"indices_with_synthetic_source": 0,
"num_docs": 0,

View file

@ -120,6 +120,10 @@ public class LogsIndexModeFullClusterRestartIT extends ParameterizedFullClusterR
}""";
public void testLogsIndexing() throws IOException {
assumeTrue(
"otherwise first backing index of logs-apache-production will be in logsdb mode",
getOldClusterTestVersion().before("9.0.0")
);
if (isRunningAgainstOldCluster()) {
assertOK(client().performRequest(putTemplate(client(), "logs-template", STANDARD_TEMPLATE)));
assertOK(client().performRequest(createDataStream("logs-apache-production")));

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.upgrades;
import com.carrotsearch.randomizedtesting.annotations.Name;
import org.elasticsearch.client.Request;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.upgrades.LogsdbIndexingRollingUpgradeIT.bulkIndex;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
public class LogsUsageRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
public LogsUsageRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}
public void testUsage() throws Exception {
assumeTrue("logsdb.prior_logs_usage only gets set in 8.x", getOldClusterTestVersion().before("9.0.0"));
String dataStreamName = "logs-mysql-error";
if (isOldCluster()) {
bulkIndex(dataStreamName, 4, 256, Instant.now());
ensureGreen(dataStreamName);
assertBusy(() -> {
var getClusterSettingsResponse = getClusterSettings();
Map<?, ?> persistentSettings = (Map<?, ?>) getClusterSettingsResponse.get("persistent");
assertThat(persistentSettings, hasEntry("logsdb.prior_logs_usage", "true"));
}, 2, TimeUnit.MINUTES);
} else {
String newIndex = rolloverDataStream(dataStreamName);
bulkIndex(dataStreamName, 4, 256, Instant.now());
Map<?, ?> indexResponse = (Map<?, ?>) getIndexSettings(newIndex, true).get(newIndex);
Map<?, ?> settings = (Map<?, ?>) indexResponse.get("settings");
Map<?, ?> defaults = (Map<?, ?>) indexResponse.get("defaults");
assertThat(settings, not(hasKey("index.mode")));
assertThat(defaults, hasEntry("index.mode", "standard"));
}
}
static Map<String, Object> getClusterSettings() throws IOException {
var request = new Request("GET", "/_cluster/settings");
request.addParameter("flat_settings", "true");
request.addParameter("include_defaults", "true");
var response = client().performRequest(request);
assertOK(response);
return entityAsMap(response);
}
static String rolloverDataStream(String dataStreamName) throws IOException {
var request = new Request("POST", "/" + dataStreamName + "/_rollover");
var response = client().performRequest(request);
assertOK(response);
var responseBody = entityAsMap(response);
return (String) responseBody.get("new_index");
}
}

View file

@ -22,13 +22,18 @@ import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.upgrades.LogsIndexModeRollingUpgradeIT.enableLogsdbByDefault;
import static org.elasticsearch.upgrades.LogsIndexModeRollingUpgradeIT.getWriteBackingIndex;
import static org.elasticsearch.upgrades.TsdbIT.formatInstant;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
@ -122,7 +127,8 @@ public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCa
assertOK(client().performRequest(putIndexTemplateRequest));
}
static void bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception {
static String bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception {
String firstIndex = null;
for (int i = 0; i < numRequest; i++) {
var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
StringBuilder requestBody = new StringBuilder();
@ -155,7 +161,11 @@ public class LogsdbIndexingRollingUpgradeIT extends AbstractRollingUpgradeTestCa
assertOK(response);
var responseBody = entityAsMap(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
if (firstIndex == null) {
firstIndex = (String) ((Map<?, ?>) ((Map<?, ?>) ((List<?>) responseBody.get("items")).get(0)).get("create")).get("_index");
}
}
return firstIndex;
}
void search(String dataStreamName) throws Exception {

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.upgrades;
import com.carrotsearch.randomizedtesting.annotations.Name;
import java.time.Instant;
import java.util.Map;
import static org.elasticsearch.upgrades.LogsUsageRollingUpgradeIT.getClusterSettings;
import static org.elasticsearch.upgrades.LogsdbIndexingRollingUpgradeIT.bulkIndex;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
public class NoLogsUsageRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
public NoLogsUsageRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}
public void testUsage() throws Exception {
String dataStreamName = "logs-mysql-error";
if (isOldCluster()) {
dataStreamName = dataStreamName.replace("logs-", "log-");
bulkIndex(dataStreamName, 4, 256, Instant.now());
ensureGreen(dataStreamName);
} else if (isUpgradedCluster()) {
String newIndex = bulkIndex(dataStreamName, 4, 256, Instant.now());
ensureGreen(dataStreamName);
Map<?, ?> indexResponse = (Map<?, ?>) getIndexSettings(newIndex, true).get(newIndex);
Map<?, ?> settings = (Map<?, ?>) indexResponse.get("settings");
assertThat(settings, hasEntry("index.mode", "logsdb"));
var getClusterSettingsResponse = getClusterSettings();
Map<?, ?> defaults = (Map<?, ?>) getClusterSettingsResponse.get("defaults");
Map<?, ?> persistentSettings = (Map<?, ?>) getClusterSettingsResponse.get("persistent");
assertThat(persistentSettings, not(hasKey("logsdb.prior_logs_usage")));
assertThat(defaults, hasEntry("cluster.logsdb.enabled", "true"));
}
}
}

View file

@ -63,14 +63,14 @@ public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT {
private RestClient client;
public void testLogsSettingsIndexModeDisabled() throws IOException {
public void testLogsSettingsIndexModeEnabledByDefault() throws IOException {
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertThat(indexMode, equalTo(IndexMode.LOGSDB.getName()));
}
public void testTogglingLogsdb() throws IOException {
@ -81,29 +81,21 @@ public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT {
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexModeBefore, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "true"));
assertThat(indexModeBefore, equalTo(IndexMode.LOGSDB.getName()));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "false"));
final String indexModeAfter = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexModeAfter, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertThat(indexModeAfter, equalTo(IndexMode.LOGSDB.getName()));
assertOK(rolloverDataStream(client, "logs-custom-dev"));
final String indexModeLater = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 1),
IndexSettings.MODE.getKey()
);
assertThat(indexModeLater, equalTo(IndexMode.LOGSDB.getName()));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "false"));
assertOK(rolloverDataStream(client, "logs-custom-dev"));
final String indexModeFinal = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 2),
IndexSettings.MODE.getKey()
);
assertThat(indexModeFinal, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertThat(indexModeLater, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
}

View file

@ -11,10 +11,8 @@ import org.elasticsearch.Build;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -42,17 +40,21 @@ import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE;
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD;
import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;
public class LogsDBPlugin extends Plugin implements ActionPlugin {
private final Settings settings;
private final SyntheticSourceLicenseService licenseService;
private static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
"logsdb.prior_logs_usage",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Boolean> CLUSTER_LOGSDB_ENABLED = Setting.boolSetting(
"cluster.logsdb.enabled",
false,
settings -> Boolean.toString(LOGSDB_PRIOR_LOGS_USAGE.get(settings) == false),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
@ -81,18 +83,6 @@ public class LogsDBPlugin extends Plugin implements ActionPlugin {
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
);
var clusterService = services.clusterService();
Supplier<Metadata> metadataSupplier = () -> clusterService.state().metadata();
var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier);
clusterService.addLocalNodeMasterListener(historicLogsUsageService);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
historicLogsUsageService.offMaster();
}
});
// Nothing to share here:
return super.createComponents(services);
}
@ -112,7 +102,7 @@ public class LogsDBPlugin extends Plugin implements ActionPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE);
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, LOGSDB_PRIOR_LOGS_USAGE);
}
@Override

View file

@ -1,166 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.logsdb;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN;
/**
* A component that checks in the background whether there are data streams that match <code>log-*-*</code> pattern and if so records this
* as persistent setting in cluster state. If <code>logs-*-*</code> data stream usage has been found then this component will no longer
* run in the background.
* <p>
* After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no <code>logs-*-*</code> data streams are
* found, then the next check runs after 2 minutes. The schedule time will double if no data streams with <code>logs-*-*</code> pattern
* are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours).
* <p>
* If during a check one or more <code>logs-*-*</code> data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set
* as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings
* of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the
* <code>logs-*-*</code> pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting.
*/
final class LogsPatternUsageService implements LocalNodeMasterListener {
private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class);
private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30);
static final Setting<TimeValue> USAGE_CHECK_MAX_PERIOD = Setting.timeSetting(
"logsdb.usage_check.max_period",
new TimeValue(24, TimeUnit.HOURS),
Setting.Property.NodeScope
);
static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
"logsdb.prior_logs_usage",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private final Client client;
private final Settings nodeSettings;
private final ThreadPool threadPool;
private final Supplier<Metadata> metadataSupplier;
// Initializing to 30s, so first time will run with a delay of 60s:
volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM;
volatile boolean isMaster;
volatile boolean hasPriorLogsUsage;
volatile Scheduler.Cancellable cancellable;
LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier<Metadata> metadataSupplier) {
this.client = client;
this.nodeSettings = nodeSettings;
this.threadPool = threadPool;
this.metadataSupplier = metadataSupplier;
}
@Override
public void onMaster() {
if (cancellable == null || cancellable.isCancelled()) {
isMaster = true;
nextWaitTime = USAGE_CHECK_MINIMUM;
scheduleNext();
}
}
@Override
public void offMaster() {
isMaster = false;
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
cancellable = null;
}
}
void scheduleNext() {
TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings);
nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis()));
scheduleNext(nextWaitTime);
}
void scheduleNext(TimeValue waitTime) {
if (isMaster && hasPriorLogsUsage == false) {
try {
cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("Failed to check; Shutting down", e);
} else {
throw e;
}
}
} else {
LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage);
}
}
void check() {
LOGGER.debug("Starting logs-*-* usage check");
if (isMaster) {
var metadata = metadataSupplier.get();
if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) {
LOGGER.debug("Using persistent logs-*-* usage check");
hasPriorLogsUsage = true;
return;
}
if (hasLogsUsage(metadata)) {
updateSetting();
} else {
LOGGER.debug("No usage found; Skipping check");
scheduleNext();
}
} else {
LOGGER.debug("No longer master; Skipping check");
}
}
static boolean hasLogsUsage(Metadata metadata) {
for (var dataStream : metadata.dataStreams().values()) {
if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) {
return true;
}
}
return false;
}
void updateSetting() {
var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build();
var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE);
request.persistentSettings(settingsToUpdate);
client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> {
if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) {
hasPriorLogsUsage = true;
cancellable = null;
} else {
LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]");
scheduleNext(TimeValue.ONE_MINUTE);
}
}, e -> {
LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e);
scheduleNext(TimeValue.ONE_MINUTE);
}));
}
}

View file

@ -1,139 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.logsdb;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LogsDBPlugin.class, DataStreamsPlugin.class);
}
@Override
protected Settings nodeSettings() {
return Settings.builder().put("logsdb.usage_check.max_period", "1s").build();
}
@Override
protected boolean resetNodeAfterTest() {
return true;
}
public void testLogsPatternUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("logs-*-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);
IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
{
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}
indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertBusy(() -> {
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true"));
});
}
public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("log-*-*"))
.template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);
var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
ensureGreen("log-myapp-prod");
// Check that LogsPatternUsageService checked three times by checking generic threadpool stats.
// (the LogsPatternUsageService's check is scheduled via the generic threadpool)
var threadPool = getInstanceFromNode(ThreadPool.class);
var beforeStat = getGenericThreadpoolStat(threadPool);
assertBusy(() -> {
var stat = getGenericThreadpoolStat(threadPool);
assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3));
});
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}
private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) {
var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList();
assertThat(result.size(), equalTo(1));
return result.get(0);
}
@Override
public void tearDown() throws Exception {
// Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave
// persistent cluster settings around.
var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*");
deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest));
var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build();
client().admin()
.cluster()
.updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings))
.actionGet();
super.tearDown();
}
}

View file

@ -1,213 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.logsdb;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.function.Supplier;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
public class LogsPatternUsageServiceTests extends ESTestCase {
public void testOnMaster() throws Exception {
var nodeSettings = Settings.builder().put("logsdb.usage_check.max_period", "1s").build();
var client = mock(Client.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<ClusterUpdateSettingsResponse> listener = (ActionListener<ClusterUpdateSettingsResponse>) invocationOnMock
.getArguments()[2];
var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build();
listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings));
return null;
}).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any());
try (var threadPool = new TestThreadPool(getTestName())) {
var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of());
Supplier<Metadata> metadataSupplier = clusterState::metadata;
var service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier);
// pre-check:
assertFalse(service.isMaster);
assertFalse(service.hasPriorLogsUsage);
assertNull(service.cancellable);
// Trigger service:
service.onMaster();
assertBusy(() -> {
assertTrue(service.isMaster);
assertTrue(service.hasPriorLogsUsage);
assertNull(service.cancellable);
});
}
}
public void testCheckHasUsage() {
var nodeSettings = Settings.EMPTY;
var client = mock(Client.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<ClusterUpdateSettingsResponse> listener = (ActionListener<ClusterUpdateSettingsResponse>) invocationOnMock
.getArguments()[2];
var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build();
listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings));
return null;
}).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any());
var threadPool = mock(ThreadPool.class);
var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable);
var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of());
Supplier<Metadata> metadataSupplier = clusterState::metadata;
LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier);
service.onMaster();
assertFalse(service.hasPriorLogsUsage);
assertNotNull(service.cancellable);
assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1));
service.check();
assertTrue(service.hasPriorLogsUsage);
assertNull(service.cancellable);
assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1));
verify(threadPool, times(1)).schedule(any(), any(), any());
verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any());
}
public void testCheckHasUsageNoMatch() {
var nodeSettings = Settings.EMPTY;
var client = mock(Client.class);
var threadPool = mock(ThreadPool.class);
var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable);
var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of());
Supplier<Metadata> metadataSupplier = clusterState::metadata;
LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier);
service.onMaster();
assertFalse(service.hasPriorLogsUsage);
assertNotNull(service.cancellable);
assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1));
service.check();
assertFalse(service.hasPriorLogsUsage);
assertNotNull(service.cancellable);
assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(2));
verify(threadPool, times(2)).schedule(any(), any(), any());
verifyNoInteractions(client);
}
public void testCheckPriorLogsUsageAlreadySet() {
var nodeSettings = Settings.EMPTY;
var client = mock(Client.class);
var threadPool = mock(ThreadPool.class);
var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable);
var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of());
clusterState = ClusterState.builder(clusterState)
.metadata(
Metadata.builder(clusterState.getMetadata())
.persistentSettings(Settings.builder().put("logsdb.prior_logs_usage", true).build())
.build()
)
.build();
Supplier<Metadata> metadataSupplier = clusterState::metadata;
LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier);
service.isMaster = true;
assertFalse(service.hasPriorLogsUsage);
assertNull(service.cancellable);
service.check();
assertTrue(service.hasPriorLogsUsage);
assertNull(service.cancellable);
verifyNoInteractions(client, threadPool);
}
public void testCheckHasUsageUnexpectedResponse() {
var nodeSettings = Settings.EMPTY;
var client = mock(Client.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<ClusterUpdateSettingsResponse> listener = (ActionListener<ClusterUpdateSettingsResponse>) invocationOnMock
.getArguments()[2];
ClusterUpdateSettingsResponse response;
if (randomBoolean()) {
var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build();
response = new ClusterUpdateSettingsResponse(false, Settings.EMPTY, persistentSettings);
} else {
response = new ClusterUpdateSettingsResponse(true, Settings.EMPTY, Settings.EMPTY);
}
listener.onResponse(response);
return null;
}).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any());
var threadPool = mock(ThreadPool.class);
var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable);
var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of());
Supplier<Metadata> metadataSupplier = clusterState::metadata;
LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier);
service.isMaster = true;
assertFalse(service.hasPriorLogsUsage);
assertNull(service.cancellable);
service.check();
assertFalse(service.hasPriorLogsUsage);
assertNotNull(service.cancellable);
verify(threadPool, times(1)).schedule(any(), any(), any());
verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any());
}
public void testHasLogsUsage() {
var metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of()).getMetadata();
assertFalse(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1", 1)), List.of()).getMetadata();
assertFalse(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1", 1)), List.of()).getMetadata();
assertFalse(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()).getMetadata();
assertFalse(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()).getMetadata();
assertTrue(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(new Tuple<>("log-app1-prod", 1), new Tuple<>("logs-app2-prod", 1)),
List.of()
).getMetadata();
assertTrue(LogsPatternUsageService.hasLogsUsage(metadata));
metadata = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(new Tuple<>("log-app1", 1), new Tuple<>("logs-app2-prod", 1)),
List.of()
).getMetadata();
assertTrue(LogsPatternUsageService.hasLogsUsage(metadata));
}
}