diff --git a/docs/changelog/128111.yaml b/docs/changelog/128111.yaml new file mode 100644 index 000000000000..d3b113a682d4 --- /dev/null +++ b/docs/changelog/128111.yaml @@ -0,0 +1,5 @@ +pr: 128111 +summary: Fix union types in CCS +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 26e6ec81584c..76b52708b4fa 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -67,6 +67,12 @@ public class Clusters { return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT; } + public static org.elasticsearch.Version bwcVersion() { + org.elasticsearch.Version local = localClusterVersion(); + org.elasticsearch.Version remote = remoteClusterVersion(); + return local.before(remote) ? local : remote; + } + private static Version distributionVersion(String key) { final String val = System.getProperty(key); return val != null ? Version.fromString(val) : Version.CURRENT; diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 71fd9da8d03b..5b46efe42457 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { private static TestFeatureService remoteFeaturesService; private static RestClient remoteClusterClient; + private static DataLocation dataLocation = null; @ParametersFactory(argumentFormatting = "%2$s.%3$s") public static List readScriptSpec() throws Exception { @@ -188,8 +189,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { */ static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException { RestClient twoClients = mock(RestClient.class); + assertNotNull("data location was set", dataLocation); // write to a single cluster for now due to the precision of some functions such as avg and tests related to updates - final RestClient bulkClient = randomFrom(localClient, remoteClient); + final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient); when(twoClients.performRequest(any())).then(invocation -> { Request request = invocation.getArgument(0); String endpoint = request.getEndpoint(); @@ -214,6 +216,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { return twoClients; } + enum DataLocation { + REMOTE_ONLY, + ANY_CLUSTER + } + static Request[] cloneRequests(Request orig, int numClones) throws IOException { Request[] clones = new Request[numClones]; for (int i = 0; i < clones.length; i++) { @@ -238,6 +245,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { * Convert FROM employees ... => FROM *:employees,employees */ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) { + if (dataLocation == null) { + dataLocation = randomFrom(DataLocation.values()); + } String query = testCase.query; String[] commands = query.split("\\|"); String first = commands[0].trim(); @@ -245,11 +255,15 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { String[] parts = commands[0].split("(?i)metadata"); assert parts.length >= 1 : parts; String fromStatement = parts[0]; - String[] localIndices = fromStatement.substring("FROM ".length()).split(","); - String remoteIndices = Arrays.stream(localIndices) - .map(index -> "*:" + index.trim() + "," + index.trim()) - .collect(Collectors.joining(",")); + final String remoteIndices; + if (canUseRemoteIndicesOnly() && randomBoolean()) { + remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); + } else { + remoteIndices = Arrays.stream(localIndices) + .map(index -> "*:" + index.trim() + "," + index.trim()) + .collect(Collectors.joining(",")); + } var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); testCase.query = newFrom + query.substring(first.length()); } @@ -257,7 +271,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { String[] parts = commands[0].split("\\s+"); assert parts.length >= 2 : commands[0]; String[] indices = parts[1].split(","); - parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(",")); + if (canUseRemoteIndicesOnly() && randomBoolean()) { + parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); + } else { + parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim() + "," + index.trim()).collect(Collectors.joining(",")); + } String newNewMetrics = String.join(" ", parts); testCase.query = newNewMetrics + query.substring(first.length()); } @@ -274,6 +292,12 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { return testCase; } + static boolean canUseRemoteIndicesOnly() { + // If the data is indexed only into the remote cluster, we can query only the remote indices. + // However, due to the union types bug in CCS, we must include the local indices in versions without the fix. + return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0); + } + static boolean hasIndexMetadata(String query) { String[] commands = query.split("\\|"); if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index fe5c36aff07e..0b62f3c4930a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -991,4 +991,28 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase { remoteClient.admin().indices().prepareRefresh(indexName).get(); } + public void testMultiTypes() throws Exception { + Client remoteClient = client(REMOTE_CLUSTER_1); + int totalDocs = 0; + for (String type : List.of("integer", "long")) { + String index = "conflict-index-" + type; + assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type)); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get(); + } + remoteClient.admin().indices().prepareRefresh(index).get(); + totalDocs += numDocs; + } + for (String castFunction : List.of("TO_LONG", "TO_INT")) { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)"); + try (EsqlQueryResponse resp = runQuery(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), hasSize(1)); + assertThat(values.get(0).get(0), equalTo((long) totalDocs)); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 0f348e268ef6..31a859d0c195 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -157,7 +157,8 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); MultiTypeEsField unionTypes = findUnionTypes(attr); if (unionTypes != null) { - String indexName = shardContext.ctx.index().getName(); + // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix + String indexName = shardContext.ctx.getFullyQualifiedIndex().getName(); Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); return conversion == null ? BlockLoader.CONSTANT_NULLS