Fix union types in CCS (#128111)

Currently, union types in CCS is broken. For example, FROM 
*:remote-indices | EVAL port = TO_INT(port) returns all nulls if the
types of the port field conflict. This happens because converters are a
map of the fully qualified cluster:index -name (defined in
MultiTypeEsField), but we are looking up the converter using only the
index name, which leads to a wrong or missing converter on remote
clusters. Our tests didn't catch this because MultiClusterSpecIT
generates the same index for both clusters, allowing the local converter
to be used for remote indices.
This commit is contained in:
Nhat Nguyen 2025-05-19 22:47:30 -07:00 committed by GitHub
parent 3594ade2a9
commit c2561b5cba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 67 additions and 7 deletions

View file

@ -0,0 +1,5 @@
pr: 128111
summary: Fix union types in CCS
area: ES|QL
type: bug
issues: []

View file

@ -67,6 +67,12 @@ public class Clusters {
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT; return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
} }
public static org.elasticsearch.Version bwcVersion() {
org.elasticsearch.Version local = localClusterVersion();
org.elasticsearch.Version remote = remoteClusterVersion();
return local.before(remote) ? local : remote;
}
private static Version distributionVersion(String key) { private static Version distributionVersion(String key) {
final String val = System.getProperty(key); final String val = System.getProperty(key);
return val != null ? Version.fromString(val) : Version.CURRENT; return val != null ? Version.fromString(val) : Version.CURRENT;

View file

@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
private static TestFeatureService remoteFeaturesService; private static TestFeatureService remoteFeaturesService;
private static RestClient remoteClusterClient; private static RestClient remoteClusterClient;
private static DataLocation dataLocation = null;
@ParametersFactory(argumentFormatting = "%2$s.%3$s") @ParametersFactory(argumentFormatting = "%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception { public static List<Object[]> readScriptSpec() throws Exception {
@ -188,8 +189,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
*/ */
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException { static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
RestClient twoClients = mock(RestClient.class); RestClient twoClients = mock(RestClient.class);
assertNotNull("data location was set", dataLocation);
// write to a single cluster for now due to the precision of some functions such as avg and tests related to updates // write to a single cluster for now due to the precision of some functions such as avg and tests related to updates
final RestClient bulkClient = randomFrom(localClient, remoteClient); final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient);
when(twoClients.performRequest(any())).then(invocation -> { when(twoClients.performRequest(any())).then(invocation -> {
Request request = invocation.getArgument(0); Request request = invocation.getArgument(0);
String endpoint = request.getEndpoint(); String endpoint = request.getEndpoint();
@ -214,6 +216,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
return twoClients; return twoClients;
} }
enum DataLocation {
REMOTE_ONLY,
ANY_CLUSTER
}
static Request[] cloneRequests(Request orig, int numClones) throws IOException { static Request[] cloneRequests(Request orig, int numClones) throws IOException {
Request[] clones = new Request[numClones]; Request[] clones = new Request[numClones];
for (int i = 0; i < clones.length; i++) { for (int i = 0; i < clones.length; i++) {
@ -238,6 +245,9 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
* Convert FROM employees ... => FROM *:employees,employees * Convert FROM employees ... => FROM *:employees,employees
*/ */
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) { static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
if (dataLocation == null) {
dataLocation = randomFrom(DataLocation.values());
}
String query = testCase.query; String query = testCase.query;
String[] commands = query.split("\\|"); String[] commands = query.split("\\|");
String first = commands[0].trim(); String first = commands[0].trim();
@ -245,11 +255,15 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
String[] parts = commands[0].split("(?i)metadata"); String[] parts = commands[0].split("(?i)metadata");
assert parts.length >= 1 : parts; assert parts.length >= 1 : parts;
String fromStatement = parts[0]; String fromStatement = parts[0];
String[] localIndices = fromStatement.substring("FROM ".length()).split(","); String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
String remoteIndices = Arrays.stream(localIndices) final String remoteIndices;
if (canUseRemoteIndicesOnly() && randomBoolean()) {
remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
} else {
remoteIndices = Arrays.stream(localIndices)
.map(index -> "*:" + index.trim() + "," + index.trim()) .map(index -> "*:" + index.trim() + "," + index.trim())
.collect(Collectors.joining(",")); .collect(Collectors.joining(","));
}
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
testCase.query = newFrom + query.substring(first.length()); testCase.query = newFrom + query.substring(first.length());
} }
@ -257,7 +271,11 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
String[] parts = commands[0].split("\\s+"); String[] parts = commands[0].split("\\s+");
assert parts.length >= 2 : commands[0]; assert parts.length >= 2 : commands[0];
String[] indices = parts[1].split(","); String[] indices = parts[1].split(",");
parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(",")); if (canUseRemoteIndicesOnly() && randomBoolean()) {
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
} else {
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim() + "," + index.trim()).collect(Collectors.joining(","));
}
String newNewMetrics = String.join(" ", parts); String newNewMetrics = String.join(" ", parts);
testCase.query = newNewMetrics + query.substring(first.length()); testCase.query = newNewMetrics + query.substring(first.length());
} }
@ -274,6 +292,12 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
return testCase; return testCase;
} }
static boolean canUseRemoteIndicesOnly() {
// If the data is indexed only into the remote cluster, we can query only the remote indices.
// However, due to the union types bug in CCS, we must include the local indices in versions without the fix.
return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_9_1_0);
}
static boolean hasIndexMetadata(String query) { static boolean hasIndexMetadata(String query) {
String[] commands = query.split("\\|"); String[] commands = query.split("\\|");
if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) { if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) {

View file

@ -991,4 +991,28 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
remoteClient.admin().indices().prepareRefresh(indexName).get(); remoteClient.admin().indices().prepareRefresh(indexName).get();
} }
public void testMultiTypes() throws Exception {
Client remoteClient = client(REMOTE_CLUSTER_1);
int totalDocs = 0;
for (String type : List.of("integer", "long")) {
String index = "conflict-index-" + type;
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
}
remoteClient.admin().indices().prepareRefresh(index).get();
totalDocs += numDocs;
}
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
try (EsqlQueryResponse resp = runQuery(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(1));
assertThat(values.get(0), hasSize(1));
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
}
}
}
} }

View file

@ -157,7 +157,8 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
MultiTypeEsField unionTypes = findUnionTypes(attr); MultiTypeEsField unionTypes = findUnionTypes(attr);
if (unionTypes != null) { if (unionTypes != null) {
String indexName = shardContext.ctx.index().getName(); // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
return conversion == null return conversion == null
? BlockLoader.CONSTANT_NULLS ? BlockLoader.CONSTANT_NULLS