diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java index 0843c38e4ab6..f9775d333cf7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; @@ -30,11 +29,15 @@ public interface Gateway extends LifecycleComponent { String type(); - void write(MetaData metaData) throws GatewayException; - - MetaData read() throws GatewayException; + void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException; Class suggestIndexGateway(); void reset() throws Exception; + + interface GatewayStateRecoveredListener { + void onSuccess(); + + void onFailure(Throwable t); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 7d933099a8ef..afb961e94104 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -20,14 +20,12 @@ package org.elasticsearch.gateway; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; -import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -36,17 +34,12 @@ import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; -import java.io.IOException; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static java.util.concurrent.Executors.*; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.MetaData.*; -import static org.elasticsearch.common.unit.TimeValue.*; -import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** * @author kimchy (shay.banon) @@ -59,31 +52,23 @@ public class GatewayService extends AbstractLifecycleComponent i private final ThreadPool threadPool; - private volatile ExecutorService executor; - private final ClusterService clusterService; private final DiscoveryService discoveryService; - private final MetaDataCreateIndexService createIndexService; - - private final TimeValue initialStateTimeout; - private final TimeValue recoverAfterTime; private final int recoverAfterNodes; - private final AtomicBoolean readFromGateway = new AtomicBoolean(); + private final AtomicBoolean performedStateRecovery = new AtomicBoolean(); - @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, - ThreadPool threadPool, MetaDataCreateIndexService createIndexService) { + @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool) { super(settings); this.gateway = gateway; this.clusterService = clusterService; this.discoveryService = discoveryService; this.threadPool = threadPool; - this.createIndexService = createIndexService; this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30)); // allow to control a delay of when indices will get created this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null); @@ -92,7 +77,6 @@ public class GatewayService extends AbstractLifecycleComponent i @Override protected void doStart() throws ElasticSearchException { gateway.start(); - this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); // if we received initial state, see if we can recover within the start phase, so we hold the // node from starting until we recovered properly if (discoveryService.initialStateReceived()) { @@ -101,12 +85,12 @@ public class GatewayService extends AbstractLifecycleComponent i if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) { updateClusterStateBlockedOnNotRecovered(); logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); + } else if (recoverAfterTime != null) { + updateClusterStateBlockedOnNotRecovered(); + logger.debug("not recovering from gateway, recover_after_time [{}]", recoverAfterTime); } else { - if (readFromGateway.compareAndSet(false, true)) { - Boolean waited = readFromGateway(initialStateTimeout); - if (waited != null && !waited) { - logger.warn("waited for {} for indices to be created from the gateway, and not all have been created", initialStateTimeout); - } + if (performedStateRecovery.compareAndSet(false, true)) { + performStateRecovery(initialStateTimeout); } } } @@ -118,12 +102,6 @@ public class GatewayService extends AbstractLifecycleComponent i @Override protected void doStop() throws ElasticSearchException { clusterService.remove(this); - executor.shutdown(); - try { - executor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } gateway.stop(); } @@ -141,86 +119,51 @@ public class GatewayService extends AbstractLifecycleComponent i if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) { logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); } else { - if (readFromGateway.compareAndSet(false, true)) { - executor.execute(new Runnable() { + if (performedStateRecovery.compareAndSet(false, true)) { + threadPool.cached().execute(new Runnable() { @Override public void run() { - readFromGateway(null); + performStateRecovery(null); } }); } } - } else { - writeToGateway(event); } } } - private void writeToGateway(final ClusterChangedEvent event) { - if (!event.metaDataChanged()) { - return; - } - executor.execute(new Runnable() { - @Override public void run() { - logger.debug("writing to gateway {} ...", gateway); - StopWatch stopWatch = new StopWatch().start(); - try { - gateway.write(event.state().metaData()); - logger.debug("wrote to gateway {}, took {}", gateway, stopWatch.stop().totalTime()); - // TODO, we need to remember that we failed, maybe add a retry scheduler? - } catch (Exception e) { - logger.error("failed to write to gateway", e); - } + private void performStateRecovery(@Nullable TimeValue timeout) { + final CountDownLatch latch = new CountDownLatch(1); + final Gateway.GatewayStateRecoveredListener recoveryListener = new Gateway.GatewayStateRecoveredListener() { + @Override public void onSuccess() { + markMetaDataAsReadFromGateway("success"); + latch.countDown(); } - }); - } - /** - * Reads from the gateway. If the waitTimeout is set, will wait till all the indices - * have been created from the meta data read from the gateway. Return value only applicable - * when waiting, and indicates that everything was created within teh wait timeout. - */ - private Boolean readFromGateway(@Nullable TimeValue waitTimeout) { - logger.debug("reading state from gateway {} ...", gateway); - StopWatch stopWatch = new StopWatch().start(); - MetaData metaData; - try { - metaData = gateway.read(); - logger.debug("read state from gateway {}, took {}", gateway, stopWatch.stop().totalTime()); - } catch (Exception e) { - logger.error("failed to read from gateway", e); - markMetaDataAsReadFromGateway("failure"); - return false; - } - if (metaData == null) { - logger.debug("no state read from gateway"); - markMetaDataAsReadFromGateway("no state"); - return true; - } - final MetaData fMetaData = metaData; - final CountDownLatch latch = new CountDownLatch(fMetaData.indices().size()); + @Override public void onFailure(Throwable t) { + markMetaDataAsReadFromGateway("failure [" + t.getMessage() + "]"); + latch.countDown(); + } + }; + if (recoverAfterTime != null) { updateClusterStateBlockedOnNotRecovered(); - logger.debug("delaying initial state index creation for [{}]", recoverAfterTime); + logger.debug("delaying initial state recovery for [{}]", recoverAfterTime); threadPool.schedule(new Runnable() { @Override public void run() { - updateClusterStateFromGateway(fMetaData, latch); + gateway.performStateRecovery(recoveryListener); } }, recoverAfterTime); } else { - updateClusterStateFromGateway(fMetaData, latch); + gateway.performStateRecovery(recoveryListener); } - // if we delay indices creation, then waiting for them does not make sense - if (recoverAfterTime != null) { - return null; - } - if (waitTimeout != null) { + + if (timeout != null) { try { - return latch.await(waitTimeout.millis(), TimeUnit.MILLISECONDS); + latch.await(timeout.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - // ignore + throw new ElasticSearchInterruptedException(e.getMessage(), e); } } - return null; } private void markMetaDataAsReadFromGateway(String reason) { @@ -239,44 +182,6 @@ public class GatewayService extends AbstractLifecycleComponent i }); } - private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) { - clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder metaDataBuilder = newMetaDataBuilder() - .metaData(currentState.metaData()); - // mark the metadata as read from gateway - metaDataBuilder.markAsRecoveredFromGateway(); - return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); - } - - @Override public void clusterStateProcessed(ClusterState clusterState) { - // go over the meta data and create indices, we don't really need to copy over - // the meta data per index, since we create the index and it will be added automatically - for (final IndexMetaData indexMetaData : fMetaData) { - try { - createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() { - @Override public void onResponse(MetaDataCreateIndexService.Response response) { - latch.countDown(); - } - - @Override public void onFailure(Throwable t) { - logger.error("failed to create index [{}]", indexMetaData.index(), t); - } - }); - } catch (IOException e) { - logger.error("failed to create index [{}]", indexMetaData.index(), e); - } - } - clusterService.submitStateUpdateTask("gateway (remove block)", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NOT_RECOVERED_FROM_GATEWAY_BLOCK); - return newClusterStateBuilder().state(currentState).blocks(blocks).build(); - } - }); - } - }); - } - private void updateClusterStateBlockedOnNotRecovered() { clusterService.submitStateUpdateTask("gateway (block: not recovered from gateway)", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 869fa1000ca2..13a7694e54ce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -19,12 +19,12 @@ package org.elasticsearch.gateway.blobstore; -import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -32,8 +32,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; -import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.shared.SharedStorageGateway; import javax.annotation.Nullable; import java.io.ByteArrayInputStream; @@ -42,7 +42,7 @@ import java.io.IOException; /** * @author kimchy (shay.banon) */ -public abstract class BlobStoreGateway extends AbstractLifecycleComponent implements Gateway { +public abstract class BlobStoreGateway extends SharedStorageGateway { private BlobStore blobStore; @@ -54,8 +54,8 @@ public abstract class BlobStoreGateway extends AbstractLifecycleComponent implements @Override protected void doClose() throws ElasticSearchException { } - @Override public void write(MetaData metaData) throws GatewayException { - - } - - @Override public MetaData read() throws GatewayException { - return null; + @Override public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException { + listener.onSuccess(); } @Override public Class suggestIndexGateway() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java new file mode 100644 index 000000000000..fe2f94a9b2b5 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.shared; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.GatewayException; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.Executors.*; +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.common.unit.TimeValue.*; +import static org.elasticsearch.common.util.concurrent.EsExecutors.*; + +/** + * @author kimchy (shay.banon) + */ +public abstract class SharedStorageGateway extends AbstractLifecycleComponent implements Gateway, ClusterStateListener { + + private final ClusterService clusterService; + + private final MetaDataCreateIndexService createIndexService; + + private volatile boolean performedStateRecovery = false; + + private volatile ExecutorService executor; + + public SharedStorageGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService) { + super(settings); + this.clusterService = clusterService; + this.createIndexService = createIndexService; + } + + @Override protected void doStart() throws ElasticSearchException { + this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); + clusterService.add(this); + } + + @Override protected void doStop() throws ElasticSearchException { + clusterService.remove(this); + executor.shutdown(); + try { + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } + } + + @Override protected void doClose() throws ElasticSearchException { + } + + @Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException { + performedStateRecovery = true; + executor.execute(new Runnable() { + @Override public void run() { + logger.debug("reading state from gateway {} ...", this); + StopWatch stopWatch = new StopWatch().start(); + MetaData metaData; + try { + metaData = read(); + logger.debug("read state from gateway {}, took {}", this, stopWatch.stop().totalTime()); + if (metaData == null) { + logger.debug("no state read from gateway"); + listener.onSuccess(); + } else { + updateClusterStateFromGateway(metaData, listener); + } + } catch (Exception e) { + logger.error("failed to read from gateway", e); + listener.onFailure(e); + } + } + }); + } + + @Override public void clusterChanged(final ClusterChangedEvent event) { + if (!lifecycle.started()) { + return; + } + if (!performedStateRecovery) { + return; + } + if (event.localNodeMaster()) { + if (!event.metaDataChanged()) { + return; + } + executor.execute(new Runnable() { + @Override public void run() { + logger.debug("writing to gateway {} ...", this); + StopWatch stopWatch = new StopWatch().start(); + try { + write(event.state().metaData()); + logger.debug("wrote to gateway {}, took {}", this, stopWatch.stop().totalTime()); + // TODO, we need to remember that we failed, maybe add a retry scheduler? + } catch (Exception e) { + logger.error("failed to write to gateway", e); + } + } + }); + } + } + + private void updateClusterStateFromGateway(final MetaData fMetaData, final GatewayStateRecoveredListener listener) { + final AtomicInteger indicesCounter = new AtomicInteger(fMetaData.indices().size()); + clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder metaDataBuilder = newMetaDataBuilder() + .metaData(currentState.metaData()); + // mark the metadata as read from gateway + metaDataBuilder.markAsRecoveredFromGateway(); + return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + // go over the meta data and create indices, we don't really need to copy over + // the meta data per index, since we create the index and it will be added automatically + for (final IndexMetaData indexMetaData : fMetaData) { + try { + createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappingsCompressed(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() { + @Override public void onResponse(MetaDataCreateIndexService.Response response) { + if (indicesCounter.decrementAndGet() == 0) { + listener.onSuccess(); + } + } + + @Override public void onFailure(Throwable t) { + logger.error("failed to create index [{}]", indexMetaData.index(), t); + } + }); + } catch (IOException e) { + logger.error("failed to create index [{}]", indexMetaData.index(), e); + } + } + } + }); + } + + protected abstract MetaData read() throws ElasticSearchException; + + protected abstract void write(MetaData metaData) throws ElasticSearchException; +} diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java index ebe5c1804344..f86f48c6c3f2 100644 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java @@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; @@ -40,8 +42,9 @@ import java.io.IOException; */ public class S3Gateway extends BlobStoreGateway { - @Inject public S3Gateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException { - super(settings); + @Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException { + super(settings, clusterService, createIndexService); String bucket = componentSettings.get("bucket"); if (bucket == null) { diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java index ec6d37a94e88..6e1535dd529b 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java @@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -45,8 +47,9 @@ public class HdfsGateway extends BlobStoreGateway { private final FileSystem fileSystem; - @Inject public HdfsGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool) throws IOException { - super(settings); + @Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, + ClusterName clusterName, ThreadPool threadPool) throws IOException { + super(settings, clusterService, createIndexService); this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true); String uri = componentSettings.get("uri"); diff --git a/plugins/hadoop/src/test/java/org/elasticsearch/hadoop/gateway/HdfsGatewayTests.java b/plugins/hadoop/src/test/java/org/elasticsearch/hadoop/gateway/HdfsGatewayTests.java index ec86dad23de7..5f633489376d 100644 --- a/plugins/hadoop/src/test/java/org/elasticsearch/hadoop/gateway/HdfsGatewayTests.java +++ b/plugins/hadoop/src/test/java/org/elasticsearch/hadoop/gateway/HdfsGatewayTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.hadoop.gateway; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Requests; @@ -78,7 +79,8 @@ public class HdfsGatewayTests { @Test public void testHdfsGateway() { // first, test meta data - node.client().admin().indices().create(createIndexRequest("test")).actionGet(); + CreateIndexResponse createIndexResponse = node.client().admin().indices().create(createIndexRequest("test")).actionGet(); + assertThat(createIndexResponse.acknowledged(), equalTo(true)); node.close(); node = buildNode().start(); try {