From c2561b5cbaed49c7aba0b644bc8b256d2426290c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 May 2025 22:47:30 -0700 Subject: [PATCH] Fix union types in CCS (#128111) Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices. --- docs/changelog/128111.yaml | 5 +++ .../xpack/esql/ccq/Clusters.java | 6 ++++ .../xpack/esql/ccq/MultiClusterSpecIT.java | 36 +++++++++++++++---- .../esql/action/CrossClusterQueryIT.java | 24 +++++++++++++ .../planner/EsPhysicalOperationProviders.java | 3 +- 5 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/128111.yaml diff --git a/docs/changelog/128111.yaml b/docs/changelog/128111.yaml new file mode 100644 index 000000000000..d3b113a682d4 --- /dev/null +++ b/docs/changelog/128111.yaml @@ -0,0 +1,5 @@ +pr: 128111 +summary: Fix union types in CCS +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 26e6ec81584c..76b52708b4fa 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -67,6 +67,12 @@ public class Clusters { return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT; } + public static org.elasticsearch.Version bwcVersion() { + org.elasticsearch.Version local = localClusterVersion(); + org.elasticsearch.Version remote = remoteClusterVersion(); + return local.before(remote) ? local : remote; + } + private static Version distributionVersion(String key) { final String val = System.getProperty(key); return val != null ? Version.fromString(val) : Version.CURRENT; diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 71fd9da8d03b..5b46efe42457 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { private static TestFeatureService remoteFeaturesService; private static RestClient remoteClusterClient; + private static DataLocation dataLocation = null; @ParametersFactory(argumentFormatting = "%2$s.%3$s") public static List readScriptSpec() throws Exception { @@ -188,8 +189,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { */ static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException { RestClient twoClients = mock(RestClient.class); + assertNotNull("data location was set", dataLocation); // write to a single cluster for now due to the precision of some functions such as avg and tests related to updates - final RestClient bulkClient = randomFrom(localClient, remoteClient); + final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient); when(twoClients.performRequest(any())).then(invocation -> { Request request = invocation.getArgument(0); String endpoint = request.getEndpoint(); @@ -214,6 +216,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { return twoClients; } + enum DataLocation { + REMOTE_ONLY, + ANY_CLUSTER + } + static Request[] cloneRequests(Request orig, int numClones) throws IOException { Request[] clones = new Request[numClones]; for (int i = 0; i < clones.length; i++) { @@ -238,6 +245,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { * Convert FROM employees ... => FROM *:employees,employees */ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) { + if (dataLocation == null) { + dataLocation = randomFrom(DataLocation.values()); + } String query = testCase.query; String[] commands = query.split("\\|"); String first = commands[0].trim(); @@ -245,11 +255,15 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { String[] parts = commands[0].split("(?i)metadata"); assert parts.length >= 1 : parts; String fromStatement = parts[0]; - String[] localIndices = fromStatement.substring("FROM ".length()).split(","); - String remoteIndices = Arrays.stream(localIndices) - .map(index -> "*:" + index.trim() + "," + index.trim()) - .collect(Collectors.joining(",")); + final String remoteIndices; + if (canUseRemoteIndicesOnly() && randomBoolean()) { + remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); + } else { + remoteIndices = Arrays.stream(localIndices) + .map(index -> "*:" + index.trim() + "," + index.trim()) + .collect(Collectors.joining(",")); + } var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); testCase.query = newFrom + query.substring(first.length()); } @@ -257,7 +271,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { String[] parts = commands[0].split("\\s+"); assert parts.length >= 2 : commands[0]; String[] indices = parts[1].split(","); - parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(",")); + if (canUseRemoteIndicesOnly() && randomBoolean()) { + parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); + } else { + parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim() + "," + index.trim()).collect(Collectors.joining(",")); + } String newNewMetrics = String.join(" ", parts); testCase.query = newNewMetrics + query.substring(first.length()); } @@ -274,6 +292,12 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { return testCase; } + static boolean canUseRemoteIndicesOnly() { + // If the data is indexed only into the remote cluster, we can query only the remote indices. + // However, due to the union types bug in CCS, we must include the local indices in versions without the fix. + return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0); + } + static boolean hasIndexMetadata(String query) { String[] commands = query.split("\\|"); if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index fe5c36aff07e..0b62f3c4930a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -991,4 +991,28 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase { remoteClient.admin().indices().prepareRefresh(indexName).get(); } + public void testMultiTypes() throws Exception { + Client remoteClient = client(REMOTE_CLUSTER_1); + int totalDocs = 0; + for (String type : List.of("integer", "long")) { + String index = "conflict-index-" + type; + assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type)); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get(); + } + remoteClient.admin().indices().prepareRefresh(index).get(); + totalDocs += numDocs; + } + for (String castFunction : List.of("TO_LONG", "TO_INT")) { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)"); + try (EsqlQueryResponse resp = runQuery(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), hasSize(1)); + assertThat(values.get(0).get(0), equalTo((long) totalDocs)); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 0f348e268ef6..31a859d0c195 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -157,7 +157,8 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); MultiTypeEsField unionTypes = findUnionTypes(attr); if (unionTypes != null) { - String indexName = shardContext.ctx.index().getName(); + // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix + String indexName = shardContext.ctx.getFullyQualifiedIndex().getName(); Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); return conversion == null ? BlockLoader.CONSTANT_NULLS