{
-
- public static final SyncedFlushAction INSTANCE = new SyncedFlushAction();
- public static final String NAME = "indices:admin/synced_flush";
-
- private SyncedFlushAction() {
- super(NAME, SyncedFlushResponse::new);
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequest.java
deleted file mode 100644
index cb3333354b8e..000000000000
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.admin.indices.flush;
-
-import org.elasticsearch.action.support.broadcast.BroadcastRequest;
-import org.elasticsearch.common.io.stream.StreamInput;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * A synced flush request to sync flush one or more indices. The synced flush process of an index performs a flush
- * and writes the same sync id to primary and all copies.
- *
- * Best created with {@link org.elasticsearch.client.Requests#syncedFlushRequest(String...)}.
- *
- * @see org.elasticsearch.client.Requests#flushRequest(String...)
- * @see org.elasticsearch.client.IndicesAdminClient#syncedFlush(SyncedFlushRequest)
- * @see SyncedFlushResponse
- */
-public class SyncedFlushRequest extends BroadcastRequest {
-
- /**
- * Constructs a new synced flush request against one or more indices. If nothing is provided, all indices will
- * be sync flushed.
- */
- public SyncedFlushRequest(String... indices) {
- super(indices);
- }
-
- public SyncedFlushRequest(StreamInput in) throws IOException {
- super(in);
- }
-
- @Override
- public String toString() {
- return "SyncedFlushRequest{" +
- "indices=" + Arrays.toString(indices) + "}";
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequestBuilder.java
deleted file mode 100644
index aee7c4688bb6..000000000000
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushRequestBuilder.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.admin.indices.flush;
-
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.client.ElasticsearchClient;
-
-public class SyncedFlushRequestBuilder extends ActionRequestBuilder {
-
- public SyncedFlushRequestBuilder(ElasticsearchClient client, SyncedFlushAction action) {
- super(client, action, new SyncedFlushRequest());
- }
-
- public SyncedFlushRequestBuilder setIndices(String[] indices) {
- super.request().indices(indices);
- return this;
- }
-
- public SyncedFlushRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
- super.request().indicesOptions(indicesOptions);
- return this;
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java
deleted file mode 100644
index 5e286b184fec..000000000000
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.action.admin.indices.flush;
-
-import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.util.iterable.Iterables;
-import org.elasticsearch.common.xcontent.ToXContentFragment;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.flush.ShardsSyncedFlushResult;
-import org.elasticsearch.indices.flush.SyncedFlushService;
-import org.elasticsearch.rest.RestStatus;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Collections.unmodifiableMap;
-
-/**
- * The result of performing a sync flush operation on all shards of multiple indices
- */
-public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment {
-
- private final Map> shardsResultPerIndex;
- private final ShardCounts shardCounts;
-
- public SyncedFlushResponse(Map> shardsResultPerIndex) {
- // shardsResultPerIndex is never modified after it is passed to this
- // constructor so this is safe even though shardsResultPerIndex is a
- // ConcurrentHashMap
- this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex);
- this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values()));
- }
-
- public SyncedFlushResponse(StreamInput in) throws IOException {
- super(in);
- shardCounts = new ShardCounts(in);
- Map> tmpShardsResultPerIndex = new HashMap<>();
- int numShardsResults = in.readInt();
- for (int i =0 ; i< numShardsResults; i++) {
- String index = in.readString();
- List shardsSyncedFlushResults = new ArrayList<>();
- int numShards = in.readInt();
- for (int j =0; j< numShards; j++) {
- shardsSyncedFlushResults.add(new ShardsSyncedFlushResult(in));
- }
- tmpShardsResultPerIndex.put(index, shardsSyncedFlushResults);
- }
- shardsResultPerIndex = Collections.unmodifiableMap(tmpShardsResultPerIndex);
- }
-
- /**
- * total number shards, including replicas, both assigned and unassigned
- */
- public int totalShards() {
- return shardCounts.total;
- }
-
- /**
- * total number of shards for which the operation failed
- */
- public int failedShards() {
- return shardCounts.failed;
- }
-
- /**
- * total number of shards which were successfully sync-flushed
- */
- public int successfulShards() {
- return shardCounts.successful;
- }
-
- public RestStatus restStatus() {
- return failedShards() == 0 ? RestStatus.OK : RestStatus.CONFLICT;
- }
-
- public Map> getShardsResultPerIndex() {
- return shardsResultPerIndex;
- }
-
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.startObject(Fields._SHARDS);
- shardCounts.toXContent(builder, params);
- builder.endObject();
- for (Map.Entry> indexEntry : shardsResultPerIndex.entrySet()) {
- List indexResult = indexEntry.getValue();
- builder.startObject(indexEntry.getKey());
- ShardCounts indexShardCounts = calculateShardCounts(indexResult);
- indexShardCounts.toXContent(builder, params);
- if (indexShardCounts.failed > 0) {
- builder.startArray(Fields.FAILURES);
- for (ShardsSyncedFlushResult shardResults : indexResult) {
- if (shardResults.failed()) {
- builder.startObject();
- builder.field(Fields.SHARD, shardResults.shardId().id());
- builder.field(Fields.REASON, shardResults.failureReason());
- builder.endObject();
- continue;
- }
- Map failedShards = shardResults.failedShards();
- for (Map.Entry shardEntry : failedShards.entrySet()) {
- builder.startObject();
- builder.field(Fields.SHARD, shardResults.shardId().id());
- builder.field(Fields.REASON, shardEntry.getValue().failureReason());
- builder.field(Fields.ROUTING, shardEntry.getKey());
- builder.endObject();
- }
- }
- builder.endArray();
- }
- builder.endObject();
- }
- return builder;
- }
-
- static ShardCounts calculateShardCounts(Iterable results) {
- int total = 0, successful = 0, failed = 0;
- for (ShardsSyncedFlushResult result : results) {
- total += result.totalShards();
- successful += result.successfulShards();
- if (result.failed()) {
- // treat all shard copies as failed
- failed += result.totalShards();
- } else {
- // some shards may have failed during the sync phase
- failed += result.failedShards().size();
- }
- }
- return new ShardCounts(total, successful, failed);
- }
-
- static final class ShardCounts implements ToXContentFragment, Writeable {
-
- public final int total;
- public final int successful;
- public final int failed;
-
- ShardCounts(int total, int successful, int failed) {
- this.total = total;
- this.successful = successful;
- this.failed = failed;
- }
-
- ShardCounts(StreamInput in) throws IOException {
- total = in.readInt();
- successful = in.readInt();
- failed = in.readInt();
- }
-
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.field(Fields.TOTAL, total);
- builder.field(Fields.SUCCESSFUL, successful);
- builder.field(Fields.FAILED, failed);
- return builder;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeInt(total);
- out.writeInt(successful);
- out.writeInt(failed);
- }
- }
-
- static final class Fields {
- static final String _SHARDS = "_shards";
- static final String TOTAL = "total";
- static final String SUCCESSFUL = "successful";
- static final String FAILED = "failed";
- static final String FAILURES = "failures";
- static final String SHARD = "shard";
- static final String ROUTING = "routing";
- static final String REASON = "reason";
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- shardCounts.writeTo(out);
- out.writeInt(shardsResultPerIndex.size());
- for (Map.Entry> entry : shardsResultPerIndex.entrySet()) {
- out.writeString(entry.getKey());
- out.writeInt(entry.getValue().size());
- for (ShardsSyncedFlushResult shardsSyncedFlushResult : entry.getValue()) {
- shardsSyncedFlushResult.writeTo(out);
- }
- }
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java
index 077657cc62dd..397ce43747d5 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
+import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
@@ -28,10 +29,16 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@@ -48,6 +55,8 @@ public class TransportShardFlushAction
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
+ transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME,
+ ThreadPool.Names.FLUSH, PreShardSyncedFlushRequest::new, new PreSyncedFlushTransportHandler(indicesService));
}
@Override
@@ -71,4 +80,43 @@ public class TransportShardFlushAction
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
}
+
+ // TODO: Remove this transition in 9.0
+ private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
+
+ private static class PreShardSyncedFlushRequest extends TransportRequest {
+ private final ShardId shardId;
+
+ private PreShardSyncedFlushRequest(StreamInput in) throws IOException {
+ super(in);
+ assert in.getVersion().before(Version.V_8_0_0) : "received pre_sync request from a new node";
+ this.shardId = new ShardId(in);
+ }
+
+ @Override
+ public String toString() {
+ return "PreShardSyncedFlushRequest{" + "shardId=" + shardId + '}';
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ assert false : "must not send pre_sync request from a new node";
+ throw new UnsupportedOperationException("");
+ }
+ }
+
+ private static final class PreSyncedFlushTransportHandler implements TransportRequestHandler {
+ private final IndicesService indicesService;
+
+ PreSyncedFlushTransportHandler(IndicesService indicesService) {
+ this.indicesService = indicesService;
+ }
+
+ @Override
+ public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) {
+ IndexShard indexShard = indicesService.indexServiceSafe(request.shardId.getIndex()).getShard(request.shardId.id());
+ indexShard.flush(new FlushRequest().force(false).waitIfOngoing(true));
+ throw new UnsupportedOperationException("Synced flush was removed and a normal flush was performed instead.");
+ }
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportSyncedFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportSyncedFlushAction.java
deleted file mode 100644
index 3eb72e0b0227..000000000000
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportSyncedFlushAction.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.admin.indices.flush;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.HandledTransportAction;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.indices.flush.SyncedFlushService;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.transport.TransportService;
-
-/**
- * Synced flush Action.
- */
-public class TransportSyncedFlushAction extends HandledTransportAction {
-
- SyncedFlushService syncedFlushService;
-
- @Inject
- public TransportSyncedFlushAction(TransportService transportService, ActionFilters actionFilters,
- SyncedFlushService syncedFlushService) {
- super(SyncedFlushAction.NAME, transportService, actionFilters, SyncedFlushRequest::new);
- this.syncedFlushService = syncedFlushService;
- }
-
- @Override
- protected void doExecute(Task task, SyncedFlushRequest request, ActionListener listener) {
- syncedFlushService.attemptSyncedFlush(request.indices(), request.indicesOptions(), listener);
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java
index 40c4c1046577..36b34a7b24c8 100644
--- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java
+++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java
@@ -42,9 +42,6 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequestBuilder;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
@@ -336,29 +333,6 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
FlushRequestBuilder prepareFlush(String... indices);
- /**
- * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
- *
- * @param request The sync flush request
- * @return A result future
- * @see org.elasticsearch.client.Requests#syncedFlushRequest(String...)
- */
- ActionFuture syncedFlush(SyncedFlushRequest request);
-
- /**
- * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
- *
- * @param request The sync flush request
- * @param listener A listener to be notified with a result
- * @see org.elasticsearch.client.Requests#syncedFlushRequest(String...)
- */
- void syncedFlush(SyncedFlushRequest request, ActionListener listener);
-
- /**
- * Explicitly sync flush one or more indices (write sync id to shards for faster recovery).
- */
- SyncedFlushRequestBuilder prepareSyncedFlush(String... indices);
-
/**
* Explicitly force merge one or more indices into a the number of segments.
*
diff --git a/server/src/main/java/org/elasticsearch/client/Requests.java b/server/src/main/java/org/elasticsearch/client/Requests.java
index 01d04c64ae1b..bec0865bea8a 100644
--- a/server/src/main/java/org/elasticsearch/client/Requests.java
+++ b/server/src/main/java/org/elasticsearch/client/Requests.java
@@ -47,7 +47,6 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@@ -248,17 +247,6 @@ public class Requests {
return new FlushRequest(indices);
}
- /**
- * Creates a synced flush indices request.
- *
- * @param indices The indices to sync flush. Use {@code null} or {@code _all} to execute against all indices
- * @return The synced flush request
- * @see org.elasticsearch.client.IndicesAdminClient#syncedFlush(SyncedFlushRequest)
- */
- public static SyncedFlushRequest syncedFlushRequest(String... indices) {
- return new SyncedFlushRequest(indices);
- }
-
/**
* Creates a force merge request.
*
diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
index 5bb480d8c23c..1ee480fb55ed 100644
--- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
+++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
@@ -163,10 +163,6 @@ import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushAction;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequestBuilder;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
@@ -1351,21 +1347,6 @@ public abstract class AbstractClient implements Client {
return new FlushRequestBuilder(this, FlushAction.INSTANCE).setIndices(indices);
}
- @Override
- public ActionFuture syncedFlush(SyncedFlushRequest request) {
- return execute(SyncedFlushAction.INSTANCE, request);
- }
-
- @Override
- public void syncedFlush(SyncedFlushRequest request, ActionListener listener) {
- execute(SyncedFlushAction.INSTANCE, request, listener);
- }
-
- @Override
- public SyncedFlushRequestBuilder prepareSyncedFlush(String... indices) {
- return new SyncedFlushRequestBuilder(this, SyncedFlushAction.INSTANCE).setIndices(indices);
- }
-
@Override
public void getMappings(GetMappingsRequest request, ActionListener listener) {
execute(GetMappingsAction.INSTANCE, request, listener);
diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java
index 22587cf6aad7..517e2966e41c 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java
@@ -88,13 +88,6 @@ public final class CommitStats implements Writeable, ToXContentFragment {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}
- /**
- * The synced-flush id of the commit if existed.
- */
- public String syncId() {
- return userData.get(InternalEngine.SYNC_COMMIT_ID);
- }
-
/**
* Returns the number of documents in the in this commit
*/
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 5415a433d867..549b3b7a21dc 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1035,12 +1035,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
- public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
- verifyNotClosed();
- logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
- return getEngine().syncFlush(syncId, expectedCommitId);
- }
-
/**
* Executes the given flush request against the engine.
*
diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java
index 7584fda21c32..1214103dd69c 100644
--- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java
+++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java
@@ -61,7 +61,6 @@ import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
-import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.MapperPlugin;
@@ -238,7 +237,6 @@ public class IndicesModule extends AbstractModule {
protected void configure() {
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
- bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportResyncReplicationAction.class).asEagerSingleton();
bind(PrimaryReplicaSyncer.class).asEagerSingleton();
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index e7e6d954aa1c..754635b12124 100644
--- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -68,7 +68,6 @@ import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
@@ -135,7 +134,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
final SearchService searchService,
- final SyncedFlushService syncedFlushService,
final PeerRecoverySourceService peerRecoverySourceService,
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
@@ -151,7 +149,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
nodeMappingRefreshAction,
repositoriesService,
searchService,
- syncedFlushService,
peerRecoverySourceService,
snapshotShardsService,
primaryReplicaSyncer,
@@ -170,7 +167,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
final SearchService searchService,
- final SyncedFlushService syncedFlushService,
final PeerRecoverySourceService peerRecoverySourceService,
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
diff --git a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java
deleted file mode 100644
index 4748c41d4b3e..000000000000
--- a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.indices.flush;
-
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Collections.emptyMap;
-
-/**
- * Result for all copies of a shard
- */
-public class ShardsSyncedFlushResult implements Writeable {
- private String failureReason;
- private Map shardResponses;
- private String syncId;
- private ShardId shardId;
- // some shards may be unassigned, so we need this as state
- private int totalShards;
-
- public ShardsSyncedFlushResult(StreamInput in) throws IOException {
- failureReason = in.readOptionalString();
- int numResponses = in.readInt();
- shardResponses = new HashMap<>();
- for (int i = 0; i < numResponses; i++) {
- ShardRouting shardRouting = new ShardRouting(in);
- SyncedFlushService.ShardSyncedFlushResponse response = SyncedFlushService.ShardSyncedFlushResponse.readSyncedFlushResponse(in);
- shardResponses.put(shardRouting, response);
- }
- syncId = in.readOptionalString();
- shardId = new ShardId(in);
- totalShards = in.readInt();
- }
-
- public ShardId getShardId() {
- return shardId;
- }
-
- /**
- * failure constructor
- */
- public ShardsSyncedFlushResult(ShardId shardId, int totalShards, String failureReason) {
- this.syncId = null;
- this.failureReason = failureReason;
- this.shardResponses = emptyMap();
- this.shardId = shardId;
- this.totalShards = totalShards;
- }
-
- /**
- * success constructor
- */
- public ShardsSyncedFlushResult(ShardId shardId,
- String syncId,
- int totalShards,
- Map shardResponses) {
- this.failureReason = null;
- this.shardResponses = Map.copyOf(shardResponses);
- this.syncId = syncId;
- this.totalShards = totalShards;
- this.shardId = shardId;
- }
-
- /**
- * @return true if the operation failed before reaching step three of synced flush. {@link #failureReason()} can be used for
- * more details
- */
- public boolean failed() {
- return failureReason != null;
- }
-
- /**
- * @return the reason for the failure if synced flush failed before step three of synced flush
- */
- public String failureReason() {
- return failureReason;
- }
-
- public String syncId() {
- return syncId;
- }
-
- /**
- * @return total number of shards for which a sync attempt was made
- */
- public int totalShards() {
- return totalShards;
- }
-
- /**
- * @return total number of successful shards
- */
- public int successfulShards() {
- int i = 0;
- for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) {
- if (result.success()) {
- i++;
- }
- }
- return i;
- }
-
- /**
- * @return an array of shard failures
- */
- public Map failedShards() {
- Map failures = new HashMap<>();
- for (Map.Entry result : shardResponses.entrySet()) {
- if (result.getValue().success() == false) {
- failures.put(result.getKey(), result.getValue());
- }
- }
- return failures;
- }
-
- /**
- * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
- * Empty if synced flush failed before step three.
- */
- public Map shardResponses() {
- return shardResponses;
- }
-
- public ShardId shardId() {
- return shardId;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeOptionalString(failureReason);
- out.writeInt(shardResponses.size());
- for (Map.Entry entry : shardResponses.entrySet()) {
- entry.getKey().writeTo(out);
- entry.getValue().writeTo(out);
- }
- out.writeOptionalString(syncId);
- shardId.writeTo(out);
- out.writeInt(totalShards);
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java
deleted file mode 100644
index a0fc2e153b90..000000000000
--- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.indices.flush;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
-import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.logging.DeprecationLogger;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.CountDown;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.engine.CommitStats;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.ShardNotFoundException;
-import org.elasticsearch.indices.IndexClosedException;
-import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportChannel;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestHandler;
-import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseHandler;
-import org.elasticsearch.transport.TransportService;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-public class SyncedFlushService {
-
- private static final Logger logger = LogManager.getLogger(SyncedFlushService.class);
-
- private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(logger);
-
- public static final String SYNCED_FLUSH_DEPRECATION_MESSAGE =
- "Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.";
-
- private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
- private static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
- private static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";
-
- private final IndicesService indicesService;
- private final ClusterService clusterService;
- private final TransportService transportService;
- private final IndexNameExpressionResolver indexNameExpressionResolver;
-
- @Inject
- public SyncedFlushService(IndicesService indicesService,
- ClusterService clusterService,
- TransportService transportService,
- IndexNameExpressionResolver indexNameExpressionResolver) {
- this.indicesService = indicesService;
- this.clusterService = clusterService;
- this.transportService = transportService;
- this.indexNameExpressionResolver = indexNameExpressionResolver;
- transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, ThreadPool.Names.FLUSH, PreShardSyncedFlushRequest::new,
- new PreSyncedFlushTransportHandler());
- transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ThreadPool.Names.FLUSH, ShardSyncedFlushRequest::new,
- new SyncedFlushTransportHandler());
- transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, ThreadPool.Names.SAME, InFlightOpsRequest::new,
- new InFlightOpCountTransportHandler());
- }
-
- /**
- * a utility method to perform a synced flush for all shards of multiple indices.
- * see {@link #attemptSyncedFlush(ShardId, ActionListener)}
- * for more details.
- */
- public void attemptSyncedFlush(final String[] aliasesOrIndices,
- IndicesOptions indicesOptions,
- final ActionListener listener) {
- final ClusterState state = clusterService.state();
- DEPRECATION_LOGGER.deprecatedAndMaybeLog("synced_flush", SYNCED_FLUSH_DEPRECATION_MESSAGE);
- final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
- final Map> results = ConcurrentCollections.newConcurrentMap();
- int numberOfShards = 0;
- for (Index index : concreteIndices) {
- final IndexMetaData indexMetaData = state.metaData().getIndexSafe(index);
- numberOfShards += indexMetaData.getNumberOfShards();
- results.put(index.getName(), Collections.synchronizedList(new ArrayList<>()));
-
- }
- if (numberOfShards == 0) {
- listener.onResponse(new SyncedFlushResponse(results));
- return;
- }
- final CountDown countDown = new CountDown(numberOfShards);
-
- for (final Index concreteIndex : concreteIndices) {
- final String index = concreteIndex.getName();
- final IndexMetaData indexMetaData = state.metaData().getIndexSafe(concreteIndex);
- final int indexNumberOfShards = indexMetaData.getNumberOfShards();
- for (int shard = 0; shard < indexNumberOfShards; shard++) {
- final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
- innerAttemptSyncedFlush(shardId, state, new ActionListener() {
- @Override
- public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
- results.get(index).add(syncedFlushResult);
- if (countDown.countDown()) {
- listener.onResponse(new SyncedFlushResponse(results));
- }
- }
-
- @Override
- public void onFailure(Exception e) {
- logger.debug("{} unexpected error while executing synced flush", shardId);
- final int totalShards = indexMetaData.getNumberOfReplicas() + 1;
- results.get(index).add(new ShardsSyncedFlushResult(shardId, totalShards, e.getMessage()));
- if (countDown.countDown()) {
- listener.onResponse(new SyncedFlushResponse(results));
- }
- }
- });
- }
- }
- }
-
- /*
- * Tries to flush all copies of a shard and write a sync id to it.
- * After a synced flush two shard copies may only contain the same sync id if they contain the same documents.
- * To ensure this, synced flush works in three steps:
- * 1. Flush all shard copies and gather the commit ids for each copy after the flush
- * 2. Ensure that there are no ongoing indexing operations on the primary
- * 3. Perform an additional flush on each shard copy that writes the sync id
- *
- * Step 3 is only executed on a shard if
- * a) the shard has no uncommitted changes since the last flush
- * b) the last flush was the one executed in 1 (use the collected commit id to verify this)
- *
- * This alone is not enough to ensure that all copies contain the same documents.
- * Without step 2 a sync id would be written for inconsistent copies in the following scenario:
- *
- * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the
- * replicas until sync flush is finished.
- * Step 1 is executed. After the flush the commit points on primary contains a write operation that the replica does not have.
- * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush
- * committed them) and there are no uncommitted changes on the replica (the write operation has not reached the replica yet).
- *
- * Step 2 detects this scenario and fails the whole synced flush if a write operation is ongoing on the primary.
- * Together with the conditions for step 3 (same commit id and no uncommitted changes) this guarantees that a snc id will only
- * be written on a primary if no write operation was executed between step 1 and step 3 and sync id will only be written on
- * the replica if it contains the same changes that the primary contains.
- *
- * Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
- **/
- public void attemptSyncedFlush(final ShardId shardId, final ActionListener actionListener) {
- innerAttemptSyncedFlush(shardId, clusterService.state(), actionListener);
- }
-
- private void innerAttemptSyncedFlush(final ShardId shardId,
- final ClusterState state,
- final ActionListener actionListener) {
- try {
- final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
- final List activeShards = shardRoutingTable.activeShards();
- final int totalShards = shardRoutingTable.getSize();
-
- if (activeShards.size() == 0) {
- actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "no active shards"));
- return;
- }
-
- // 1. send pre-sync flushes to all replicas
- final StepListener