From 3420be0ca5a2fdeb7e91314f01a62bcd72d5840f Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 1 Aug 2022 09:17:50 -0600 Subject: [PATCH] Fix renaming data streams with CCR replication (#88875) This commit fixes the situation where a user wants to use CCR to replicate indices that are part of a data stream while renaming the data stream. For example, assume a user has an auto-follow request that looks like this: ``` PUT /_ccr/auto_follow/my-auto-follow-pattern { "remote_cluster" : "other-cluster", "leader_index_patterns" : ["logs-*"], "follow_index_pattern" : "{{leader_index}}_copy" } ``` And then the data stream `logs-mysql-error` was created, creating the backing index `.ds-logs-mysql-error-2022-07-29-000001`. Prior to this commit, replicating this data stream means that the backing index would be renamed to `.ds-logs-mysql-error-2022-07-29-000001_copy` and the data stream would *not* be renamed. This caused a check to trip in `TransportPutLifecycleAction` asserting that a backing index was not renamed for a data stream during following. After this commit, there are a couple of changes: First, the data stream will also be renamed. This means that the `logs-mysql-error` becomes `logs-mysql-error_copy` when created on the follower cluster. Because of the way that CCR works, this means we need to support renaming a data stream for a regular "create follower" request, so a new parameter has been added: `data_stream_name`. It works like this: ``` PUT /mynewindex/_ccr/follow { "remote_cluster": "other-cluster", "leader_index": "myotherindex", "data_stream_name": "new_ds" } ``` Second, the backing index for a data stream must be renamed in a way that does not break the parsing of a data stream backing pattern, whereas previously the index `.ds-logs-mysql-error-2022-07-29-000001` would be renamed to `.ds-logs-mysql-error-2022-07-29-000001_copy` (an illegal name since it doesn't end with the rollover digit), after this commit it will be renamed to `.ds-logs-mysql-error_copy-2022-07-29-000001` to match the renamed data stream. This means that for the given `follow_index_pattern` of `{{leader_index}}_copy` the index changes look like: | Leader Cluster | Follower Cluster | |--------------|-----------| | `logs-mysql-error` (data stream) | `logs-mysql-error_copy` (data stream) | | `.ds-logs-mysql-error-2022-07-29-000001` | `.ds-logs-mysql-error_copy-2022-07-29-000001` | Which internally means the auto-follow request turned into the create follower request of: ``` PUT /.ds-logs-mysql-error_copy-2022-07-29-000001/_ccr/follow { "remote_cluster": "other-cluster", "leader_index": ".ds-logs-mysql-error-2022-07-29-000001", "data_stream_name": "logs-mysql-error_copy" } ``` Relates to https://github.com/elastic/elasticsearch/pull/84940 (cherry-picked the commit for a test) Relates to https://github.com/elastic/elasticsearch/pull/61993 (where data stream support was first introduced for CCR) Resolves https://github.com/elastic/elasticsearch/issues/81751 --- docs/changelog/88875.yaml | 6 + .../put-auto-follow-pattern.asciidoc | 13 +- .../ccr/apis/follow/put-follow.asciidoc | 20 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 129 ++++++- .../xpack/ccr/FollowIndexIT.java | 20 -- .../xpack/ccr/FollowIndexSecurityIT.java | 2 +- .../xpack/ccr/ESCCRRestTestCase.java | 11 +- .../ccr/action/AutoFollowCoordinator.java | 128 ++++++- .../ccr/action/TransportPutFollowAction.java | 57 ++- .../action/AutoFollowCoordinatorTests.java | 328 ++++++++++++++++++ .../ccr/action/FollowParametersTests.java | 5 + .../action/PutFollowActionRequestTests.java | 37 ++ .../action/TransportPutFollowActionTests.java | 28 +- .../core/ccr/action/PutFollowAction.java | 37 +- 14 files changed, 753 insertions(+), 68 deletions(-) create mode 100644 docs/changelog/88875.yaml diff --git a/docs/changelog/88875.yaml b/docs/changelog/88875.yaml new file mode 100644 index 000000000000..0643e86a6dfe --- /dev/null +++ b/docs/changelog/88875.yaml @@ -0,0 +1,6 @@ +pr: 88875 +summary: Fix renaming data streams with CCR replication +area: "Data streams" +type: bug +issues: + - 81751 diff --git a/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc b/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc index ed377e72fce4..3876cab007d9 100644 --- a/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc +++ b/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc @@ -85,11 +85,14 @@ the new patterns. more `leader_index_patterns` and one or more `leader_index_exclusion_patterns` won't be followed. `follow_index_pattern`:: - (Optional, string) The name of follower index. The template `{{leader_index}}` - can be used to derive the name of the follower index from the name of the - leader index. When following a data stream, use `{{leader_index}}`; {ccr-init} - does not support changes to the names of a follower data stream's backing - indices. + (Optional, string) The name of follower index. The template `{{leader_index}}` can be used to + derive the name of the follower index from the name of the leader index. When following a data + stream, the `follow_index_pattern` will be used for renaming not only the leader index, but also + the data stream containing the leader index. For example, a data stream called + `logs-mysql-default` with a backing index of `.ds-logs-mysql-default-2022-01-01-000001` and a + `follow_index_pattern` of `{{leader_index}}_copy` will replicate the data stream as + `logs-mysql-default_copy` and the backing index as + `.ds-logs-mysql-default_copy-2022-01-01-000001`. include::../follow-request-body.asciidoc[] diff --git a/docs/reference/ccr/apis/follow/put-follow.asciidoc b/docs/reference/ccr/apis/follow/put-follow.asciidoc index d09eb5153404..93e8a710751a 100644 --- a/docs/reference/ccr/apis/follow/put-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/put-follow.asciidoc @@ -76,6 +76,26 @@ referenced leader index. When this API returns, the follower index exists, and (Required, string) The <> containing the leader index. +[[ccr-put-follow-request-body-data_stream_name]]`data_stream_name`:: + (Optional, string) If the leader index is part of a <>, the name to + which the local data stream for the followed index should be renamed. For example, A request like: + +[source,console] +-------------------------------------------------- +PUT /.ds-logs-mysql-default_copy-2022-01-01-000001/_ccr/follow +{ + "remote_cluster" : "remote_cluster", + "leader_index" : ".ds-logs-mysql-default-2022-01-01-000001", + "data_stream_name": "logs-mysql-default_copy" +} +-------------------------------------------------- +// TEST[skip:no setup] + +Replicates the leader index `.ds-logs-mysql-default-2022-01-01-000001` into the follower index +`.ds-logs-mysql-default_copy-2022-01-01-000001` and will do so using the data stream +`logs-mysql-default_copy`, as opposed to the original leader data stream name of +`logs-mysql-default`. + include::../follow-request-body.asciidoc[] [[ccr-put-follow-examples]] diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 35fac474d86f..ffdd40a1bd84 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -236,7 +236,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); try { // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", null); // Create data stream and ensure that is is auto followed try (RestClient leaderClient = buildLeaderClient()) { @@ -320,6 +320,121 @@ public class AutoFollowIT extends ESCCRRestTestCase { } } + public void testDataStreamsRenameFollowDataStream() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-mysql-error"; + final String dataStreamNameFollower = "logs-mysql-error_copy"; + final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT); + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + try { + // Create auto follow pattern + createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", "{{leader_index}}_copy"); + + // Create data stream and ensure that is is auto followed + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + logger.info( + "--> checking {} with index {} has been auto followed to {} with backing index {}", + dataStreamName, + backingIndexName(dataStreamName, 1), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1) + ); + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamNameFollower, 1)); + ensureYellow(dataStreamNameFollower); + verifyDocuments(client(), dataStreamNameFollower, numDocs); + }); + + // First rollover and ensure second backing index is replicated: + logger.info("--> rolling over"); + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream( + client(), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2) + ); + ensureYellow(dataStreamNameFollower); + verifyDocuments(client(), dataStreamNameFollower, numDocs + 1); + }); + + // Second rollover and ensure third backing index is replicated: + logger.info("--> rolling over"); + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream( + leaderClient, + dataStreamName, + backingIndexName(dataStreamName, 1), + backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3) + ); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 2); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); + verifyDataStream( + client(), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2), + backingIndexName(dataStreamNameFollower, 3) + ); + ensureYellow(dataStreamNameFollower); + verifyDocuments(client(), dataStreamNameFollower, numDocs + 2); + }); + + } finally { + cleanUpFollower( + List.of( + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2), + backingIndexName(dataStreamNameFollower, 3) + ), + List.of(dataStreamNameFollower), + List.of(autoFollowPatternName) + ); + cleanUpLeader( + List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)), + List.of(dataStreamName), + List.of() + ); + } + } + public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception { if ("follow".equals(targetCluster) == false) { return; @@ -353,7 +468,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { } // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null); // Rollover and ensure only second backing index is replicated: try (RestClient leaderClient = buildLeaderClient()) { @@ -410,7 +525,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { List backingIndexNames = null; try { // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster", null); // Create data stream and ensure that is is auto followed try (var leaderClient = buildLeaderClient()) { @@ -531,7 +646,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); try { // Create auto follow pattern - createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster"); + createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster", null); // Create leader index and write alias: try (var leaderClient = buildLeaderClient()) { @@ -618,7 +733,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { try { // Create auto follow pattern in follow cluster - createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster"); + createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster", null); // Create auto follow pattern in leader cluster: try (var leaderClient = buildLeaderClient()) { @@ -658,7 +773,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { } assertOK(leaderClient.performRequest(request)); // Then create the actual auto follow pattern: - createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster"); + createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster", null); } var numDocs = 128; @@ -832,7 +947,7 @@ public class AutoFollowIT extends ESCCRRestTestCase { final String mountedIndex = testPrefix + "-mounted"; try { - createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster", null); // Create a regular index on leader try (var leaderClient = buildLeaderClient()) { diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 38132b53ed30..db8562bac62e 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -180,26 +180,6 @@ public class FollowIndexIT extends ESCCRRestTestCase { assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM")); } - public void testChangeBackingIndexNameFails() throws Exception { - if ("follow".equals(targetCluster) == false) { - return; - } - - final String dataStreamName = "logs-foobar-prod"; - try (RestClient leaderClient = buildLeaderClient()) { - Request request = new Request("PUT", "/_data_stream/" + dataStreamName); - assertOK(leaderClient.performRequest(request)); - verifyDataStream(leaderClient, dataStreamName, DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1)); - } - - ResponseException failure = expectThrows( - ResponseException.class, - () -> followIndex(DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1), ".ds-logs-barbaz-prod-000001") - ); - assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); - assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same")); - } - public void testFollowSearchableSnapshotsFails() throws Exception { final String testPrefix = getTestName().toLowerCase(Locale.ROOT); diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 24eb234716c4..c2210af7e0a1 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -281,7 +281,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase { // Setup { - createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster"); + createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster", null); } // Create data stream and ensure that it is auto followed { diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index f7df63db15f9..b95d9f60c62d 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -335,7 +335,13 @@ public class ESCCRRestTestCase extends ESRestTestCase { return List.copyOf(actualBackingIndices); } - protected static void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException { + protected static void createAutoFollowPattern( + RestClient client, + String name, + String pattern, + String remoteCluster, + String followIndexPattern + ) throws IOException { Request request = new Request("PUT", "/_ccr/auto_follow/" + name); try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { bodyBuilder.startObject(); @@ -345,6 +351,9 @@ public class ESCCRRestTestCase extends ESRestTestCase { bodyBuilder.value(pattern); } bodyBuilder.endArray(); + if (followIndexPattern != null) { + bodyBuilder.field("follow_index_pattern", followIndexPattern); + } bodyBuilder.field("remote_cluster", remoteCluster); } bodyBuilder.endObject(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a53ea9dc6903..b11fafd01f6b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -61,6 +62,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -72,9 +75,24 @@ import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedClust */ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener { + /** + * This is the string that will be replaced by the leader index name for a backing index or data + * stream. It allows auto-following to automatically rename an index or data stream when + * automatically followed. For example, using "{{leader_index}}_copy" for the follow pattern + * means that a data stream called "logs-foo-bar" would be renamed "logs-foo-bar_copy" when + * replicated, and a backing index called ".ds-logs-foo-bar-2022-02-02-000001" would be renamed + * to ".ds-logs-foo-bar_copy-2022-02-02-000001". + * See {@link AutoFollower#getFollowerIndexName} for the entire usage. + */ + public static final String AUTO_FOLLOW_PATTERN_REPLACEMENT = "{{leader_index}}"; + private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; + private static final Pattern DS_BACKING_PATTERN = Pattern.compile( + "^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$" + ); + private final Client client; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; @@ -563,6 +581,12 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements cleanFollowedRemoteIndices(remoteClusterState, patterns); } + /** + * Go through all the leader indices that need to be followed, ensuring that they are + * auto-followed by only a single pattern, have soft-deletes enabled, are not + * searchable snapshots, and are not already followed. If all of those conditions are met, + * then follow the indices. + */ private void checkAutoFollowPattern( String autoFollowPattenName, String remoteClusterString, @@ -582,8 +606,13 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements leaderIndicesToFollow.size() ); + // Loop through all the as-of-yet-unfollowed indices from the leader for (final Index indexToFollow : leaderIndicesToFollow) { + // Look up the abstraction for the given index, e.g., an index ".ds-foo" could look + // up the Data Stream "foo" IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName()); + // Ensure that the remote cluster doesn't have other patterns + // that would follow the index, there can be only one. List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() .filter(otherPattern -> otherPattern.v2().match(indexAbstraction)) .map(Tuple::v1) @@ -605,6 +634,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ); } else { final IndexMetadata leaderIndexMetadata = remoteMetadata.getIndexSafe(indexToFollow); + // First ensure that the index on the leader that we want to follow has soft-deletes enabled if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexMetadata.getSettings()) == false) { String message = String.format( Locale.ROOT, @@ -639,10 +669,12 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)) ); } else { + // Finally, if there are no reasons why we cannot follow the leader index, perform the follow. followLeaderIndex( autoFollowPattenName, remoteClusterString, indexToFollow, + indexAbstraction, autoFollowPattern, headers, error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)) @@ -669,22 +701,32 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements return false; } - private void followLeaderIndex( - String autoFollowPattenName, - String remoteClusterString, + /** + * Given a remote cluster, index that will be followed (and its abstraction), as well as an + * {@link AutoFollowPattern}, generate the internal follow request for following the index. + */ + static PutFollowAction.Request generateRequest( + String remoteCluster, Index indexToFollow, - AutoFollowPattern pattern, - Map headers, - Consumer onResult + IndexAbstraction indexAbstraction, + AutoFollowPattern pattern ) { final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); PutFollowAction.Request request = new PutFollowAction.Request(); - request.setRemoteCluster(remoteClusterString); + request.setRemoteCluster(remoteCluster); request.setLeaderIndex(indexToFollow.getName()); request.setFollowerIndex(followIndexName); request.setSettings(pattern.getSettings()); + // If there was a pattern specified for renaming the backing index, and this index is + // part of a data stream, then send the new data stream name as part of the request. + if (pattern.getFollowIndexPattern() != null && indexAbstraction.getParentDataStream() != null) { + String dataStreamName = indexAbstraction.getParentDataStream().getDataStream().getName(); + // Send the follow index pattern as the data stream pattern, so that data streams can be + // renamed accordingly (not only the backing indices) + request.setDataStreamName(pattern.getFollowIndexPattern().replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, dataStreamName)); + } request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount()); request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize()); request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests()); @@ -697,9 +739,23 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements request.getParameters().setReadPollTimeout(pattern.getReadPollTimeout()); request.masterNodeTimeout(TimeValue.MAX_VALUE); + return request; + } + + private void followLeaderIndex( + String autoFollowPattenName, + String remoteClusterString, + Index indexToFollow, + IndexAbstraction indexAbstraction, + AutoFollowPattern pattern, + Map headers, + Consumer onResult + ) { + PutFollowAction.Request request = generateRequest(remoteClusterString, indexToFollow, indexAbstraction, pattern); + // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { - LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, followIndexName); + LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, request.getFollowerIndex()); // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: // (so that we do not try to follow it in subsequent auto follow runs) @@ -731,6 +787,22 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements } } + /** + * Given an auto following pattern for a set of indices and the cluster state from a remote + * cluster, return the list of indices that need to be followed. The list of followed index + * UUIDs contains indices that have already been followed, so the returned list will only + * contain "new" indices from the leader that need to be followed. + * + * When looking up the name of the index to see if it matches one of the patterns, the index + * abstraction ({@link IndexAbstraction}) of the index is used for comparison, this means + * that if an index named ".ds-foo" was part of a data stream "foo", then an auto-follow + * pattern of "f*" would allow the ".ds-foo" index to be returned. + * + * @param autoFollowPattern pattern to check indices that may need to be followed + * @param remoteClusterState state from the remote ES cluster + * @param followedIndexUUIDs a collection of UUIDs of indices already being followed + * @return any new indices on the leader that need to be followed + */ static List getLeaderIndicesToFollow( AutoFollowPattern autoFollowPattern, ClusterState remoteClusterState, @@ -760,9 +832,45 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements return leaderIndicesToFollow; } + /** + * Returns the new name for the follower index. If the auto-follow configuration includes a + * follow index pattern, the text "{@code {{leader_index}}}" is replaced with the original + * index name, so a leader index called "foo" and a pattern of "{{leader_index}}_copy" + * becomes a new follower index called "foo_copy". + */ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) { - if (autoFollowPattern.getFollowIndexPattern() != null) { - return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName); + final String followPattern = autoFollowPattern.getFollowIndexPattern(); + if (followPattern != null) { + if (leaderIndexName.contains(DataStream.BACKING_INDEX_PREFIX)) { + // The index being replicated is a data stream backing index, so it's something + // like: .ds--20XX-mm-dd-NNNNNN + // + // However, we cannot just replace the name with the proposed follow index + // pattern, or else we'll end up with something like ".ds-logs-foo-bar-2022-02-02-000001_copy" + // for "{{leader_index}}_copy", which will cause problems because it doesn't + // follow a parseable pattern. Instead it would be better to rename it as though + // the data stream name was the leader index name, ending up with + // ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name. + Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName); + if (m.find()) { + return m.group(1) + // Prefix including ".ds-" + followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed + "-" + // Hyphen separator + m.group(3) + // Date math + m.group(4); + } else { + throw new IllegalArgumentException( + "unable to determine follower index name from leader index name [" + + leaderIndexName + + "] and follow index pattern: [" + + followPattern + + "], index appears to follow a regular data stream backing pattern, but could not be parsed" + ); + } + } else { + // If the index does nat contain a `.ds-`, then rename it as usual. + return followPattern.replace("{{leader_index}}", leaderIndexName); + } } else { return leaderIndexName; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 88301c49c210..b95e03eb09f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -169,17 +170,6 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction afterRestoreStarted(clientWithHeaders, request, delegatedListener, response) ); if (remoteDataStream == null) { + // If the index we're following is not part of a data stream, start the + // restoration of the index normally. restoreService.restoreSnapshot(restoreRequest, delegatelistener); } else { String followerIndexName = request.getFollowerIndex(); + // This method is used to update the metadata in the same cluster state + // update as the snapshot is restored. BiConsumer updater = (currentState, mdBuilder) -> { - DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(remoteDataStream.getName()); - Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); - assert followerIndex != null; + final String localDataStreamName; - DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream); + // If we have been given a data stream name, use that name for the local + // data stream. See the javadoc for AUTO_FOLLOW_PATTERN_REPLACEMENT + // for more info. + final String dsName = request.getDataStreamName(); + if (Strings.hasText(dsName)) { + localDataStreamName = dsName; + } else { + // There was no specified name, use the original data stream name. + localDataStreamName = remoteDataStream.getName(); + } + final DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(localDataStreamName); + final Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); + assert followerIndex != null + : "expected followerIndex " + followerIndexName + " to exist in the state, but it did not"; + + final DataStream updatedDataStream = updateLocalDataStream( + followerIndex, + localDataStream, + localDataStreamName, + remoteDataStream + ); mdBuilder.put(updatedDataStream); }; restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater); @@ -303,12 +315,23 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction AutoFollower.generateRequest("remote", index, indexAbstraction, pattern) + ); + assertThat( + e.getMessage(), + containsString( + "unable to determine follower index name from leader index name " + + "[my-.ds-backing-index] and follow index pattern: [{{leader_index}}_copy]" + + ", index appears to follow a regular data stream backing pattern, but could not be parsed" + ) + ); + } } public void testStats() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java index fd92bc3ecff9..93879f2dfb84 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java @@ -38,6 +38,11 @@ public class FollowParametersTests extends AbstractSerializingTestCase request.setFollowerIndex(randomAlphaOfLength(5)); + case 1 -> request.waitForActiveShards(new ActiveShardCount(randomIntBetween(3, 5))); + case 2 -> request.setRemoteCluster(randomAlphaOfLength(5)); + case 3 -> request.setLeaderIndex(randomAlphaOfLength(5)); + case 4 -> request.setSettings( + Settings.builder() + .put( + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), + randomValueOtherThan( + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(request.getSettings()), + ESTestCase::randomInt + ) + ) + .build() + ); + case 5 -> request.setParameters(FollowParametersTests.randomInstance()); + case 6 -> request.setDataStreamName(randomAlphaOfLength(5)); + default -> throw new AssertionError("failed branch"); + } + return request; + } + @Override protected boolean supportsUnknownFields() { return false; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 955623bdda74..61050b417211 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -24,7 +24,12 @@ public class TransportPutFollowActionTests extends ESTestCase { public void testCreateNewLocalDataStream() { DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + null, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -36,7 +41,12 @@ public class TransportPutFollowActionTests extends ESTestCase { DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); DataStream localDataStream = generateDataSteam("logs-foobar", 2, true); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -51,7 +61,12 @@ public class TransportPutFollowActionTests extends ESTestCase { DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false); DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); Index backingIndexToFollow = remoteDataStream.getIndices().get(0); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -62,7 +77,12 @@ public class TransportPutFollowActionTests extends ESTestCase { // follow second last backing index: localDataStream = result; backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2); - result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 910cf956c5da..1b340a27bac2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -15,9 +15,11 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -43,6 +45,7 @@ public final class PutFollowAction extends ActionType private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); private static final ParseField SETTINGS_FIELD = new ParseField("settings"); + private static final ParseField DATA_STREAM_NAME = new ParseField("data_stream_name"); // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to // PutFollowParameters class. But since two minor version are already released with duplicate follow parameters @@ -52,6 +55,7 @@ public final class PutFollowAction extends ActionType static { PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD); PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD); + PARSER.declareString((putFollowParameters, value) -> putFollowParameters.dataStreamName = value, DATA_STREAM_NAME); PARSER.declareObject( (putFollowParameters, value) -> putFollowParameters.settings = value, (p, c) -> Settings.fromXContent(p), @@ -69,6 +73,7 @@ public final class PutFollowAction extends ActionType request.setFollowerIndex(followerIndex); request.setRemoteCluster(parameters.remoteCluster); request.setLeaderIndex(parameters.leaderIndex); + request.setDataStreamName(parameters.dataStreamName); request.setSettings(parameters.settings); request.setParameters(parameters); return request; @@ -76,8 +81,10 @@ public final class PutFollowAction extends ActionType private String remoteCluster; private String leaderIndex; - private Settings settings = Settings.EMPTY; private String followerIndex; + @Nullable + private String dataStreamName; + private Settings settings = Settings.EMPTY; private FollowParameters parameters = new FollowParameters(); private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; @@ -123,6 +130,15 @@ public final class PutFollowAction extends ActionType this.parameters = parameters; } + @Nullable + public String getDataStreamName() { + return dataStreamName; + } + + public void setDataStreamName(String dataStreamName) { + this.dataStreamName = dataStreamName; + } + public ActiveShardCount waitForActiveShards() { return waitForActiveShards; } @@ -156,6 +172,9 @@ public final class PutFollowAction extends ActionType if (followerIndex == null) { e = addValidationError("follower_index is missing", e); } + if (dataStreamName != null && Strings.hasText(dataStreamName) == false) { + e = addValidationError("data stream name must contain text if present", e); + } return e; } @@ -179,6 +198,9 @@ public final class PutFollowAction extends ActionType } this.parameters = new FollowParameters(in); waitForActiveShards(ActiveShardCount.readFrom(in)); + if (in.getVersion().onOrAfter(Version.V_8_5_0)) { + this.dataStreamName = in.readOptionalString(); + } } @Override @@ -192,6 +214,9 @@ public final class PutFollowAction extends ActionType } parameters.writeTo(out); waitForActiveShards.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_5_0)) { + out.writeOptionalString(this.dataStreamName); + } } @Override @@ -200,6 +225,9 @@ public final class PutFollowAction extends ActionType { builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); + if (dataStreamName != null) { + builder.field(DATA_STREAM_NAME.getPreferredName(), dataStreamName); + } if (settings.isEmpty() == false) { builder.startObject(SETTINGS_FIELD.getPreferredName()); { @@ -222,12 +250,14 @@ public final class PutFollowAction extends ActionType && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex) && Objects.equals(parameters, request.parameters) - && Objects.equals(waitForActiveShards, request.waitForActiveShards); + && Objects.equals(waitForActiveShards, request.waitForActiveShards) + && Objects.equals(dataStreamName, request.dataStreamName) + && Objects.equals(settings, request.settings); } @Override public int hashCode() { - return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, waitForActiveShards); + return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, settings, waitForActiveShards, dataStreamName); } // This class only exists for reuse of the FollowParameters class, see comment above the parser field. @@ -235,6 +265,7 @@ public final class PutFollowAction extends ActionType private String remoteCluster; private String leaderIndex; + private String dataStreamName; private Settings settings = Settings.EMPTY; }