diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index 37ed3a7e739f..1990c70a7f08 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; @@ -33,7 +34,6 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -137,7 +137,7 @@ public final class FollowersChecker { ); transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { handleDisconnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index bbf0884cb4c2..6c3901474707 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -32,7 +32,6 @@ import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -124,7 +123,7 @@ public class LeaderChecker { transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { handleDisconnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index 5d13b10f5a6a..e70a8abacb02 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -229,11 +229,26 @@ public class ClusterConnectionManager implements ConnectionManager { try { connectionListener.onNodeConnected(node, conn); } finally { - conn.addCloseListener(ActionListener.running(() -> { - connectedNodes.remove(node, conn); - connectionListener.onNodeDisconnected(node, conn); - managerRefs.decRef(); - })); + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + handleClose(null); + } + + @Override + public void onFailure(Exception e) { + handleClose(e); + } + + void handleClose(@Nullable Exception e) { + connectedNodes.remove(node, conn); + try { + connectionListener.onNodeDisconnected(node, e); + } finally { + managerRefs.decRef(); + } + } + }); conn.addCloseListener(ActionListener.running(() -> { if (connectingRefCounter.hasReferences() == false) { diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index daccac3fbe2c..bc11ca0e8699 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -60,9 +60,9 @@ public interface ConnectionManager extends Closeable { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Override - public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) { for (TransportConnectionListener listener : listeners) { - listener.onNodeDisconnected(key, connection); + listener.onNodeDisconnected(key, closeException); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index 8bc5771485f6..c27d9cf69a90 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -61,7 +61,7 @@ public class RemoteConnectionManager implements ConnectionManager { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { removeConnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 248ca4313cef..a715797b9797 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -339,7 +340,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis protected abstract ConnectionStrategy strategyType(); @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (shouldOpenMoreConnections()) { // try to reconnect and fill up the slot of the disconnected node connect( diff --git a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index 92796f826fc3..470014eb7a67 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -10,6 +10,7 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Nullable; /** * A listener interface that allows to react on transport events. All methods may be @@ -38,5 +39,5 @@ public interface TransportConnectionListener { /** * Called once a node connection is closed and unregistered. */ - default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {} + default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {} } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 7ab9b8611b8c..3c5dc6b39292 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; @@ -831,7 +832,7 @@ public class TransportSearchActionTests extends ESTestCase { CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (disconnectedNodes.remove(node)) { disconnectedLatch.countDown(); } @@ -1134,7 +1135,7 @@ public class TransportSearchActionTests extends ESTestCase { CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (disconnectedNodes.remove(node)) { disconnectedLatch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 8963bb10d657..f0edd4aeba12 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -251,7 +252,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { final AtomicReference> disconnectListenerRef = new AtomicReference<>(); transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { final ActionListener disconnectListener = disconnectListenerRef.getAndSet(null); if (disconnectListener != null) { disconnectListener.onResponse(node); diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index 0a446e999c3c..1e735de2651a 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -101,7 +102,7 @@ public class ClusterConnectionManagerTests extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); @@ -658,7 +659,7 @@ public class ClusterConnectionManagerTests extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); @@ -698,7 +699,7 @@ public class ClusterConnectionManagerTests extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index f37a139e4583..b54e9fca8206 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -202,7 +202,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { fail("disconnect should not be called " + node); } }; @@ -924,7 +924,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { latch.countDown(); } }; @@ -2118,7 +2118,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { fail("disconnect should not be called " + node); } };