diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java
index 5aa64a5c1375..b08b045d287c 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java
@@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -269,6 +270,28 @@ public final class IndicesClient {
listener, emptySet(), headers);
}
+ /** Initiate a synced flush manually using the synced flush API
+ *
+ * See
+ * Synced flush API on elastic.co
+ */
+ public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced,
+ SyncedFlushResponse::fromXContent, emptySet(), headers);
+ }
+
+ /**
+ * Asynchronously initiate a synced flush manually using the synced flush API
+ *
+ * See
+ * Synced flush API on elastic.co
+ */
+ public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) {
+ restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced,
+ SyncedFlushResponse::fromXContent, listener, emptySet(), headers);
+ }
+
+
/**
* Retrieve the settings of one or more indices
*
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index 6126d59b16a7..1f542736d7dc 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@@ -211,6 +212,14 @@ final class RequestConverters {
return request;
}
+ static Request flushSynced(SyncedFlushRequest syncedFlushRequest) {
+ String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices();
+ Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced"));
+ Params parameters = new Params(request);
+ parameters.withIndicesOptions(syncedFlushRequest.indicesOptions());
+ return request;
+ }
+
static Request forceMerge(ForceMergeRequest forceMergeRequest) {
String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices();
Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge"));
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java
new file mode 100644
index 000000000000..53f3f3358ba2
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentLocation;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser.Token;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+
+public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment {
+
+ public static final String SHARDS_FIELD = "_shards";
+
+ private ShardCounts totalCounts;
+ private Map indexResults;
+
+ SyncedFlushResponse(ShardCounts totalCounts, Map indexResults) {
+ this.totalCounts = new ShardCounts(totalCounts.total, totalCounts.successful, totalCounts.failed);
+ this.indexResults = Collections.unmodifiableMap(indexResults);
+ }
+
+ /**
+ * @return The total number of shard copies that were processed across all indexes
+ */
+ public int totalShards() {
+ return totalCounts.total;
+ }
+
+ /**
+ * @return The number of successful shard copies that were processed across all indexes
+ */
+ public int successfulShards() {
+ return totalCounts.successful;
+ }
+
+ /**
+ * @return The number of failed shard copies that were processed across all indexes
+ */
+ public int failedShards() {
+ return totalCounts.failed;
+ }
+
+ /**
+ * @return A map of results for each index where the keys of the map are the index names
+ * and the values are the results encapsulated in {@link IndexResult}.
+ */
+ public Map getIndexResults() {
+ return indexResults;
+ }
+
+ ShardCounts getShardCounts() {
+ return totalCounts;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(SHARDS_FIELD);
+ totalCounts.toXContent(builder, params);
+ builder.endObject();
+ for (Map.Entry entry: indexResults.entrySet()) {
+ String indexName = entry.getKey();
+ IndexResult indexResult = entry.getValue();
+ builder.startObject(indexName);
+ indexResult.toXContent(builder, params);
+ builder.endObject();
+ }
+ return builder;
+ }
+
+ public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException {
+ ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
+ ShardCounts totalCounts = null;
+ Map indexResults = new HashMap<>();
+ XContentLocation startLoc = parser.getTokenLocation();
+ while (parser.nextToken().equals(Token.FIELD_NAME)) {
+ if (parser.currentName().equals(SHARDS_FIELD)) {
+ ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
+ totalCounts = ShardCounts.fromXContent(parser);
+ } else {
+ String indexName = parser.currentName();
+ IndexResult indexResult = IndexResult.fromXContent(parser);
+ indexResults.put(indexName, indexResult);
+ }
+ }
+ if (totalCounts != null) {
+ return new SyncedFlushResponse(totalCounts, indexResults);
+ } else {
+ throw new ParsingException(
+ startLoc,
+ "Unable to reconstruct object. Total counts for shards couldn't be parsed."
+ );
+ }
+ }
+
+ /**
+ * Encapsulates the number of total successful and failed shard copies
+ */
+ public static final class ShardCounts implements ToXContentFragment {
+
+ public static final String TOTAL_FIELD = "total";
+ public static final String SUCCESSFUL_FIELD = "successful";
+ public static final String FAILED_FIELD = "failed";
+
+ private static final ConstructingObjectParser PARSER =
+ new ConstructingObjectParser<>(
+ "shardcounts",
+ a -> new ShardCounts((Integer) a[0], (Integer) a[1], (Integer) a[2])
+ );
+ static {
+ PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD));
+ PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD));
+ PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD));
+ }
+
+ private int total;
+ private int successful;
+ private int failed;
+
+
+ ShardCounts(int total, int successful, int failed) {
+ this.total = total;
+ this.successful = successful;
+ this.failed = failed;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field(TOTAL_FIELD, total);
+ builder.field(SUCCESSFUL_FIELD, successful);
+ builder.field(FAILED_FIELD, failed);
+ return builder;
+ }
+
+ public static ShardCounts fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+
+ public boolean equals(ShardCounts other) {
+ if (other != null) {
+ return
+ other.total == this.total &&
+ other.successful == this.successful &&
+ other.failed == this.failed;
+ } else {
+ return false;
+ }
+ }
+
+ }
+
+ /**
+ * Description for the flush/synced results for a particular index.
+ * This includes total, successful and failed copies along with failure description for each failed copy.
+ */
+ public static final class IndexResult implements ToXContentFragment {
+
+ public static final String TOTAL_FIELD = "total";
+ public static final String SUCCESSFUL_FIELD = "successful";
+ public static final String FAILED_FIELD = "failed";
+ public static final String FAILURES_FIELD = "failures";
+
+ @SuppressWarnings("unchecked")
+ private static final ConstructingObjectParser PARSER =
+ new ConstructingObjectParser<>(
+ "indexresult",
+ a -> new IndexResult((Integer) a[0], (Integer) a[1], (Integer) a[2], (List)a[3])
+ );
+ static {
+ PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD));
+ PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD));
+ PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD));
+ PARSER.declareObjectArray(optionalConstructorArg(), ShardFailure.PARSER, new ParseField(FAILURES_FIELD));
+ }
+
+ private ShardCounts counts;
+ private List failures;
+
+ IndexResult(int total, int successful, int failed, List failures) {
+ counts = new ShardCounts(total, successful, failed);
+ if (failures != null) {
+ this.failures = Collections.unmodifiableList(failures);
+ } else {
+ this.failures = Collections.unmodifiableList(new ArrayList<>());
+ }
+ }
+
+ /**
+ * @return The total number of shard copies that were processed for this index.
+ */
+ public int totalShards() {
+ return counts.total;
+ }
+
+ /**
+ * @return The number of successful shard copies that were processed for this index.
+ */
+ public int successfulShards() {
+ return counts.successful;
+ }
+
+ /**
+ * @return The number of failed shard copies that were processed for this index.
+ */
+ public int failedShards() {
+ return counts.failed;
+ }
+
+ /**
+ * @return A list of {@link ShardFailure} objects that describe each of the failed shard copies for this index.
+ */
+ public List failures() {
+ return failures;
+ }
+
+ ShardCounts getShardCounts() {
+ return counts;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ counts.toXContent(builder, params);
+ if (failures.size() > 0) {
+ builder.startArray(FAILURES_FIELD);
+ for (ShardFailure failure : failures) {
+ failure.toXContent(builder, params);
+ }
+ builder.endArray();
+ }
+ return builder;
+ }
+
+ public static IndexResult fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+ }
+
+ /**
+ * Description of a failed shard copy for an index.
+ */
+ public static final class ShardFailure implements ToXContentFragment {
+
+ public static String SHARD_ID_FIELD = "shard";
+ public static String FAILURE_REASON_FIELD = "reason";
+ public static String ROUTING_FIELD = "routing";
+
+ private int shardId;
+ private String failureReason;
+ private Map routing;
+
+ @SuppressWarnings("unchecked")
+ static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "shardfailure",
+ a -> new ShardFailure((Integer)a[0], (String)a[1], (Map)a[2])
+ );
+ static {
+ PARSER.declareInt(constructorArg(), new ParseField(SHARD_ID_FIELD));
+ PARSER.declareString(constructorArg(), new ParseField(FAILURE_REASON_FIELD));
+ PARSER.declareObject(
+ optionalConstructorArg(),
+ (parser, c) -> parser.map(),
+ new ParseField(ROUTING_FIELD)
+ );
+ }
+
+ ShardFailure(int shardId, String failureReason, Map routing) {
+ this.shardId = shardId;
+ this.failureReason = failureReason;
+ if (routing != null) {
+ this.routing = Collections.unmodifiableMap(routing);
+ } else {
+ this.routing = Collections.unmodifiableMap(new HashMap<>());
+ }
+ }
+
+ /**
+ * @return Id of the shard whose copy failed
+ */
+ public int getShardId() {
+ return shardId;
+ }
+
+ /**
+ * @return Reason for failure of the shard copy
+ */
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ /**
+ * @return Additional information about the failure.
+ */
+ public Map getRouting() {
+ return routing;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(SHARD_ID_FIELD, shardId);
+ builder.field(FAILURE_REASON_FIELD, failureReason);
+ if (routing.size() > 0) {
+ builder.field(ROUTING_FIELD, routing);
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ public static ShardFailure fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java
index 88e4a2568158..448ff0138d3a 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java
@@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -563,6 +564,39 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
}
}
+ public void testSyncedFlush() throws IOException {
+ {
+ String index = "index";
+ Settings settings = Settings.builder()
+ .put("number_of_shards", 1)
+ .put("number_of_replicas", 0)
+ .build();
+ createIndex(index, settings);
+ SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index);
+ SyncedFlushResponse flushResponse =
+ execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync);
+ assertThat(flushResponse.totalShards(), equalTo(1));
+ assertThat(flushResponse.successfulShards(), equalTo(1));
+ assertThat(flushResponse.failedShards(), equalTo(0));
+ }
+ {
+ String nonExistentIndex = "non_existent_index";
+ assertFalse(indexExists(nonExistentIndex));
+ SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex);
+ ElasticsearchException exception = expectThrows(
+ ElasticsearchException.class,
+ () ->
+ execute(
+ syncedFlushRequest,
+ highLevelClient().indices()::flushSynced,
+ highLevelClient().indices()::flushSyncedAsync
+ )
+ );
+ assertEquals(RestStatus.NOT_FOUND, exception.status());
+ }
+ }
+
+
public void testClearCache() throws IOException {
{
String index = "index";
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index 1573071da337..0f0c1f275881 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@@ -645,6 +646,29 @@ public class RequestConvertersTests extends ESTestCase {
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}
+ public void testSyncedFlush() {
+ String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
+ SyncedFlushRequest syncedFlushRequest;
+ if (randomBoolean()) {
+ syncedFlushRequest = new SyncedFlushRequest(indices);
+ } else {
+ syncedFlushRequest = new SyncedFlushRequest();
+ syncedFlushRequest.indices(indices);
+ }
+ Map expectedParams = new HashMap<>();
+ setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams);
+ Request request = RequestConverters.flushSynced(syncedFlushRequest);
+ StringJoiner endpoint = new StringJoiner("/", "/", "");
+ if (indices != null && indices.length > 0) {
+ endpoint.add(String.join(",", indices));
+ }
+ endpoint.add("_flush/synced");
+ assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
+ assertThat(request.getParameters(), equalTo(expectedParams));
+ assertThat(request.getEntity(), nullValue());
+ assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
+ }
+
public void testForceMerge() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
ForceMergeRequest forceMergeRequest;
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java
new file mode 100644
index 000000000000..bc8fc90dd75e
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import com.carrotsearch.hppc.ObjectIntHashMap;
+import com.carrotsearch.hppc.ObjectIntMap;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.flush.ShardsSyncedFlushResult;
+import org.elasticsearch.indices.flush.SyncedFlushService;
+import org.elasticsearch.test.ESTestCase;
+
+public class SyncedFlushResponseTests extends ESTestCase {
+
+ public void testXContentSerialization() throws IOException {
+ final XContentType xContentType = randomFrom(XContentType.values());
+ TestPlan plan = createTestPlan();
+
+ XContentBuilder serverResponsebuilder = XContentBuilder.builder(xContentType.xContent());
+ assertNotNull(plan.result);
+ serverResponsebuilder.startObject();
+ plan.result.toXContent(serverResponsebuilder, ToXContent.EMPTY_PARAMS);
+ serverResponsebuilder.endObject();
+ XContentBuilder clientResponsebuilder = XContentBuilder.builder(xContentType.xContent());
+ assertNotNull(plan.result);
+ clientResponsebuilder.startObject();
+ plan.clientResult.toXContent(clientResponsebuilder, ToXContent.EMPTY_PARAMS);
+ clientResponsebuilder.endObject();
+ Map serverContentMap = convertFailureListToSet(
+ serverResponsebuilder
+ .generator()
+ .contentType()
+ .xContent()
+ .createParser(
+ xContentRegistry(),
+ LoggingDeprecationHandler.INSTANCE,
+ BytesReference.bytes(serverResponsebuilder).streamInput()
+ ).map()
+ );
+ Map clientContentMap = convertFailureListToSet(
+ clientResponsebuilder
+ .generator()
+ .contentType()
+ .xContent()
+ .createParser(
+ xContentRegistry(),
+ LoggingDeprecationHandler.INSTANCE,
+ BytesReference.bytes(clientResponsebuilder).streamInput()
+ )
+ .map()
+ );
+ assertEquals(serverContentMap, clientContentMap);
+ }
+
+ public void testXContentDeserialization() throws IOException {
+ final XContentType xContentType = randomFrom(XContentType.values());
+ TestPlan plan = createTestPlan();
+ XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
+ builder.startObject();
+ plan.result.toXContent(builder, ToXContent.EMPTY_PARAMS);
+ builder.endObject();
+ XContentParser parser = builder
+ .generator()
+ .contentType()
+ .xContent()
+ .createParser(
+ xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()
+ );
+ SyncedFlushResponse originalResponse = plan.clientResult;
+ SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser);
+ assertNotNull(parsedResponse);
+ assertShardCounts(originalResponse.getShardCounts(), parsedResponse.getShardCounts());
+ for (Map.Entry entry: originalResponse.getIndexResults().entrySet()) {
+ String index = entry.getKey();
+ SyncedFlushResponse.IndexResult responseResult = entry.getValue();
+ SyncedFlushResponse.IndexResult parsedResult = parsedResponse.getIndexResults().get(index);
+ assertNotNull(responseResult);
+ assertNotNull(parsedResult);
+ assertShardCounts(responseResult.getShardCounts(), parsedResult.getShardCounts());
+ assertEquals(responseResult.failures().size(), parsedResult.failures().size());
+ for (SyncedFlushResponse.ShardFailure responseShardFailure: responseResult.failures()) {
+ assertTrue(containsFailure(parsedResult.failures(), responseShardFailure));
+ }
+ }
+ }
+
+ static class TestPlan {
+ SyncedFlushResponse.ShardCounts totalCounts;
+ Map countsPerIndex = new HashMap<>();
+ ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>();
+ org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse result;
+ SyncedFlushResponse clientResult;
+ }
+
+ TestPlan createTestPlan() throws IOException {
+ final TestPlan testPlan = new TestPlan();
+ final Map> indicesResults = new HashMap<>();
+ Map indexResults = new HashMap<>();
+ final XContentType xContentType = randomFrom(XContentType.values());
+ final int indexCount = randomIntBetween(1, 10);
+ int totalShards = 0;
+ int totalSuccessful = 0;
+ int totalFailed = 0;
+ for (int i = 0; i < indexCount; i++) {
+ final String index = "index_" + i;
+ int shards = randomIntBetween(1, 4);
+ int replicas = randomIntBetween(0, 2);
+ int successful = 0;
+ int failed = 0;
+ int failures = 0;
+ List shardsResults = new ArrayList<>();
+ List shardFailures = new ArrayList<>();
+ for (int shard = 0; shard < shards; shard++) {
+ final ShardId shardId = new ShardId(index, "_na_", shard);
+ if (randomInt(5) < 2) {
+ // total shard failure
+ failed += replicas + 1;
+ failures++;
+ shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure"));
+ shardFailures.add(
+ new SyncedFlushResponse.ShardFailure(
+ shardId.id(),
+ "simulated total failure",
+ new HashMap<>()
+ )
+ );
+ } else {
+ Map shardResponses = new HashMap<>();
+ for (int copy = 0; copy < replicas + 1; copy++) {
+ final ShardRouting shardRouting =
+ TestShardRouting.newShardRouting(
+ index, shard, "node_" + shardId + "_" + copy, null,
+ copy == 0, ShardRoutingState.STARTED
+ );
+ if (randomInt(5) < 2) {
+ // shard copy failure
+ failed++;
+ failures++;
+ shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId));
+ // Building the shardRouting map here.
+ XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
+ Map routing =
+ shardRouting.toXContent(builder, ToXContent.EMPTY_PARAMS)
+ .generator()
+ .contentType()
+ .xContent()
+ .createParser(
+ xContentRegistry(), LoggingDeprecationHandler.INSTANCE,
+ BytesReference.bytes(builder).streamInput()
+ )
+ .map();
+ shardFailures.add(
+ new SyncedFlushResponse.ShardFailure(
+ shardId.id(),
+ "copy failure " + shardId,
+ routing
+ )
+ );
+ } else {
+ successful++;
+ shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse());
+ }
+ }
+ shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));
+ }
+ }
+ indicesResults.put(index, shardsResults);
+ indexResults.put(
+ index,
+ new SyncedFlushResponse.IndexResult(
+ shards * (replicas + 1),
+ successful,
+ failed,
+ shardFailures
+ )
+ );
+ testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed));
+ testPlan.expectedFailuresPerIndex.put(index, failures);
+ totalFailed += failed;
+ totalShards += shards * (replicas + 1);
+ totalSuccessful += successful;
+ }
+ testPlan.result = new org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse(indicesResults);
+ testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed);
+ testPlan.clientResult = new SyncedFlushResponse(
+ new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed),
+ indexResults
+ );
+ return testPlan;
+ }
+
+ public boolean containsFailure(List failures, SyncedFlushResponse.ShardFailure origFailure) {
+ for (SyncedFlushResponse.ShardFailure failure: failures) {
+ if (failure.getShardId() == origFailure.getShardId() &&
+ failure.getFailureReason().equals(origFailure.getFailureReason()) &&
+ failure.getRouting().equals(origFailure.getRouting())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) {
+ if (first == null) {
+ assertNull(second);
+ } else {
+ assertTrue(first.equals(second));
+ }
+ }
+
+ public Map convertFailureListToSet(Map input) {
+ Map retMap = new HashMap<>();
+ for (Map.Entry entry: input.entrySet()) {
+ if (entry.getKey().equals(SyncedFlushResponse.SHARDS_FIELD)) {
+ retMap.put(entry.getKey(), entry.getValue());
+ } else {
+ // This was an index entry.
+ @SuppressWarnings("unchecked")
+ Map indexResult = (Map)entry.getValue();
+ Map retResult = new HashMap<>();
+ for (Map.Entry entry2: indexResult.entrySet()) {
+ if (entry2.getKey().equals(SyncedFlushResponse.IndexResult.FAILURES_FIELD)) {
+ @SuppressWarnings("unchecked")
+ List