Convert CCR module tests to new testing framework (#125894)

This commit is contained in:
Mark Vieira 2025-03-31 14:16:19 -07:00 committed by GitHub
parent 70de5a82b4
commit 66ba3c2a53
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 1134 additions and 634 deletions

View file

@ -58,12 +58,6 @@ public abstract class RestrictedBuildApiService implements BuildService<Restrict
map.put(LegacyRestTestBasePlugin.class, ":x-pack:qa:third-party:slack");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:async-search:qa:rest");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:autoscaling:qa:rest");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:downgrade-to-basic-license");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:multi-cluster");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:non-compliant-license");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:rest");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:restart");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:ccr:qa:security");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:deprecation:qa:early-deprecation-rest");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:deprecation:qa:rest");
map.put(LegacyRestTestBasePlugin.class, ":x-pack:plugin:downsample:qa:with-security");

View file

@ -1,40 +1,24 @@
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask
apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-test-artifact'
esplugin {
name = 'x-pack-ccr'
description = 'Elasticsearch Expanded Pack Plugin - CCR'
classname ='org.elasticsearch.xpack.ccr.Ccr'
hasNativeController =false
requiresKeystore =true
classname = 'org.elasticsearch.xpack.ccr.Ccr'
hasNativeController = false
requiresKeystore = true
extendedPlugins = ['x-pack-core']
}
base {
archivesName = 'x-pack-ccr'
}
// Integration Test classes that cannot run with the security manager
String[] noSecurityManagerITClasses = ["**/CloseFollowerIndexIT.class"]
tasks.register('internalClusterTestNoSecurityManager', Test) {
testClassesDirs = sourceSets.internalClusterTest.output.classesDirs
classpath = sourceSets.internalClusterTest.runtimeClasspath
include noSecurityManagerITClasses
systemProperty 'tests.security.manager', 'false'
}
tasks.named("check").configure { dependsOn 'internalClusterTestNoSecurityManager' }
tasks.named('internalClusterTest').configure {
exclude noSecurityManagerITClasses
}
tasks.named('internalClusterTestTestingConventions').configure {
baseClass 'org.elasticsearch.xpack.CcrIntegTestCase'
baseClass 'org.elasticsearch.xpack.CcrSingleNodeTestCase'
baseClass 'org.elasticsearch.test.ESIntegTestCase'
}
addQaCheckDependencies(project)
dependencies {
compileOnly project(":server")
@ -43,4 +27,42 @@ dependencies {
testImplementation(testArtifact(project(xpackModule('monitoring'))))
testImplementation(project(":modules:analysis-common"))
testImplementation(project(":modules:data-streams"))
javaRestTestImplementation(testImplementation(testArtifact(project(xpackModule('core')))))
clusterModules project(":modules:analysis-common")
clusterModules project(":modules:mapper-extras")
clusterModules project(":modules:data-streams")
clusterModules project(":modules:ingest-common")
clusterModules project(xpackModule("monitoring"))
clusterModules project(xpackModule("ilm"))
clusterModules project(xpackModule("wildcard"))
clusterModules project(xpackModule("stack"))
clusterModules project(xpackModule("mapper-constant-keyword"))
clusterModules project(xpackModule("searchable-snapshots"))
}
restResources {
restApi {
include '_common', 'cluster', 'nodes', 'indices', 'index', 'info', 'ccr'
}
}
tasks.named('internalClusterTest') {
systemProperty 'tests.security.manager', 'false'
}
tasks.named('internalClusterTestTestingConventions') {
baseClass 'org.elasticsearch.xpack.CcrIntegTestCase'
baseClass 'org.elasticsearch.xpack.CcrSingleNodeTestCase'
baseClass 'org.elasticsearch.test.ESIntegTestCase'
}
tasks.named("yamlRestTest") {
usesDefaultDistribution("uses _xpack info api")
}
tasks.withType(StandaloneRestIntegTestTask).configureEach {
// These fail in CI but only when run as part of checkPart2 and not individually.
// Tracked in : https://github.com/elastic/elasticsearch/issues/66661
buildParams.withFipsEnabledOnly(it)
}

View file

@ -1,20 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
apply plugin: 'elasticsearch.java'
dependencies {
api project(':test:framework')
}
subprojects {
tasks.withType(Test).configureEach {
// These fail in CI but only when run as part of checkPart2 and not individually.
// Tracked in : https://github.com/elastic/elasticsearch/issues/66661
buildParams.withFipsEnabledOnly(it)
}
}

View file

@ -1,85 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.TestClusterValueSource
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.testclusters.TestClustersRegistry
import org.elasticsearch.gradle.util.GradleUtils
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(':x-pack:plugin:ccr:qa')
}
def clusterPath = getPath()
def leaderCluster = testClusters.register("leader-cluster") {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
}
def followCluster = testClusters.register("follow-cluster") {
testDistribution = 'DEFAULT'
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderInfo = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}
def leaderUris = leaderInfo.map { it.getAllTransportPortURI() }
setting 'cluster.remote.leader_cluster.seeds',
{ "\"${leaderUris.get().join(",")}\"" }, IGNORE_VALUE
}
tasks.register("leader-cluster", RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
}
File policyFile = file("${buildDir}/tmp/java.policy")
tasks.register("writeJavaPolicy") {
doLast {
if (policyFile.parentFile.exists() == false && policyFile.parentFile.mkdirs() == false) {
throw new GradleException("failed to create temporary directory [${tmp}]")
}
policyFile.write(
[
"grant {",
" permission java.io.FilePermission \"${-> followCluster.map { it.getFirstNode().getServerLog() }.get()}\", \"read\";",
"};"
].join("\n")
)
}
}
tasks.register("follow-cluster", RestIntegTestTask) {
dependsOn 'writeJavaPolicy', "leader-cluster"
useCluster leaderCluster
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'java.security.policy', "file://${policyFile}"
nonInputProperties.systemProperty 'tests.leader_host', getClusterInfo('leader-cluster').map { it.getAllHttpSocketURI().get(0) }
nonInputProperties.systemProperty 'log', followCluster.map(c -> c.getFirstNode().getServerLog())
}
tasks.named("check").configure { dependsOn "follow-cluster" }

View file

@ -4,115 +4,17 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.TestClusterValueSource
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.testclusters.TestClustersRegistry
import org.elasticsearch.gradle.util.GradleUtils
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(':x-pack:plugin:ccr:qa')
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
javaRestTestImplementation(testArtifact(project(xpackModule('ccr')), 'javaRestTest'))
javaRestTestImplementation project(xpackModule('ccr'))
}
def clusterPath = getPath()
def leaderCluster = testClusters.register('leader-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
setting 'path.repo', "${layout.buildDirectory.asFile.get()}/cluster/shared/repo/leader-cluster"
tasks.named("javaRestTest") {
usesDefaultDistribution("uses _xpack usage api")
// These fail in CI but only when run as part of checkPart2 and not individually.
// Tracked in : https://github.com/elastic/elasticsearch/issues/66661
buildParams.withFipsEnabledOnly(it)
}
def middleCluster = testClusters.register('middle-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderInfo = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}
def leaderUris = leaderInfo.map { it.getAllTransportPortURI() }
setting 'cluster.remote.leader_cluster.seeds',
{ "\"${leaderUris.get().join(",")}\"" }, IGNORE_VALUE
}
tasks.register("leader-cluster", RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
systemProperty 'tests.leader_cluster_repository_path', "${layout.buildDirectory.asFile.get()}/cluster/shared/repo/leader-cluster"
}
tasks.register("middle-cluster", RestIntegTestTask) {
dependsOn "leader-cluster"
useCluster testClusters.named("leader-cluster")
systemProperty 'tests.target_cluster', 'middle'
systemProperty 'tests.leader_cluster_repository_path', "${layout.buildDirectory.asFile.get()}/cluster/shared/repo/leader-cluster"
def leaderUri = getClusterInfo('leader-cluster').map { it.allHttpSocketURI.get(0) }
nonInputProperties.systemProperty 'tests.leader_host', leaderUri
}
tasks.register('follow-cluster', RestIntegTestTask) {
dependsOn "leader-cluster", "middle-cluster"
useCluster leaderCluster
useCluster middleCluster
systemProperty 'tests.target_cluster', 'follow'
systemProperty 'tests.leader_cluster_repository_path', "${layout.buildDirectory.asFile.get()}/cluster/shared/repo/leader-cluster"
def leaderUri = getClusterInfo('leader-cluster').map { it.allHttpSocketURI.get(0) }
def middleUri = getClusterInfo('middle-cluster').map { it.allHttpSocketURI.get(0) }
nonInputProperties.systemProperty 'tests.leader_host', leaderUri
nonInputProperties.systemProperty 'tests.middle_host', middleUri
}
testClusters.matching { it.name == "follow-cluster" }.configureEach {
testDistribution = 'DEFAULT'
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderUris = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}.map { it.getAllTransportPortURI() }
def middleUris = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("middle-cluster")
it.parameters.service = serviceProvider
}.map { it.getAllTransportPortURI() }
setting 'cluster.remote.leader_cluster.seeds',
{ "\"${leaderUris.get().join(",")}\"" }, IGNORE_VALUE
setting 'cluster.remote.middle_cluster.seeds',
{ "\"${middleUris.get().join(",")}\"" }, IGNORE_VALUE
}
testClusters.configureEach {
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
}
tasks.named("check").configure { dependsOn "follow-cluster" }

View file

@ -6,12 +6,20 @@
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.xcontent.ObjectPath;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import java.io.IOException;
import java.util.Map;
@ -20,10 +28,45 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
public class XPackUsageIT extends ESCCRRestTestCase {
public class XPackUsageIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.distribution(DistributionType.DEFAULT)
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(followerCluster);
public XPackUsageIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
public void testXPackCcrUsage() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
logger.info("skipping test, waiting for target cluster [follow]");
return;
}

View file

@ -1,75 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
public class ChainIT extends ESCCRRestTestCase {
public void testFollowIndex() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "leader";
final String middleIndexName = "middle";
if ("leader".equals(targetCluster)) {
logger.info("Running against leader cluster");
String mapping = "";
if (randomBoolean()) { // randomly do source filtering on indexing
mapping = """
"_source": { "includes": ["field"], "excludes": ["filtered_field"]}""";
}
createIndex(adminClient(), leaderIndexName, Settings.EMPTY, mapping, null);
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(client(), leaderIndexName, Integer.toString(i), "field", i, "filtered_field", "true");
}
refresh(adminClient(), leaderIndexName);
verifyDocuments(leaderIndexName, numDocs, "filtered_field:true");
} else if ("middle".equals(targetCluster)) {
logger.info("Running against middle cluster");
followIndex("leader_cluster", leaderIndexName, middleIndexName);
assertBusy(() -> verifyDocuments(middleIndexName, numDocs, "filtered_field:true"));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 3, "filtered_field:true"));
} else if ("follow".equals(targetCluster)) {
logger.info("Running against follow cluster");
final String followIndexName = "follow";
followIndex("middle_cluster", middleIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3, "filtered_field:true"));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs + 3;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
try (RestClient middleClient = buildMiddleClient()) {
assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 6, "filtered_field:true", middleClient));
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 6, "filtered_field:true"));
} else {
fail("unexpected target cluster [" + targetCluster + "]");
}
}
@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}

View file

@ -1,60 +0,0 @@
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
import org.elasticsearch.gradle.testclusters.TestClusterValueSource
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.testclusters.TestClustersRegistry
import org.elasticsearch.gradle.util.GradleUtils
apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(':x-pack:plugin:ccr:qa:')
}
def clusterPath = getPath()
def leaderCluster = testClusters.register('leader-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
}
def followerCluster = testClusters.register('follow-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderInfo = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}
def leaderUris = leaderInfo.map { it.getAllTransportPortURI() }
setting 'cluster.remote.leader_cluster.seeds',
{ "\"${leaderUris.get().join(",")}\"" }, IGNORE_VALUE
}
tasks.register('leader-cluster', RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
}
tasks.register('follow-cluster', RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster leaderCluster
systemProperty 'tests.target_cluster', 'follow'
def followUri = getClusterInfo('follow-cluster').map { it.allHttpSocketURI.get(0) }
nonInputProperties.systemProperty 'tests.leader_host', followUri
}
tasks.named("check").configure { dependsOn "follow-cluster" }

View file

@ -1,25 +0,0 @@
apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test'
restResources {
restApi {
include '_common', 'cluster', 'nodes', 'indices', 'index', 'info', 'ccr'
}
}
dependencies {
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
}
testClusters.configureEach {
testDistribution = 'DEFAULT'
// Disable assertions in FollowingEngineAssertions, otherwise an AssertionError is thrown before
// indexing a document directly in a follower index. In a rest test we like to test the exception
// that is thrown in production when indexing a document directly in a follower index.
jvmArgs '-da:org.elasticsearch.xpack.ccr.index.engine.FollowingEngineAssertions'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.security.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
// TODO: reduce the need for superuser here
user username: 'ccr-user', password: 'ccr-user-password', role: 'superuser'
}

View file

@ -1,79 +0,0 @@
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
import org.elasticsearch.gradle.testclusters.TestClusterValueSource
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.testclusters.TestClustersRegistry
import org.elasticsearch.gradle.util.GradleUtils
apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
dependencies {
testImplementation project(':x-pack:plugin:ccr:qa')
}
def clusterPath = getPath()
def leaderCluster = testClusters.register('leader-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
}
def followCluster = testClusters.register('follow-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderInfo = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}
def leaderUri = leaderInfo.map { it.getAllTransportPortURI().get(0) }
setting 'cluster.remote.leader_cluster.seeds',
{ "\"${leaderUri.get()}\"" }, IGNORE_VALUE
nameCustomization = { 'follow' }
}
tasks.register('leader-cluster', RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
}
tasks.register('follow-cluster', RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster leaderCluster
systemProperty 'tests.target_cluster', 'follow'
def leaderUri = getClusterInfo("leader-cluster").map { it.allHttpSocketURI.get(0) }
nonInputProperties.systemProperty 'tests.leader_host', leaderUri
}
tasks.register("followClusterRestartTest", StandaloneRestIntegTestTask) {
dependsOn 'follow-cluster'
useCluster leaderCluster
useCluster followCluster
systemProperty 'tests.rest.load_packaged', 'false'
systemProperty 'tests.target_cluster', 'follow-restart'
def leaderUri = getClusterInfo('leader-cluster').map { it.allHttpSocketURI.get(0) }
def followUris = getClusterInfo('follow-cluster').map { it.allHttpSocketURI.join(",") }
nonInputProperties.systemProperty 'tests.leader_host', leaderUri
nonInputProperties.systemProperty 'tests.rest.cluster', followUris
doFirst {
getRegistry().get().restart(clusterPath, "follow-cluster")
}
}
tasks.named("check").configure { dependsOn "followClusterRestartTest" }

View file

@ -1,65 +0,0 @@
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.TestClusterValueSource
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.testclusters.TestClustersRegistry
import org.elasticsearch.gradle.util.GradleUtils
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(path: xpackModule('ccr'))
testImplementation project(':x-pack:plugin:ccr:qa')
}
def clusterPath = getPath()
def leadCluster = testClusters.register('leader-cluster') {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
extraConfigFile 'roles.yml', file('leader-roles.yml')
user username: "test_admin", role: "superuser"
user username: "test_ccr", role: "ccruser"
}
testClusters.register('follow-cluster') {
testDistribution = 'DEFAULT'
Provider<TestClustersRegistry> serviceProvider = GradleUtils.getBuildService(
project.gradle.sharedServices,
TestClustersPlugin.REGISTRY_SERVICE_NAME
)
def leaderUris = project.getProviders().of(TestClusterValueSource.class) {
it.parameters.path.set(clusterPath)
it.parameters.clusterName.set("leader-cluster")
it.parameters.service = serviceProvider
}.map { it.AllTransportPortURI }
setting 'cluster.remote.leader_cluster.seeds', {
"\"${leaderUris.get().join(",")}\""
}, IGNORE_VALUE
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.collection.enabled', 'false' // will be enabled by tests
extraConfigFile 'roles.yml', file('follower-roles.yml')
user username: "test_admin", role: "superuser"
user username: "test_ccr", role: "ccruser"
}
tasks.register('leader-cluster', RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
}
def followerClusterTestTask = tasks.register('follow-cluster', RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster leadCluster
systemProperty 'tests.target_cluster', 'follow'
def leaderUri = getClusterInfo('leader-cluster').map { it.allHttpSocketURI.get(0) }
nonInputProperties.systemProperty 'tests.leader_host', leaderUri
}
tasks.named("check").configure { dependsOn(followerClusterTestTask) }

View file

@ -0,0 +1,467 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.TestMethodAndParams;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.TestCaseOrdering;
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
@TestCaseOrdering(AbstractCCRRestTestCase.TargetClusterTestOrdering.class)
public abstract class AbstractCCRRestTestCase extends ESRestTestCase {
protected final TargetCluster targetCluster;
private static TargetCluster clientTargetCluster;
public AbstractCCRRestTestCase(@Name("targetCluster") TargetCluster targetCluster) {
this.targetCluster = targetCluster;
}
protected static List<Object[]> leaderMiddleFollower() {
return Arrays.stream(TargetCluster.values()).map(v -> new Object[] { v }).toList();
}
protected static List<Object[]> leaderFollower() {
return Arrays.stream(TargetCluster.values()).filter(c -> c != TargetCluster.MIDDLE).map(v -> new Object[] { v }).toList();
}
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
protected abstract ElasticsearchCluster getLeaderCluster();
protected abstract ElasticsearchCluster getFollowerCluster();
protected ElasticsearchCluster getMiddleCluster() {
throw new UnsupportedOperationException("cannot get middle cluster");
};
@Override
protected String getTestRestCluster() {
clientTargetCluster = targetCluster;
return switch (targetCluster) {
case LEADER -> getLeaderCluster().getHttpAddresses();
case MIDDLE -> getMiddleCluster().getHttpAddresses();
case FOLLOWER -> getFollowerCluster().getHttpAddresses();
};
}
@Before
public void maybeReInitClient() throws Exception {
if (clientTargetCluster != targetCluster) {
closeClients();
initClient();
}
}
protected static void index(String index, String id, Object... fields) throws IOException {
index(adminClient(), index, id, fields);
}
protected static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
final Request request = new Request("POST", "/" + index + "/_doc" + (id == null ? "" : "/" + id));
request.setJsonEntity(Strings.toString(document));
assertOK(client.performRequest(request));
}
protected static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"read_poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
protected static void followIndex(String leaderIndex, String followIndex) throws IOException {
followIndex("leader_cluster", leaderIndex, followIndex);
}
protected static void followIndex(String leaderCluster, String leaderIndex, String followIndex) throws IOException {
followIndex(client(), leaderCluster, leaderIndex, followIndex);
}
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
followIndex(client, leaderCluster, leaderIndex, followIndex, null);
}
protected static void followIndex(
final RestClient client,
final String leaderCluster,
final String leaderIndex,
final String followIndex,
final Settings settings
) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.field("remote_cluster", leaderCluster);
bodyBuilder.field("leader_index", leaderIndex);
bodyBuilder.field("read_poll_timeout", "10ms");
if (settings != null) {
bodyBuilder.startObject("settings");
{
settings.toXContent(bodyBuilder, ToXContent.EMPTY_PARAMS);
}
bodyBuilder.endObject();
}
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client.performRequest(request));
}
protected static void pauseFollow(String followIndex) throws IOException {
pauseFollow(client(), followIndex);
}
protected static void pauseFollow(RestClient client, String followIndex) throws IOException {
assertOK(client.performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}
protected static void putAutoFollowPattern(String patternName, String remoteCluster, String indexPattern) throws IOException {
Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/" + patternName);
putPatternRequest.setJsonEntity(String.format(Locale.ROOT, """
{"leader_index_patterns": ["%s"], "remote_cluster": "%s"}
""", indexPattern, remoteCluster));
assertOK(client().performRequest(putPatternRequest));
}
protected static void deleteAutoFollowPattern(String patternName) throws IOException {
deleteAutoFollowPattern(client(), patternName);
}
protected static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException {
Request putPatternRequest = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
assertOK(client.performRequest(putPatternRequest));
}
protected static void unfollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
}
protected static void verifyDocuments(final String index, final int expectedNumDocs, final String query) throws IOException {
verifyDocuments(index, expectedNumDocs, query, adminClient());
}
protected static void verifyDocuments(final String index, final int expectedNumDocs, final String query, final RestClient client)
throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter("size", Integer.toString(expectedNumDocs));
request.addParameter("sort", "field:asc");
request.addParameter("q", query);
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<String, ?> response = toMap(client.performRequest(request));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(index, numDocs, equalTo(expectedNumDocs));
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), equalTo(expectedNumDocs));
for (int i = 0; i < expectedNumDocs; i++) {
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
assertThat(index, i, equalTo(value));
}
}
protected static void verifyDocuments(final RestClient client, final String index, final int expectedNumDocs) throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<String, ?> response = toMap(client.performRequest(request));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(index, numDocs, equalTo(expectedNumDocs));
}
protected static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException {
Request request = new Request("GET", "/.monitoring-*/_search");
request.setJsonEntity(String.format(Locale.ROOT, """
{"query": {"term": {"ccr_stats.leader_index": "%s"}}}
""", expectedLeaderIndex));
Map<String, ?> response;
try {
response = toMap(adminClient().performRequest(request));
} catch (ResponseException e) {
throw new AssertionError("error while searching", e);
}
int followerMaxSeqNo = 0;
int followerMappingVersion = 0;
int followerSettingsVersion = 0;
int followerAliasesVersion = 0;
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), greaterThanOrEqualTo(1));
for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
assertThat(leaderIndex, endsWith(expectedLeaderIndex));
final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
assertThat(followerIndex, equalTo(expectedFollowerIndex));
int foundFollowerMaxSeqNo = (int) XContentMapValues.extractValue("_source.ccr_stats.follower_max_seq_no", hit);
followerMaxSeqNo = Math.max(followerMaxSeqNo, foundFollowerMaxSeqNo);
int foundFollowerMappingVersion = (int) XContentMapValues.extractValue("_source.ccr_stats.follower_mapping_version", hit);
followerMappingVersion = Math.max(followerMappingVersion, foundFollowerMappingVersion);
int foundFollowerSettingsVersion = (int) XContentMapValues.extractValue("_source.ccr_stats.follower_settings_version", hit);
followerSettingsVersion = Math.max(followerSettingsVersion, foundFollowerSettingsVersion);
int foundFollowerAliasesVersion = (int) XContentMapValues.extractValue("_source.ccr_stats.follower_aliases_version", hit);
followerAliasesVersion = Math.max(followerAliasesVersion, foundFollowerAliasesVersion);
}
assertThat(followerMaxSeqNo, greaterThan(0));
assertThat(followerMappingVersion, greaterThan(0));
assertThat(followerSettingsVersion, greaterThan(0));
assertThat(followerAliasesVersion, greaterThan(0));
}
protected static void verifyAutoFollowMonitoring() throws IOException {
Request request = new Request("GET", "/.monitoring-*/_count");
request.setJsonEntity("""
{
"query": {
"bool" : {
"filter": {
"term" : { "type" : "ccr_auto_follow_stats" }
},
"must" : {
"range" : {
"ccr_auto_follow_stats.number_of_successful_follow_indices" : { "gt" : 0 }
}
}
}
}
}
""");
String responseEntity;
Map<String, ?> response;
try {
responseEntity = EntityUtils.toString(adminClient().performRequest(request).getEntity());
response = toMap(responseEntity);
} catch (ResponseException e) {
throw new AssertionError("error while searching", e);
}
assertNotNull(responseEntity);
final Number count = (Number) XContentMapValues.extractValue("count", response);
assertThat(
"Expected at least 1 successfully followed index but found none, count returned [" + responseEntity + ']',
count.longValue(),
greaterThanOrEqualTo(1L)
);
}
protected static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
protected static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
protected static void ensureYellow(final String index) throws IOException {
ensureYellow(index, adminClient());
}
protected static void ensureYellow(final String index, final RestClient client) throws IOException {
ensureHealth(client, index, request -> {
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_active_shards", "1");
request.addParameter("wait_for_no_relocating_shards", "true");
// follower index can be yellow even when its primary shards are still initializing as we bootstrap them using snapshot/restore.
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "5s");
request.addParameter("level", "shards");
});
}
protected Set<CcrNodeTask> getCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Map<String, Object> rsp1 = toMap(adminClient().performRequest(request));
Map<?, ?> nodes = (Map<?, ?>) rsp1.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
Map<?, ?> nodeTasks = (Map<?, ?>) node.get("tasks");
var ccrNodeTasks = new HashSet<CcrNodeTask>();
for (Map.Entry<?, ?> entry : nodeTasks.entrySet()) {
Map<?, ?> nodeTask = (Map<?, ?>) entry.getValue();
String action = (String) nodeTask.get("action");
if (action.startsWith("xpack/ccr/shard_follow_task")) {
var status = (Map<?, ?>) nodeTask.get("status");
ccrNodeTasks.add(
new CcrNodeTask(
(String) status.get("remote_cluster"),
(String) status.get("leader_index"),
(String) status.get("follower_index"),
(Integer) status.get("shard_id")
)
);
}
}
return ccrNodeTasks;
}
protected record CcrNodeTask(String remoteCluster, String leaderIndex, String followerIndex, int shardId) {}
protected static boolean indexExists(String index) throws IOException {
Response response = adminClient().performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}
protected static List<String> verifyDataStream(final RestClient client, final String name, final String... expectedBackingIndices)
throws IOException {
Request request = new Request("GET", "/_data_stream/" + name);
Map<String, ?> response = toMap(client.performRequest(request));
List<?> retrievedDataStreams = (List<?>) response.get("data_streams");
assertThat(retrievedDataStreams, hasSize(1));
List<?> actualBackingIndexItems = (List<?>) ((Map<?, ?>) retrievedDataStreams.get(0)).get("indices");
assertThat(actualBackingIndexItems, hasSize(expectedBackingIndices.length));
final List<String> actualBackingIndices = new ArrayList<>();
for (int i = 0; i < expectedBackingIndices.length; i++) {
Map<?, ?> actualBackingIndexItem = (Map<?, ?>) actualBackingIndexItems.get(i);
String actualBackingIndex = (String) actualBackingIndexItem.get("index_name");
String expectedBackingIndex = expectedBackingIndices[i];
String actualDataStreamName = actualBackingIndex.substring(5, actualBackingIndex.indexOf('-', 5));
String expectedDataStreamName = expectedBackingIndex.substring(5, expectedBackingIndex.indexOf('-', 5));
assertThat(actualDataStreamName, equalTo(expectedDataStreamName));
int actualGeneration = Integer.parseInt(actualBackingIndex.substring(actualBackingIndex.lastIndexOf('-')));
int expectedGeneration = Integer.parseInt(expectedBackingIndex.substring(expectedBackingIndex.lastIndexOf('-')));
assertThat(actualGeneration, equalTo(expectedGeneration));
actualBackingIndices.add(actualBackingIndex);
}
return List.copyOf(actualBackingIndices);
}
protected static void createAutoFollowPattern(
RestClient client,
String name,
String pattern,
String remoteCluster,
String followIndexPattern
) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.array("leader_index_patterns", pattern);
if (followIndexPattern != null) {
bodyBuilder.field("follow_index_pattern", followIndexPattern);
}
bodyBuilder.field("remote_cluster", remoteCluster);
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client.performRequest(request));
}
/**
* Fix point in time when data stream backing index is first time queried.
* This is required to avoid failures when running test at midnight.
* (index is created for day0, but assertions are executed for day1 assuming different time based index name that does not exist)
*/
private final LazyInitializable<Long, RuntimeException> time = new LazyInitializable<>(System::currentTimeMillis);
protected String backingIndexName(String dataStreamName, int generation) {
return DataStream.getDefaultBackingIndexName(dataStreamName, generation, time.getOrCompute());
}
protected RestClient buildLeaderClient() throws IOException {
assert targetCluster != TargetCluster.LEADER;
return buildClient(getLeaderCluster().getHttpAddresses());
}
protected RestClient buildLeaderClient(final Settings settings) throws IOException {
assert targetCluster != TargetCluster.LEADER;
return buildClient(getLeaderCluster().getHttpAddresses(), settings);
}
protected RestClient buildMiddleClient() throws IOException {
assert targetCluster != TargetCluster.MIDDLE;
return buildClient(getMiddleCluster().getHttpAddresses());
}
private RestClient buildClient(final String url) throws IOException {
return buildClient(url, restAdminSettings());
}
private RestClient buildClient(final String url, final Settings settings) throws IOException {
int portSeparator = url.lastIndexOf(':');
HttpHost httpHost = new HttpHost(
url.substring(0, portSeparator),
Integer.parseInt(url.substring(portSeparator + 1)),
getProtocol()
);
return buildClient(settings, new HttpHost[] { httpHost });
}
public enum TargetCluster {
LEADER,
MIDDLE,
FOLLOWER;
}
public static class TargetClusterTestOrdering implements Comparator<TestMethodAndParams> {
@Override
public int compare(TestMethodAndParams o1, TestMethodAndParams o2) {
return Integer.compare(getOrdinal(o1), getOrdinal(o2));
}
private int getOrdinal(TestMethodAndParams t) {
return ((TargetCluster) t.getInstanceArguments().get(0)).ordinal();
}
}
}

View file

@ -7,6 +7,10 @@
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
@ -21,9 +25,15 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.text.SimpleDateFormat;
@ -37,7 +47,6 @@ import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xcontent.ObjectPath.eval;
import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
@ -48,12 +57,78 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public class AutoFollowIT extends ESCCRRestTestCase {
@SuppressForbidden("temp folder uses file api")
public class AutoFollowIT extends AbstractCCRRestTestCase {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);
public static TemporaryFolder leaderRepoDir = new TemporaryFolder();
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.module("searchable-snapshots")
.module("data-streams")
.module("ingest-common")
.module("mapper-extras")
.module("x-pack-stack")
.module("x-pack-ilm")
.module("x-pack-monitoring")
.module("constant-keyword")
.module("wildcard")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.setting("path.repo", () -> leaderRepoDir.getRoot().getAbsolutePath())
.feature(FeatureFlag.TIME_SERIES_MODE)
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster middleCluster = ElasticsearchCluster.local()
.name("middle-cluster")
.apply(commonConfig)
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("xpack.monitoring.collection.enabled", "true")
.setting("cluster.remote.middle_cluster.seeds", () -> "\"" + middleCluster.getTransportEndpoints() + "\"")
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderRepoDir)
.around(leaderCluster)
.around(middleCluster)
.around(followerCluster);
public AutoFollowIT(@Name("targetCluster") AbstractCCRRestTestCase.TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderMiddleFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
@Override
protected ElasticsearchCluster getMiddleCluster() {
return middleCluster;
}
public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
logger.info("skipping test, waiting for target cluster [follow]");
return;
}
@ -109,7 +184,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
logger.info("skipping test, waiting for target cluster [follow]");
return;
}
@ -179,7 +254,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
});
assertLongBusy(() -> verifyCcrMonitoring("metrics-20210101", "metrics-20210101"));
assertLongBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring);
assertLongBusy(AbstractCCRRestTestCase::verifyAutoFollowMonitoring);
} finally {
cleanUpFollower(List.of("metrics-20210101"), List.of(), List.of(autoFollowPatternName));
@ -188,7 +263,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
logger.info("skipping test, waiting for target cluster [follow]");
return;
}
@ -228,7 +303,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testDataStreams() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -324,7 +399,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testDataStreamsRenameFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -439,7 +514,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -517,7 +592,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
@SuppressWarnings("unchecked")
public void testDataStreamsBackingIndicesOrdering() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -643,7 +718,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -766,7 +841,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testRolloverAliasInFollowClusterForbidden() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -850,7 +925,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testDataStreamsBiDirectionalReplication() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -1035,7 +1110,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testAutoFollowSearchableSnapshotsFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
@ -1047,13 +1122,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
final String indexName = testPrefix + "-index";
try {
try (var leaderClient = buildLeaderClient()) {
final String systemPropertyRepoPath = System.getProperty("tests.leader_cluster_repository_path");
assertThat(
"Missing system property [tests.leader_cluster_repository_path]",
systemPropertyRepoPath,
not(emptyOrNullString())
);
final String repositoryPath = systemPropertyRepoPath + '/' + testPrefix;
final String repositoryPath = leaderRepoDir.newFolder(testPrefix).getAbsolutePath();
registerRepository(leaderClient, repository, "fs", true, Settings.builder().put("location", repositoryPath).build());
@ -1122,14 +1191,14 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
public void testNoWarningOnPromoteDatastreamWhenTemplateExistsOnFollower() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
testDataStreamPromotionWarnings(true);
}
public void testWarningOnPromoteDatastreamWhenTemplateDoesNotExistsOnFollower() {
if ("follow".equals(targetCluster) == false) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
WarningFailureException exception = assertThrows(WarningFailureException.class, () -> testDataStreamPromotionWarnings(false));
@ -1329,4 +1398,9 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}
}
}
@Override
public String getTestName() {
return super.getTestName().replaceAll("[ ={}]", "_");
}
}

View file

@ -7,21 +7,64 @@
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import java.util.Locale;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;
public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
public class CcrMultiClusterLicenseIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.setting("xpack.security.enabled", "true")
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("xpack.license.self_generated.type", "trial")
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(followerCluster);
public CcrMultiClusterLicenseIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
public void testFollow() {
if ("follow".equals(targetCluster)) {
if (targetCluster == TargetCluster.FOLLOWER) {
final Request request = new Request("PUT", "/follower/_ccr/follow");
request.setJsonEntity("""
{"remote_cluster": "leader_cluster", "leader_index": "leader"}
@ -31,7 +74,7 @@ public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
}
public void testAutoFollow() {
if ("follow".equals(targetCluster)) {
if (targetCluster == TargetCluster.FOLLOWER) {
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("""
{"leader_index_patterns":["*"], "remote_cluster": "leader_cluster"}

View file

@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
public class ChainIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.feature(FeatureFlag.TIME_SERIES_MODE)
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster middleCluster = ElasticsearchCluster.local()
.name("middle-cluster")
.apply(commonConfig)
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("cluster.remote.middle_cluster.seeds", () -> "\"" + middleCluster.getTransportEndpoints() + "\"")
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(middleCluster).around(followerCluster);
public ChainIT(@Name("targetCluster") AbstractCCRRestTestCase.TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderMiddleFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
@Override
protected ElasticsearchCluster getMiddleCluster() {
return middleCluster;
}
public void testFollowIndex() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "leader";
final String middleIndexName = "middle";
switch (targetCluster) {
case LEADER:
logger.info("Running against leader cluster");
String mapping = "";
if (randomBoolean()) { // randomly do source filtering on indexing
mapping = """
"_source": { "includes": ["field"], "excludes": ["filtered_field"]}""";
}
createIndex(adminClient(), leaderIndexName, Settings.EMPTY, mapping, null);
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(client(), leaderIndexName, Integer.toString(i), "field", i, "filtered_field", "true");
}
refresh(adminClient(), leaderIndexName);
verifyDocuments(leaderIndexName, numDocs, "filtered_field:true");
break;
case MIDDLE:
logger.info("Running against middle cluster");
followIndex("leader_cluster", leaderIndexName, middleIndexName);
assertBusy(() -> verifyDocuments(middleIndexName, numDocs, "filtered_field:true"));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 3, "filtered_field:true"));
break;
case FOLLOWER:
logger.info("Running against follow cluster");
final String followIndexName = "follow";
followIndex("middle_cluster", middleIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3, "filtered_field:true"));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs + 3;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true");
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
try (RestClient middleClient = buildMiddleClient()) {
assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 6, "filtered_field:true", middleClient));
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 6, "filtered_field:true"));
break;
}
}
@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}

View file

@ -6,7 +6,9 @@
*/
package org.elasticsearch.xpack.ccr;
import org.apache.lucene.util.Constants;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@ -15,13 +17,20 @@ import org.elasticsearch.common.logging.JsonLogsStream;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.LogType;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Path;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@ -32,12 +41,46 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is;
public class FollowIndexIT extends ESCCRRestTestCase {
public class DowngradeLicenseFollowIndexIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "true")
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(followerCluster);
public DowngradeLicenseFollowIndexIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
public void testDowngradeRemoteClusterToBasic() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
assumeTrue("Test should only run with target_cluster=follow", targetCluster == TargetCluster.FOLLOWER);
{
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
@ -87,15 +130,15 @@ public class FollowIndexIT extends ESCCRRestTestCase {
assertThat(indexExists(index2), is(false));
// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
// (does not work on windows...)
if (Constants.WINDOWS == false) {
assertBusy(() -> {
Path path = PathUtils.get(System.getProperty("log"));
try (Stream<JsonLogLine> stream = JsonLogsStream.from(path)) {
assertTrue(stream.anyMatch(autoFollowCoordinatorWarn()::matches));
}
});
}
assertBusy(() -> {
try (
InputStream in = followerCluster.getNodeLog(0, LogType.SERVER_JSON);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
Stream<JsonLogLine> stream = JsonLogsStream.from(reader)
) {
assertTrue(stream.anyMatch(autoFollowCoordinatorWarn()::matches));
}
});
}, 60, TimeUnit.SECONDS);
// Manually following index2 also does not work after the downgrade:

View file

@ -6,6 +6,10 @@
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.SuppressForbidden;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
@ -21,28 +25,82 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
public class FollowIndexIT extends ESCCRRestTestCase {
@SuppressForbidden("temp folder uses file api")
public class FollowIndexIT extends AbstractCCRRestTestCase {
public static TemporaryFolder leaderRepoDir = new TemporaryFolder();
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.module("searchable-snapshots")
.module("data-streams")
.module("ingest-common")
.module("mapper-extras")
.module("x-pack-stack")
.module("x-pack-ilm")
.module("x-pack-monitoring")
.module("constant-keyword")
.module("wildcard")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.setting("path.repo", () -> leaderRepoDir.getRoot().getAbsolutePath())
.feature(FeatureFlag.TIME_SERIES_MODE)
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("xpack.monitoring.collection.enabled", "true")
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderRepoDir).around(leaderCluster).around(followerCluster);
public FollowIndexIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
public void testFollowIndex() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "test_index1";
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("Running against leader cluster");
String mapping = "";
if (randomBoolean()) { // randomly do source filtering on indexing
@ -56,7 +114,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
refresh(adminClient(), leaderIndexName);
verifyDocuments(leaderIndexName, numDocs, "filtered_field:true");
} else if ("follow".equals(targetCluster)) {
} else if (targetCluster == TargetCluster.FOLLOWER) {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
final boolean overrideNumberOfReplicas = randomBoolean();
@ -100,7 +158,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowThatOverridesRequiredLeaderSetting() throws IOException {
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
createIndex(adminClient(), "override_leader_index", Settings.EMPTY);
} else {
final Settings settings = Settings.builder().put("index.number_of_shards", 5).build();
@ -124,7 +182,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowThatOverridesNonExistentSetting() throws IOException {
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
createIndex(adminClient(), "override_leader_index_non_existent_setting", Settings.EMPTY);
} else {
final Settings settings = Settings.builder().put("index.non_existent_setting", randomAlphaOfLength(3)).build();
@ -151,7 +209,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowNonExistingLeaderIndex() {
if ("follow".equals(targetCluster) == false) {
if (targetCluster == TargetCluster.FOLLOWER == false) {
logger.info("skipping test, waiting for target cluster [follow]");
return;
}
@ -165,7 +223,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowDataStreamFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
if (targetCluster == TargetCluster.FOLLOWER == false) {
return;
}
@ -182,13 +240,11 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowSearchableSnapshotsFails() throws Exception {
final String testPrefix = getTestName().toLowerCase(Locale.ROOT);
final String testPrefix = "test_follow_searchable_snapshots_fails";
final String mountedIndex = "mounted-" + testPrefix;
if ("leader".equals(targetCluster)) {
final String systemPropertyRepoPath = System.getProperty("tests.leader_cluster_repository_path");
assertThat("Missing system property [tests.leader_cluster_repository_path]", systemPropertyRepoPath, not(emptyOrNullString()));
final String repositoryPath = systemPropertyRepoPath + '/' + testPrefix;
if (targetCluster == TargetCluster.LEADER) {
final String repositoryPath = leaderRepoDir.newFolder(testPrefix).getAbsolutePath();
final String repository = "repository-" + testPrefix;
registerRepository(repository, FsRepository.TYPE, true, Settings.builder().put("location", repositoryPath).build());
@ -227,7 +283,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
final int numDocs = 128;
final String leaderIndexName = "tsdb_leader";
long basetime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-04-28T18:35:24.467Z");
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("Running against leader cluster");
createIndex(
adminClient(),
@ -248,7 +304,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
refresh(adminClient(), leaderIndexName);
verifyDocuments(client(), leaderIndexName, numDocs);
} else if ("follow".equals(targetCluster)) {
} else if (targetCluster == TargetCluster.FOLLOWER) {
logger.info("Running against follow cluster");
final String followIndexName = "tsdb_follower";
final boolean overrideNumberOfReplicas = randomBoolean();
@ -321,7 +377,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowTsdbIndexCanNotOverrideMode() throws Exception {
if (false == "follow".equals(targetCluster)) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
logger.info("Running against follow cluster");
@ -342,7 +398,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
public void testFollowStandardIndexCanNotOverrideMode() throws Exception {
if (false == "follow".equals(targetCluster)) {
if (targetCluster != TargetCluster.FOLLOWER) {
return;
}
logger.info("Running against follow cluster");
@ -365,7 +421,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
public void testSyntheticSource() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "synthetic_leader";
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("Running against leader cluster");
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), SourceFieldMapper.Mode.SYNTHETIC)
@ -378,7 +434,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
refresh(adminClient(), leaderIndexName);
verifyDocuments(client(), leaderIndexName, numDocs);
} else if ("follow".equals(targetCluster)) {
} else if (targetCluster == TargetCluster.FOLLOWER) {
logger.info("Running against follow cluster");
final String followIndexName = "synthetic_follower";
final boolean overrideNumberOfReplicas = randomBoolean();

View file

@ -6,6 +6,9 @@
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
@ -21,7 +24,12 @@ import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import java.io.IOException;
import java.text.SimpleDateFormat;
@ -39,7 +47,57 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
public class FollowIndexSecurityIT extends ESCCRRestTestCase {
public class FollowIndexSecurityIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.module("mapper-extras")
.module("data-streams")
.module("ingest-common")
.module("x-pack-monitoring")
.module("x-pack-ilm")
.module("wildcard")
.module("x-pack-stack")
.module("constant-keyword")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.user("test_admin", "x-pack-test-password", "superuser", false)
.user("test_ccr", "x-pack-test-password", "ccruser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local()
.name("leader-cluster")
.apply(commonConfig)
.rolesFile(Resource.fromClasspath("leader-roles.yml"))
.build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("xpack.monitoring.collection.enabled", "false")
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.rolesFile(Resource.fromClasspath("follower-roles.yml"))
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(followerCluster);
public FollowIndexSecurityIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
@Override
protected Settings restClientSettings() {
@ -57,7 +115,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
final int numDocs = 16;
final String allowedIndex = "allowed-index";
final String unallowedIndex = "unallowed-index";
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("Running against leader cluster");
createIndex(adminClient(), allowedIndex, Settings.EMPTY);
createIndex(adminClient(), unallowedIndex, Settings.EMPTY);
@ -147,9 +205,8 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
}
public void testAutoFollowPatterns() throws Exception {
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));
final String prefix = getTestName().toLowerCase(Locale.ROOT);
assumeTrue("Test should only run with target_cluster=follow", targetCluster == TargetCluster.FOLLOWER);
final String prefix = "testautofollowpatterns";
String allowedIndex = prefix + "-eu_20190101";
String disallowedIndex = prefix + "-us_20190101";
@ -188,7 +245,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
assertThat(indexExists(disallowedIndex), is(false));
withMonitoring(logger, () -> {
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 120L, TimeUnit.SECONDS);
assertBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring, 120L, TimeUnit.SECONDS);
assertBusy(AbstractCCRRestTestCase::verifyAutoFollowMonitoring, 120L, TimeUnit.SECONDS);
});
} finally {
// Cleanup by deleting auto follow pattern and pause following:
@ -204,7 +261,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
public void testForgetFollower() throws IOException {
final String forgetLeader = "forget-leader";
final String forgetFollower = "forget-follower";
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("running against leader cluster");
createIndex(adminClient(), forgetLeader, indexSettings(1, 0).build());
} else {
@ -253,7 +310,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
final String cleanLeader = "clean-leader";
final String cleanFollower = "clean-follower";
if ("leader".equals(targetCluster)) {
if (targetCluster == TargetCluster.LEADER) {
logger.info("running against leader cluster");
final Settings indexSettings = indexSettings(1, 0).put("index.soft_deletes.enabled", true).build();
createIndex(adminClient(), cleanLeader, indexSettings);
@ -270,9 +327,11 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
}
public void testUnPromoteAndFollowDataStream() throws Exception {
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));
assumeTrue("Test should only run with target_cluster=follow", targetCluster == TargetCluster.FOLLOWER);
var numDocs = 64;
// TODO: We're implicitly relying on index templates from the stack module here. This requires us to install this module
// and several others. We should think about just explicitly creating a data stream index template instead.
var dataStreamName = "logs-eu-monitor1";
var dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);

View file

@ -7,26 +7,68 @@
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import java.io.IOException;
public class RestartIT extends ESCCRRestTestCase {
public class RestartIT extends AbstractCCRRestTestCase {
public static LocalClusterConfigProvider commonConfig = c -> c.module("x-pack-ccr")
.module("analysis-common")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.user("admin", "admin-password", "superuser", false);
public static ElasticsearchCluster leaderCluster = ElasticsearchCluster.local().name("leader-cluster").apply(commonConfig).build();
public static ElasticsearchCluster followerCluster = ElasticsearchCluster.local()
.name("follow-cluster")
.apply(commonConfig)
.setting("cluster.remote.leader_cluster.seeds", () -> "\"" + leaderCluster.getTransportEndpoints() + "\"")
.build();
@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(leaderCluster).around(followerCluster);
public RestartIT(@Name("targetCluster") TargetCluster targetCluster) {
super(targetCluster);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return leaderFollower();
}
@Override
protected ElasticsearchCluster getLeaderCluster() {
return leaderCluster;
}
@Override
protected ElasticsearchCluster getFollowerCluster() {
return followerCluster;
}
public void testRestart() throws Exception {
final int numberOfDocuments = 128;
final String testsTargetCluster = System.getProperty("tests.target_cluster");
switch (testsTargetCluster) {
case "leader" -> {
switch (targetCluster) {
case LEADER -> {
// create a single index "leader" on the leader
createIndexAndIndexDocuments("leader", numberOfDocuments, client());
}
case "follow" -> {
case FOLLOWER -> {
// follow "leader" with "follow-leader" on the follower
followIndex("leader", "follow-leader");
verifyFollower("follow-leader", numberOfDocuments, client());
@ -48,8 +90,11 @@ public class RestartIT extends ESCCRRestTestCase {
// the follower should catch up
verifyFollower("follow-leader-1", numberOfDocuments, client());
}
}
case "follow-restart" -> {
followerCluster.restart(false);
closeClients();
initClient();
try (RestClient leaderClient = buildLeaderClient()) {
// create "leader-2" on the leader, and index some additional documents into existing indices
createIndexAndIndexDocuments("leader-2", numberOfDocuments, leaderClient);
@ -68,7 +113,7 @@ public class RestartIT extends ESCCRRestTestCase {
}
}
default -> {
throw new IllegalArgumentException("unexpected value [" + testsTargetCluster + "] for tests.target_cluster");
throw new IllegalArgumentException("unexpected value [" + targetCluster + "] for targetCluster");
}
}
}

View file

@ -12,13 +12,30 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
public class CcrRestIT extends ESClientYamlSuiteTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
// TODO: Switch to ingeg-test when we fix the xpack info api
.distribution(DistributionType.DEFAULT)
.module("x-pack-ccr")
.setting("xpack.security.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.user("ccr-user", "ccr-user-password", "superuser", false)
// Disable assertions in FollowingEngineAssertions, otherwise an AssertionError is thrown before
// indexing a document directly in a follower index. In a rest test we like to test the exception
// that is thrown in production when indexing a document directly in a follower index.
.jvmArg("-da:org.elasticsearch.xpack.ccr.index.engine.FollowingEngineAssertions")
.build();
public CcrRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ -44,4 +61,8 @@ public class CcrRestIT extends ESClientYamlSuiteTestCase {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("indices:data/read/xpack/ccr/shard_changes"));
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}

View file

@ -12,7 +12,6 @@ apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
dependencies {
testImplementation project(':x-pack:plugin:ccr:qa')
testImplementation project(':x-pack:plugin:core')
testImplementation project(':x-pack:plugin:ilm')
}

View file

@ -24,7 +24,6 @@ import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ccr.ESCCRRestTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.Phase;

View file

@ -4,7 +4,15 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ccr;
package org.elasticsearch.xpack.ilm;
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
@ -40,7 +48,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
public class ESCCRRestTestCase extends ESRestTestCase {
public abstract class ESCCRRestTestCase extends ESRestTestCase {
protected final String targetCluster = System.getProperty("tests.target_cluster");