Protect replicated data streams against local rollovers (#64710)

When a data stream is being auto followed then a rollover in a local cluster can break auto following,
if the local cluster performs a rollover then it creates a new write index and if then later the remote
cluster rolls over as well then that new write index can't be replicated, because it has the same name
as in the write index in the local cluster, which was created earlier.

If a data stream is managed by ccr, then the local cluster should not do a rollover for those data streams.
The data stream should be rolled over in the remote cluster and that change should replicate to the local
cluster. Performing a rollover in the local cluster is an operation that the data stream support in ccr should
perform.

To protect against rolling over a replicated data stream, this PR adds a replicate field to DataStream class.
The rollover api will fail with an error in case a data stream is being rolled over and the targeted data stream is
a replicated data stream. When the put follow api creates a data stream in the local cluster then the replicate flag
is set to true. There should be a way to turn a replicated data stream into a regular data stream when for example
during disaster recovery. The newly added api in this pr (promote data stream api) is doing that. After a replicated
data stream is promoted to a regular data stream then the local data stream can be rolled over, so that the new
write index is no longer a follower index. Also if the put follow api is attempting to update this data stream
(for example to attempt to resume auto following) then that with fail, because the data stream is no longer a
replicated data stream.

Today with time based indices behind an alias, the is_write_index property isn't replicated from remote cluster
to the local cluster, so when attempting to rollover the alias in the local cluster the rollover fails, because the
alias doesn't have a write index. The added replicated field in the DataStream class and added validation
achieve the same kind of protection, but in a more robust way.

A followup from #61993.
This commit is contained in:
Martijn van Groningen 2020-12-08 08:34:24 +01:00 committed by GitHub
parent c58937b31f
commit 52afaf2060
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 762 additions and 76 deletions

View file

@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.
The data streams replicated from a remote cluster by CCR are protected from
local rollovers. The <<promote-data-stream-api,promote data stream API>>
can be used to turn these data streams into regular data streams.
Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create

View file

@ -9,6 +9,7 @@ The following APIs are available for managing <<data-streams,data streams>>:
* <<indices-get-data-stream>>
* <<indices-migrate-to-data-stream>>
* <<data-stream-stats-api>>
* <<promote-data-stream-api>>
For concepts and tutorials, see <<data-streams>>.
@ -21,3 +22,5 @@ include::{es-repo-dir}/indices/get-data-stream.asciidoc[]
include::{es-repo-dir}/indices/migrate-to-data-stream.asciidoc[]
include::{es-repo-dir}/indices/data-stream-stats.asciidoc[]
include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[]

View file

@ -0,0 +1,38 @@
[role="xpack"]
[[promote-data-stream-api]]
=== Promote Data Stream API
++++
<titleabbrev>Promote data stream api</titleabbrev>
++++
The purpose of the promote data stream api is to turn
a data stream that is replicated by CCR into a regular
data stream.
Via CCR Auto Following, a data stream from a remote cluster
can be replicated to the local cluster. These data streams
can't be rolled over in the local cluster. Only if the upstream
data stream rolls over then these replicated data streams roll
over as well. In the event that the remote cluster is no longer
available, the data stream in the local cluster can be promoted
to a regular data stream, which allows these data streams to
be rolled over in the local cluster.
[source,console]
----
POST /_data_stream/_promote/my-data-stream
----
// TEST[catch:missing]
[[promote-data-stream-api-request]]
==== {api-request-title}
`POST /_data_stream/_promote/<data-stream>`
[[promote-data-stream-api-path-params]]
==== {api-path-parms-title}
`<data-stream>`::
(Required, string)
The name of the data stream to promote.

View file

@ -38,6 +38,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
@ -148,6 +149,7 @@ public class MetadataRolloverService {
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1);
ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation
createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
if (onlyValidate) {
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState);

View file

@ -44,6 +44,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final Version HIDDEN_VERSION = Version.V_7_11_0;
public static final Version REPLICATED_VERSION = Version.V_8_0_0;
private final String name;
private final TimestampField timeStampField;
@ -51,19 +52,21 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
private final long generation;
private final Map<String, Object> metadata;
private final boolean hidden;
private final boolean replicated;
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
this(name, timeStampField, indices, generation, metadata, false);
this(name, timeStampField, indices, generation, metadata, false, false);
}
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
boolean hidden) {
boolean hidden, boolean replicated) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
this.hidden = hidden;
this.replicated = replicated;
assert indices.size() > 0;
}
@ -100,6 +103,16 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
return hidden;
}
/**
* Determines whether this data stream is replicated from elsewhere,
* for example a remote cluster.
*
* @return Whether this data stream is replicated.
*/
public boolean isReplicated() {
return replicated;
}
/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
@ -110,9 +123,14 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
*/
public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
if (replicated) {
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
"because it is a replicated data stream");
}
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
}
/**
@ -126,7 +144,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}
/**
@ -151,7 +169,11 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}
public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
}
/**
@ -169,7 +191,8 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
public DataStream(StreamInput in) throws IOException {
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null,
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean());
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(),
in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean());
}
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@ -188,6 +211,9 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
if (out.getVersion().onOrAfter(HIDDEN_VERSION)) {
out.writeBoolean(hidden);
}
if (out.getVersion().onOrAfter(REPLICATED_VERSION)) {
out.writeBoolean(replicated);
}
}
public static final ParseField NAME_FIELD = new ParseField("name");
@ -196,11 +222,12 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField METADATA_FIELD = new ParseField("_meta");
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
public static final ParseField REPLICATED_FIELD = new ParseField("replicated");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5]));
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
@ -209,6 +236,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
}
public static DataStream fromXContent(XContentParser parser) throws IOException {
@ -226,6 +254,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
builder.field(METADATA_FIELD.getPreferredName(), metadata);
}
builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
builder.endObject();
return builder;
}
@ -239,12 +268,14 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
generation == that.generation &&
Objects.equals(metadata, that.metadata);
Objects.equals(metadata, that.metadata) &&
hidden == that.hidden &&
replicated == that.replicated;
}
@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, metadata);
return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated);
}
public static final class TimestampField implements Writeable, ToXContentObject {

View file

@ -185,7 +185,7 @@ public class MetadataCreateDataStreamService {
dsBackingIndices.add(writeIndex.getIndex());
boolean hidden = template.getDataStreamTemplate().isHidden();
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden);
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,
writeIndex.getIndex().getName(),

View file

@ -623,7 +623,7 @@ public class RestoreService implements ClusterStateApplier {
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
dataStream.getMetadata(), dataStream.isHidden());
dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated());
}
public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {

View file

@ -536,7 +536,9 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance();
final DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStream.getName() + "*"), null, null, null, null, null,
new ComposableIndexTemplate.DataStreamTemplate(), null);
Metadata.Builder builder = Metadata.builder();
@ -632,7 +634,9 @@ public class MetadataRolloverServiceTests extends ESTestCase {
final boolean useDataStream = randomBoolean();
final Metadata.Builder builder = Metadata.builder();
if (useDataStream) {
DataStream dataStream = DataStreamTestHelper.randomInstance();
DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
rolloverTarget = dataStream.getName();
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);

View file

@ -52,7 +52,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
}
public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance();
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
DataStream rolledDs = ds.rollover(newWriteIndex);

View file

@ -2127,7 +2127,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
.put(index2, false)
.put(justAnIndex, false)
.put(new DataStream(dataStream1, createTimestampField("@timestamp"),
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build();
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build();
Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*");
assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() ));

View file

@ -109,7 +109,8 @@ public final class DataStreamTestHelper {
if (randomBoolean()) {
metadata = Map.of("key", "value");
}
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, randomBoolean());
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata,
randomBoolean(), randomBoolean());
}
/**

View file

@ -11,6 +11,7 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -24,11 +25,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class AutoFollowIT extends ESCCRRestTestCase {
@ -185,21 +189,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Create auto follow pattern
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
// Create data stream and ensure that is is auto followed
{
@ -210,12 +200,12 @@ public class AutoFollowIT extends ESCCRRestTestCase {
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001");
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001");
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs);
});
@ -226,7 +216,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
@ -236,7 +226,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 1);
});
@ -247,8 +237,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" +
".ds-logs-mysql-error-000003");
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3));
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
@ -258,8 +248,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002",
".ds-logs-mysql-error-000003");
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 2);
});
@ -288,34 +278,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
}
}
// Create auto follow pattern
{
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
}
createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
// Rollover and ensure only second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
@ -325,17 +299,17 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002");
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 2));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, 1);
});
}
// Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
{
followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001");
followIndex(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 1));
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
});
@ -347,13 +321,380 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
}
public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final int numDocs = 64;
final var dataStreamName = "logs-tomcat-prod";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Create auto follow pattern
createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
// Create data stream and ensure that is is auto followed
{
try (var leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs);
});
}
// Rollover in leader cluster and ensure second backing index is replicated:
{
try (var leaderClient = buildLeaderClient()) {
var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 1);
});
}
// Try rollover in follow cluster
{
var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover");
var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
"because it is a replicated data stream"));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
// Unfollow .ds-logs-tomcat-prod-000001
pauseFollow(backingIndexName(dataStreamName, 1));
closeIndex(backingIndexName(dataStreamName, 1));
unfollow(backingIndexName(dataStreamName, 1));
// Try again
var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover");
e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2));
assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
"because it is a replicated data stream"));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
// Promote local data stream
var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
assertOK(client().performRequest(promoteRequest));
// Try again and now the rollover should be successful because local data stream is now :
var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest3));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3));
// TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails,
// because local data stream isn't a replicated data stream anymore.
// Unfollow .ds-logs-tomcat-prod-000002,
// which is now possible because this index can now be closed as it is no longer the write index.
pauseFollow(backingIndexName(dataStreamName, 2));
closeIndex(backingIndexName(dataStreamName, 2));
unfollow(backingIndexName(dataStreamName, 2));
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}
public void testRolloverAliasInFollowClusterForbidden() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
final int numDocs = 64;
final var aliasName = "log-tomcat-prod";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Create auto follow pattern
createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
// Create leader index and write alias:
{
try (var leaderClient = buildLeaderClient()) {
var createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001");
createFirstIndexRequest.setJsonEntity("{\"aliases\": {\"" + aliasName + "\":{\"is_write_index\":true}}}");
leaderClient.performRequest(createFirstIndexRequest);
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/" + aliasName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyAlias(leaderClient, aliasName, true, aliasName + "-000001");
verifyDocuments(leaderClient, aliasName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyAlias(client(), aliasName, false, aliasName + "-000001");
ensureYellow(aliasName);
verifyDocuments(client(), aliasName, numDocs);
});
}
// Rollover in leader cluster and ensure second backing index is replicated:
{
try (var leaderClient = buildLeaderClient()) {
var rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyAlias(leaderClient, aliasName, true, aliasName + "-000002", aliasName + "-000001");
var indexRequest = new Request("POST", "/" + aliasName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, aliasName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001");
ensureYellow(aliasName);
verifyDocuments(client(), aliasName, numDocs + 1);
});
}
// Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't
// replicated to follow cluster.
{
var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover");
var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index"));
verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001");
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
}
}
private static void verifyAlias(RestClient client,
String aliasName,
boolean checkWriteIndex,
String... otherIndices) throws IOException {
var getAliasRequest = new Request("GET", "/_alias/" + aliasName);
var responseBody = toMap(client.performRequest(getAliasRequest));
if (checkWriteIndex) {
assertThat(ObjectPath.eval(otherIndices[0] + ".aliases." + aliasName + ".is_write_index", responseBody), is(true));
}
for (String otherIndex : otherIndices) {
assertThat(ObjectPath.eval(otherIndex + ".aliases." + aliasName, responseBody), notNullValue());
}
}
public void testDataStreamsBiDirectionalReplication() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices();
int initialNumberOfSuccessfulFollowedIndicesInLeaderCluster;
// Create auto follow pattern in follow cluster
createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
// Create auto follow pattern in leader cluster:
try (var leaderClient = buildLeaderClient()) {
initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient);
// First add remote cluster to leader cluster:
var request = new Request("PUT", "/_cluster/settings");
try (var bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startObject("persistent");
{
bodyBuilder.startObject("cluster");
{
bodyBuilder.startObject("remote");
{
bodyBuilder.startObject("follower_cluster");
{
bodyBuilder.startArray("seeds");
var nodesInfoRequest = new Request("GET", "/_nodes/_local");
var nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest));
var node = (Map<?, ?>) ((Map<?, ?>) nodesInfoResponse.get("nodes")).values().iterator().next();
var transportMetrics = (Map<?, ?>) node.get("transport");
var address = (String) transportMetrics.get("publish_address");
bodyBuilder.value(address);
bodyBuilder.endArray();
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(leaderClient.performRequest(request));
// Then create the actual auto follow pattern:
createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
}
var numDocs = 128;
var leaderDataStreamName = "logs-http-eu";
// Create data stream in leader cluster and ensure it is followed in follow cluster
{
try (var leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, leaderDataStreamName, backingIndexName(leaderDataStreamName, 1));
verifyDocuments(leaderClient, leaderDataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndicesInFollowCluster + 1));
verifyDataStream(client(), leaderDataStreamName, backingIndexName(leaderDataStreamName, 1));
ensureYellow(leaderDataStreamName);
verifyDocuments(client(), leaderDataStreamName, numDocs);
});
}
var followerDataStreamName = "logs-http-na";
{
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(client().performRequest(indexRequest));
}
verifyDocuments(client(), followerDataStreamName, numDocs);
try (var leaderClient = buildLeaderClient()) {
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(leaderClient),
equalTo(initialNumberOfSuccessfulFollowedIndicesInLeaderCluster + 1));
verifyDataStream(leaderClient, followerDataStreamName, backingIndexName(followerDataStreamName, 1));
ensureYellow(followerDataStreamName);
verifyDocuments(leaderClient, followerDataStreamName, numDocs);
});
}
}
// TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and
// writes via 'logs-http' alias (ensuring write goes to write data stream).
// Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario.
// See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322
// See all eu and na logs in leader and follower cluster:
verifyDocuments(client(), "logs-http*", numDocs * 2);
try (var leaderClient = buildLeaderClient()) {
verifyDocuments(leaderClient, "logs-http*", numDocs * 2);
}
int moreDocs = 48;
// Index more docs into leader cluster
{
try (var leaderClient = buildLeaderClient()) {
for (int i = 0; i < moreDocs; i++) {
var indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDocuments(leaderClient, leaderDataStreamName, numDocs + moreDocs);
}
assertBusy(() -> {
verifyDocuments(client(), leaderDataStreamName, numDocs + moreDocs);
});
}
// Index more docs into follower cluster
{
for (int i = 0; i < moreDocs; i++) {
var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(client().performRequest(indexRequest));
}
verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs);
try (var leaderClient = buildLeaderClient()) {
assertBusy(() -> {
verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs);
});
}
}
// TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http'
// (see previous TODO)
// See all eu and na logs in leader and follower cluster:
verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2);
try (RestClient leaderClient = buildLeaderClient()) {
verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2);
}
// Cleanup:
{
deleteAutoFollowPattern(client(), "id1");
deleteDataStream(client(), followerDataStreamName);
try (RestClient leaderClient = buildLeaderClient()) {
deleteDataStream(leaderClient, leaderDataStreamName);
deleteAutoFollowPattern(leaderClient, "id2");
}
}
}
private int getNumberOfSuccessfulFollowedIndices() throws IOException {
return getNumberOfSuccessfulFollowedIndices(client());
}
private int getNumberOfSuccessfulFollowedIndices(RestClient client) throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> response = toMap(client.performRequest(statsRequest));
response = (Map<?, ?>) response.get("auto_follow_stats");
return (Integer) response.get("number_of_successful_follow_indices");
}
private void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value(pattern);
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", remoteCluster);
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client.performRequest(request));
}
private static String backingIndexName(String dataStreamName, int generation) {
return String.format(Locale.ROOT, ".ds-%s-%06d", dataStreamName, generation);
}
private static void verifyDocuments(final RestClient client,
final String index,
final int expectedNumDocs) throws IOException {
@ -388,4 +729,9 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
}
private void deleteDataStream(RestClient client, String name) throws IOException {
Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
assertOK(client.performRequest(deleteTemplateRequest));
}
}

View file

@ -122,8 +122,12 @@ public class ESCCRRestTestCase extends ESRestTestCase {
}
protected static void deleteAutoFollowPattern(String patternName) throws IOException {
deleteAutoFollowPattern(client(), patternName);
}
protected static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException {
Request putPatternRequest = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
assertOK(client().performRequest(putPatternRequest));
assertOK(client.performRequest(putPatternRequest));
}
protected static void unfollow(String followIndex) throws IOException {

View file

@ -268,8 +268,14 @@ public final class TransportPutFollowAction
// The data stream and the backing indices have been created and validated in the remote cluster,
// just copying the data stream is in this case safe.
return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(),
List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata());
List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata(),
remoteDataStream.isHidden(), true);
} else {
if (localDataStream.isReplicated() == false) {
throw new IllegalArgumentException("cannot follow backing index [" + backingIndexToFollow.getName() +
"], because local data stream [" + localDataStream.getName() + "] is no longer marked as replicated");
}
List<Index> backingIndices = new ArrayList<>(localDataStream.getIndices());
backingIndices.add(backingIndexToFollow);
@ -280,7 +286,8 @@ public final class TransportPutFollowAction
backingIndices.sort(Comparator.comparing(Index::getName));
return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices,
remoteDataStream.getGeneration(), remoteDataStream.getMetadata());
remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), localDataStream.isHidden(),
localDataStream.isReplicated());
}
}

View file

@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
public class TransportPutFollowActionTests extends ESTestCase {
public void testCreateNewLocalDataStream() {
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3);
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false);
Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
@ -33,8 +33,8 @@ public class TransportPutFollowActionTests extends ESTestCase {
}
public void testUpdateLocalDataStream_followNewBackingIndex() {
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3);
DataStream localDataStream = generateDataSteam("logs-foobar", 2);
DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false);
DataStream localDataStream = generateDataSteam("logs-foobar", 2, true);
Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
@ -48,8 +48,8 @@ public class TransportPutFollowActionTests extends ESTestCase {
public void testUpdateLocalDataStream_followOlderBackingIndex() {
// follow first backing index:
DataStream remoteDataStream = generateDataSteam("logs-foobar", 5);
DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5));
DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false);
DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5));
Index backingIndexToFollow = remoteDataStream.getIndices().get(0);
DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
assertThat(result.getName(), equalTo(remoteDataStream.getName()));
@ -72,19 +72,19 @@ public class TransportPutFollowActionTests extends ESTestCase {
assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5)));
}
static DataStream generateDataSteam(String name, int numBackingIndices) {
static DataStream generateDataSteam(String name, int numBackingIndices, boolean replicate) {
List<Index> backingIndices = IntStream.range(1, numBackingIndices + 1)
.mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value))
.map(value -> new Index(value, "uuid"))
.collect(Collectors.toList());
return new DataStream(name, new TimestampField("@timestamp"), backingIndices);
return new DataStream(name, new TimestampField("@timestamp"), backingIndices, backingIndices.size(), Map.of(), false, replicate);
}
static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) {
static DataStream generateDataSteam(String name, int generation, boolean replicate, String... backingIndexNames) {
List<Index> backingIndices = Arrays.stream(backingIndexNames)
.map(value -> new Index(value, "uuid"))
.collect(Collectors.toList());
return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of());
return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of(), false, replicate);
}
}

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class PromoteDataStreamAction extends ActionType<AcknowledgedResponse> {
public static final PromoteDataStreamAction INSTANCE = new PromoteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/promote";
private PromoteDataStreamAction() {
super(NAME, AcknowledgedResponse::readFrom);
}
public static class Request extends MasterNodeRequest<PromoteDataStreamAction.Request> {
private String name;
public Request(String name) {
this.name = Objects.requireNonNull(name);
}
public String getName() {
return name;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (name == null) {
validationException = addValidationError("no data stream specified", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PromoteDataStreamAction.Request request = (PromoteDataStreamAction.Request) o;
return Objects.equals(name, request.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
}
}

View file

@ -24,8 +24,10 @@ import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.action.PromoteDataStreamAction;
import org.elasticsearch.xpack.datastreams.action.DataStreamsStatsTransportAction;
import org.elasticsearch.xpack.datastreams.action.MigrateToDataStreamTransportAction;
import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction;
import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction;
import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction;
import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction;
@ -39,6 +41,7 @@ import org.elasticsearch.xpack.datastreams.action.DeleteDataStreamTransportActio
import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction;
import org.elasticsearch.xpack.datastreams.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.xpack.datastreams.rest.RestMigrateToDataStreamAction;
import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction;
import java.util.List;
import java.util.Map;
@ -60,7 +63,17 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, MapperPlu
var dsUsageAction = new ActionHandler<>(XPackUsageFeatureAction.DATA_STREAMS, DataStreamUsageTransportAction.class);
var dsInfoAction = new ActionHandler<>(XPackInfoFeatureAction.DATA_STREAMS, DataStreamInfoTransportAction.class);
var migrateAction = new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class);
return List.of(createDsAction, deleteDsInfoAction, getDsAction, dsStatsAction, dsUsageAction, dsInfoAction, migrateAction);
var promoteAction = new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class);
return List.of(
createDsAction,
deleteDsInfoAction,
getDsAction,
dsStatsAction,
dsUsageAction,
dsInfoAction,
migrateAction,
promoteAction
);
}
@Override
@ -78,6 +91,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, MapperPlu
var getDsAction = new RestGetDataStreamsAction();
var dsStatsAction = new RestDataStreamsStatsAction();
var migrateAction = new RestMigrateToDataStreamAction();
return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction);
var promoteAction = new RestPromoteDataStreamAction();
return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction);
}
}

View file

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datastreams.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.PromoteDataStreamAction;
public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction<PromoteDataStreamAction.Request> {
@Inject
public PromoteDataStreamTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
PromoteDataStreamAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
PromoteDataStreamAction.Request::new,
indexNameExpressionResolver,
ThreadPool.Names.SAME
);
}
@Override
protected void masterOperation(
Task task,
PromoteDataStreamAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
clusterService.submitStateUpdateTask(
"promote-data-stream [" + request.getName() + "]",
new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public ClusterState execute(ClusterState currentState) {
return promoteDataStream(currentState, request);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
);
}
static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) {
DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName());
if (dataStream == null) {
throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist");
}
DataStream promotedDataStream = dataStream.promoteDataStream();
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
metadata.put(promotedDataStream);
return ClusterState.builder(currentState).metadata(metadata).build();
}
@Override
protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datastreams.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.action.PromoteDataStreamAction;
import java.io.IOException;
import java.util.List;
public class RestPromoteDataStreamAction extends BaseRestHandler {
@Override
public String getName() {
return "promote_data_stream_action";
}
@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.POST, "/_data_stream/_promote/{name}"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
PromoteDataStreamAction.Request request = new PromoteDataStreamAction.Request(restRequest.param("name"));
return channel -> client.execute(PromoteDataStreamAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View file

@ -324,6 +324,7 @@ public class Constants {
"indices:admin/data_stream/delete",
"indices:admin/data_stream/get",
"indices:admin/data_stream/migrate",
"indices:admin/data_stream/promote",
"indices:admin/delete",
"indices:admin/flush",
"indices:admin/flush[s]",

View file

@ -0,0 +1,27 @@
{
"indices.promote_data_stream":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Promotes a data stream from a replicated data stream managed by CCR to a regular data stream"
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_data_stream/_promote/{name}",
"methods":[
"POST"
],
"parts":{
"name":{
"type":"string",
"description":"The name of the data stream"
}
}
}
]
},
"params":{
}
}
}