Add data stream support to CCR (#61993)

This commit adds support data stream support to CCR's auto following by making the following changes:
* When the auto follow coordinator iterates over the candidate indices to follow,
  the auto follow coordinator also checks whether the index is part of a data stream and
  if the name of data stream also matches with the auto follow pattern then the index
  will be auto followed.
* When following an index, the put follow api also checks whether that index is part
  of a data stream and if so then also replicates the data stream definition to the
  local cluster.
* In order for the follow index api to determine whether an index is part of a data
  stream, the cluster state api was modified to also fetch the data stream definition
  of the cluster state if only the state is queried for specific indices.

When a data stream is auto followed, only new backing indices are auto followed.
This is in line with how time based indices patterns are replicated today. This
means that the data stream isn't copied 1 to 1 into the local cluster. The local
cluster's data stream definition contains the same name, timestamp field and
generation, but the list of backing indices may be different (depending on when
a data stream was auto followed).

Closes #56259
This commit is contained in:
Martijn van Groningen 2020-11-03 14:01:14 +01:00 committed by GitHub
parent 08c0a8703f
commit c4c3c8b422
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 621 additions and 26 deletions

View file

@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of
a new index on the remote cluster matches the auto-follow pattern, a a new index on the remote cluster matches the auto-follow pattern, a
corresponding follower index is added to the local cluster. corresponding follower index is added to the local cluster.
You can also create auto-follow patterns for data streams. When a new backing
index is generated on a remote cluster, that index and its data stream are
automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.
Auto-follow patterns are especially useful with Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create <<index-lifecycle-management,{ilm-cap}>>, which might continually create
new indices on the cluster containing the leader index. new indices on the cluster containing the leader index.

View file

@ -29,6 +29,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
@ -37,6 +39,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -149,9 +152,21 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
mdBuilder.version(currentState.metadata().version()); mdBuilder.version(currentState.metadata().version());
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request); String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
for (String filteredIndex : indices) { for (String filteredIndex : indices) {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex); // If the requested index is part of a data stream then that data stream should also be included:
if (indexMetadata != null) { IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex);
mdBuilder.put(indexMetadata, false); if (indexAbstraction.getParentDataStream() != null) {
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
mdBuilder.put(dataStream);
// Also the IMD of other backing indices need to be included, otherwise the cluster state api
// can't create a valid cluster state instance:
for (Index backingIndex : dataStream.getIndices()) {
mdBuilder.put(currentState.metadata().index(backingIndex), false);
}
} else {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
if (indexMetadata != null) {
mdBuilder.put(indexMetadata, false);
}
} }
} }
} else { } else {

View file

@ -86,6 +86,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -184,6 +185,20 @@ public class RestoreService implements ClusterStateApplier {
* @param listener restore listener * @param listener restore listener
*/ */
public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener<RestoreCompletionResponse> listener) { public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener<RestoreCompletionResponse> listener) {
restoreSnapshot(request, listener, (clusterState, builder) -> {});
}
/**
* Restores snapshot specified in the restore request.
*
* @param request restore request
* @param listener restore listener
* @param updater handler that allows callers to make modifications to {@link Metadata}
* in the same cluster state update as the restore operation
*/
public void restoreSnapshot(final RestoreSnapshotRequest request,
final ActionListener<RestoreCompletionResponse> listener,
final BiConsumer<ClusterState, Metadata.Builder> updater) {
try { try {
// Read snapshot info and metadata from the repository // Read snapshot info and metadata from the repository
final String repositoryName = request.repository(); final String repositoryName = request.repository();
@ -455,6 +470,7 @@ public class RestoreService implements ClusterStateApplier {
} }
RoutingTable rt = rtBuilder.build(); RoutingTable rt = rtBuilder.build();
updater.accept(currentState, mdBuilder);
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]"); return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
} }

View file

@ -13,18 +13,27 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
public class AutoFollowIT extends ESCCRRestTestCase { public class AutoFollowIT extends ESCCRRestTestCase {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);
public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception { public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
if ("follow".equals(targetCluster) == false) { if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" ); logger.info("skipping test, waiting for target cluster [follow]" );
@ -64,6 +73,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
verifyDocuments("logs-20190101", 5, "filtered_field:true"); verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true"); verifyDocuments("logs-20200101", 5, "filtered_field:true");
}); });
deleteAutoFollowPattern("leader_cluster_pattern");
} }
public void testAutoFollowPatterns() throws Exception { public void testAutoFollowPatterns() throws Exception {
@ -122,6 +132,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
verifyCcrMonitoring("metrics-20210101", "metrics-20210101"); verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
verifyAutoFollowMonitoring(); verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
deleteAutoFollowPattern("test_pattern");
} }
public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException { public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
@ -163,6 +174,179 @@ public class AutoFollowIT extends ESCCRRestTestCase {
); );
} }
public void testDataStreams() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final int numDocs = 64;
final String dataStreamName = "logs-mysql-error";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Create auto follow pattern
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
// Create data stream and ensure that is is auto followed
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001");
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs);
});
}
// First rollover and ensure second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 1);
});
}
// Second rollover and ensure third backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" +
".ds-logs-mysql-error-000003");
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002",
".ds-logs-mysql-error-000003");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 2);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}
public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final int initialNumDocs = 16;
final String dataStreamName = "logs-syslog-prod";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Initialize data stream prior to auto following
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < initialNumDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
}
}
// Create auto follow pattern
{
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
}
// Rollover and ensure only second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, 1);
});
}
// Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
{
followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001");
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}
private int getNumberOfSuccessfulFollowedIndices() throws IOException { private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats"); Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest)); Map<?, ?> response = toMap(client().performRequest(statsRequest));
@ -170,5 +354,38 @@ public class AutoFollowIT extends ESCCRRestTestCase {
return (Integer) response.get("number_of_successful_follow_indices"); return (Integer) response.get("number_of_successful_follow_indices");
} }
private static void verifyDocuments(final RestClient client,
final String index,
final int expectedNumDocs) throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<String, ?> response = toMap(client.performRequest(request));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(index, numDocs, equalTo(expectedNumDocs));
}
static void verifyDataStream(final RestClient client,
final String name,
final String... expectedBackingIndices) throws IOException {
Request request = new Request("GET", "/_data_stream/" + name);
Map<String, ?> response = toMap(client.performRequest(request));
List<?> retrievedDataStreams = (List<?>) response.get("data_streams");
assertThat(retrievedDataStreams, hasSize(1));
List<?> actualBackingIndices = (List<?>) ((Map<?, ?>) retrievedDataStreams.get(0)).get("indices");
assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length));
for (int i = 0; i < expectedBackingIndices.length; i++) {
Map<?, ?> actualBackingIndex = (Map<?, ?>) actualBackingIndices.get(i);
String expectedBackingIndex = expectedBackingIndices[i];
assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex));
}
}
private void deleteDataStream(String name) throws IOException {
try (RestClient leaderClient = buildLeaderClient()) {
Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
assertOK(leaderClient.performRequest(deleteTemplateRequest));
}
}
} }

View file

@ -15,6 +15,7 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ccr.AutoFollowIT.verifyDataStream;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
@ -123,4 +124,39 @@ public class FollowIndexIT extends ESCCRRestTestCase {
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
} }
public void testFollowDataStreamFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final String dataStreamName = "logs-syslog-prod";
try (RestClient leaderClient = buildLeaderClient()) {
Request request = new Request("PUT", "/_data_stream/" + dataStreamName);
assertOK(leaderClient.performRequest(request));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
}
ResponseException failure = expectThrows(ResponseException.class, () -> followIndex(dataStreamName, dataStreamName));
assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM"));
}
public void testChangeBackingIndexNameFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final String dataStreamName = "logs-foobar-prod";
try (RestClient leaderClient = buildLeaderClient()) {
Request request = new Request("PUT", "/_data_stream/" + dataStreamName);
assertOK(leaderClient.performRequest(request));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-foobar-prod-000001");
}
ResponseException failure = expectThrows(ResponseException.class,
() -> followIndex(".ds-logs-foobar-prod-000001", ".ds-logs-barbaz-prod-000001"));
assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same"));
}
} }

View file

@ -23,9 +23,12 @@ import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient; import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
@ -109,11 +112,11 @@ public class CcrLicenseChecker {
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
*/ */
public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final Client client, final Client client,
final String clusterAlias, final String clusterAlias,
final String leaderIndex, final String leaderIndex,
final Consumer<Exception> onFailure, final Consumer<Exception> onFailure,
final BiConsumer<String[], IndexMetadata> consumer) { final BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer) {
final ClusterStateRequest request = new ClusterStateRequest(); final ClusterStateRequest request = new ClusterStateRequest();
request.clear(); request.clear();
@ -127,20 +130,35 @@ public class CcrLicenseChecker {
onFailure, onFailure,
remoteClusterStateResponse -> { remoteClusterStateResponse -> {
ClusterState remoteClusterState = remoteClusterStateResponse.getState(); ClusterState remoteClusterState = remoteClusterStateResponse.getState();
IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); final IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex);
if (leaderIndexMetadata == null) { if (leaderIndexMetadata == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex)); final IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex);
final Exception failure;
if (indexAbstraction == null) {
failure = new IndexNotFoundException(leaderIndex);
} else {
// provided name may be an alias or data stream and in that case throw a specific error:
String message = String.format(Locale.ROOT,
"cannot follow [%s], because it is a %s",
leaderIndex, indexAbstraction.getType()
);
failure = new IllegalArgumentException(message);
}
onFailure.accept(failure);
return; return;
} }
if (leaderIndexMetadata.getState() == IndexMetadata.State.CLOSE) { if (leaderIndexMetadata.getState() == IndexMetadata.State.CLOSE) {
onFailure.accept(new IndexClosedException(leaderIndexMetadata.getIndex())); onFailure.accept(new IndexClosedException(leaderIndexMetadata.getIndex()));
return; return;
} }
IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex);
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null ?
indexAbstraction.getParentDataStream().getDataStream() : null;
final Client remoteClient = client.getRemoteClusterClient(clusterAlias); final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> {
if (e == null) { if (e == null) {
fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetadata, onFailure, historyUUIDs -> fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetadata, onFailure, historyUUIDs ->
consumer.accept(historyUUIDs, leaderIndexMetadata)); consumer.accept(historyUUIDs, Tuple.tuple(leaderIndexMetadata, remoteDataStream)));
} else { } else {
onFailure.accept(e); onFailure.accept(e);
} }

View file

@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -105,7 +106,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
this.relativeMillisTimeProvider = relativeMillisTimeProvider; this.relativeMillisTimeProvider = relativeMillisTimeProvider;
this.absoluteMillisTimeProvider = absoluteMillisTimeProvider; this.absoluteMillisTimeProvider = absoluteMillisTimeProvider;
this.executor = Objects.requireNonNull(executor); this.executor = Objects.requireNonNull(executor);
this.recentAutoFollowErrors = new LinkedHashMap<String, Tuple<Long, ElasticsearchException>>() { this.recentAutoFollowErrors = new LinkedHashMap<>() {
@Override @Override
protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, ElasticsearchException>> eldest) { protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, ElasticsearchException>> eldest) {
return size() > MAX_AUTO_FOLLOW_ERRORS; return size() > MAX_AUTO_FOLLOW_ERRORS;
@ -496,8 +497,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
leaderIndicesToFollow.size()); leaderIndicesToFollow.size());
for (final Index indexToFollow : leaderIndicesToFollow) { for (final Index indexToFollow : leaderIndicesToFollow) {
IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName());
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) .filter(otherPattern -> otherPattern.v2().match(indexAbstraction))
.map(Tuple::v1) .map(Tuple::v1)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (otherMatchingPatterns.size() != 0) { if (otherMatchingPatterns.size() != 0) {
@ -615,7 +617,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
if (leaderIndexMetadata.getState() != IndexMetadata.State.OPEN) { if (leaderIndexMetadata.getState() != IndexMetadata.State.OPEN) {
continue; continue;
} }
if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetadata.getIndex().getName())) { IndexAbstraction indexAbstraction =
remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndexMetadata.getIndex().getName());
if (autoFollowPattern.isActive() && autoFollowPattern.match(indexAbstraction)) {
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetadata.getIndex()); IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetadata.getIndex());
if (indexRoutingTable != null && if (indexRoutingTable != null &&
// Leader indices can be in the cluster state, but not all primary shards may be ready yet. // Leader indices can be in the cluster state, but not all primary shards may be ready yet.
@ -624,7 +628,6 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
// this index will be auto followed. // this index will be auto followed.
indexRoutingTable.allPrimaryShardsActive() && indexRoutingTable.allPrimaryShardsActive() &&
followedIndexUUIDs.contains(leaderIndexMetadata.getIndex().getUUID()) == false) { followedIndexUUIDs.contains(leaderIndexMetadata.getIndex().getUUID()) == false) {
leaderIndicesToFollow.add(leaderIndexMetadata.getIndex()); leaderIndicesToFollow.add(leaderIndexMetadata.getIndex());
} }
} }

View file

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
@ -200,7 +201,8 @@ public class TransportPutAutoFollowPatternAction extends AcknowledgedTransportMa
List<String> followedIndexUUIDS) { List<String> followedIndexUUIDS) {
for (final IndexMetadata indexMetadata : leaderMetadata) { for (final IndexMetadata indexMetadata : leaderMetadata) {
if (AutoFollowPattern.match(patterns, indexMetadata.getIndex().getName())) { IndexAbstraction indexAbstraction = leaderMetadata.getIndicesLookup().get(indexMetadata.getIndex().getName());
if (AutoFollowPattern.match(patterns, indexAbstraction)) {
followedIndexUUIDS.add(indexMetadata.getIndexUUID()); followedIndexUUIDS.add(indexMetadata.getIndexUUID());
} }
} }

View file

@ -20,12 +20,15 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreInfo;
@ -40,8 +43,12 @@ import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiConsumer;
public final class TransportPutFollowAction public final class TransportPutFollowAction
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> { extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {
@ -98,11 +105,12 @@ public final class TransportPutFollowAction
remoteCluster, remoteCluster,
leaderIndex, leaderIndex,
listener::onFailure, listener::onFailure,
(historyUUID, leaderIndexMetadata) -> createFollowerIndex(leaderIndexMetadata, request, listener)); (historyUUID, tuple) -> createFollowerIndex(tuple.v1(), tuple.v2(), request, listener));
} }
private void createFollowerIndex( private void createFollowerIndex(
final IndexMetadata leaderIndexMetadata, final IndexMetadata leaderIndexMetadata,
final DataStream remoteDataStream,
final PutFollowAction.Request request, final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) { final ActionListener<PutFollowAction.Response> listener) {
if (leaderIndexMetadata == null) { if (leaderIndexMetadata == null) {
@ -126,6 +134,16 @@ public final class TransportPutFollowAction
return; return;
} }
if (remoteDataStream != null) {
// when following a backing index then the names of the backing index must be remain the same in the local
// and remote cluster.
if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) {
listener.onFailure(
new IllegalArgumentException("a backing index name in the local and remote cluster must remain the same"));
return;
}
}
final Settings overrideSettings = Settings.builder() final Settings overrideSettings = Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex()) .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
@ -148,9 +166,24 @@ public final class TransportPutFollowAction
@Override @Override
protected void doRun() { protected void doRun() {
restoreService.restoreSnapshot(restoreRequest, ActionListener<RestoreService.RestoreCompletionResponse> delegatelistener = ActionListener.delegateFailure(
ActionListener.delegateFailure(listener, listener,
(delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response))); (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)
);
if (remoteDataStream == null) {
restoreService.restoreSnapshot(restoreRequest, delegatelistener);
} else {
String followerIndexName = request.getFollowerIndex();
BiConsumer<ClusterState, Metadata.Builder> updater = (currentState, mdBuilder) -> {
DataStream localDataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName());
Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
assert followerIndex != null;
DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream);
mdBuilder.put(updatedDataStream);
};
restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater);
}
} }
}); });
} }
@ -161,7 +194,7 @@ public final class TransportPutFollowAction
final ActionListener<PutFollowAction.Response> listener; final ActionListener<PutFollowAction.Response> listener;
if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) { if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) {
originalListener.onResponse(new PutFollowAction.Response(true, false, false)); originalListener.onResponse(new PutFollowAction.Response(true, false, false));
listener = new ActionListener<PutFollowAction.Response>() { listener = new ActionListener<>() {
@Override @Override
public void onResponse(PutFollowAction.Response response) { public void onResponse(PutFollowAction.Response response) {
@ -211,6 +244,29 @@ public final class TransportPutFollowAction
)); ));
} }
static DataStream updateLocalDataStream(Index backingIndexToFollow,
DataStream localDataStream,
DataStream remoteDataStream) {
if (localDataStream == null) {
// The data stream and the backing indices have been created and validated in the remote cluster,
// just copying the data stream is in this case safe.
return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(),
List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata());
} else {
List<Index> backingIndices = new ArrayList<>(localDataStream.getIndices());
backingIndices.add(backingIndexToFollow);
// When following an older backing index it should be positioned before the newer backing indices.
// Currently the assumption is that the newest index (highest generation) is the write index.
// (just appending an older backing index to the list of backing indices would break that assumption)
// (string sorting works because of the naming backing index naming scheme)
backingIndices.sort(Comparator.comparing(Index::getName));
return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices,
remoteDataStream.getGeneration(), remoteDataStream.getMetadata());
}
}
@Override @Override
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex()); return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex());

View file

@ -132,7 +132,7 @@ public class TransportResumeFollowAction extends AcknowledgedTransportMasterNode
listener::onFailure, listener::onFailure,
(leaderHistoryUUID, leaderIndexMetadata) -> { (leaderHistoryUUID, leaderIndexMetadata) -> {
try { try {
start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); start(request, leaderCluster, leaderIndexMetadata.v1(), followerIndexMetadata, leaderHistoryUUID, listener);
} catch (final IOException e) { } catch (final IOException e) {
listener.onFailure(e); listener.onFailure(e);
} }

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -171,6 +172,93 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(invoked[0], is(true)); assertThat(invoked[0], is(true));
} }
public void testAutoFollower_dataStream() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
autoFollowHeaders.put("remote", Map.of("key", "val"));
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
boolean[] invoked = new boolean[]{false};
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results -> {
invoked[0] = true;
assertThat(results.size(), equalTo(1));
assertThat(results.get(0).clusterStateFetchException, nullValue());
List<Map.Entry<Index, Exception>> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet());
assertThat(entries.size(), equalTo(1));
assertThat(entries.get(0).getKey().getName(), equalTo(".ds-logs-foobar-000001"));
assertThat(entries.get(0).getValue(), nullValue());
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote"));
handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, false), null);
}
@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo(".ds-logs-foobar-000001"));
assertThat(followRequest.getFollowerIndex(), equalTo(".ds-logs-foobar-000001"));
assertThat(followRequest.masterNodeTimeout(), equalTo(TimeValue.MAX_VALUE));
successHandler.run();
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
ClusterState resultCs = updateFunction.apply(currentState);
AutoFollowMetadata result = resultCs.metadata().custom(AutoFollowMetadata.TYPE);
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1));
handler.accept(null);
}
@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
}
};
autoFollower.start();
assertThat(invoked[0], is(true));
}
public void testAutoFollowerClusterStateApiFailure() { public void testAutoFollowerClusterStateApiFailure() {
Client client = mock(Client.class); Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
@ -2009,4 +2097,30 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
return clusterService; return clusterService;
} }
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
Settings.Builder indexSettings = settings(Version.CURRENT);
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
indexSettings.put("index.hidden", true);
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"),
List.of(indexMetadata.getIndex()));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder()
.put(indexMetadata, true)
.put(dataStream)
.version(0L));
ShardRouting shardRouting =
TestShardRouting.newShardRouting(dataStreamName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
return csBuilder.build();
}
} }

View file

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStream.TimestampField;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.equalTo;
public class TransportPutFollowActionTests extends ESTestCase {
public void testCreateNewLocalDataStream() {
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3);
Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
assertThat(result.getIndices().size(), equalTo(1));
assertThat(result.getIndices().get(0), equalTo(backingIndexToFollow));
}
public void testUpdateLocalDataStream_followNewBackingIndex() {
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3);
DataStream localDataStream = generateDataSteam("logs-foobar", 2);
Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
assertThat(result.getIndices().size(), equalTo(3));
assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3)));
}
public void testUpdateLocalDataStream_followOlderBackingIndex() {
// follow first backing index:
DataStream remoteDataStream = generateDataSteam("logs-foobar", 5);
DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5));
Index backingIndexToFollow = remoteDataStream.getIndices().get(0);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
assertThat(result.getIndices().size(), equalTo(2));
assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5)));
// follow second last backing index:
localDataStream = result;
backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2);
result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
assertThat(result.getIndices().size(), equalTo(3));
assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 4)));
assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5)));
}
static DataStream generateDataSteam(String name, int numBackingIndices) {
List<Index> backingIndices = IntStream.range(1, numBackingIndices + 1)
.mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value))
.map(value -> new Index(value, "uuid"))
.collect(Collectors.toList());
return new DataStream(name, new TimestampField("@timestamp"), backingIndices);
}
static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) {
List<Index> backingIndices = Arrays.stream(backingIndexNames)
.map(value -> new Index(value, "uuid"))
.collect(Collectors.toList());
return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of());
}
}

View file

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ccr;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -275,12 +276,18 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
} }
} }
public boolean match(String indexName) { public boolean match(IndexAbstraction indexAbstraction) {
return match(leaderIndexPatterns, indexName); return match(leaderIndexPatterns, indexAbstraction);
} }
public static boolean match(List<String> leaderIndexPatterns, String indexName) { public static boolean match(List<String> leaderIndexPatterns, IndexAbstraction indexAbstraction) {
return Regex.simpleMatch(leaderIndexPatterns, indexName); boolean matches = Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName());
if (matches) {
return true;
} else {
return indexAbstraction.getParentDataStream() != null &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
}
} }
public String getRemoteCluster() { public String getRemoteCluster() {

View file

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
@ -35,6 +36,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStream;
@ -1137,6 +1139,18 @@ public class DataStreamIT extends ESIntegTestCase {
assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features"))); assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features")));
} }
public void testClusterStateIncludeDataStream() throws Exception {
putComposableIndexTemplate("id1", List.of("metrics-foo*"));
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
// when querying a backing index then the data stream should be included as well.
ClusterStateRequest request = new ClusterStateRequest().indices(".ds-metrics-foo-000001");
ClusterState state = client().admin().cluster().state(request).get().getState();
assertThat(state.metadata().dataStreams().size(), equalTo(1));
assertThat(state.metadata().dataStreams().get("metrics-foo").getName(), equalTo("metrics-foo"));
}
private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) { private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
verifyResolvability(dataStream, requestBuilder, fail, 0); verifyResolvability(dataStream, requestBuilder, fail, 0);
} }