Propagate index errors in field_caps (#70245)

Currently we don't report any exceptions occuring during field_caps requests back to the user.
This PR adds a new failure section to the response which contains exceptions per index. 
In addition the response contains another field, `failed_indices`, with the number of indices that threw
an exception. If all of the requested indices fail, the whole request fails, otherwise the request succeeds 
and it is up to the caller to check for potential errors in the response body.

Closes #68994
This commit is contained in:
Christoph Büscher 2021-04-06 12:02:24 +02:00 committed by GitHub
parent a07d876a93
commit a413ae67e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 631 additions and 79 deletions

View file

@ -85,3 +85,27 @@ setup:
- match: {fields.day_of_week.keyword.type: keyword}
- match: {fields.day_of_week.keyword.searchable: true}
- match: {fields.day_of_week.keyword.aggregatable: true}
---
"Field caps with errors in runtime mappings section throws":
- skip:
version: " - 7.11.99"
reason: Runtime mappings support was added in 7.12
- do:
catch: bad_request
field_caps:
index: test-*
fields: "*"
body:
runtime_mappings:
day_of_week:
type: keyword
script:
source: "bad syntax"
- match: { error.type: "script_exception" }
- match: { error.reason: "compile error" }
- match: { error.script : "bad syntax" }
- match: { error.lang : "painless" }

View file

@ -61,10 +61,13 @@
- is_false: fields.geo.keyword.on_aggregatable_indices
- do:
catch: missing
field_caps:
index: 'my_remote_cluster:some_index_that_doesnt_exist'
fields: [number]
- match: { 'fields': {} } # empty response - this index doesn't exists
- match: { error.type: "index_not_found_exception" }
- match: { error.reason: "no such index [some_index_that_doesnt_exist]" }
- do:
field_caps:
@ -86,6 +89,23 @@
- match: {fields.number.keyword.aggregatable: true}
- match: {fields.number.keyword.type: keyword}
- do:
catch: bad_request
field_caps:
index: 'my_remote_cluster:field_caps_index_1'
fields: [number]
body:
runtime_mappings:
day_of_week:
type: keyword
script:
source: "bad syntax"
- match: { error.type: "script_exception" }
- match: { error.reason: "compile error" }
- match: { error.script : "bad syntax" }
- match: { error.lang : "painless" }
---
"Get field caps from remote cluster with index filter":
- skip:

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.fieldcaps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.fieldcaps.FieldCapabilitiesIT.ExceptionOnRewriteQueryBuilder;
import org.elasticsearch.search.fieldcaps.FieldCapabilitiesIT.ExceptionOnRewriteQueryPlugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
public class CCSFieldCapabilitiesIT extends AbstractMultiClustersTestCase {
@Override
protected Collection<String> remoteClusterAlias() {
return List.of("remote_cluster");
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(ExceptionOnRewriteQueryPlugin.class);
return plugins;
}
public void testFailuresFromRemote() {
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client("remote_cluster");
String localIndex = "local_test";
assertAcked(localClient.admin().indices().prepareCreate(localIndex));
localClient.prepareIndex(localIndex).setId("1").setSource("foo", "bar").get();
localClient.admin().indices().prepareRefresh(localIndex).get();
String remoteErrorIndex = "remote_test_error";
assertAcked(remoteClient.admin().indices().prepareCreate(remoteErrorIndex));
remoteClient.prepareIndex(remoteErrorIndex).setId("2").setSource("foo", "bar").get();
remoteClient.admin().indices().prepareRefresh(remoteErrorIndex).get();
// regular field_caps across clusters
FieldCapabilitiesResponse response = client().prepareFieldCaps("*", "remote_cluster:*").setFields("*").get();
assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder(localIndex, "remote_cluster:" + remoteErrorIndex));
// adding an index filter so remote call should fail
response = client().prepareFieldCaps("*", "remote_cluster:*")
.setFields("*")
.setIndexFilter(new ExceptionOnRewriteQueryBuilder())
.get();
assertThat(response.getIndices()[0], equalTo(localIndex));
assertThat(response.getFailedIndices()[0], equalTo("remote_cluster:*"));
FieldCapabilitiesFailure failure = response.getFailures()
.stream()
.filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:*"))
.findFirst().get();
Exception ex = failure.getException();
assertEquals(RemoteTransportException.class, ex.getClass());
Throwable cause = ExceptionsHelper.unwrapCause(ex);
assertEquals(IllegalArgumentException.class, cause.getClass());
assertEquals("I throw because I choose to.", cause.getMessage());
// if we only query the remote we should get back an exception only
ex = expectThrows(
IllegalArgumentException.class,
() -> client().prepareFieldCaps("remote_cluster:*")
.setFields("*")
.setIndexFilter(new ExceptionOnRewriteQueryBuilder())
.get());
assertEquals("I throw because I choose to.", ex.getMessage());
// add an index that doesn't fail to the remote
assertAcked(remoteClient.admin().indices().prepareCreate("okay_remote_index"));
remoteClient.prepareIndex("okay_remote_index").setId("2").setSource("foo", "bar").get();
remoteClient.admin().indices().prepareRefresh("okay_remote_index").get();
response = client().prepareFieldCaps("*", "remote_cluster:*")
.setFields("*")
.setIndexFilter(new ExceptionOnRewriteQueryBuilder())
.get();
assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder(localIndex, "remote_cluster:okay_remote_index"));
assertThat(response.getFailedIndices()[0], equalTo("remote_cluster:" + remoteErrorIndex));
failure = response.getFailures()
.stream()
.filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:" + remoteErrorIndex))
.findFirst().get();
ex = failure.getException();
assertEquals(RemoteTransportException.class, ex.getClass());
cause = ExceptionsHelper.unwrapCause(ex);
assertEquals(IllegalArgumentException.class, cause.getClass());
assertEquals("I throw because I choose to.", cause.getMessage());
}
}

View file

@ -8,18 +8,28 @@
package org.elasticsearch.search.fieldcaps;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.junit.Before;
import java.io.IOException;
@ -27,15 +37,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsInAnyOrder;
public class FieldCapabilitiesIT extends ESIntegTestCase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
@ -92,7 +107,7 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestMapperPlugin.class);
return List.of(TestMapperPlugin.class, ExceptionOnRewriteQueryPlugin.class);
}
public void testFieldAlias() {
@ -258,6 +273,48 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
}
}
public void testWithRunntimeMappings() throws InterruptedException {
Map<String, Object> runtimeFields = new HashMap<>();
runtimeFields.put("day_of_week", Collections.singletonMap("type", "keyword"));
FieldCapabilitiesResponse response = client().prepareFieldCaps().setFields("*").setRuntimeFields(runtimeFields).get();
Map<String, FieldCapabilities> runtimeField = response.getField("day_of_week");
assertNotNull(runtimeField);
assertEquals("day_of_week", runtimeField.get("keyword").getName());
assertEquals("keyword", runtimeField.get("keyword").getType());
assertTrue(runtimeField.get("keyword").isSearchable());
assertTrue(runtimeField.get("keyword").isAggregatable());
}
public void testFailures() throws InterruptedException {
// in addition to the existing "old_index" and "new_index", create two where the test query throws an error on rewrite
assertAcked(prepareCreate("index1-error"));
assertAcked(prepareCreate("index2-error"));
ensureGreen("index1-error", "index2-error");
FieldCapabilitiesResponse response = client().prepareFieldCaps()
.setFields("*")
.setIndexFilter(new ExceptionOnRewriteQueryBuilder())
.get();
assertEquals(1, response.getFailures().size());
assertEquals(2, response.getFailedIndices().length);
assertThat(response.getFailures().get(0).getIndices(), arrayContainingInAnyOrder("index1-error", "index2-error"));
Exception failure = response.getFailures().get(0).getException();
assertEquals(RemoteTransportException.class, failure.getClass());
assertEquals(IllegalArgumentException.class, failure.getCause().getClass());
assertEquals("I throw because I choose to.", failure.getCause().getMessage());
// the "indices" section should not include failed ones
assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder("old_index", "new_index"));
// if all requested indices failed, we fail the request by throwing the exception
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> client().prepareFieldCaps("index1-error", "index2-error")
.setFields("*")
.setIndexFilter(new ExceptionOnRewriteQueryBuilder())
.get());
assertEquals("I throw because I choose to.", ex.getMessage());
}
private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
assertNotNull(response.getIndices());
Arrays.sort(indices);
@ -265,6 +322,72 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
assertArrayEquals(indices, response.getIndices());
}
/**
* Adds an "exception" query that throws on rewrite if the index name contains the string "error"
*/
public static class ExceptionOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
public ExceptionOnRewriteQueryPlugin() {}
@Override
public List<QuerySpec<?>> getQueries() {
return singletonList(
new QuerySpec<>("exception", ExceptionOnRewriteQueryBuilder::new, p -> new ExceptionOnRewriteQueryBuilder())
);
}
}
static class ExceptionOnRewriteQueryBuilder extends AbstractQueryBuilder<ExceptionOnRewriteQueryBuilder> {
public static final String NAME = "exception";
ExceptionOnRewriteQueryBuilder() {}
ExceptionOnRewriteQueryBuilder(StreamInput in) throws IOException {
super(in);
}
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
SearchExecutionContext searchExecutionContext = queryRewriteContext.convertToSearchExecutionContext();
if (searchExecutionContext != null) {
if (searchExecutionContext.indexMatches("*error*")) {
throw new IllegalArgumentException("I throw because I choose to.");
};
}
return this;
}
@Override
protected void doWriteTo(StreamOutput out) {}
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
builder.endObject();
}
@Override
protected Query doToQuery(SearchExecutionContext context) {
return new MatchAllDocsQuery();
}
@Override
protected boolean doEquals(ExceptionOnRewriteQueryBuilder other) {
return false;
}
@Override
protected int doHashCode() {
return 0;
}
@Override
public String getWriteableName() {
return NAME;
}
}
public static final class TestMapperPlugin extends Plugin implements MapperPlugin {
@Override
public Map<String, MetadataFieldMapper.TypeParser> getMetadataMappers() {

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class FieldCapabilitiesFailure implements Writeable, ToXContentObject {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField FAILURE_FIELD = new ParseField("failure");
private final List<String> indices;
private final Exception exception;
public FieldCapabilitiesFailure(String[] indices, Exception exception) {
this.indices = new ArrayList<>(Arrays.asList(Objects.requireNonNull(indices)));
this.exception = Objects.requireNonNull(exception);
}
public FieldCapabilitiesFailure(StreamInput in) throws IOException {
this.indices = in.readStringList();
this.exception = in.readException();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.startObject(FAILURE_FIELD.getPreferredName());
{
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
}
builder.endObject();
}
builder.endObject();
return builder;
}
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<FieldCapabilitiesFailure, Void> PARSER =
new ConstructingObjectParser<>("field_capabilities_failure", true, a -> {
return new FieldCapabilitiesFailure(((List<String>) a[0]).toArray(String[]::new), (Exception) a[1]);
});
static {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, p.currentToken(), p);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, p.nextToken(), p);
Exception e = ElasticsearchException.failureFromXContent(p);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, p.nextToken(), p);
return e;
},
FAILURE_FIELD
);
}
public static FieldCapabilitiesFailure fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indices);
out.writeException(exception);
}
public String[] getIndices() {
return indices.toArray(String[]::new);
}
public Exception getException() {
return exception;
}
FieldCapabilitiesFailure addIndex(String index) {
this.indices.add(index);
return this;
}
FieldCapabilitiesFailure addIndices(List<String> indices) {
this.indices.addAll(indices);
return this;
}
}

View file

@ -12,6 +12,8 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.index.query.QueryBuilder;
import java.util.Map;
public class FieldCapabilitiesRequestBuilder extends ActionRequestBuilder<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public FieldCapabilitiesRequestBuilder(ElasticsearchClient client,
FieldCapabilitiesAction action,
@ -36,4 +38,9 @@ public class FieldCapabilitiesRequestBuilder extends ActionRequestBuilder<FieldC
request().indexFilter(indexFilter);
return this;
}
public FieldCapabilitiesRequestBuilder setRuntimeFields(Map<String, Object> runtimeFieldSection) {
request().runtimeFields(runtimeFieldSection);
return this;
}
}

View file

@ -36,24 +36,36 @@ import java.util.stream.Collectors;
public class FieldCapabilitiesResponse extends ActionResponse implements ToXContentObject {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField FIELDS_FIELD = new ParseField("fields");
private static final ParseField FAILED_INDICES_FIELD = new ParseField("failed_indices");
private static final ParseField FAILURES_FIELD = new ParseField("failures");
private final String[] indices;
private final Map<String, Map<String, FieldCapabilities>> responseMap;
private final List<FieldCapabilitiesFailure> failures;
private final List<FieldCapabilitiesIndexResponse> indexResponses;
public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> responseMap) {
this(indices, responseMap, Collections.emptyList());
public FieldCapabilitiesResponse(
String[] indices,
Map<String, Map<String, FieldCapabilities>> responseMap,
List<FieldCapabilitiesFailure> failures
) {
this(indices, responseMap, Collections.emptyList(), failures);
}
FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses) {
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses);
public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> responseMap) {
this(indices, responseMap, Collections.emptyList(), Collections.emptyList());
}
FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures);
}
private FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> responseMap,
List<FieldCapabilitiesIndexResponse> indexResponses) {
List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
this.responseMap = Objects.requireNonNull(responseMap);
this.indexResponses = Objects.requireNonNull(indexResponses);
this.indices = indices;
this.failures = failures;
}
public FieldCapabilitiesResponse(StreamInput in) throws IOException {
@ -65,22 +77,26 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
}
this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.failures = in.readList(FieldCapabilitiesFailure::new);
} else {
this.failures = Collections.emptyList();
}
}
/**
* Used for serialization
*/
FieldCapabilitiesResponse() {
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), Collections.emptyList());
}
/**
* Get the concrete list of indices that were requested.
* Get the concrete list of indices that were requested and returned a response.
*/
public String[] getIndices() {
return indices;
}
/**
* Get the concrete list of indices that failed
*/
public String[] getFailedIndices() {
return this.failures.stream().map(FieldCapabilitiesFailure::getIndices).flatMap(s -> Arrays.stream(s)).toArray(String[]::new);
}
/**
* Get the field capabilities map.
@ -89,6 +105,12 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
return responseMap;
}
/**
* Get possible request failures keyed by index name
*/
public List<FieldCapabilitiesFailure> getFailures() {
return failures;
}
/**
* Returns the actual per-index field caps responses
@ -127,6 +149,9 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
}
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
out.writeList(indexResponses);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeList(failures);
}
}
private static void writeField(StreamOutput out, Map<String, FieldCapabilities> map) throws IOException {
@ -141,6 +166,10 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
builder.startObject();
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(FIELDS_FIELD.getPreferredName(), responseMap);
if (this.failures.size() > 0) {
builder.field(FAILED_INDICES_FIELD.getPreferredName(), getFailedIndices().length);
builder.field(FAILURES_FIELD.getPreferredName(), failures);
}
builder.endObject();
return builder;
}
@ -151,19 +180,27 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<FieldCapabilitiesResponse, Void> PARSER =
new ConstructingObjectParser<>("field_capabilities_response", true,
a -> {
List<String> indices = a[0] == null ? Collections.emptyList() : (List<String>) a[0];
return new FieldCapabilitiesResponse(indices.stream().toArray(String[]::new),
((List<Tuple<String, Map<String, FieldCapabilities>>>) a[1]).stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)));
});
new ConstructingObjectParser<>("field_capabilities_response", true, a -> {
Map<String, Map<String, FieldCapabilities>> responseMap = ((List<Tuple<String, Map<String, FieldCapabilities>>>) a[0]).stream()
.collect(Collectors.toMap(Tuple::v1, Tuple::v2));
List<String> indices = a[1] == null ? Collections.emptyList() : (List<String>) a[1];
List<FieldCapabilitiesFailure> failures = a[2] == null
? Collections.emptyList()
: (List<FieldCapabilitiesFailure>) a[2];
return new FieldCapabilitiesResponse(indices.stream().toArray(String[]::new), responseMap, failures);
});
static {
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), INDICES_FIELD);
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> {
Map<String, FieldCapabilities> typeToCapabilities = parseTypeToCapabilities(p, n);
return new Tuple<>(n, typeToCapabilities);
}, FIELDS_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), INDICES_FIELD);
PARSER.declareObjectArray(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FieldCapabilitiesFailure.fromXContent(p),
FAILURES_FIELD
);
}
private static Map<String, FieldCapabilities> parseTypeToCapabilities(XContentParser parser, String name) throws IOException {
@ -187,12 +224,13 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
FieldCapabilitiesResponse that = (FieldCapabilitiesResponse) o;
return Arrays.equals(indices, that.indices) &&
Objects.equals(responseMap, that.responseMap) &&
Objects.equals(indexResponses, that.indexResponses);
Objects.equals(indexResponses, that.indexResponses) &&
Objects.equals(failures, that.failures);
}
@Override
public int hashCode() {
int result = Objects.hash(responseMap, indexResponses);
int result = Objects.hash(responseMap, indexResponses, failures);
result = 31 * result + Arrays.hashCode(indices);
return result;
}

View file

@ -8,6 +8,7 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
@ -19,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.indices.IndicesService;
@ -87,47 +89,55 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
final CountDown completionCounter = new CountDown(totalNumRequest);
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
final ActionListener<List<FieldCapabilitiesIndexResponse>> countDownListener = new ActionListener<>() {
@Override
public void onResponse(List<FieldCapabilitiesIndexResponse> results) {
for (FieldCapabilitiesIndexResponse res : results) {
if (res.canMatch()) {
indexResponses.add(res);
}
}
countDown();
}
final FailureCollector indexFailures = new FailureCollector();
@Override
public void onFailure(Exception e) {
// TODO we should somehow inform the user that we failed
countDown();
}
private void countDown() {
if (completionCounter.countDown()) {
final Runnable countDown = () -> {
if (completionCounter.countDown()) {
List<FieldCapabilitiesFailure> failures = indexFailures.values();
if (indexResponses.size() > 0) {
if (request.isMergeResults()) {
listener.onResponse(merge(indexResponses, request.includeUnmapped()));
listener.onResponse(merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures)));
} else {
listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
listener.onResponse(new FieldCapabilitiesResponse(indexResponses, new ArrayList<>(failures)));
}
} else {
// we have no responses at all, maybe because of errors
if (indexFailures.size() > 0) {
// throw back the first exception
listener.onFailure(failures.iterator().next().getException());
} else {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
}
}
}
};
for (String index : concreteIndices) {
client.executeLocally(TransportFieldCapabilitiesIndexAction.TYPE,
client.executeLocally(
TransportFieldCapabilitiesIndexAction.TYPE,
new FieldCapabilitiesIndexRequest(
request.fields(),
index,
localIndices,
request.indexFilter(),
nowInMillis, request.runtimeFields()
nowInMillis,
request.runtimeFields()
),
ActionListener.wrap(
response -> countDownListener.onResponse(Collections.singletonList(response)),
countDownListener::onFailure
)
new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
if (result.canMatch()) {
indexResponses.add(result);
}
countDown.run();
}
@Override
public void onFailure(Exception e) {
indexFailures.collect(e, index);
countDown.run();
}
}
);
}
@ -145,25 +155,36 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
remoteRequest.runtimeFields(request.runtimeFields());
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
remoteClusterClient.fieldCaps(remoteRequest,
ActionListener.wrap(response -> {
List<FieldCapabilitiesIndexResponse> remotes = new ArrayList<>();
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
remotes.add(new FieldCapabilitiesIndexResponse(
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
indexResponses.add(
new FieldCapabilitiesIndexResponse(
RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()),
resp.get(), resp.canMatch()));
}
countDownListener.onResponse(remotes);
}, countDownListener::onFailure));
resp.get(),
resp.canMatch()
)
);
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
indexFailures.collectRemoteException(ex, clusterAlias, failure.getIndices());
}
countDown.run();
}, ex -> {
indexFailures.collectRemoteException(ex, clusterAlias, originalIndices.indices());
countDown.run();
}
));
}
}
private FieldCapabilitiesResponse merge(List<FieldCapabilitiesIndexResponse> indexResponses, boolean includeUnmapped) {
String[] indices = indexResponses.stream()
.map(FieldCapabilitiesIndexResponse::getIndexName)
.sorted()
.toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<> ();
private FieldCapabilitiesResponse merge(
List<FieldCapabilitiesIndexResponse> indexResponses,
boolean includeUnmapped,
List<FieldCapabilitiesFailure> failures
) {
String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).sorted().toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
for (FieldCapabilitiesIndexResponse response : indexResponses) {
innerMerge(responseMapBuilder, response);
}
@ -180,8 +201,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
}
responseMap.put(entry.getKey(), Collections.unmodifiableMap(typeMap));
}
return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(responseMap));
// de-dup failures
return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(responseMap), failures);
}
private void addUnmappedFields(String[] indices, String field, Map<String, FieldCapabilities.Builder> typeMap) {
@ -211,4 +232,35 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
builder.add(response.getIndexName(), isMetadataField, fieldCap.isSearchable(), fieldCap.isAggregatable(), fieldCap.meta());
}
}
private class FailureCollector {
final Map<Tuple<String, String>, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap(
new HashMap<>()
);
List<FieldCapabilitiesFailure> values() {
return new ArrayList<>(indexFailures.values());
}
void collect(Exception e, String index) {
// we deduplicate exceptions on the underlying causes message and classname
// we unwrap the cause to e.g. group RemoteTransportexceptions coming from different nodes if the cause is the same
Throwable cause = ExceptionsHelper.unwrapCause(e);
Tuple<String, String> groupingKey = new Tuple<String, String>(cause.getMessage(), cause.getClass().getName());
indexFailures.compute(
groupingKey,
(k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] {index}, e) : v.addIndex(index)
);
}
void collectRemoteException(Exception ex, String clusterAlias, String[] remoteIndices) {
for (String failedIndex : remoteIndices) {
collect(ex, RemoteClusterAware.buildRemoteIndexName(clusterAlias, failedIndex));
}
}
int size() {
return this.indexFailures.size();
}
}
}

View file

@ -240,7 +240,12 @@ public class TransportFieldCapabilitiesIndexAction
ShardRouting shardRouting = nextRoutingOrNull();
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
if (lastFailure == null) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, request), lastFailure);
listener.onFailure(lastFailure);
}
} else {
if (lastFailure == null || isShardNotAvailableException(lastFailure)) {
listener.onFailure(new NoShardAvailableActionException(null,

View file

@ -8,9 +8,16 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.ElasticsearchExceptionTests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -23,13 +30,15 @@ public class FieldCapabilitiesResponseTests extends AbstractWireSerializingTestC
@Override
protected FieldCapabilitiesResponse createTestInstance() {
FieldCapabilitiesResponse randomResponse;
List<FieldCapabilitiesIndexResponse> responses = new ArrayList<>();
int numResponse = randomIntBetween(0, 10);
for (int i = 0; i < numResponse; i++) {
responses.add(createRandomIndexResponse());
}
return new FieldCapabilitiesResponse(responses);
randomResponse = new FieldCapabilitiesResponse(responses, Collections.emptyList());
return randomResponse;
}
@Override
@ -91,6 +100,57 @@ public class FieldCapabilitiesResponseTests extends AbstractWireSerializingTestC
FieldCapabilitiesTests.randomFieldCaps(toReplace)));
break;
}
return new FieldCapabilitiesResponse(null, mutatedResponses);
return new FieldCapabilitiesResponse(null, mutatedResponses, Collections.emptyList());
}
public void testFailureSerialization() throws IOException {
FieldCapabilitiesResponse randomResponse = createResponseWithFailures();
FieldCapabilitiesResponse deserialized = copyInstance(randomResponse);
assertThat(deserialized.getIndices(), Matchers.equalTo(randomResponse.getIndices()));
// only match size of failure list and indices, most exceptions don't support 'equals'
List<FieldCapabilitiesFailure> deserializedFailures = deserialized.getFailures();
assertEquals(deserializedFailures.size(), randomResponse.getFailures().size());
int i = 0;
for (FieldCapabilitiesFailure originalFailure : randomResponse.getFailures()) {
FieldCapabilitiesFailure deserializedFaliure = deserializedFailures.get(i);
assertThat(deserializedFaliure.getIndices(), Matchers.equalTo(originalFailure.getIndices()));
i++;
}
}
public void testFailureParsing() throws IOException {
FieldCapabilitiesResponse randomResponse = createResponseWithFailures();
boolean humanReadable = randomBoolean();
XContentType xContentType = randomFrom(XContentType.values());
BytesReference originalBytes = toShuffledXContent(randomResponse, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);
FieldCapabilitiesResponse parsedResponse;
try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
parsedResponse = FieldCapabilitiesResponse.fromXContent(parser);
assertNull(parser.nextToken());
}
assertNotSame(parsedResponse, randomResponse);
assertThat(parsedResponse.getIndices(), Matchers.equalTo(randomResponse.getIndices()));
// only match size of failure list and indices, most exceptions don't support 'equals'
List<FieldCapabilitiesFailure> deserializedFailures = parsedResponse.getFailures();
assertEquals(deserializedFailures.size(), randomResponse.getFailures().size());
int i = 0;
for (FieldCapabilitiesFailure originalFailure : randomResponse.getFailures()) {
FieldCapabilitiesFailure deserializedFaliure = deserializedFailures.get(i);
assertThat(deserializedFaliure.getIndices(), Matchers.equalTo(originalFailure.getIndices()));
i++;
}
}
private FieldCapabilitiesResponse createResponseWithFailures() {
String[] indices = randomArray(randomIntBetween(1, 5), String[]::new, () -> randomAlphaOfLength(5));
List<FieldCapabilitiesFailure> failures = new ArrayList<>();
for (String index : indices) {
if (randomBoolean() || failures.size() == 0) {
failures.add(new FieldCapabilitiesFailure(new String[] {index}, ElasticsearchExceptionTests.randomExceptions().v2()));
} else {
failures.get(failures.size() - 1).addIndex(index);
}
}
return new FieldCapabilitiesResponse(indices, Collections.emptyMap(), failures);
}
}

View file

@ -20,6 +20,7 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
@ -85,7 +86,8 @@ public class MergedFieldCapabilitiesResponseTests extends AbstractSerializingTes
FieldCapabilitiesTests.randomFieldCaps(toReplace)));
break;
}
return new FieldCapabilitiesResponse(null, mutatedResponses);
// TODO pass real list
return new FieldCapabilitiesResponse(null, mutatedResponses, Collections.emptyList());
}
@Override
@ -105,7 +107,7 @@ public class MergedFieldCapabilitiesResponseTests extends AbstractSerializingTes
String generatedResponse = BytesReference.bytes(builder).utf8ToString();
assertEquals((
"{" +
" \"indices\": null," +
" \"indices\": [\"index1\",\"index2\",\"index3\",\"index4\"]," +
" \"fields\": {" +
" \"rating\": { " +
" \"keyword\": {" +
@ -133,15 +135,16 @@ public class MergedFieldCapabilitiesResponseTests extends AbstractSerializingTes
" \"aggregatable\": false" +
" }" +
" }" +
" }" +
" }," +
" \"failed_indices\":2," +
" \"failures\":[" +
" { \"indices\": [\"errorindex\", \"errorindex2\"]," +
" \"failure\" : {\"error\":{\"root_cause\":[{\"type\":\"illegal_argument_exception\"," +
" \"reason\":\"test\"}],\"type\":\"illegal_argument_exception\",\"reason\":\"test\"}}}" +
" ]" +
"}").replaceAll("\\s+", ""), generatedResponse);
}
public void testEmptyResponse() throws IOException {
FieldCapabilitiesResponse testInstance = new FieldCapabilitiesResponse();
assertSerialization(testInstance);
}
private static FieldCapabilitiesResponse createSimpleResponse() {
Map<String, FieldCapabilities> titleCapabilities = new HashMap<>();
titleCapabilities.put("text", new FieldCapabilities("title", "text", false, true, false,
@ -162,6 +165,10 @@ public class MergedFieldCapabilitiesResponseTests extends AbstractSerializingTes
Map<String, Map<String, FieldCapabilities>> responses = new HashMap<>();
responses.put("title", titleCapabilities);
responses.put("rating", ratingCapabilities);
return new FieldCapabilitiesResponse(null, responses);
List<FieldCapabilitiesFailure> failureMap = List.of(
new FieldCapabilitiesFailure(new String[] { "errorindex", "errorindex2" }, new IllegalArgumentException("test"))
);
return new FieldCapabilitiesResponse(new String[] {"index1", "index2", "index3", "index4"}, responses, failureMap);
}
}