Skip to content

Commit

Permalink
Consider connection exceptions and disregard random nodes
Browse files Browse the repository at this point in the history
* consider connection exceptions and disregard random nodes

* reset redirection
  • Loading branch information
sazzad16 authored Feb 11, 2021
1 parent 9bce8eb commit 67a062a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 97 deletions.
150 changes: 57 additions & 93 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
Expand Down Expand Up @@ -100,131 +100,95 @@ public T runWithAnyNode() {
}
}

private boolean shouldBackOff(int attemptsLeft) {
int attemptsDone = maxAttempts - attemptsLeft;
return attemptsDone >= maxAttempts / 3;
}

private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
throw new JedisClusterMaxAttemptsException("Deadline exceeded");
}

return millisLeft / (attemptsLeft * attemptsLeft);
}

private T runWithRetries(final int slot) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
Supplier<Jedis> connectionSupplier = () -> connectionHandler.getConnectionFromSlot(slot);

// If we got one redirection, stick with that and don't try anything else
Supplier<Jedis> redirectionSupplier = null;

for (int currentAttempt = 0; currentAttempt < this.maxAttempts; currentAttempt++) {
JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Jedis connection = null;
try {
if (redirectionSupplier != null) {
connection = redirectionSupplier.get();
if (redirect != null) {
connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
connection = connectionSupplier.get();
connection = connectionHandler.getConnectionFromSlot(slot);
}

if (shouldBackOff(maxAttempts - currentAttempt)) {
// Don't just stick to this any more, start asking around
redirectionSupplier = null;
}
return execute(connection);

final T result = execute(connection);
if (currentAttempt > 0) {
LOG.info("Success after {} attempts", currentAttempt + 1);
}
return result;
} catch (JedisNoReachableClusterNodeException e) {
throw e;
} catch (JedisConnectionException e) {
LOG.warn("Failed connecting to Redis: {}", connection, e);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
++consecutiveConnectionFailures;
LOG.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the currentAttempt counter hasn't increased yet
int attemptsLeft = maxAttempts - currentAttempt - 1;
connectionSupplier = handleConnectionProblem(connection, slot, attemptsLeft, deadline);
} catch (JedisRedirectionException e) {
redirectionSupplier = handleRedirection(connection, e);
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
LOG.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
} finally {
releaseConnection(connection);
}

LOG.info("{} retries left...", maxAttempts - currentAttempt - 1);
}

throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}

protected void sleep(long sleepMillis) {
try {
LOG.info("Backing off, sleeping {}ms before trying again...", sleepMillis);
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
if (this.maxAttempts < 3) {
// keep the old implementations as is?
if (attemptsLeft <= 1) {
this.connectionHandler.renewSlotCache();
}
return false;
}
}

private Supplier<Jedis> handleConnectionProblem(Jedis failedConnection, final int slot, int attemptsLeft,
Instant doneDeadline) {
if (!shouldBackOff(attemptsLeft)) {
return () -> {
Jedis connection = connectionHandler.getConnectionFromSlot(slot);
LOG.info("Retrying with {}", connection);
return connection;
};
if (consecutiveConnectionFailures < 2) {
return false;
}

// Must release current connection before renewing the slot cache below. If we fail to do this,
// then JedisClusterTest.testReturnConnectionOnJedisClusterConnection will start failing
// intermittently.
releaseConnection(failedConnection);

sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();

return () -> {
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
// Get a random connection, it will redirect us if it's not the right one
LOG.info("Retrying with a random node...");
Jedis connection = connectionHandler.getConnection();
LOG.info("Retrying with random pick: {}", connection);
return connection;
};
return true;
}

private Supplier<Jedis> handleRedirection(Jedis connection, final JedisRedirectionException jre) {
LOG.debug("Redirected by server to {}", jre.getTargetNode());

// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

// release current connection before iteration
releaseConnection(connection);
long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
throw new JedisClusterMaxAttemptsException("Deadline exceeded");
}

return () -> {
Jedis redirectedConnection = connectionHandler.getConnectionFromNode(jre.getTargetNode());
LOG.info("Retrying with redirection target {}", connection);
if (jre instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
redirectedConnection.asking();
}
return millisLeft / (attemptsLeft * attemptsLeft);
}

return redirectedConnection;
};
protected void sleep(long sleepMillis) {
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void releaseConnection(Jedis connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void connectToNodesFailsWithSSLParametersAndNoHostMapping() {
null, sslParameters, null, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisNoReachableClusterNodeException e) {
} catch (JedisClusterMaxAttemptsException e) {
// initial connection to localhost works, but subsequent connections to nodes use 127.0.0.1
// and fail hostname verification
}
Expand Down Expand Up @@ -159,7 +159,7 @@ public void connectWithCustomHostNameVerifier() {
null, null, hostnameVerifier, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisNoReachableClusterNodeException e) {
} catch (JedisClusterMaxAttemptsException e) {
// initial connection made with 'localhost' but subsequent connections to nodes use 127.0.0.1
// which causes custom hostname verification to fail
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void connectToNodesFailsWithSSLParametersAndNoHostMapping() {
null, sslParameters, null, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisNoReachableClusterNodeException e) {
} catch (JedisClusterMaxAttemptsException e) {
// initial connection to localhost works, but subsequent connections to nodes use 127.0.0.1
// and fail hostname verification
}
Expand Down Expand Up @@ -148,7 +148,7 @@ public void connectWithCustomHostNameVerifier() {
null, null, hostnameVerifier, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisNoReachableClusterNodeException e) {
} catch (JedisClusterMaxAttemptsException e) {
// initial connection made with 'localhost' but subsequent connections to nodes use 127.0.0.1
// which causes custom hostname verification to fail
}
Expand Down

0 comments on commit 67a062a

Please sign in to comment.