Migrate discovery-ec2 QA tests to javaRestTest (#119384) (#120511)

No need for all this Gradle magic any more, we can just test the
discovery behaviour directly using Java REST tests.
This commit is contained in:
David Turner 2025-01-21 11:31:01 +00:00 committed by GitHub
parent ff3f731b5c
commit 2e12bbb933
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 965 additions and 425 deletions

View file

@ -31,6 +31,8 @@ dependencies {
api "joda-time:joda-time:2.10.10" api "joda-time:joda-time:2.10.10"
javaRestTestImplementation project(':plugins:discovery-ec2') javaRestTestImplementation project(':plugins:discovery-ec2')
javaRestTestImplementation project(':test:fixtures:aws-fixture-utils')
javaRestTestImplementation project(':test:fixtures:aws-ec2-fixture')
javaRestTestImplementation project(':test:fixtures:ec2-imds-fixture') javaRestTestImplementation project(':test:fixtures:ec2-imds-fixture')
internalClusterTestImplementation project(':test:fixtures:ec2-imds-fixture') internalClusterTestImplementation project(':test:fixtures:ec2-imds-fixture')
@ -101,11 +103,6 @@ tasks.withType(Test).configureEach {
} }
} }
tasks.named("check").configure {
// also execute the QA tests when testing the plugin
dependsOn 'qa:amazon-ec2:check'
}
tasks.named("thirdPartyAudit").configure { tasks.named("thirdPartyAudit").configure {
ignoreMissingClasses( ignoreMissingClasses(
// classes are missing // classes are missing

View file

@ -1,122 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import org.apache.tools.ant.filters.ReplaceTokens
import org.elasticsearch.gradle.internal.test.AntFixture
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.internal.test.rest.LegacyYamlRestTestPlugin
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.legacy-yaml-rest-test'
dependencies {
yamlRestTestImplementation project(':plugins:discovery-ec2')
}
restResources {
restApi {
include '_common', 'cluster', 'nodes'
}
}
final int ec2NumberOfNodes = 3
Map<String, Object> expansions = [
'expected_nodes': ec2NumberOfNodes
]
tasks.named("processYamlRestTestResources").configure {
inputs.properties(expansions)
filter("tokens" : expansions.collectEntries {k, v -> [k, v.toString()]} /* must be a map of strings */, ReplaceTokens.class)
}
// disable default yamlRestTest task, use spezialized ones below
tasks.named("yamlRestTest").configure { enabled = false }
/*
* Test using various credential providers (see also https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html):
* - Elasticsearch Keystore (secure settings discovery.ec2.access_key and discovery.ec2.secret_key)
* - Java system properties (aws.accessKeyId and aws.secretAccessKey)
* - Environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY)
* - ECS container credentials (loaded from ECS if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is set)
* - Instance profile credentials (delivered through the EC2 metadata service)
*
* Notably missing is a test for the default credential profiles file, which is located at ~/.aws/credentials and would at least require a
* custom Java security policy to work.
*/
['KeyStore', 'EnvVariables', 'SystemProperties', 'ContainerCredentials', 'InstanceProfile'].forEach { action ->
TaskProvider<AntFixture> fixture = tasks.register("ec2Fixture${action}", AntFixture) {
dependsOn project.sourceSets.yamlRestTest.runtimeClasspath
FileCollection cp = project.sourceSets.yamlRestTest.runtimeClasspath
env 'CLASSPATH', "${-> cp.asPath}"
executable = "${buildParams.runtimeJavaHome.get() }/bin/java"
args 'org.elasticsearch.discovery.ec2.AmazonEC2Fixture', baseDir, "${buildDir}/testclusters/yamlRestTest${action}-1/config/unicast_hosts.txt"
}
def yamlRestTestTask = tasks.register("yamlRestTest${action}", RestIntegTestTask) {
dependsOn fixture
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
SourceSet yamlRestTestSourceSet = sourceSets.getByName(LegacyYamlRestTestPlugin.SOURCE_SET_NAME)
testClassesDirs = yamlRestTestSourceSet.getOutput().getClassesDirs()
classpath = yamlRestTestSourceSet.getRuntimeClasspath()
}
if(action == 'ContainerCredentials') {
def addressAndPortSource = fixture.get().addressAndPortSource
testClusters.matching { it.name == "yamlRestTestContainerCredentials" }.configureEach {
environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI',
() -> addressAndPortSource.map{ addr -> "http://${addr}/ecs_credentials_endpoint" }.get(), IGNORE_VALUE
}
}
tasks.named("check").configure {
dependsOn(yamlRestTestTask)
}
def addressAndPortSource = fixture.get().addressAndPortSource
testClusters.matching { it.name == yamlRestTestTask.name}.configureEach {
numberOfNodes = ec2NumberOfNodes
plugin ':plugins:discovery-ec2'
setting 'discovery.seed_providers', 'ec2'
setting 'network.host', '_ec2_'
setting 'discovery.ec2.endpoint', { "http://${-> addressAndPortSource.get()}" }, IGNORE_VALUE
systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "http://${-> addressAndPortSource.get()}" }, IGNORE_VALUE
}
}
// Extra config for KeyStore
testClusters.matching { it.name == "yamlRestTestKeyStore" }.configureEach {
keystore 'discovery.ec2.access_key', 'ec2_integration_test_access_key'
keystore 'discovery.ec2.secret_key', 'ec2_integration_test_secret_key'
}
// Extra config for EnvVariables
testClusters.matching { it.name == "yamlRestTestEnvVariables" }.configureEach {
environment 'AWS_ACCESS_KEY_ID', 'ec2_integration_test_access_key'
environment 'AWS_SECRET_ACCESS_KEY', 'ec2_integration_test_secret_key'
}
// Extra config for SystemProperties
testClusters.matching { it.name == "yamlRestTestSystemProperties" }.configureEach {
systemProperty 'aws.accessKeyId', 'ec2_integration_test_access_key'
systemProperty 'aws.secretKey', 'ec2_integration_test_secret_key'
}
// Extra config for ContainerCredentials
tasks.named("ec2FixtureContainerCredentials").configure {
env 'ACTIVATE_CONTAINER_CREDENTIALS', true
}
// Extra config for InstanceProfile
tasks.named("ec2FixtureInstanceProfile").configure {
env 'ACTIVATE_INSTANCE_PROFILE', true
}

View file

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
public class AmazonEC2DiscoveryClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public AmazonEC2DiscoveryClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters();
}
}

View file

@ -1,251 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.util.DateUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URLEncodedUtils;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* {@link AmazonEC2Fixture} is a fixture that emulates an AWS EC2 service.
*/
public class AmazonEC2Fixture extends AbstractHttpFixture {
private static final String IMDSV_2_TOKEN = "imdsv2-token";
private static final String X_AWS_EC_2_METADATA_TOKEN = "X-aws-ec2-metadata-token";
private final Path nodes;
private final boolean instanceProfile;
private final boolean containerCredentials;
private AmazonEC2Fixture(final String workingDir, final String nodesUriPath, boolean instanceProfile, boolean containerCredentials) {
super(workingDir);
this.nodes = toPath(Objects.requireNonNull(nodesUriPath));
this.instanceProfile = instanceProfile;
this.containerCredentials = containerCredentials;
}
public static void main(String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("AmazonEC2Fixture <working directory> <nodes transport uri file>");
}
boolean instanceProfile = Booleans.parseBoolean(System.getenv("ACTIVATE_INSTANCE_PROFILE"), false);
boolean containerCredentials = Booleans.parseBoolean(System.getenv("ACTIVATE_CONTAINER_CREDENTIALS"), false);
final AmazonEC2Fixture fixture = new AmazonEC2Fixture(args[0], args[1], instanceProfile, containerCredentials);
fixture.listen();
}
@Override
protected Response handle(final Request request) throws IOException {
if ("/".equals(request.getPath()) && (HttpPost.METHOD_NAME.equals(request.getMethod()))) {
final String userAgent = request.getHeader("User-Agent");
if (userAgent != null && userAgent.startsWith("aws-sdk-java")) {
final String auth = request.getHeader("Authorization");
if (auth == null || auth.contains("ec2_integration_test_access_key") == false) {
throw new IllegalArgumentException("wrong access key: " + auth);
}
// Simulate an EC2 DescribeInstancesResponse
byte[] responseBody = EMPTY_BYTE;
for (NameValuePair parse : URLEncodedUtils.parse(new String(request.getBody(), UTF_8), UTF_8)) {
if ("Action".equals(parse.getName())) {
responseBody = generateDescribeInstancesResponse();
break;
}
}
return new Response(RestStatus.OK.getStatus(), contentType("text/xml; charset=UTF-8"), responseBody);
}
}
if ("/latest/meta-data/local-ipv4".equals(request.getPath())
&& (HttpGet.METHOD_NAME.equals(request.getMethod()))
&& request.getHeaders().getOrDefault(X_AWS_EC_2_METADATA_TOKEN, "").equals(IMDSV_2_TOKEN)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, "127.0.0.1".getBytes(UTF_8));
}
if (instanceProfile
&& "/latest/meta-data/iam/security-credentials/".equals(request.getPath())
&& HttpGet.METHOD_NAME.equals(request.getMethod())
&& request.getHeaders().getOrDefault(X_AWS_EC_2_METADATA_TOKEN, "").equals(IMDSV_2_TOKEN)) {
final Map<String, String> headers = new HashMap<>(contentType("text/plain"));
return new Response(RestStatus.OK.getStatus(), headers, "my_iam_profile".getBytes(UTF_8));
}
if ("/latest/api/token".equals(request.getPath()) && HttpPut.METHOD_NAME.equals(request.getMethod())) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, IMDSV_2_TOKEN.getBytes(StandardCharsets.UTF_8));
}
if ((containerCredentials
&& "/ecs_credentials_endpoint".equals(request.getPath())
&& HttpGet.METHOD_NAME.equals(request.getMethod()))
|| ("/latest/meta-data/iam/security-credentials/my_iam_profile".equals(request.getPath())
&& HttpGet.METHOD_NAME.equals(request.getMethod())
&& request.getHeaders().getOrDefault(X_AWS_EC_2_METADATA_TOKEN, "").equals(IMDSV_2_TOKEN))) {
final Date expiration = new Date(new Date().getTime() + TimeUnit.DAYS.toMillis(1));
final String response = String.format(Locale.ROOT, """
{
"AccessKeyId": "ec2_integration_test_access_key",
"Expiration": "%s",
"RoleArn": "test",
"SecretAccessKey": "ec2_integration_test_secret_key",
"Token": "test"
}""", DateUtils.formatISO8601Date(expiration));
final Map<String, String> headers = new HashMap<>(contentType("application/json"));
return new Response(RestStatus.OK.getStatus(), headers, response.getBytes(UTF_8));
}
return null;
}
/**
* Generates a XML response that describe the EC2 instances
*/
private byte[] generateDescribeInstancesResponse() {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
final StringWriter out = new StringWriter();
XMLStreamWriter sw;
try {
sw = xmlOutputFactory.createXMLStreamWriter(out);
sw.writeStartDocument();
String namespace = "http://ec2.amazonaws.com/doc/2013-02-01/";
sw.setDefaultNamespace(namespace);
sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", namespace);
{
sw.writeStartElement("requestId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("reservationSet");
{
if (Files.exists(nodes)) {
for (String address : Files.readAllLines(nodes)) {
sw.writeStartElement("item");
{
sw.writeStartElement("reservationId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("instancesSet");
{
sw.writeStartElement("item");
{
sw.writeStartElement("instanceId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("imageId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("instanceState");
{
sw.writeStartElement("code");
sw.writeCharacters("16");
sw.writeEndElement();
sw.writeStartElement("name");
sw.writeCharacters("running");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateDnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("dnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("instanceType");
sw.writeCharacters("m1.medium");
sw.writeEndElement();
sw.writeStartElement("placement");
{
sw.writeStartElement("availabilityZone");
sw.writeCharacters("use-east-1e");
sw.writeEndElement();
sw.writeEmptyElement("groupName");
sw.writeStartElement("tenancy");
sw.writeCharacters("default");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateIpAddress");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("ipAddress");
sw.writeCharacters(address);
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
}
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeEndDocument();
sw.flush();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return out.toString().getBytes(UTF_8);
}
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
private static Path toPath(final String dir) {
return Paths.get(dir);
}
}

View file

@ -1,15 +0,0 @@
# Integration tests for discovery-ec2
setup:
- do:
cluster.health:
wait_for_status: green
wait_for_nodes: @expected_nodes@
---
"All nodes are correctly discovered":
- do:
nodes.info:
metric: [ transport ]
- match: { _nodes.total: @expected_nodes@ }

View file

@ -1 +0,0 @@
group = "${group}.plugins.discovery-ec2.qa"

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.LogType;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.regex.Pattern;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
public abstract class DiscoveryEc2ClusterFormationTestCase extends ESRestTestCase {
protected abstract ElasticsearchCluster getCluster();
@Override
protected String getTestRestCluster() {
return getCluster().getHttpAddresses();
}
public void testClusterFormation() throws IOException {
final var cluster = getCluster();
final var expectedAddresses = new HashSet<>(cluster.getAvailableTransportEndpoints());
final var addressesPattern = Pattern.compile(".* using dynamic transport addresses \\[(.*)]");
assertThat(cluster.getNumNodes(), greaterThan(1)); // multiple node cluster means discovery must have worked
for (int nodeIndex = 0; nodeIndex < cluster.getNumNodes(); nodeIndex++) {
try (
var logStream = cluster.getNodeLog(nodeIndex, LogType.SERVER);
var logReader = new InputStreamReader(logStream, StandardCharsets.UTF_8);
var bufReader = new BufferedReader(logReader)
) {
do {
final var line = bufReader.readLine();
if (line == null) {
break;
}
final var matcher = addressesPattern.matcher(line);
if (matcher.matches()) {
for (final var address : matcher.group(1).split(", ")) {
// TODO also add some nodes to the DescribeInstances output which are filtered out, and verify that we do not
// see their addresses here
assertThat(expectedAddresses, hasItem(address));
}
}
} while (true);
}
}
}
protected static String getIdentifierPrefix(String testSuiteName) {
return testSuiteName + "-" + Integer.toString(Murmur3HashFunction.hash(testSuiteName + System.getProperty("tests.seed")), 16) + "-";
}
}

View file

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.DynamicAwsCredentials;
import fixture.aws.ec2.AwsEc2HttpFixture;
import fixture.aws.imds.Ec2ImdsHttpFixture;
import fixture.aws.imds.Ec2ImdsServiceBuilder;
import fixture.aws.imds.Ec2ImdsVersion;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
import java.util.Set;
public class DiscoveryEc2EcsCredentialsIT extends DiscoveryEc2ClusterFormationTestCase {
private static final DynamicAwsCredentials dynamicCredentials = new DynamicAwsCredentials();
private static final String PREFIX = getIdentifierPrefix("DiscoveryEc2EcsCredentialsIT");
private static final String REGION = PREFIX + "-region";
private static final String CREDENTIALS_ENDPOINT = "/ecs_credentials_endpoint_" + PREFIX;
private static final Ec2ImdsHttpFixture ec2ImdsHttpFixture = new Ec2ImdsHttpFixture(
new Ec2ImdsServiceBuilder(Ec2ImdsVersion.V1).newCredentialsConsumer(dynamicCredentials::addValidCredentials)
.alternativeCredentialsEndpoints(Set.of(CREDENTIALS_ENDPOINT))
);
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
dynamicCredentials::isAuthorized,
DiscoveryEc2EcsCredentialsIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.environment("AWS_CONTAINER_CREDENTIALS_FULL_URI", () -> ec2ImdsHttpFixture.getAddress() + CREDENTIALS_ENDPOINT)
.environment("AWS_REGION", REGION)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ImdsHttpFixture).around(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.ec2.AwsEc2HttpFixture;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
public class DiscoveryEc2EnvironmentVariableCredentialsIT extends DiscoveryEc2ClusterFormationTestCase {
private static final String PREFIX = getIdentifierPrefix("DiscoveryEc2EnvironmentVariableCredentialsIT");
private static final String REGION = PREFIX + "-region";
private static final String ACCESS_KEY = PREFIX + "-access-key";
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
fixedAccessKey(ACCESS_KEY),
DiscoveryEc2EnvironmentVariableCredentialsIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.environment("AWS_REGION", REGION)
.environment("AWS_ACCESS_KEY_ID", ACCESS_KEY)
.environment("AWS_SECRET_ACCESS_KEY", ESTestCase::randomIdentifier)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.DynamicAwsCredentials;
import fixture.aws.ec2.AwsEc2HttpFixture;
import fixture.aws.imds.Ec2ImdsHttpFixture;
import fixture.aws.imds.Ec2ImdsServiceBuilder;
import fixture.aws.imds.Ec2ImdsVersion;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
public class DiscoveryEc2InstanceProfileIT extends DiscoveryEc2ClusterFormationTestCase {
private static final DynamicAwsCredentials dynamicCredentials = new DynamicAwsCredentials();
private static final Ec2ImdsHttpFixture ec2ImdsHttpFixture = new Ec2ImdsHttpFixture(
new Ec2ImdsServiceBuilder(Ec2ImdsVersion.V2).instanceIdentityDocument(
(builder, params) -> builder.field("region", randomIdentifier())
).newCredentialsConsumer(dynamicCredentials::addValidCredentials)
);
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
dynamicCredentials::isAuthorized,
DiscoveryEc2InstanceProfileIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.systemProperty(Ec2ImdsHttpFixture.ENDPOINT_OVERRIDE_SYSPROP_NAME, ec2ImdsHttpFixture::getAddress)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ImdsHttpFixture).around(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.ec2.AwsEc2HttpFixture;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
public class DiscoveryEc2KeystoreCredentialsIT extends DiscoveryEc2ClusterFormationTestCase {
private static final String PREFIX = getIdentifierPrefix("DiscoveryEc2KeystoreCredentialsIT");
private static final String REGION = PREFIX + "-region";
private static final String ACCESS_KEY = PREFIX + "-access-key";
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
fixedAccessKey(ACCESS_KEY),
DiscoveryEc2KeystoreCredentialsIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.environment("AWS_REGION", REGION)
.keystore("discovery.ec2.access_key", ACCESS_KEY)
.keystore("discovery.ec2.secret_key", ESTestCase::randomIdentifier)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.ec2.AwsEc2HttpFixture;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
import static fixture.aws.AwsCredentialsUtils.fixedAccessKeyAndToken;
public class DiscoveryEc2KeystoreSessionCredentialsIT extends DiscoveryEc2ClusterFormationTestCase {
private static final String PREFIX = getIdentifierPrefix("DiscoveryEc2KeystoreSessionCredentialsIT");
private static final String REGION = PREFIX + "-region";
private static final String ACCESS_KEY = PREFIX + "-access-key";
private static final String SESSION_TOKEN = PREFIX + "-session-token";
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
fixedAccessKeyAndToken(ACCESS_KEY, SESSION_TOKEN),
DiscoveryEc2KeystoreSessionCredentialsIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.environment("AWS_REGION", REGION)
.keystore("discovery.ec2.access_key", ACCESS_KEY)
.keystore("discovery.ec2.secret_key", ESTestCase::randomIdentifier)
.keystore("discovery.ec2.session_token", SESSION_TOKEN)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.discovery.ec2;
import fixture.aws.ec2.AwsEc2HttpFixture;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.List;
import static fixture.aws.AwsCredentialsUtils.fixedAccessKey;
public class DiscoveryEc2SystemPropertyCredentialsIT extends DiscoveryEc2ClusterFormationTestCase {
private static final String PREFIX = getIdentifierPrefix("DiscoveryEc2SystemPropertyCredentialsIT");
private static final String REGION = PREFIX + "-region";
private static final String ACCESS_KEY = PREFIX + "-access-key";
private static final AwsEc2HttpFixture ec2ApiFixture = new AwsEc2HttpFixture(
fixedAccessKey(ACCESS_KEY),
DiscoveryEc2SystemPropertyCredentialsIT::getAvailableTransportEndpoints
);
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
.nodes(2)
.plugin("discovery-ec2")
.setting(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), Ec2DiscoveryPlugin.EC2_SEED_HOSTS_PROVIDER_NAME)
.setting("logger." + AwsEc2SeedHostsProvider.class.getCanonicalName(), "DEBUG")
.setting(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), ec2ApiFixture::getAddress)
.environment("AWS_REGION", REGION)
.systemProperty("aws.accessKeyId", ACCESS_KEY)
.systemProperty("aws.secretKey", ESTestCase::randomIdentifier)
.build();
private static List<String> getAvailableTransportEndpoints() {
return cluster.getAvailableTransportEndpoints();
}
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(ec2ApiFixture).around(cluster);
@Override
protected ElasticsearchCluster getCluster() {
return cluster;
}
}

View file

@ -37,7 +37,6 @@ import java.nio.charset.StandardCharsets;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -47,7 +46,7 @@ import static org.elasticsearch.discovery.ec2.AwsEc2Utils.X_AWS_EC_2_METADATA_TO
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, ReloadablePlugin { public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, ReloadablePlugin {
private static final Logger logger = LogManager.getLogger(Ec2DiscoveryPlugin.class); private static final Logger logger = LogManager.getLogger(Ec2DiscoveryPlugin.class);
public static final String EC2 = "ec2"; public static final String EC2_SEED_HOSTS_PROVIDER_NAME = "ec2";
static { static {
SpecialPermission.check(); SpecialPermission.check();
@ -91,7 +90,7 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Reloa
@Override @Override
public Map<String, Supplier<SeedHostsProvider>> getSeedHostProviders(TransportService transportService, NetworkService networkService) { public Map<String, Supplier<SeedHostsProvider>> getSeedHostProviders(TransportService transportService, NetworkService networkService) {
return Collections.singletonMap(EC2, () -> new AwsEc2SeedHostsProvider(settings, transportService, ec2Service)); return Map.of(EC2_SEED_HOSTS_PROVIDER_NAME, () -> new AwsEc2SeedHostsProvider(settings, transportService, ec2Service));
} }
@Override @Override

View file

@ -91,6 +91,7 @@ List projects = [
'distribution:tools:ansi-console', 'distribution:tools:ansi-console',
'server', 'server',
'test:framework', 'test:framework',
'test:fixtures:aws-ec2-fixture',
'test:fixtures:aws-fixture-utils', 'test:fixtures:aws-fixture-utils',
'test:fixtures:aws-sts-fixture', 'test:fixtures:aws-sts-fixture',
'test:fixtures:azure-fixture', 'test:fixtures:azure-fixture',

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
apply plugin: 'elasticsearch.java'
description = 'Fixture for emulating the EC2 DescribeInstances API running in AWS'
dependencies {
api project(':server')
api("junit:junit:${versions.junit}") {
transitive = false
}
api project(':test:framework')
api project(':test:fixtures:aws-fixture-utils')
}

View file

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package fixture.aws.ec2;
import com.sun.net.httpserver.HttpServer;
import org.junit.rules.ExternalResource;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Objects;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
public class AwsEc2HttpFixture extends ExternalResource {
private HttpServer server;
private final Supplier<List<String>> transportAddressesSupplier;
private final BiPredicate<String, String> authorizationPredicate;
public AwsEc2HttpFixture(BiPredicate<String, String> authorizationPredicate, Supplier<List<String>> transportAddressesSupplier) {
this.authorizationPredicate = Objects.requireNonNull(authorizationPredicate);
this.transportAddressesSupplier = Objects.requireNonNull(transportAddressesSupplier);
}
public String getAddress() {
return "http://" + server.getAddress().getHostString() + ":" + server.getAddress().getPort();
}
public void stop(int delay) {
server.stop(delay);
}
protected void before() throws Throwable {
server = HttpServer.create(resolveAddress(), 0);
server.createContext("/", new AwsEc2HttpHandler(authorizationPredicate, transportAddressesSupplier));
server.start();
}
@Override
protected void after() {
stop(0);
}
private static InetSocketAddress resolveAddress() {
try {
return new InetSocketAddress(InetAddress.getByName("localhost"), 0);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
}

View file

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package fixture.aws.ec2;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.http.client.utils.URLEncodedUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
import static fixture.aws.AwsCredentialsUtils.checkAuthorization;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.test.ESTestCase.randomIdentifier;
import static org.junit.Assert.assertNull;
/**
* Minimal HTTP handler that emulates the AWS EC2 endpoint (at least, just the DescribeInstances action therein)
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate the AWS EC2 endpoint")
public class AwsEc2HttpHandler implements HttpHandler {
private final BiPredicate<String, String> authorizationPredicate;
private final Supplier<List<String>> transportAddressesSupplier;
public AwsEc2HttpHandler(BiPredicate<String, String> authorizationPredicate, Supplier<List<String>> transportAddressesSupplier) {
this.authorizationPredicate = Objects.requireNonNull(authorizationPredicate);
this.transportAddressesSupplier = Objects.requireNonNull(transportAddressesSupplier);
}
@Override
public void handle(final HttpExchange exchange) throws IOException {
try (exchange) {
if ("POST".equals(exchange.getRequestMethod()) && "/".equals(exchange.getRequestURI().getPath())) {
if (checkAuthorization(authorizationPredicate, exchange) == false) {
return;
}
final var parsedRequest = new HashMap<String, String>();
for (final var nameValuePair : URLEncodedUtils.parse(new String(exchange.getRequestBody().readAllBytes(), UTF_8), UTF_8)) {
assertNull(nameValuePair.getName(), parsedRequest.put(nameValuePair.getName(), nameValuePair.getValue()));
}
if ("DescribeInstances".equals(parsedRequest.get("Action")) == false) {
throw new UnsupportedOperationException(parsedRequest.toString());
}
final var responseBody = generateDescribeInstancesResponse();
exchange.getResponseHeaders().add("Content-Type", "text/xml; charset=UTF-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
exchange.getResponseBody().write(responseBody);
return;
}
throw new UnsupportedOperationException("can only handle DescribeInstances requests");
} catch (Exception e) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(e));
}
}
private static final String XML_NAMESPACE = "http://ec2.amazonaws.com/doc/2013-02-01/";
private byte[] generateDescribeInstancesResponse() {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
final StringWriter out = new StringWriter();
XMLStreamWriter sw;
try {
sw = xmlOutputFactory.createXMLStreamWriter(out);
sw.writeStartDocument();
sw.setDefaultNamespace(XML_NAMESPACE);
sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", XML_NAMESPACE);
{
sw.writeStartElement("requestId");
sw.writeCharacters(randomIdentifier());
sw.writeEndElement();
sw.writeStartElement("reservationSet");
{
for (final var address : transportAddressesSupplier.get()) {
sw.writeStartElement("item");
{
sw.writeStartElement("reservationId");
sw.writeCharacters(randomIdentifier());
sw.writeEndElement();
sw.writeStartElement("instancesSet");
{
sw.writeStartElement("item");
{
sw.writeStartElement("instanceId");
sw.writeCharacters(randomIdentifier());
sw.writeEndElement();
sw.writeStartElement("imageId");
sw.writeCharacters(randomIdentifier());
sw.writeEndElement();
sw.writeStartElement("instanceState");
{
sw.writeStartElement("code");
sw.writeCharacters("16");
sw.writeEndElement();
sw.writeStartElement("name");
sw.writeCharacters("running");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateDnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("dnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("instanceType");
sw.writeCharacters("m1.medium"); // TODO randomize
sw.writeEndElement();
sw.writeStartElement("placement");
{
sw.writeStartElement("availabilityZone");
sw.writeCharacters(randomIdentifier());
sw.writeEndElement();
sw.writeEmptyElement("groupName");
sw.writeStartElement("tenancy");
sw.writeCharacters("default");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateIpAddress");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("ipAddress");
sw.writeCharacters(address);
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeEndDocument();
sw.flush();
}
} catch (Exception e) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(e));
throw new RuntimeException(e);
}
return out.toString().getBytes(UTF_8);
}
}

View file

@ -0,0 +1,211 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package fixture.aws.ec2;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpPrincipal;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
public class AwsEc2HttpHandlerTests extends ESTestCase {
public void testDescribeInstances() throws IOException, XMLStreamException {
final List<String> addresses = randomList(
1,
10,
() -> "10.0." + between(1, 254) + "." + between(1, 254) + ":" + between(1025, 65535)
);
final var handler = new AwsEc2HttpHandler((ignored1, ignored2) -> true, () -> addresses);
final var response = handleRequest(handler);
assertEquals(RestStatus.OK, response.status());
final var unseenAddressesInTags = Stream.of("privateDnsName", "dnsName", "privateIpAddress", "ipAddress")
.collect(
Collectors.toMap(
localName -> new QName("http://ec2.amazonaws.com/doc/2013-02-01/", localName),
localName -> new HashSet<>(addresses)
)
);
final var xmlStreamReader = XMLInputFactory.newDefaultFactory().createXMLStreamReader(response.body().streamInput());
try {
for (; xmlStreamReader.getEventType() != XMLStreamConstants.END_DOCUMENT; xmlStreamReader.next()) {
if (xmlStreamReader.getEventType() == XMLStreamConstants.START_ELEMENT) {
final var unseenAddresses = unseenAddressesInTags.get(xmlStreamReader.getName());
if (unseenAddresses != null) {
xmlStreamReader.next();
assertEquals(XMLStreamConstants.CHARACTERS, xmlStreamReader.getEventType());
final var currentAddress = xmlStreamReader.getText();
assertTrue(currentAddress, unseenAddresses.remove(currentAddress));
}
}
}
} finally {
xmlStreamReader.close();
}
assertTrue(unseenAddressesInTags.toString(), unseenAddressesInTags.values().stream().allMatch(HashSet::isEmpty));
}
private record TestHttpResponse(RestStatus status, BytesReference body) {}
private static TestHttpResponse handleRequest(AwsEc2HttpHandler handler) {
final var httpExchange = new TestHttpExchange(
"POST",
"/",
new BytesArray("Action=DescribeInstances"),
TestHttpExchange.EMPTY_HEADERS
);
try {
handler.handle(httpExchange);
} catch (IOException e) {
fail(e);
}
assertNotEquals(0, httpExchange.getResponseCode());
return new TestHttpResponse(RestStatus.fromCode(httpExchange.getResponseCode()), httpExchange.getResponseBodyContents());
}
private static class TestHttpExchange extends HttpExchange {
private static final Headers EMPTY_HEADERS = new Headers();
private final String method;
private final URI uri;
private final BytesReference requestBody;
private final Headers requestHeaders;
private final Headers responseHeaders = new Headers();
private final BytesStreamOutput responseBody = new BytesStreamOutput();
private int responseCode;
TestHttpExchange(String method, String uri, BytesReference requestBody, Headers requestHeaders) {
this.method = method;
this.uri = URI.create(uri);
this.requestBody = requestBody;
this.requestHeaders = requestHeaders;
}
@Override
public Headers getRequestHeaders() {
return requestHeaders;
}
@Override
public Headers getResponseHeaders() {
return responseHeaders;
}
@Override
public URI getRequestURI() {
return uri;
}
@Override
public String getRequestMethod() {
return method;
}
@Override
public HttpContext getHttpContext() {
return null;
}
@Override
public void close() {}
@Override
public InputStream getRequestBody() {
try {
return requestBody.streamInput();
} catch (IOException e) {
throw new AssertionError(e);
}
}
@Override
public OutputStream getResponseBody() {
return responseBody;
}
@Override
public void sendResponseHeaders(int rCode, long responseLength) {
this.responseCode = rCode;
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
}
@Override
public int getResponseCode() {
return responseCode;
}
public BytesReference getResponseBodyContents() {
return responseBody.bytes();
}
@Override
public InetSocketAddress getLocalAddress() {
return null;
}
@Override
public String getProtocol() {
return "HTTP/1.1";
}
@Override
public Object getAttribute(String name) {
return null;
}
@Override
public void setAttribute(String name, Object value) {
fail("setAttribute not implemented");
}
@Override
public void setStreams(InputStream i, OutputStream o) {
fail("setStreams not implemented");
}
@Override
public HttpPrincipal getPrincipal() {
fail("getPrincipal not implemented");
throw new UnsupportedOperationException("getPrincipal not implemented");
}
}
}

View file

@ -218,6 +218,20 @@ public abstract class AbstractLocalClusterFactory<S extends LocalClusterSpec, H
return readPortsFile(portsFile).get(0); return readPortsFile(portsFile).get(0);
} }
/**
* @return the available transport endpoints of this node; if the node has no available transport endpoints yet then returns an
* empty list.
*/
public List<String> getAvailableTransportEndpoints() {
Path portsFile = workingDir.resolve("logs").resolve("transport.ports");
if (Files.notExists(portsFile)) {
// Ok if missing, we're only returning the _available_ transport endpoints and the node might not yet be started up.
// If we're using this for discovery then we'll retry until we see enough running nodes to form the cluster.
return List.of();
}
return readPortsFile(portsFile);
}
public String getRemoteClusterServerEndpoint() { public String getRemoteClusterServerEndpoint() {
if (spec.isRemoteClusterServerEnabled()) { if (spec.isRemoteClusterServerEnabled()) {
Path portsFile = workingDir.resolve("logs").resolve("remote_cluster.ports"); Path portsFile = workingDir.resolve("logs").resolve("remote_cluster.ports");

View file

@ -24,6 +24,7 @@ import java.net.MalformedURLException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -132,6 +133,19 @@ public class DefaultLocalClusterHandle implements LocalClusterHandle {
return execute(() -> nodes.parallelStream().map(Node::getTransportEndpoint).collect(Collectors.joining(","))); return execute(() -> nodes.parallelStream().map(Node::getTransportEndpoint).collect(Collectors.joining(",")));
} }
@Override
public List<String> getAvailableTransportEndpoints() {
final var results = new ArrayList<String>(nodes.size() * 2); // *2 because each node has both IPv4 and IPv6 addresses
for (final var node : nodes) {
try {
results.addAll(node.getAvailableTransportEndpoints());
} catch (Exception e) {
LOGGER.warn("failure reading available transport endpoints from [{}]", node.getName(), e);
}
}
return results;
}
@Override @Override
public String getTransportEndpoint(int index) { public String getTransportEndpoint(int index) {
return getTransportEndpoints().split(",")[index]; return getTransportEndpoints().split(",")[index];
@ -252,6 +266,13 @@ public class DefaultLocalClusterHandle implements LocalClusterHandle {
String transportUris = execute(() -> nodes.parallelStream().map(Node::getTransportEndpoint).collect(Collectors.joining("\n"))); String transportUris = execute(() -> nodes.parallelStream().map(Node::getTransportEndpoint).collect(Collectors.joining("\n")));
execute(() -> nodes.parallelStream().forEach(node -> { execute(() -> nodes.parallelStream().forEach(node -> {
try { try {
if (node.getSpec().getPlugins().containsKey("discovery-ec2")) {
// If we're using (i.e. testing) a discovery plugin then suppress the file-based discovery mechanism, to make sure the
// test does not pass spuriously by using file-based discovery.
// TODO find a way to do this without just hard-coding the plugin name here.
LOGGER.info("Skipping writing unicast hosts file for node {}", node.getName());
return;
}
Path hostsFile = node.getWorkingDir().resolve("config").resolve("unicast_hosts.txt"); Path hostsFile = node.getWorkingDir().resolve("config").resolve("unicast_hosts.txt");
LOGGER.info("Writing unicast hosts file {} for node {}", hostsFile, node.getName()); LOGGER.info("Writing unicast hosts file {} for node {}", hostsFile, node.getName());
Files.writeString(hostsFile, transportUris); Files.writeString(hostsFile, transportUris);

View file

@ -19,6 +19,7 @@ import java.io.InputStream;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
public class DefaultLocalElasticsearchCluster<S extends LocalClusterSpec, H extends LocalClusterHandle> implements ElasticsearchCluster { public class DefaultLocalElasticsearchCluster<S extends LocalClusterSpec, H extends LocalClusterHandle> implements ElasticsearchCluster {
@ -126,6 +127,12 @@ public class DefaultLocalElasticsearchCluster<S extends LocalClusterSpec, H exte
return handle.getTransportEndpoints(); return handle.getTransportEndpoints();
} }
@Override
public List<String> getAvailableTransportEndpoints() {
checkHandle();
return handle.getAvailableTransportEndpoints();
}
@Override @Override
public String getTransportEndpoint(int index) { public String getTransportEndpoint(int index) {
checkHandle(); checkHandle();

View file

@ -15,6 +15,7 @@ import org.elasticsearch.test.cluster.MutableSettingsProvider;
import org.elasticsearch.test.cluster.util.Version; import org.elasticsearch.test.cluster.util.Version;
import java.io.InputStream; import java.io.InputStream;
import java.util.List;
public interface LocalClusterHandle extends ClusterHandle { public interface LocalClusterHandle extends ClusterHandle {
@ -54,6 +55,11 @@ public interface LocalClusterHandle extends ClusterHandle {
*/ */
String getTransportEndpoints(); String getTransportEndpoints();
/**
* @return a list of all available TCP transport endpoints, which may be empty if none of the nodes in this cluster are started.
*/
List<String> getAvailableTransportEndpoints();
/** /**
* Returns the TCP transport endpoint for the node at the given index. If this method is called on an unstarted cluster, the cluster * Returns the TCP transport endpoint for the node at the given index. If this method is called on an unstarted cluster, the cluster
* will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability. * will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability.