Allow simple connection strategy to be configured (#49066)

Currently the simple connection strategy only exists in the code. It
cannot be configured. This commit moves in the direction of allowing it
to be configured. It introduces settings for the addresses and socket
count. Additionally it introduces new settings for the sniff strategy
so that the more generic number of connections and seed node settings
can be deprecated.

The simple settings are not yet registered as the registration is
dependent on follow-up work to validate the settings.
This commit is contained in:
Tim Brooks 2019-11-19 09:08:03 -07:00 committed by GitHub
parent 3cf6569e0e
commit 4f4140431b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 411 additions and 314 deletions

View file

@ -83,7 +83,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

View file

@ -87,7 +87,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

View file

@ -144,7 +144,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString()));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", remoteNode.getAddress().toString()));
for (int i = 0; i < 10; i++) {
restHighLevelClient.index(
@ -229,7 +229,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
assertSearchConnectFailure();
Map<String, Object> map = new HashMap<>();
map.put("seeds", null);
map.put("sniff.seeds", null);
map.put("skip_unavailable", null);
updateRemoteClusterSettings(map);
}
@ -248,32 +248,32 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.seeds] " +
containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("seeds", remoteNode.getAddress().toString());
settingsMap.put("sniff.seeds", remoteNode.getAddress().toString());
settingsMap.put("skip_unavailable", randomBoolean());
updateRemoteClusterSettings(settingsMap);
{
//check that seeds cannot be reset alone if skip_unavailable is set
Request request = new Request("PUT", "/_cluster/settings");
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null)));
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("sniff.seeds", null)));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
if (randomBoolean()) {
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null));
updateRemoteClusterSettings(Collections.singletonMap("seeds", null));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", null));
} else {
Map<String, Object> nullMap = new HashMap<>();
nullMap.put("seeds", null);
nullMap.put("sniff.seeds", null);
nullMap.put("skip_unavailable", null);
updateRemoteClusterSettings(nullMap);
}

View file

@ -101,8 +101,8 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -280,8 +280,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
@ -289,6 +287,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,

View file

@ -19,77 +19,26 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
*/
public abstract class RemoteClusterAware {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
/**
* A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in
* the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect
* to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately
* undocumented as it does not work well with all proxies.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"cluster.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
protected final Settings settings;
private final ClusterNameExpressionResolver clusterNameResolver;
@ -106,57 +55,7 @@ public abstract class RemoteClusterAware {
* Returns remote clusters that are enabled in these settings
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
final Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings
.map(REMOTE_CLUSTERS_SEEDS::getNamespace)
.filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings))
.collect(Collectors.toSet());
}
/**
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
*/
protected static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings) {
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
return remoteSeeds.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(seedsSetting::getNamespace, concreteSetting -> {
String clusterName = seedsSetting.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode =
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
List<Tuple<String, Supplier<DiscoveryNode>>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode)));
}
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
if (proxyMode) {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
return RemoteConnectionStrategy.getRemoteClusters(settings);
}
/**
@ -203,52 +102,12 @@ public abstract class RemoteClusterAware {
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}
static InetSocketAddress parseSeedAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, port);
}
public static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
}
private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse port", e);
}
}
private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
? indexName : clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;

View file

@ -68,7 +68,7 @@ final class RemoteClusterConnection implements Closeable {
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
this(settings, clusterAlias, transportService,
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
createConnectionManager(RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings), transportService));
}
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
@ -76,7 +76,7 @@ final class RemoteClusterConnection implements Closeable {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
@ -241,18 +241,4 @@ final class RemoteClusterConnection implements Closeable {
public boolean shouldRebuildConnection(Settings newSettings) {
return connectionStrategy.shouldRebuildConnection(newSettings);
}
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
return new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
.build();
}
}

View file

@ -113,19 +113,19 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
"cluster.remote.",
"transport.compress",
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();

View file

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -33,20 +34,36 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable {
enum ConnectionStrategy {
SNIFF,
SIMPLE
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings),
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings);
private final int numberOfChannels;
private final Supplier<Stream<Setting.AffixSetting<?>>> enabledSettings;
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enabledSettings) {
this.numberOfChannels = numberOfChannels;
this.enabledSettings = enabledSettings;
}
}
public static final Setting.AffixSetting<ConnectionStrategy> REMOTE_CONNECTION_MODE = Setting.affixKeySetting(
@ -75,6 +92,96 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
connectionManager.getConnectionManager().addListener(this);
}
static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings))
.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
// TODO: Evaluate if we actually need PING channels?
.addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING);
return builder.build();
}
static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportService transportService,
RemoteConnectionManager connectionManager, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
switch (mode) {
case SNIFF:
return new SniffConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
case SIMPLE:
return new SimpleConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
default:
throw new AssertionError("Invalid connection strategy" + mode);
}
}
static Set<String> getRemoteClusters(Settings settings) {
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
.flatMap(strategy -> strategy.enabledSettings.get());
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}
public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
List<String> addresses = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
return addresses.isEmpty() == false;
}
}
private static <T> Stream<String> getClusterAlias(Settings settings, Setting.AffixSetting<T> affixSetting) {
Stream<Setting<T>> allConcreteSettings = affixSetting.getAllConcreteSettings(settings);
return allConcreteSettings.map(affixSetting::getNamespace);
}
static InetSocketAddress parseSeedAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, port);
}
private static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
}
static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse port", e);
}
}
private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}
/**
* Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either
* be queued or rejected and failed.
@ -129,16 +236,6 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
}
}
public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
return false;
}
}
boolean shouldRebuildConnection(Settings newSettings) {
ConnectionStrategy newMode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
if (newMode.equals(strategyType()) == false) {

View file

@ -26,24 +26,51 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import java.util.Iterator;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
/**
* A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
* fashion.
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
"cluster.remote.",
"addresses",
key -> Setting.listSetting(key, Collections.emptyList(), s -> {
// validate address
parsePort(s);
return s;
}, Setting.Property.Dynamic, Setting.Property.NodeScope));
/**
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
*/
public static final Setting.AffixSetting<Integer> REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"simple.socket_connections",
key -> intSetting(key, 18, 1, Setting.Property.Dynamic, Setting.Property.NodeScope));
static final int CHANNELS_PER_CONNECTION = 1;
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class);
private final int maxNumRemoteConnections;
private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
@ -51,9 +78,26 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
private final ConnectionManager.ConnectionValidator clusterNameValidator;
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumRemoteConnections, List<Supplier<TransportAddress>> addresses) {
Settings settings) {
this(
clusterAlias,
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()));
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
super(clusterAlias, transportService, connectionManager);
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.maxNumConnections = maxNumConnections;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
@ -77,9 +121,13 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
}));
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
}
@Override
protected boolean shouldOpenMoreConnections() {
return connectionManager.size() < maxNumRemoteConnections;
return connectionManager.size() < maxNumConnections;
}
@Override
@ -94,10 +142,10 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected void connectImpl(ActionListener<Void> listener) {
performSimpleConnectionProcess(addresses.iterator(), listener);
performSimpleConnectionProcess(listener);
}
private void performSimpleConnectionProcess(Iterator<Supplier<TransportAddress>> addressIter, ActionListener<Void> listener) {
private void performSimpleConnectionProcess(ActionListener<Void> listener) {
openConnections(listener, 1);
}
@ -105,7 +153,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) {
List<TransportAddress> resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList());
int remaining = maxNumRemoteConnections - connectionManager.size();
int remaining = maxNumConnections - connectionManager.size();
ActionListener<Void> compositeListener = new ActionListener<>() {
private final AtomicInteger successfulConnections = new AtomicInteger(0);
@ -158,7 +206,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
+ "]"));
} else {
logger.debug("unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", clusterAlias,
openConnections, maxNumRemoteConnections);
openConnections, maxNumConnections);
finished.onResponse(null);
}
}
@ -169,4 +217,8 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ;
return resolvedAddresses.get(Math.floorMod(curr, resolvedAddresses.size()));
}
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
}
}

View file

@ -33,8 +33,10 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -53,9 +55,75 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class SniffConnectionStrategy extends RemoteConnectionStrategy {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope));
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.seeds",
key -> Setting.listSetting(key,
"_na_".equals(key) ? REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(key)
: REMOTE_CLUSTER_SEEDS_OLD.getConcreteSetting(key.replaceAll("sniff\\.seeds$", "seeds")),
s -> {
// validate seed address
parsePort(s);
return s;
}, Setting.Property.Dynamic, Setting.Property.NodeScope));
/**
* A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in
* the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect
* to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately
* undocumented as it does not work well with all proxies.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"cluster.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTER_SEEDS);
/**
* The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting.AffixSetting<Integer> REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.node_connections",
key -> intSetting(key, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope));
static final int CHANNELS_PER_CONNECTION = 6;
private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class);
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
@ -75,10 +143,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
clusterAlias,
transportService,
connectionManager,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.get(settings),
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
getNodePredicate(settings),
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
@ -100,6 +168,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
this.seedNodes = seedNodes;
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD);
}
@Override
protected boolean shouldOpenMoreConnections() {
return connectionManager.size() < maxNumRemoteConnections;
@ -107,8 +179,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String proxy = RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy);
}
@ -148,10 +220,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
final DiscoveryNode seedNode = seedNodes.next().get();
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
proxyAddress);
final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG);
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
try {
connectionManager.openConnection(seedNode, profile, openConnectionStep);
connectionManager.openConnection(seedNode, null, openConnectionStep);
} catch (Exception e) {
onFailure.accept(e);
}
@ -318,11 +389,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(address));
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(proxyAddress));
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(proxyAddress));
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
@ -353,7 +424,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
return node;
} else {
// resolve proxy address lazy here
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
InetSocketAddress proxyInetAddress = parseSeedAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}

View file

@ -355,7 +355,7 @@ public class TransportSearchActionTests extends ESTestCase {
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
settingsBuilder.put("cluster.remote.remote" + i + ".sniff.seeds", remoteSeedNode.getAddress().toString());
remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
return mockTransportServices;

View file

@ -65,7 +65,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
@ -96,7 +96,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();

View file

@ -52,7 +52,8 @@ public class RemoteClusterClientTests extends ESTestCase {
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.sniff.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
// following two log lines added to investigate #41745, can be removed once issue is closed
@ -80,7 +81,8 @@ public class RemoteClusterClientTests extends ESTestCase {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.sniff.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start();

View file

@ -194,7 +194,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
@ -219,7 +219,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private static List<String> seedNodes(final DiscoveryNode... seedNodes) {
private static List<String> addresses(final DiscoveryNode... seedNodes) {
return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList());
}
@ -234,14 +234,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<String> seedNodes = seedNodes(seedNode1, seedNode);
List<String> seedNodes = addresses(seedNode1, seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes);
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
@ -321,7 +321,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(transport3.getLocalDiscoNode());
knownNodes.add(transport2.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<String> seedNodes = seedNodes(node3, node1, node2);
List<String> seedNodes = addresses(node3, node1, node2);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -421,7 +421,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
@ -469,7 +470,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes);
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
final int numGetThreads = randomIntBetween(4, 10);
final Thread[] getThreads = new Thread[numGetThreads];
@ -570,7 +571,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(connectedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(connectedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, connectionManager)) {
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
for (int i = 0; i < 10; i++) {
@ -595,10 +596,32 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private Settings buildRandomSettings(String clusterAlias, List<String> addresses) {
if (randomBoolean()) {
return buildSimpleSettings(clusterAlias, addresses);
} else {
return buildSniffSettings(clusterAlias, addresses);
}
}
private static Settings buildSimpleSettings(String clusterAlias, List<String> addresses) {
Settings.Builder builder = Settings.builder();
builder.put(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(addresses));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "simple");
return builder.build();
}
private static Settings buildSniffSettings(String clusterAlias, List<String> seedNodes) {
Settings.Builder builder = Settings.builder();
builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "sniff");
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
}
return builder.build();
}
}

View file

@ -25,11 +25,9 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
@ -38,8 +36,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -51,12 +47,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class RemoteClusterServiceTests extends ESTestCase {
@ -83,81 +76,40 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_COMPRESS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS));
}
public void testRemoteClusterSeedSetting() {
// simple validation
Settings settings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seed", "[::1]:9090").build();
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.sniff.seed", "[::1]:9090").build();
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1").build();
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
Settings brokenPortSettings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:123456789123456789").build();
Exception e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings)
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(brokenSettings)
.forEach(setting -> setting.get(brokenPortSettings))
);
assertEquals("failed to parse port", e.getMessage());
}
public void testBuildRemoteClustersDynamicConfig() throws Exception {
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> map =
RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090")
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
.put("cluster.remote.quux.seeds", "quux:9300")
.put("cluster.remote.quux.proxy", "quux-proxy:19300")
.build());
assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux")));
assertThat(map.get("foo").v2(), hasSize(1));
assertThat(map.get("bar").v2(), hasSize(1));
assertThat(map.get("boom").v2(), hasSize(1));
assertThat(map.get("quux").v2(), hasSize(1));
DiscoveryNode foo = map.get("foo").v2().get(0).v2().get();
assertEquals("", map.get("foo").v1());
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode bar = map.get("bar").v2().get(0).v2().get();
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
assertEquals(bar.getId(), "bar#[::1]:9090");
assertEquals("", map.get("bar").v1());
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode boom = map.get("boom").v2().get(0).v2().get();
assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("boom-node1.internal", boom.getHostName());
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
assertEquals("foo.bar.com:1234", map.get("boom").v1());
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode quux = map.get("quux").v2().get(0).v2().get();
assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("quux", quux.getHostName());
assertEquals(quux.getId(), "quux#quux:9300");
assertEquals("quux-proxy:19300", map.get("quux").v1());
assertEquals(quux.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
}
public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
@ -173,8 +125,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -399,7 +351,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
@ -408,7 +360,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule);
boolean compressionEnabled = true;
settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled);
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
settingsChange.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
assertBusy(remoteClusterConnection::isClosed);
@ -453,9 +405,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -520,8 +472,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -591,9 +543,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -723,12 +675,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
{
Settings settings = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
assertEquals("missing required setting [cluster.remote.foo.seeds] for setting [cluster.remote.foo.skip_unavailable]",
assertEquals("missing required setting [cluster.remote.foo.sniff.seeds] for setting [cluster.remote.foo.skip_unavailable]",
iae.getMessage());
}
{
@ -736,10 +688,10 @@ public class RemoteClusterServiceTests extends ESTestCase {
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
service.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean())
.put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.sniff.seeds", seed).build(), true);
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("cluster.remote.foo.seeds", seed).build(),
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
service2.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build(), false);
}
@ -763,7 +715,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
builder.putList("cluster.remote.cluster_test.sniff.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -859,8 +811,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
private static Settings createSettings(String clusterAlias, List<String> seeds) {
Settings.Builder builder = Settings.builder();
builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
}
return builder.build();
}
}

View file

@ -25,8 +25,8 @@ import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD;
import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER;
@ -60,7 +60,7 @@ public class RemoteClusterSettingsTests extends ESTestCase {
public void testSeedsDefault() {
final String alias = randomAlphaOfLength(8);
assertThat(REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
assertThat(REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
}
public void testProxyDefault() {

View file

@ -59,8 +59,6 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
RemoteConnectionStrategy.ConnectionStrategy.SIMPLE);
ConnectionProfile profile = connectionManager.getConnectionProfile();
Settings.Builder newBuilder = Settings.builder();
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple");
if (randomBoolean()) {
@ -72,6 +70,18 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
assertTrue(first.shouldRebuildConnection(newBuilder.build()));
}
public void testCorrectChannelNumber() {
String clusterAlias = "cluster-alias";
String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
Settings simpleSettings = Settings.builder().put(settingKey, "simple").build();
ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings);
assertEquals(1, simpleProfile.getNumConnections());
Settings sniffSettings = Settings.builder().put(settingKey, "sniff").build();
ConnectionProfile sniffProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, sniffSettings);
assertEquals(6, sniffProfile.getNumConnections());
}
private static class FakeConnectionStrategy extends RemoteConnectionStrategy {
private final ConnectionStrategy strategy;

View file

@ -31,7 +31,9 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -40,7 +42,9 @@ import java.util.stream.Collectors;
public class SimpleConnectionStrategyTests extends ESTestCase {
private final String clusterAlias = "cluster-alias";
private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster");
private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
private final Settings settings = Settings.builder().put(modeKey, "simple").build();
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings);
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
@ -60,7 +64,7 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
.put("node.name", id)
.put(settings)
.build();
MockTransportService newService = MockTransportService.createNewService(settings, version, threadPool);
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool);
try {
newService.start();
newService.acceptIncomingRequests();
@ -262,7 +266,38 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
private static List<Supplier<TransportAddress>> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(s -> (Supplier<TransportAddress>) () -> s).collect(Collectors.toList());
public void testSimpleStrategyWillResolveAddressesEachConnect() throws Exception {
try (MockTransportService transport1 = startTransport("seed_node", Version.CURRENT)) {
TransportAddress address = transport1.boundAddress().publishAddress();
CountDownLatch multipleResolveLatch = new CountDownLatch(2);
Supplier<TransportAddress> addressSupplier = () -> {
multipleResolveLatch.countDown();
return address;
};
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address), Collections.singletonList(addressSupplier))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
remoteConnectionManager.getAnyRemoteConnection().close();
assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS));
}
}
}
}
private static List<String> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList());
}
}

View file

@ -60,7 +60,9 @@ import static org.hamcrest.Matchers.equalTo;
public class SniffConnectionStrategyTests extends ESTestCase {
private final String clusterAlias = "cluster-alias";
private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster");
private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
private final Settings settings = Settings.builder().put(modeKey, "sniff").build();
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings);
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
@ -508,8 +510,8 @@ public class SniffConnectionStrategyTests extends ESTestCase {
assertTrue(connectionManager.nodeConnected(discoverableNode));
assertTrue(strategy.assertNoRunningConnections());
Setting<?> seedSetting = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Setting<?> seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Settings noChange = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))

View file

@ -80,6 +80,7 @@ import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
@ -1357,7 +1358,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@ -1388,7 +1389,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY))
.put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());