diff --git a/src/main/java/redis/clients/jedis/JedisClusterCommand.java b/src/main/java/redis/clients/jedis/JedisClusterCommand.java index 0aa1055df4..78afe28ed6 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterCommand.java +++ b/src/main/java/redis/clients/jedis/JedisClusterCommand.java @@ -1,5 +1,8 @@ package redis.clients.jedis; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException; import redis.clients.jedis.exceptions.JedisClusterOperationException; @@ -11,6 +14,8 @@ public abstract class JedisClusterCommand { + private static final Logger LOG = LoggerFactory.getLogger(JedisClusterCommand.class); + private final JedisClusterConnectionHandler connectionHandler; private final int maxAttempts; @@ -22,7 +27,7 @@ public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int public abstract T execute(Jedis connection); public T run(String key) { - return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); + return runWithRetries(JedisClusterCRC16.getSlot(key)); } public T run(int keyCount, String... keys) { @@ -42,11 +47,11 @@ public T run(int keyCount, String... keys) { } } - return runWithRetries(slot, this.maxAttempts, false, null); + return runWithRetries(slot); } public T runBinary(byte[] key) { - return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); + return runWithRetries(JedisClusterCRC16.getSlot(key)); } public T runBinary(int keyCount, byte[]... keys) { @@ -66,7 +71,7 @@ public T runBinary(int keyCount, byte[]... keys) { } } - return runWithRetries(slot, this.maxAttempts, false, null); + return runWithRetries(slot); } public T runWithAnyNode() { @@ -79,61 +84,62 @@ public T runWithAnyNode() { } } - private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) { - if (attempts <= 0) { - throw new JedisClusterMaxAttemptsException("No more cluster attempts left."); - } - - Jedis connection = null; - try { - - if (redirect != null) { - connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); - if (redirect instanceof JedisAskDataException) { - // TODO: Pipeline asking with the original command to make it faster.... - connection.asking(); - } - } else { - if (tryRandomNode) { - connection = connectionHandler.getConnection(); + private T runWithRetries(final int slot) { + JedisRedirectionException redirect = null; + Exception lastException = null; + for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) { + Jedis connection = null; + try { + if (redirect != null) { + connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode()); + if (redirect instanceof JedisAskDataException) { + // TODO: Pipeline asking with the original command to make it faster.... + connection.asking(); + } } else { connection = connectionHandler.getConnectionFromSlot(slot); } - } - - return execute(connection); - } catch (JedisNoReachableClusterNodeException jnrcne) { - throw jnrcne; - } catch (JedisConnectionException jce) { - // release current connection before recursion - releaseConnection(connection); - connection = null; - - if (attempts <= 1) { - //We need this because if node is not reachable anymore - we need to finally initiate slots - //renewing, or we can stuck with cluster state without one node in opposite case. - //But now if maxAttempts = [1 or 2] we will do it too often. - //TODO make tracking of successful/unsuccessful operations for node - do renewing only - //if there were no successful responses from this node last few seconds - this.connectionHandler.renewSlotCache(); - } - - return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); - } catch (JedisRedirectionException jre) { - // if MOVED redirection occurred, - if (jre instanceof JedisMovedDataException) { - // it rebuilds cluster's slot cache recommended by Redis cluster specification - this.connectionHandler.renewSlotCache(connection); + return execute(connection); + + } catch (JedisNoReachableClusterNodeException jnrcne) { + throw jnrcne; + } catch (JedisConnectionException jce) { + lastException = jce; + LOG.debug("Failed connecting to Redis: {}", connection, jce); + // "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet + handleConnectionProblem(attemptsLeft - 1); + } catch (JedisRedirectionException jre) { + // avoid updating lastException if it is a connection exception + if (lastException == null || lastException instanceof JedisRedirectionException) { + lastException = jre; + } + LOG.debug("Redirected by server to {}", jre.getTargetNode()); + redirect = jre; + // if MOVED redirection occurred, + if (jre instanceof JedisMovedDataException) { + // it rebuilds cluster's slot cache recommended by Redis cluster specification + this.connectionHandler.renewSlotCache(connection); + } + } finally { + releaseConnection(connection); } + } - // release current connection before recursion - releaseConnection(connection); - connection = null; + JedisClusterMaxAttemptsException maxAttemptsException + = new JedisClusterMaxAttemptsException("No more cluster attempts left."); + maxAttemptsException.addSuppressed(lastException); + throw maxAttemptsException; + } - return runWithRetries(slot, attempts - 1, false, jre); - } finally { - releaseConnection(connection); + private void handleConnectionProblem(int attemptsLeft) { + if (attemptsLeft <= 1) { + //We need this because if node is not reachable anymore - we need to finally initiate slots + //renewing, or we can stuck with cluster state without one node in opposite case. + //But now if maxAttempts = [1 or 2] we will do it too often. + //TODO make tracking of successful/unsuccessful operations for node - do renewing only + //if there were no successful responses from this node last few seconds + this.connectionHandler.renewSlotCache(); } }