Fix data stream retrieval in DataStreamLifecycleServiceIT (#125195)

These tests had the potential to fail when two consecutive GET data
streams requests would hit two different nodes, where one node already
had the cluster state that contained the new backing index and the other
node didn't yet.

Caused by #122852

Fixes #124846
Fixes #124950
Fixes #124999
This commit is contained in:
Niels Bauman 2025-03-24 16:43:09 +01:00 committed by GitHub
parent 669d400e1a
commit 542a3b65a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 118 additions and 264 deletions

View file

@ -89,7 +89,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
@ -160,22 +159,12 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0);
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
}
public void testRolloverAndRetention() throws Exception {
@ -194,19 +183,11 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the configuration
String writeIndex = backingIndices.get(0).getName();
String writeIndex = backingIndices.get(0);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
}
@ -232,23 +213,10 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet();
indexDocs(SYSTEM_DATA_STREAM_NAME, 1);
now.addAndGet(TimeValue.timeValueSeconds(30).millis());
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { SYSTEM_DATA_STREAM_NAME }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(
GetDataStreamAction.INSTANCE,
getDataStreamRequest
).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2)); // global retention is ignored
// we expect the data stream to have two backing indices since the effective retention is 100 days
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
});
List<String> backingIndices = waitForDataStreamBackingIndices(SYSTEM_DATA_STREAM_NAME, 2);
// we expect the data stream to have two backing indices since the effective retention is 100 days
String writeIndex = backingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
// Now we advance the time to well beyond the configured retention. We expect that the older index will have been deleted.
now.addAndGet(TimeValue.timeValueDays(3 * TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS).millis());
@ -263,12 +231,12 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1)); // global retention is ignored
List<Index> currentBackingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(currentBackingIndices.size(), equalTo(1)); // global retention is ignored
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the configuration
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
String currentWriteIndex = currentBackingIndices.get(0).getName();
assertThat(currentWriteIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
builder.humanReadable(true);
ToXContent.Params withEffectiveRetention = new ToXContent.MapParams(
@ -378,18 +346,9 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
).get();
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
Set<String> indexNames = backingIndices.stream().map(Index::getName).collect(Collectors.toSet());
assertTrue(indexNames.contains("index_new"));
assertFalse(indexNames.contains("index_old"));
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertTrue(backingIndices.contains("index_new"));
assertFalse(backingIndices.contains("index_old"));
});
}
@ -412,21 +371,8 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
// Update the lifecycle of the data stream
updateLifecycle(dataStreamName, TimeValue.timeValueMillis(1));
// Verify that the retention has changed for all backing indices
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
assertThat(dataStream.getName(), equalTo(dataStreamName));
List<Index> backingIndices = dataStream.getIndices();
assertThat(backingIndices.size(), equalTo(1));
String writeIndex = dataStream.getWriteIndex().getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, finalGeneration));
});
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 1);
assertThat(backingIndices.getFirst(), backingIndexEqualTo(dataStreamName, finalGeneration));
}
public void testAutomaticForceMerge() throws Exception {
@ -476,7 +422,8 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
int finalGeneration = randomIntBetween(3, 4);
for (int currentGeneration = 1; currentGeneration < finalGeneration; currentGeneration++) {
// This is currently the write index, but it will be rolled over as soon as data stream lifecycle runs:
final String toBeRolledOverIndex = getBackingIndices(dataStreamName).get(currentGeneration - 1);
final var backingIndexNames = waitForDataStreamBackingIndices(dataStreamName, currentGeneration);
final String toBeRolledOverIndex = backingIndexNames.get(currentGeneration - 1);
for (int i = 0; i < randomIntBetween(10, 50); i++) {
indexDocs(dataStreamName, randomIntBetween(1, 300));
// Make sure the segments get written:
@ -488,7 +435,7 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
if (currentGeneration == 1) {
toBeForceMergedIndex = null; // Not going to be used
} else {
toBeForceMergedIndex = getBackingIndices(dataStreamName).get(currentGeneration - 2);
toBeForceMergedIndex = backingIndexNames.get(currentGeneration - 2);
}
int currentBackingIndexCount = currentGeneration;
DataStreamLifecycleService dataStreamLifecycleService = internalCluster().getInstance(
@ -499,19 +446,9 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
// run data stream lifecycle once
dataStreamLifecycleService.run(clusterService.state());
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
assertThat(dataStream.getName(), equalTo(dataStreamName));
List<Index> backingIndices = dataStream.getIndices();
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(backingIndices.size(), equalTo(currentBackingIndexCount + 1));
String writeIndex = dataStream.getWriteIndex().getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1));
assertThat(backingIndices.getLast(), backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1));
/*
* We only expect forcemerge to happen on the 2nd data stream lifecycle run and later, since on the first there's only the
* single write index to be rolled over.
@ -567,40 +504,18 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
// let's allow one rollover to go through
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
var backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
// prevent new indices from being created (ie. future rollovers)
updateClusterSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1));
indexDocs(dataStreamName, 1);
String writeIndexName = getBackingIndices(dataStreamName).get(1);
String writeIndexName = backingIndices.get(1);
assertBusy(() -> {
ErrorEntry writeIndexRolloverError = null;
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName);
if (writeIndexRolloverError != null) {
break;
}
}
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class);
ErrorEntry writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName);
assertThat(writeIndexRolloverError, is(notNullValue()));
assertThat(writeIndexRolloverError.error(), containsString("maximum normal shards open"));
@ -620,6 +535,9 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
assertTrue(found);
}, 30, TimeUnit.SECONDS);
// Ensure data stream did not roll over yet.
assertEquals(2, getDataStreamBackingIndexNames(dataStreamName).size());
// DSL should signal to the health node that there's an error in the store that's been retried at least 3 times
assertBusy(() -> {
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
@ -660,20 +578,17 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
updateClusterSettings(Settings.builder().putNull("*"));
assertBusy(() -> {
List<String> backingIndices = getBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(3));
String writeIndex = backingIndices.get(2);
List<String> currentBackingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(currentBackingIndices.size(), equalTo(3));
String writeIndex = currentBackingIndices.get(2);
// rollover was successful and we got to generation 3
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 3));
// we recorded the error against the previous write index (generation 2)
// let's check there's no error recorded against it anymore
String previousWriteInddex = backingIndices.get(1);
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue());
}
String previousWriteInddex = currentBackingIndices.get(1);
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class);
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue());
});
// the error has been fixed so the health information shouldn't be reported anymore
@ -725,24 +640,7 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
// let's allow one rollover to go through
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
List<String> dsBackingIndices = getBackingIndices(dataStreamName);
List<String> dsBackingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String firstGenerationIndex = dsBackingIndices.get(0);
String secondGenerationIndex = dsBackingIndices.get(1);
@ -752,31 +650,18 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
updateLifecycle(dataStreamName, TimeValue.timeValueSeconds(1));
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(backingIndices.size(), equalTo(2));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
assertThat(backingIndices.getLast(), backingIndexEqualTo(dataStreamName, 2));
ErrorEntry recordedRetentionExecutionError = null;
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
recordedRetentionExecutionError = lifecycleService.getErrorStore()
.getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex);
if (recordedRetentionExecutionError != null && recordedRetentionExecutionError.retryCount() > 3) {
break;
}
}
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(
DataStreamLifecycleService.class
);
ErrorEntry recordedRetentionExecutionError = lifecycleService.getErrorStore()
.getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex);
assertThat(recordedRetentionExecutionError, is(notNullValue()));
assertThat(recordedRetentionExecutionError.retryCount(), greaterThanOrEqualTo(3));
assertThat(recordedRetentionExecutionError.error(), containsString("blocked by: [FORBIDDEN/5/index read-only (api)"));
});
@ -819,23 +704,15 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex);
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
// data stream only has one index now
assertThat(backingIndices.size(), equalTo(1));
// error stores don't contain anything for the first generation index anymore
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue());
}
// error store doesn't contain anything for the first generation index anymore
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(
DataStreamLifecycleService.class
);
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue());
});
// health info for DSL should be EMPTY as everything's healthy
@ -895,24 +772,12 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
// let's allow one rollover to go through
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String firstGenerationIndex = backingIndices.get(0);
assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
String firstGenerationIndex = getBackingIndices(dataStreamName).get(0);
ClusterGetSettingsAction.Response response = client().execute(
ClusterGetSettingsAction.INSTANCE,
new ClusterGetSettingsAction.Request(TEST_REQUEST_TIMEOUT)
@ -950,12 +815,9 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexDocs(dataStreamName, 1);
// let's allow one rollover to go through
assertBusy(() -> {
List<String> backingIndices = getBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(3));
});
backingIndices = waitForDataStreamBackingIndices(dataStreamName, 3);
String secondGenerationIndex = backingIndices.get(1);
String secondGenerationIndex = getBackingIndices(dataStreamName).get(1);
// check the 2nd generation index picked up the new setting values
assertBusy(() -> {
GetSettingsRequest getSettingsRequest = new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(secondGenerationIndex)
@ -986,7 +848,7 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
indexDocs(dataStreamName, 10);
List<String> backingIndices = getBackingIndices(dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
{
// backing index should not be managed
String writeIndex = backingIndices.get(0);
@ -1021,14 +883,11 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
)
);
assertBusy(() -> {
List<String> currentBackingIndices = getBackingIndices(dataStreamName);
assertThat(currentBackingIndices.size(), equalTo(2));
String backingIndex = currentBackingIndices.get(0);
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = currentBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
List<String> currentBackingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String backingIndex = currentBackingIndices.get(0);
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = currentBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
}
public void testLifecycleAppliedToFailureStore() throws Exception {
@ -1069,24 +928,9 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
indexInvalidFlagDocs(dataStreamName, 1);
// Let's verify the rollover
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1));
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(failureIndices.size(), equalTo(2));
});
List<String> indices = getFailureIndices(dataStreamName);
String firstGenerationIndex = indices.get(0);
String secondGenerationIndex = indices.get(1);
List<String> failureIndices = waitForDataStreamIndices(dataStreamName, 2, true);
String firstGenerationIndex = failureIndices.get(0);
String secondGenerationIndex = failureIndices.get(1);
// Let's verify the merge settings
ClusterGetSettingsAction.Response response = client().execute(
@ -1126,36 +970,12 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1));
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(failureIndices.size(), equalTo(1));
assertThat(failureIndices.get(0).getName(), equalTo(secondGenerationIndex));
List<Index> retrievedFailureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
assertThat(retrievedFailureIndices.size(), equalTo(1));
assertThat(retrievedFailureIndices.get(0).getName(), equalTo(secondGenerationIndex));
});
}
private static List<String> getBackingIndices(String dataStreamName) {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
}
private static List<String> getFailureIndices(String dataStreamName) {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().stream().map(Index::getName).toList();
}
static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {

View file

@ -330,9 +330,6 @@ tests:
- class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests
method: testUpdateIndexTemplateToDataStreamLifecyclePreference
issue: https://github.com/elastic/elasticsearch/issues/124837
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testAutomaticForceMerge
issue: https://github.com/elastic/elasticsearch/issues/124846
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Test knn search}
issue: https://github.com/elastic/elasticsearch/issues/124848
@ -354,9 +351,6 @@ tests:
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
method: test20RunWithBootstrapChecks
issue: https://github.com/elastic/elasticsearch/issues/124940
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testErrorRecordingOnRetention
issue: https://github.com/elastic/elasticsearch/issues/124950
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
method: testStopQueryLocalNoRemotes
issue: https://github.com/elastic/elasticsearch/issues/124959
@ -378,9 +372,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test011SecurityEnabledStatus
issue: https://github.com/elastic/elasticsearch/issues/124990
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testLifecycleAppliedToFailureStore
issue: https://github.com/elastic/elasticsearch/issues/124999
- class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests
method: testGetDataStreamResponse
issue: https://github.com/elastic/elasticsearch/issues/125083

View file

@ -854,10 +854,53 @@ public abstract class ESIntegTestCase extends ESTestCase {
return builder;
}
/**
* Waits for the specified data stream to have the expected number of backing indices.
*/
public static List<String> waitForDataStreamBackingIndices(String dataStreamName, int expectedSize) {
return waitForDataStreamIndices(dataStreamName, expectedSize, false);
}
/**
* Waits for the specified data stream to have the expected number of backing or failure indices.
*/
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
// We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state.
// This avoids inconsistencies in subsequent API calls which might hit a non-master node.
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
if (dataStream == null) {
return false;
}
return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize;
});
safeAwait(listener);
final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore);
assertEquals(
Strings.format(
"Retrieved number of data stream indices doesn't match expectation for data stream [%s]. Expected %d but got %s",
dataStreamName,
expectedSize,
backingIndexNames
),
expectedSize,
backingIndexNames.size()
);
return backingIndexNames;
}
/**
* Returns a list of the data stream's backing index names.
*/
public List<String> getDataStreamBackingIndexNames(String dataStreamName) {
public static List<String> getDataStreamBackingIndexNames(String dataStreamName) {
return getDataStreamBackingIndexNames(dataStreamName, false);
}
/**
* Returns a list of the data stream's backing or failure index names.
*/
public static List<String> getDataStreamBackingIndexNames(String dataStreamName, boolean failureStore) {
GetDataStreamAction.Response response = safeGet(
client().execute(
GetDataStreamAction.INSTANCE,
@ -867,7 +910,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
assertThat(response.getDataStreams().size(), equalTo(1));
DataStream dataStream = response.getDataStreams().getFirst().getDataStream();
assertThat(dataStream.getName(), equalTo(dataStreamName));
return dataStream.getIndices().stream().map(Index::getName).toList();
return dataStream.getDataStreamIndices(failureStore).getIndices().stream().map(Index::getName).toList();
}
/**