-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry with backoff on cluster connection failures #2358
Changes from all commits
6e2f312
5a4fdbd
fc3728a
cdf56b2
d99ef7b
8978ca5
85fa21c
f8d09c2
9c7ef1d
9bce8eb
67a062a
7e4abac
eabf10b
3223404
fd17343
bf56639
c7ae6b5
569afe7
1638603
68e8fdc
c665dc1
d9f2596
a23d602
6dd86db
fed69b0
791180c
4bf345b
62a619e
f1c307d
8a9e0a8
25c63a4
9b6242f
73d74d3
d14174d
af5d1f7
ddd4038
7aa0b74
0ef36d3
25303b7
9e3fbcc
882dd49
7430b9b
9eb8d58
27bce50
b900a87
4501b0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
package redis.clients.jedis; | ||
|
||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.concurrent.TimeUnit; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -18,10 +21,22 @@ public abstract class JedisClusterCommand<T> { | |
|
||
private final JedisClusterConnectionHandler connectionHandler; | ||
private final int maxAttempts; | ||
private final Duration maxTotalRetriesDuration; | ||
|
||
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) { | ||
this(connectionHandler, maxAttempts, Duration.ofMillis((long) BinaryJedisCluster.DEFAULT_TIMEOUT * maxAttempts)); | ||
} | ||
|
||
/** | ||
* @param connectionHandler | ||
* @param maxAttempts | ||
* @param maxTotalRetriesDuration No more attempts after we have been trying for this long. | ||
*/ | ||
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts, | ||
Duration maxTotalRetriesDuration) { | ||
this.connectionHandler = connectionHandler; | ||
this.maxAttempts = maxAttempts; | ||
this.maxTotalRetriesDuration = maxTotalRetriesDuration; | ||
} | ||
|
||
public abstract T execute(Jedis connection); | ||
|
@@ -85,7 +100,10 @@ public T runWithAnyNode() { | |
} | ||
|
||
private T runWithRetries(final int slot) { | ||
Instant deadline = Instant.now().plus(maxTotalRetriesDuration); | ||
|
||
JedisRedirectionException redirect = null; | ||
int consecutiveConnectionFailures = 0; | ||
Exception lastException = null; | ||
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) { | ||
Jedis connection = null; | ||
|
@@ -106,15 +124,21 @@ private T runWithRetries(final int slot) { | |
throw jnrcne; | ||
} catch (JedisConnectionException jce) { | ||
lastException = jce; | ||
++consecutiveConnectionFailures; | ||
LOG.debug("Failed connecting to Redis: {}", connection, jce); | ||
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet | ||
handleConnectionProblem(attemptsLeft - 1); | ||
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline); | ||
if (reset) { | ||
consecutiveConnectionFailures = 0; | ||
redirect = null; | ||
} | ||
} catch (JedisRedirectionException jre) { | ||
// avoid updating lastException if it is a connection exception | ||
if (lastException == null || lastException instanceof JedisRedirectionException) { | ||
lastException = jre; | ||
} | ||
LOG.debug("Redirected by server to {}", jre.getTargetNode()); | ||
consecutiveConnectionFailures = 0; | ||
redirect = jre; | ||
// if MOVED redirection occurred, | ||
if (jre instanceof JedisMovedDataException) { | ||
|
@@ -124,6 +148,9 @@ private T runWithRetries(final int slot) { | |
} finally { | ||
releaseConnection(connection); | ||
} | ||
if (Instant.now().isAfter(deadline)) { | ||
throw new JedisClusterOperationException("Cluster retry deadline exceeded."); | ||
} | ||
} | ||
|
||
JedisClusterMaxAttemptsException maxAttemptsException | ||
|
@@ -132,14 +159,60 @@ private T runWithRetries(final int slot) { | |
throw maxAttemptsException; | ||
} | ||
|
||
private void handleConnectionProblem(int attemptsLeft) { | ||
if (attemptsLeft <= 1) { | ||
// We need this because if node is not reachable anymore - we need to finally initiate slots | ||
// renewing, or we can stuck with cluster state without one node in opposite case. | ||
// But now if maxAttempts = [1 or 2] we will do it too often. | ||
// TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
// if there were no successful responses from this node last few seconds | ||
this.connectionHandler.renewSlotCache(); | ||
/** | ||
* Related values should be reset if <code>TRUE</code> is returned. | ||
* | ||
* @param attemptsLeft | ||
* @param consecutiveConnectionFailures | ||
* @param doneDeadline | ||
* @return true - if some actions are taken | ||
* <br /> false - if no actions are taken | ||
*/ | ||
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) { | ||
if (this.maxAttempts < 3) { | ||
// Since we only renew the slots cache after two consecutive connection | ||
// failures (see consecutiveConnectionFailures above), we need to special | ||
// case the situation where we max out after two or fewer attempts. | ||
// Otherwise, on two or fewer max attempts, the slots cache would never be | ||
// renewed. | ||
if (attemptsLeft == 0) { | ||
this.connectionHandler.renewSlotCache(); | ||
return true; | ||
walles marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return false; | ||
} | ||
|
||
if (consecutiveConnectionFailures < 2) { | ||
return false; | ||
} | ||
|
||
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline)); | ||
//We need this because if node is not reachable anymore - we need to finally initiate slots | ||
//renewing, or we can stuck with cluster state without one node in opposite case. | ||
//TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
//if there were no successful responses from this node last few seconds | ||
this.connectionHandler.renewSlotCache(); | ||
return true; | ||
} | ||
|
||
private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) { | ||
if (attemptsLeft <= 0) { | ||
return 0; | ||
} | ||
|
||
long millisLeft = Duration.between(Instant.now(), deadline).toMillis(); | ||
if (millisLeft < 0) { | ||
throw new JedisClusterOperationException("Cluster retry deadline exceeded."); | ||
} | ||
|
||
return millisLeft / (attemptsLeft * (attemptsLeft + 1)); | ||
} | ||
|
||
protected void sleep(long sleepMillis) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, |
||
try { | ||
TimeUnit.MILLISECONDS.sleep(sleepMillis); | ||
} catch (InterruptedException e) { | ||
throw new JedisClusterOperationException(e); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking the the time on each successful call seems like a waste and might impact the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gkorland According to https://www.alibabacloud.com/blog/performance-issues-related-to-localdatetime-and-instant-during-serialization-operations_595605
Throughput of Instant.now+atZone+format+DateTimeFormatter.ofPattern is 6816922.578 ops/sec.
Without any formatting, throughput of Instant.now+plus should be much higher. Shouldn't it be enough?