mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
ESQL: Honor skip_unavailable setting for nonmatching indices errors at planning time (#116348)
Adds support to ES|QL planning time (EsqlSession code) for dealing with non-matching indices and how that relates to the remote cluster skip_unavailable setting and also how to deal with missing indices on the local cluster (if included in the query). For clusters included in an ES|QL query: • `skip_unavailable=true` means: if no data is returned from that cluster (due to cluster-not-connected, no matching indices, a missing concrete index or shard failures during searches), it is not a "fatal" error that causes the entire query to fail. Instead it is just a failure on that particular cluster and partial data should be returned from other clusters. • `skip_unavailable=false` means: if no data is returned from that cluster (for the same reasons enumerated above), then the whole query should fail. This allows users to ensure that data is returned from a "required" cluster. • For the local cluster, ES|QL assumes `allow_no_indices=true` and the skip_unavailable setting does not apply (in part because there is no way for a user to set skip_unavailable for the local cluster) Based on discussions with ES|QL team members, we defined the following rules to be enforced with respect to non-matching index expressions: **Rules enforced at planning time** P1. fail the query if there are no matching indices on any cluster (VerificationException) P2. fail the query if a skip_unavailable:false cluster has no matching indices (the local cluster already has this rule enforced at planning time) P3. fail query if the local cluster has no matching indices and a concrete index was specified **Rules enforced at execution time** For missing concrete (no wildcards present) index expressions: E1. fail the query when it was specified for the local cluster or a skip_unavailable=false remote cluster E2: on skip_unavailable=true clusters: an error fails the query on that cluster, but not the entire query (data from other clusters still returned) **Notes on the rules** P1: this already happens, no new code needed in this PR P2: The reason we need to enforce rule 2 at planning time is that when there are no matching indices from field caps the EsIndex that is created (and passed into IndexResolution.valid) leaves that cluster out of the list, so at execution time it will not attempt to query that cluster at all, so execution time will not catch missing concrete indices. And even if it did get queried at execution time it wouldn't fail on wildcard only indices where none of them matched. P3: Right now `FROM remote:existent,nomatch` does NOT throw a failure (for same reason described in rule 2 above) so that needs to be enforced in this PR. This PR deals with enforcing and testing the planning time rules: P1, P2 and P3. A follow-on PR will address changes needed for handling the execution time rules. **Notes on PR scope** This PR covers nonsecured clusters (`xpack.security.enabled: false`) and security using certs ("RCS1). In my testing I've founding that api-key based security ("RCS2") is not behaving the same, so that work has been deferred to a follow-on PR. Partially addresses https://github.com/elastic/elasticsearch/issues/114531
This commit is contained in:
parent
790f37c5ad
commit
cca7c153de
10 changed files with 1670 additions and 305 deletions
5
docs/changelog/116348.yaml
Normal file
5
docs/changelog/116348.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 116348
|
||||
summary: "ESQL: Honor skip_unavailable setting for nonmatching indices errors at planning time"
|
||||
area: ES|QL
|
||||
type: enhancement
|
||||
issues: [ 114531 ]
|
File diff suppressed because it is too large
Load diff
|
@ -12,22 +12,37 @@ import org.elasticsearch.core.Nullable;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public final class IndexResolution {
|
||||
|
||||
public static IndexResolution valid(EsIndex index, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
|
||||
/**
|
||||
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
|
||||
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
|
||||
* @param unavailableClusters Remote clusters that could not be contacted during planning
|
||||
* @return valid IndexResolution
|
||||
*/
|
||||
public static IndexResolution valid(
|
||||
EsIndex index,
|
||||
Set<String> resolvedIndices,
|
||||
Map<String, FieldCapabilitiesFailure> unavailableClusters
|
||||
) {
|
||||
Objects.requireNonNull(index, "index must not be null if it was found");
|
||||
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
|
||||
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
|
||||
return new IndexResolution(index, null, unavailableClusters);
|
||||
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
|
||||
*/
|
||||
public static IndexResolution valid(EsIndex index) {
|
||||
return valid(index, Collections.emptyMap());
|
||||
return valid(index, index.concreteIndices(), Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static IndexResolution invalid(String invalid) {
|
||||
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
|
||||
return new IndexResolution(null, invalid, Collections.emptyMap());
|
||||
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static IndexResolution notFound(String name) {
|
||||
|
@ -39,12 +54,20 @@ public final class IndexResolution {
|
|||
@Nullable
|
||||
private final String invalid;
|
||||
|
||||
// all indices found by field-caps
|
||||
private final Set<String> resolvedIndices;
|
||||
// remote clusters included in the user's index expression that could not be connected to
|
||||
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;
|
||||
|
||||
private IndexResolution(EsIndex index, @Nullable String invalid, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
|
||||
private IndexResolution(
|
||||
EsIndex index,
|
||||
@Nullable String invalid,
|
||||
Set<String> resolvedIndices,
|
||||
Map<String, FieldCapabilitiesFailure> unavailableClusters
|
||||
) {
|
||||
this.index = index;
|
||||
this.invalid = invalid;
|
||||
this.resolvedIndices = resolvedIndices;
|
||||
this.unavailableClusters = unavailableClusters;
|
||||
}
|
||||
|
||||
|
@ -64,8 +87,8 @@ public final class IndexResolution {
|
|||
}
|
||||
|
||||
/**
|
||||
* Is the index valid for use with ql? Returns {@code false} if the
|
||||
* index wasn't found.
|
||||
* Is the index valid for use with ql?
|
||||
* @return {@code false} if the index wasn't found.
|
||||
*/
|
||||
public boolean isValid() {
|
||||
return invalid == null;
|
||||
|
@ -75,10 +98,17 @@ public final class IndexResolution {
|
|||
* @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias,
|
||||
* value is the {@link FieldCapabilitiesFailure} describing the issue.
|
||||
*/
|
||||
public Map<String, FieldCapabilitiesFailure> getUnavailableClusters() {
|
||||
public Map<String, FieldCapabilitiesFailure> unavailableClusters() {
|
||||
return unavailableClusters;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all indices found by field-caps (regardless of whether they had any mappings)
|
||||
*/
|
||||
public Set<String> resolvedIndices() {
|
||||
return resolvedIndices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null || obj.getClass() != getClass()) {
|
||||
|
@ -87,16 +117,29 @@ public final class IndexResolution {
|
|||
IndexResolution other = (IndexResolution) obj;
|
||||
return Objects.equals(index, other.index)
|
||||
&& Objects.equals(invalid, other.invalid)
|
||||
&& Objects.equals(resolvedIndices, other.resolvedIndices)
|
||||
&& Objects.equals(unavailableClusters, other.unavailableClusters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, invalid, unavailableClusters);
|
||||
return Objects.hash(index, invalid, resolvedIndices, unavailableClusters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return invalid != null ? invalid : index.name();
|
||||
return invalid != null
|
||||
? invalid
|
||||
: "IndexResolution{"
|
||||
+ "index="
|
||||
+ index
|
||||
+ ", invalid='"
|
||||
+ invalid
|
||||
+ '\''
|
||||
+ ", resolvedIndices="
|
||||
+ resolvedIndices
|
||||
+ ", unavailableClusters="
|
||||
+ unavailableClusters
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -251,19 +251,17 @@ public class ComputeService {
|
|||
if (execInfo.isCrossClusterSearch()) {
|
||||
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
|
||||
for (String clusterAlias : execInfo.clusterAliases()) {
|
||||
// took time and shard counts for SKIPPED clusters were added at end of planning, so only update other cases here
|
||||
if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
|
||||
execInfo.swapCluster(
|
||||
clusterAlias,
|
||||
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
|
||||
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
|
||||
.setTotalShards(0)
|
||||
.setSuccessfulShards(0)
|
||||
.setSkippedShards(0)
|
||||
.setFailedShards(0)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
execInfo.swapCluster(clusterAlias, (k, v) -> {
|
||||
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
|
||||
.setTotalShards(0)
|
||||
.setSuccessfulShards(0)
|
||||
.setSkippedShards(0)
|
||||
.setFailedShards(0);
|
||||
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
|
||||
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
|
||||
}
|
||||
return builder.build();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -309,7 +309,7 @@ public class EsqlSession {
|
|||
// resolution to updateExecutionInfo
|
||||
if (indexResolution.isValid()) {
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
|
||||
if (executionInfo.isCrossClusterSearch()
|
||||
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
|
||||
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.core.TimeValue;
|
|||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.xpack.esql.VerificationException;
|
||||
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
|
||||
import org.elasticsearch.xpack.esql.analysis.Analyzer;
|
||||
import org.elasticsearch.xpack.esql.index.IndexResolution;
|
||||
|
@ -33,7 +34,6 @@ class EsqlSessionCCSUtils {
|
|||
|
||||
private EsqlSessionCCSUtils() {}
|
||||
|
||||
// visible for testing
|
||||
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
|
||||
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
|
||||
for (FieldCapabilitiesFailure failure : failures) {
|
||||
|
@ -75,10 +75,10 @@ class EsqlSessionCCSUtils {
|
|||
|
||||
/**
|
||||
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
|
||||
*
|
||||
* <p>
|
||||
* For cases where field-caps had no indices to search and the remotes were unavailable, we
|
||||
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
|
||||
*
|
||||
* <p>
|
||||
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
|
||||
* on any of the requested clusters.
|
||||
*/
|
||||
|
@ -132,7 +132,6 @@ class EsqlSessionCCSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (String clusterAlias : executionInfo.clusterAliases()) {
|
||||
|
@ -181,39 +180,91 @@ class EsqlSessionCCSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
|
||||
Set<String> clustersWithResolvedIndices = new HashSet<>();
|
||||
// determine missing clusters
|
||||
for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
|
||||
for (String indexName : indexResolution.resolvedIndices()) {
|
||||
clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
|
||||
}
|
||||
Set<String> clustersRequested = executionInfo.clusterAliases();
|
||||
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
|
||||
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
|
||||
clustersWithNoMatchingIndices.removeAll(indexResolution.unavailableClusters().keySet());
|
||||
|
||||
/**
|
||||
* Rules enforced at planning time around non-matching indices
|
||||
* P1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere (TODO: document where)
|
||||
* P2. fail query if a skip_unavailable:false cluster has no matching indices (the local cluster already has this rule
|
||||
* enforced at planning time)
|
||||
* P3. fail query if the local cluster has no matching indices and a concrete index was specified
|
||||
*/
|
||||
String fatalErrorMessage = null;
|
||||
/*
|
||||
* These are clusters in the original request that are not present in the field-caps response. They were
|
||||
* specified with an index or indices that do not exist, so the search on that cluster is done.
|
||||
* specified with an index expression matched no indices, so the search on that cluster is done.
|
||||
* Mark it as SKIPPED with 0 shards searched and took=0.
|
||||
*/
|
||||
for (String c : clustersWithNoMatchingIndices) {
|
||||
// TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
|
||||
// they were requested with one or more concrete indices
|
||||
// for now we never mark the local cluster as SKIPPED
|
||||
final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
|
||||
? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
|
||||
: EsqlExecutionInfo.Cluster.Status.SKIPPED;
|
||||
executionInfo.swapCluster(
|
||||
c,
|
||||
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
|
||||
.setTook(new TimeValue(0))
|
||||
.setTotalShards(0)
|
||||
.setSuccessfulShards(0)
|
||||
.setSkippedShards(0)
|
||||
.setFailedShards(0)
|
||||
.build()
|
||||
);
|
||||
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
|
||||
if (missingIndicesIsFatal(c, executionInfo)) {
|
||||
String error = Strings.format(
|
||||
"Unknown index [%s]",
|
||||
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
|
||||
);
|
||||
if (fatalErrorMessage == null) {
|
||||
fatalErrorMessage = error;
|
||||
} else {
|
||||
fatalErrorMessage += "; " + error;
|
||||
}
|
||||
} else {
|
||||
// handles local cluster (when no concrete indices requested) and skip_unavailable=true clusters
|
||||
EsqlExecutionInfo.Cluster.Status status;
|
||||
ShardSearchFailure failure;
|
||||
if (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
|
||||
status = EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
|
||||
failure = null;
|
||||
} else {
|
||||
status = EsqlExecutionInfo.Cluster.Status.SKIPPED;
|
||||
failure = new ShardSearchFailure(new VerificationException("Unknown index [" + indexExpression + "]"));
|
||||
}
|
||||
executionInfo.swapCluster(c, (k, v) -> {
|
||||
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
|
||||
.setTook(new TimeValue(0))
|
||||
.setTotalShards(0)
|
||||
.setSuccessfulShards(0)
|
||||
.setSkippedShards(0)
|
||||
.setFailedShards(0);
|
||||
if (failure != null) {
|
||||
builder.setFailures(List.of(failure));
|
||||
}
|
||||
return builder.build();
|
||||
});
|
||||
}
|
||||
}
|
||||
if (fatalErrorMessage != null) {
|
||||
throw new VerificationException(fatalErrorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
static boolean missingIndicesIsFatal(String clusterAlias, EsqlExecutionInfo executionInfo) {
|
||||
// missing indices on local cluster is fatal only if a concrete index requested
|
||||
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
|
||||
return concreteIndexRequested(executionInfo.getCluster(clusterAlias).getIndexExpression());
|
||||
}
|
||||
return executionInfo.getCluster(clusterAlias).isSkipUnavailable() == false;
|
||||
}
|
||||
|
||||
private static boolean concreteIndexRequested(String indexExpression) {
|
||||
for (String expr : indexExpression.split(",")) {
|
||||
if (expr.charAt(0) == '<' || expr.startsWith("-<")) {
|
||||
// skip date math expressions
|
||||
continue;
|
||||
}
|
||||
if (expr.indexOf('*') < 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
package org.elasticsearch.xpack.esql.session;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
|
@ -143,21 +144,24 @@ public class IndexResolver {
|
|||
fields.put(name, field);
|
||||
}
|
||||
|
||||
Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
|
||||
fieldCapsResponse.getFailures()
|
||||
);
|
||||
|
||||
Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
|
||||
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
|
||||
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
|
||||
}
|
||||
|
||||
boolean allEmpty = true;
|
||||
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
|
||||
allEmpty &= ir.get().isEmpty();
|
||||
}
|
||||
if (allEmpty) {
|
||||
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
|
||||
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Map.of()));
|
||||
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Map.of()), concreteIndices.keySet(), unavailableRemotes);
|
||||
}
|
||||
|
||||
Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
|
||||
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
|
||||
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
|
||||
}
|
||||
EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices);
|
||||
return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()));
|
||||
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), concreteIndices.keySet(), unavailableRemotes);
|
||||
}
|
||||
|
||||
private boolean allNested(List<IndexFieldCapabilities> caps) {
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.transport.NoSeedNodeLeftException;
|
|||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.xpack.esql.VerificationException;
|
||||
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
|
||||
import org.elasticsearch.xpack.esql.core.type.EsField;
|
||||
import org.elasticsearch.xpack.esql.index.EsIndex;
|
||||
|
@ -228,7 +229,8 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
IndexMode.STANDARD
|
||||
)
|
||||
);
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of());
|
||||
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of());
|
||||
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
||||
|
||||
|
@ -266,7 +268,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
IndexMode.STANDARD
|
||||
)
|
||||
);
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of());
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of());
|
||||
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
||||
|
||||
|
@ -293,7 +295,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
|
||||
executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
|
||||
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
|
||||
executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
|
||||
executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true));
|
||||
|
||||
EsIndex esIndex = new EsIndex(
|
||||
"logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*",
|
||||
|
@ -302,7 +304,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
);
|
||||
// remote1 is unavailable
|
||||
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure));
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of(remote1Alias, failure));
|
||||
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
||||
|
||||
|
@ -341,8 +343,12 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
);
|
||||
|
||||
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure));
|
||||
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
||||
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of(remote1Alias, failure));
|
||||
VerificationException ve = expectThrows(
|
||||
VerificationException.class,
|
||||
() -> EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution)
|
||||
);
|
||||
assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -579,4 +585,46 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
|
|||
assertThat(remoteFailures.get(0).reason(), containsString("unable to connect to remote cluster"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testMissingIndicesIsFatal() {
|
||||
String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
|
||||
String remote1Alias = "remote1";
|
||||
String remote2Alias = "remote2";
|
||||
String remote3Alias = "remote3";
|
||||
|
||||
// scenario 1: cluster is skip_unavailable=true - not fatal
|
||||
{
|
||||
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
|
||||
executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
|
||||
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "mylogs1,mylogs2,logs*", true));
|
||||
assertThat(EsqlSessionCCSUtils.missingIndicesIsFatal(remote1Alias, executionInfo), equalTo(false));
|
||||
}
|
||||
|
||||
// scenario 2: cluster is local cluster and had no concrete indices - not fatal
|
||||
{
|
||||
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
|
||||
executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
|
||||
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "mylogs1,mylogs2,logs*", true));
|
||||
assertThat(EsqlSessionCCSUtils.missingIndicesIsFatal(localClusterAlias, executionInfo), equalTo(false));
|
||||
}
|
||||
|
||||
// scenario 3: cluster is local cluster and user specified a concrete index - fatal
|
||||
{
|
||||
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
|
||||
String localIndexExpr = randomFrom("foo*,logs", "logs", "logs,metrics", "bar*,x*,logs", "logs-1,*x*");
|
||||
executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, localIndexExpr, false));
|
||||
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "mylogs1,mylogs2,logs*", true));
|
||||
assertThat(EsqlSessionCCSUtils.missingIndicesIsFatal(localClusterAlias, executionInfo), equalTo(true));
|
||||
}
|
||||
|
||||
// scenario 4: cluster is skip_unavailable=false - always fatal
|
||||
{
|
||||
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
|
||||
executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "*", false));
|
||||
String indexExpr = randomFrom("foo*,logs", "logs", "bar*,x*,logs", "logs-1,*x*", "*");
|
||||
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, indexExpr, false));
|
||||
assertThat(EsqlSessionCCSUtils.missingIndicesIsFatal(remote1Alias, executionInfo), equalTo(true));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,574 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.remotecluster;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.test.cluster.ElasticsearchCluster;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.rules.RuleChain;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
/**
|
||||
* Tests cross-cluster ES|QL queries under RCS1.0 security model for cases where index expressions do not match
|
||||
* to ensure handling of those matches the expected rules defined in EsqlSessionCrossClusterUtils.
|
||||
*/
|
||||
public class CrossClusterEsqlRCS1MissingIndicesIT extends AbstractRemoteClusterSecurityTestCase {
|
||||
|
||||
private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
|
||||
|
||||
static {
|
||||
// remote cluster
|
||||
fulfillingCluster = ElasticsearchCluster.local()
|
||||
.name("fulfilling-cluster")
|
||||
.nodes(1)
|
||||
.module("x-pack-esql")
|
||||
.module("x-pack-enrich")
|
||||
.apply(commonClusterConfig)
|
||||
.setting("remote_cluster.port", "0")
|
||||
.setting("xpack.ml.enabled", "false")
|
||||
.setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
|
||||
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
|
||||
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
|
||||
.setting("xpack.security.authc.token.enabled", "true")
|
||||
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
|
||||
.node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
|
||||
.build();
|
||||
|
||||
// "local" cluster
|
||||
queryCluster = ElasticsearchCluster.local()
|
||||
.name("query-cluster")
|
||||
.module("x-pack-esql")
|
||||
.module("x-pack-enrich")
|
||||
.apply(commonClusterConfig)
|
||||
.setting("xpack.ml.enabled", "false")
|
||||
.setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
|
||||
|
||||
private static final String INDEX1 = "points"; // on local cluster only
|
||||
private static final String INDEX2 = "squares"; // on local and remote clusters
|
||||
|
||||
record ExpectedCluster(String clusterAlias, String indexExpression, String status, Integer totalShards) {}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void assertExpectedClustersForMissingIndicesTests(Map<String, Object> responseMap, List<ExpectedCluster> expected) {
|
||||
Map<String, ?> clusters = (Map<String, ?>) responseMap.get("_clusters");
|
||||
assertThat((int) responseMap.get("took"), greaterThan(0));
|
||||
|
||||
Map<String, ?> detailsMap = (Map<String, ?>) clusters.get("details");
|
||||
assertThat(detailsMap.size(), is(expected.size()));
|
||||
|
||||
assertThat((int) clusters.get("total"), is(expected.size()));
|
||||
assertThat((int) clusters.get("successful"), is((int) expected.stream().filter(ec -> ec.status().equals("successful")).count()));
|
||||
assertThat((int) clusters.get("skipped"), is((int) expected.stream().filter(ec -> ec.status().equals("skipped")).count()));
|
||||
assertThat((int) clusters.get("failed"), is((int) expected.stream().filter(ec -> ec.status().equals("failed")).count()));
|
||||
|
||||
for (ExpectedCluster expectedCluster : expected) {
|
||||
Map<String, ?> clusterDetails = (Map<String, ?>) detailsMap.get(expectedCluster.clusterAlias());
|
||||
String msg = expectedCluster.clusterAlias();
|
||||
|
||||
assertThat(msg, (int) clusterDetails.get("took"), greaterThan(0));
|
||||
assertThat(msg, clusterDetails.get("status"), is(expectedCluster.status()));
|
||||
Map<String, ?> shards = (Map<String, ?>) clusterDetails.get("_shards");
|
||||
if (expectedCluster.totalShards() == null) {
|
||||
assertThat(msg, (int) shards.get("total"), greaterThan(0));
|
||||
} else {
|
||||
assertThat(msg, (int) shards.get("total"), is(expectedCluster.totalShards()));
|
||||
}
|
||||
|
||||
if (expectedCluster.status().equals("successful")) {
|
||||
assertThat((int) shards.get("successful"), is((int) shards.get("total")));
|
||||
assertThat((int) shards.get("skipped"), is(0));
|
||||
|
||||
} else if (expectedCluster.status().equals("skipped")) {
|
||||
assertThat((int) shards.get("successful"), is(0));
|
||||
assertThat((int) shards.get("skipped"), is((int) shards.get("total")));
|
||||
ArrayList<?> failures = (ArrayList<?>) clusterDetails.get("failures");
|
||||
assertThat(failures.size(), is(1));
|
||||
Map<String, ?> failure1 = (Map<String, ?>) failures.get(0);
|
||||
Map<String, ?> innerReason = (Map<String, ?>) failure1.get("reason");
|
||||
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
|
||||
assertThat(innerReason.get("reason").toString(), containsString(expectedMsg));
|
||||
assertThat(innerReason.get("type").toString(), containsString("verification_exception"));
|
||||
|
||||
} else {
|
||||
fail(msg + "; Unexpected status: " + expectedCluster.status());
|
||||
}
|
||||
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
|
||||
assertThat((int) shards.get("failed"), is(0));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() throws Exception {
|
||||
setupRolesAndPrivileges();
|
||||
setupIndex();
|
||||
|
||||
configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, true, randomBoolean(), true);
|
||||
|
||||
// missing concrete local index is an error
|
||||
{
|
||||
String q = Strings.format("FROM nomatch,%s:%s | STATS count(*)", REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index [nomatch]"));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), Matchers.containsString("Unknown index [nomatch]"));
|
||||
}
|
||||
|
||||
// missing concrete remote index is not fatal when skip_unavailable=true (as long as an index matches on another cluster)
|
||||
{
|
||||
String q = Strings.format("FROM %s,%s:nomatch | STATS count(*)", INDEX1, REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
Response response = client().performRequest(esqlRequest(limit1));
|
||||
assertOK(response);
|
||||
|
||||
Map<String, Object> map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), greaterThanOrEqualTo(1));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
new ExpectedCluster("(local)", INDEX1, "successful", null),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch", "skipped", 0)
|
||||
)
|
||||
);
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
response = client().performRequest(esqlRequest(limit0));
|
||||
assertOK(response);
|
||||
|
||||
map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), is(0));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
new ExpectedCluster("(local)", INDEX1, "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch", "skipped", 0)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// since there is at least one matching index in the query, the missing wildcarded local index is not an error
|
||||
{
|
||||
String q = Strings.format("FROM nomatch*,%s:%s", REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
Response response = client().performRequest(esqlRequest(limit1));
|
||||
assertOK(response);
|
||||
|
||||
Map<String, Object> map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), greaterThanOrEqualTo(1));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
// local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
|
||||
new ExpectedCluster("(local)", "nomatch*", "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, INDEX2, "successful", null)
|
||||
)
|
||||
);
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
response = client().performRequest(esqlRequest(limit0));
|
||||
assertOK(response);
|
||||
|
||||
map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), is(0));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
// local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
|
||||
new ExpectedCluster("(local)", "nomatch*", "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, INDEX2, "successful", 0)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// since at least one index of the query matches on some cluster, a wildcarded index on skip_un=true is not an error
|
||||
{
|
||||
String q = Strings.format("FROM %s,%s:nomatch*", INDEX1, REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
Response response = client().performRequest(esqlRequest(limit1));
|
||||
assertOK(response);
|
||||
|
||||
Map<String, Object> map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), greaterThanOrEqualTo(1));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
new ExpectedCluster("(local)", INDEX1, "successful", null),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch*", "skipped", 0)
|
||||
)
|
||||
);
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
response = client().performRequest(esqlRequest(limit0));
|
||||
assertOK(response);
|
||||
|
||||
map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), is(0));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
new ExpectedCluster("(local)", INDEX1, "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch*", "skipped", 0)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// an error is thrown if there are no matching indices at all, even when the cluster is skip_unavailable=true
|
||||
{
|
||||
// with non-matching concrete index
|
||||
String q = Strings.format("FROM %s:nomatch", REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
|
||||
// an error is thrown if there are no matching indices at all, even when the cluster is skip_unavailable=true and the
|
||||
// index was wildcarded
|
||||
{
|
||||
String q = Strings.format("FROM %s:nomatch*", REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch*]", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch*]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
|
||||
// an error is thrown if there are no matching indices at all
|
||||
{
|
||||
String localExpr = randomFrom("nomatch", "nomatch*");
|
||||
String remoteExpr = randomFrom("nomatch", "nomatch*");
|
||||
String q = Strings.format("FROM %s,%s:%s", localExpr, REMOTE_CLUSTER_ALIAS, remoteExpr);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index"));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index"));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr)));
|
||||
}
|
||||
|
||||
// TODO uncomment and test in follow-on PR which does skip_unavailable handling at execution time
|
||||
// {
|
||||
// String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
//
|
||||
// String limit1 = q + " | LIMIT 1";
|
||||
// Response response = client().performRequest(esqlRequest(limit1));
|
||||
// assertOK(response);
|
||||
//
|
||||
// Map<String, Object> map = responseAsMap(response);
|
||||
// assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
// assertThat(((ArrayList<?>) map.get("values")).size(), greaterThanOrEqualTo(1));
|
||||
//
|
||||
// assertExpectedClustersForMissingIndicesTests(map,
|
||||
// List.of(
|
||||
// new ExpectedCluster("(local)", INDEX1, "successful", null),
|
||||
// new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0)
|
||||
// )
|
||||
// );
|
||||
//
|
||||
// String limit0 = q + " | LIMIT 0";
|
||||
// response = client().performRequest(esqlRequest(limit0));
|
||||
// assertOK(response);
|
||||
//
|
||||
// map = responseAsMap(response);
|
||||
// assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
// assertThat(((ArrayList<?>) map.get("values")).size(), is(0));
|
||||
//
|
||||
// assertExpectedClustersForMissingIndicesTests(map,
|
||||
// List.of(
|
||||
// new ExpectedCluster("(local)", INDEX1, "successful", 0),
|
||||
// new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0)
|
||||
// )
|
||||
// );
|
||||
// }
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableFalse() throws Exception {
|
||||
// Remote cluster is closed and skip_unavailable is set to false.
|
||||
// Although the other cluster is open, we expect an Exception.
|
||||
|
||||
setupRolesAndPrivileges();
|
||||
setupIndex();
|
||||
|
||||
configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, true, randomBoolean(), false);
|
||||
|
||||
// missing concrete local index is an error
|
||||
{
|
||||
String q = Strings.format("FROM nomatch,%s:%s | STATS count(*)", REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index [nomatch]"));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), Matchers.containsString("Unknown index [nomatch]"));
|
||||
}
|
||||
|
||||
// missing concrete remote index is not fatal when skip_unavailable=true (as long as an index matches on another cluster)
|
||||
{
|
||||
String q = Strings.format("FROM %s,%s:nomatch | STATS count(*)", INDEX1, REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), Matchers.containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
|
||||
// since there is at least one matching index in the query, the missing wildcarded local index is not an error
|
||||
{
|
||||
String q = Strings.format("FROM nomatch*,%s:%s", REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
Response response = client().performRequest(esqlRequest(limit1));
|
||||
assertOK(response);
|
||||
|
||||
Map<String, Object> map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), greaterThanOrEqualTo(1));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
// local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
|
||||
new ExpectedCluster("(local)", "nomatch*", "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, INDEX2, "successful", null)
|
||||
)
|
||||
);
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
response = client().performRequest(esqlRequest(limit0));
|
||||
assertOK(response);
|
||||
|
||||
map = responseAsMap(response);
|
||||
assertThat(((ArrayList<?>) map.get("columns")).size(), greaterThanOrEqualTo(1));
|
||||
assertThat(((ArrayList<?>) map.get("values")).size(), is(0));
|
||||
|
||||
assertExpectedClustersForMissingIndicesTests(
|
||||
map,
|
||||
List.of(
|
||||
// local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
|
||||
new ExpectedCluster("(local)", "nomatch*", "successful", 0),
|
||||
new ExpectedCluster(REMOTE_CLUSTER_ALIAS, INDEX2, "successful", 0)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// query is fatal since the remote cluster has skip_unavailable=false and has no matching indices
|
||||
{
|
||||
String q = Strings.format("FROM %s,%s:nomatch*", INDEX1, REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch*]", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), Matchers.containsString(Strings.format("Unknown index [%s:nomatch*]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
|
||||
// an error is thrown if there are no matching indices at all
|
||||
{
|
||||
// with non-matching concrete index
|
||||
String q = Strings.format("FROM %s:nomatch", REMOTE_CLUSTER_ALIAS);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
|
||||
// an error is thrown if there are no matching indices at all
|
||||
{
|
||||
String localExpr = randomFrom("nomatch", "nomatch*");
|
||||
String remoteExpr = randomFrom("nomatch", "nomatch*");
|
||||
String q = Strings.format("FROM %s,%s:%s", localExpr, REMOTE_CLUSTER_ALIAS, remoteExpr);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index"));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr)));
|
||||
|
||||
String limit0 = q + " | LIMIT 0";
|
||||
e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
assertThat(e.getMessage(), containsString("Unknown index"));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr)));
|
||||
}
|
||||
|
||||
// error since the remote cluster with skip_unavailable=false specified a concrete index that is not found
|
||||
{
|
||||
String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2);
|
||||
|
||||
String limit1 = q + " | LIMIT 1";
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit1)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("no such index [nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
assertThat(e.getMessage(), containsString(Strings.format("index_not_found_exception", REMOTE_CLUSTER_ALIAS)));
|
||||
|
||||
// TODO: in follow on PR, add support for throwing a VerificationException from this scenario
|
||||
// String limit0 = q + " | LIMIT 0";
|
||||
// e = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(limit0)));
|
||||
// assertThat(e.getMessage(), containsString(Strings.format("Unknown index [%s:nomatch]", REMOTE_CLUSTER_ALIAS)));
|
||||
}
|
||||
}
|
||||
|
||||
private void setupRolesAndPrivileges() throws IOException {
|
||||
var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
|
||||
putUserRequest.setJsonEntity("""
|
||||
{
|
||||
"password": "x-pack-test-password",
|
||||
"roles" : ["remote_search"]
|
||||
}""");
|
||||
assertOK(adminClient().performRequest(putUserRequest));
|
||||
|
||||
var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
|
||||
putRoleOnRemoteClusterRequest.setJsonEntity("""
|
||||
{
|
||||
"indices": [
|
||||
{
|
||||
"names": ["points", "squares"],
|
||||
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"]
|
||||
}
|
||||
],
|
||||
"remote_indices": [
|
||||
{
|
||||
"names": ["points", "squares"],
|
||||
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"],
|
||||
"clusters": ["my_remote_cluster"]
|
||||
}
|
||||
]
|
||||
}""");
|
||||
assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest));
|
||||
}
|
||||
|
||||
private void setupIndex() throws IOException {
|
||||
Request createIndex = new Request("PUT", INDEX1);
|
||||
createIndex.setJsonEntity("""
|
||||
{
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"id": { "type": "integer" },
|
||||
"score": { "type": "integer" }
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
assertOK(client().performRequest(createIndex));
|
||||
|
||||
Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
|
||||
bulkRequest.setJsonEntity("""
|
||||
{ "index": { "_index": "points" } }
|
||||
{ "id": 1, "score": 75}
|
||||
{ "index": { "_index": "points" } }
|
||||
{ "id": 2, "score": 125}
|
||||
{ "index": { "_index": "points" } }
|
||||
{ "id": 3, "score": 100}
|
||||
{ "index": { "_index": "points" } }
|
||||
{ "id": 4, "score": 50}
|
||||
{ "index": { "_index": "points" } }
|
||||
{ "id": 5, "score": 150}
|
||||
""");
|
||||
assertOK(client().performRequest(bulkRequest));
|
||||
|
||||
createIndex = new Request("PUT", INDEX2);
|
||||
createIndex.setJsonEntity("""
|
||||
{
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"num": { "type": "integer" },
|
||||
"square": { "type": "integer" }
|
||||
}
|
||||
}
|
||||
}
|
||||
""");
|
||||
assertOK(client().performRequest(createIndex));
|
||||
|
||||
bulkRequest = new Request("POST", "/_bulk?refresh=true");
|
||||
bulkRequest.setJsonEntity("""
|
||||
{ "index": {"_index": "squares"}}
|
||||
{ "num": 1, "square": 1 }
|
||||
{ "index": {"_index": "squares"}}
|
||||
{ "num": 4, "square": 4 }
|
||||
{ "index": {"_index": "squares"}}
|
||||
{ "num": 3, "square": 9 }
|
||||
{ "index": {"_index": "squares"}}
|
||||
{ "num": 4, "square": 16 }
|
||||
""");
|
||||
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
|
||||
}
|
||||
|
||||
private Request esqlRequest(String query) throws IOException {
|
||||
XContentBuilder body = JsonXContent.contentBuilder();
|
||||
|
||||
body.startObject();
|
||||
body.field("query", query);
|
||||
body.field("include_ccs_metadata", true);
|
||||
body.endObject();
|
||||
|
||||
Request request = new Request("POST", "_query");
|
||||
request.setJsonEntity(org.elasticsearch.common.Strings.toString(body));
|
||||
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -495,7 +495,7 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
|
|||
}
|
||||
|
||||
/**
|
||||
* Note: invalid_remote is "invalid" because it has a bogus API key and the cluster does not exist (cannot be connected to)
|
||||
* Note: invalid_remote is "invalid" because it has a bogus API key
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCrossClusterQueryAgainstInvalidRemote() throws Exception {
|
||||
|
@ -521,13 +521,19 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
|
|||
// invalid remote with local index should return local results
|
||||
{
|
||||
var q = "FROM invalid_remote:employees,employees | SORT emp_id DESC | LIMIT 10";
|
||||
Response response = performRequestWithRemoteSearchUser(esqlRequest(q));
|
||||
// TODO: when skip_unavailable=false for invalid_remote, a fatal exception should be thrown
|
||||
// this does not yet happen because field-caps returns nothing for this cluster, rather
|
||||
// than an error, so the current code cannot detect that error. Follow on PR will handle this.
|
||||
assertLocalOnlyResults(response);
|
||||
if (skipUnavailable) {
|
||||
Response response = performRequestWithRemoteSearchUser(esqlRequest(q));
|
||||
// this does not yet happen because field-caps returns nothing for this cluster, rather
|
||||
// than an error, so the current code cannot detect that error. Follow on PR will handle this.
|
||||
assertLocalOnlyResultsAndSkippedRemote(response);
|
||||
} else {
|
||||
// errors from invalid remote should throw an exception if the cluster is marked with skip_unavailable=false
|
||||
ResponseException error = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(q)));
|
||||
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(400));
|
||||
// TODO: in follow on PR, figure out why this is returning the wrong error - should be "cannot connect to invalid_remote"
|
||||
assertThat(error.getMessage(), containsString("Unknown index [invalid_remote:employees]"));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
var q = "FROM invalid_remote:employees | SORT emp_id DESC | LIMIT 10";
|
||||
// errors from invalid remote should be ignored if the cluster is marked with skip_unavailable=true
|
||||
|
@ -560,10 +566,9 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
|
|||
|
||||
} else {
|
||||
// errors from invalid remote should throw an exception if the cluster is marked with skip_unavailable=false
|
||||
ResponseException error = expectThrows(ResponseException.class, () -> {
|
||||
final Response response1 = performRequestWithRemoteSearchUser(esqlRequest(q));
|
||||
});
|
||||
ResponseException error = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(q)));
|
||||
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(401));
|
||||
// TODO: in follow on PR, figure out why this is returning the wrong error - should be "cannot connect to invalid_remote"
|
||||
assertThat(error.getMessage(), containsString("unable to find apikey"));
|
||||
}
|
||||
}
|
||||
|
@ -1049,7 +1054,7 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void assertLocalOnlyResults(Response response) throws IOException {
|
||||
private void assertLocalOnlyResultsAndSkippedRemote(Response response) throws IOException {
|
||||
assertOK(response);
|
||||
Map<String, Object> responseAsMap = entityAsMap(response);
|
||||
List<?> columns = (List<?>) responseAsMap.get("columns");
|
||||
|
@ -1061,6 +1066,34 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
|
|||
.collect(Collectors.toList());
|
||||
// local results
|
||||
assertThat(flatList, containsInAnyOrder("2", "4", "6", "8", "support", "management", "engineering", "marketing"));
|
||||
Map<String, ?> clusters = (Map<String, ?>) responseAsMap.get("_clusters");
|
||||
|
||||
/*
|
||||
clusters map:
|
||||
{running=0, total=2, details={
|
||||
invalid_remote={_shards={total=0, failed=0, successful=0, skipped=0}, took=176, indices=employees,
|
||||
failures=[{reason={reason=Unable to connect to [invalid_remote], type=connect_transport_exception},
|
||||
index=null, shard=-1}], status=skipped},
|
||||
(local)={_shards={total=1, failed=0, successful=1, skipped=0}, took=298, indices=employees, status=successful}},
|
||||
failed=0, partial=0, successful=1, skipped=1}
|
||||
*/
|
||||
|
||||
assertThat((int) clusters.get("total"), equalTo(2));
|
||||
assertThat((int) clusters.get("successful"), equalTo(1));
|
||||
assertThat((int) clusters.get("skipped"), equalTo(1));
|
||||
|
||||
Map<String, ?> details = (Map<String, ?>) clusters.get("details");
|
||||
Map<String, ?> invalidRemoteMap = (Map<String, ?>) details.get("invalid_remote");
|
||||
assertThat(invalidRemoteMap.get("status").toString(), equalTo("skipped"));
|
||||
List<?> failures = (List<?>) invalidRemoteMap.get("failures");
|
||||
assertThat(failures.size(), equalTo(1));
|
||||
Map<String, ?> failureMap = (Map<String, ?>) failures.get(0);
|
||||
Map<String, ?> reasonMap = (Map<String, ?>) failureMap.get("reason");
|
||||
assertThat(reasonMap.get("reason").toString(), containsString("Unable to connect to [invalid_remote]"));
|
||||
assertThat(reasonMap.get("type").toString(), containsString("connect_transport_exception"));
|
||||
|
||||
Map<String, ?> localCluster = (Map<String, ?>) details.get("(local)");
|
||||
assertThat(localCluster.get("status").toString(), equalTo("successful"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue