Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry with backoff on cluster connection failures #2358

Merged
merged 46 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6e2f312
Split JedisClusterCommand into multiple methods
Jan 25, 2021
5a4fdbd
Drop redundant null check
Jan 25, 2021
fc3728a
Bump JDK version to 1.8
Jan 25, 2021
cdf56b2
Replace ConnectionGetters with lambdas
Jan 25, 2021
d99ef7b
Retrigger CI
Jan 25, 2021
8978ca5
Add backoff to Redis connections
Jan 28, 2021
85fa21c
Add unit tests for backoff logic
Jan 22, 2021
f8d09c2
Add retries logging
Jan 29, 2021
9c7ef1d
Always use the user requested timeout
Jan 28, 2021
9bce8eb
Remedy review feedback
Feb 2, 2021
67a062a
Consider connection exceptions and disregard random nodes
sazzad16 Feb 11, 2021
7e4abac
Revert "Consider connection exceptions and disregard random nodes"
Feb 11, 2021
eabf10b
Add another backoff test case
Feb 11, 2021
3223404
consider connection exceptions and disregard random nodes
sazzad16 Feb 10, 2021
fd17343
reset redirection
sazzad16 Feb 11, 2021
bf56639
Fix test failure
walles Feb 12, 2021
c7ae6b5
Merge pull request #3 from sazzad16/backoff-walles
Feb 12, 2021
569afe7
Merge branch 'master' into j/backoff
walles Feb 12, 2021
1638603
Apply suggestions from code review
Feb 12, 2021
68e8fdc
update documentation
sazzad16 Feb 12, 2021
c665dc1
Improve a comment
Feb 14, 2021
d9f2596
Update src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
Feb 17, 2021
a23d602
Add change from another branch
Feb 25, 2021
6dd86db
Merge branch 'master' into j/backoff
walles Feb 25, 2021
fed69b0
Merge remote-tracking branch 'origin/master' into j/backoff
Feb 26, 2021
791180c
Move JedisClusterCommandTest out of commands package
sazzad16 Feb 26, 2021
4bf345b
Use JedisClusterOperationException
sazzad16 Feb 26, 2021
62a619e
Reduce sleep time, especially when few attempts left
sazzad16 Feb 26, 2021
f1c307d
Update src/main/java/redis/clients/jedis/JedisClusterCommand.java
sazzad16 Feb 26, 2021
8a9e0a8
Merge remote-tracking branch 'origin/master' into j/backoff
Mar 3, 2021
25c63a4
Merge branch 'master' into j/backoff
sazzad16 Mar 10, 2021
9b6242f
merge fix
sazzad16 Mar 10, 2021
73d74d3
Merge branch 'master' into j/backoff
gkorland Mar 18, 2021
d14174d
merge fix
sazzad16 Mar 20, 2021
af5d1f7
Merge remote-tracking branch 'redis/master' into j/backoff
sazzad16 Mar 20, 2021
ddd4038
Merge branch 'master' into j/backoff
sazzad16 Mar 25, 2021
7aa0b74
Merge branch 'master' into j/backoff
sazzad16 Mar 29, 2021
0ef36d3
Use maxAttempts
sazzad16 Mar 29, 2021
25303b7
format import
sazzad16 Mar 29, 2021
9e3fbcc
Re-add missing codes due to merge
sazzad16 Mar 29, 2021
882dd49
avoid NPE while zero max attempts
sazzad16 Mar 29, 2021
7430b9b
Remove zero attempts test
sazzad16 Mar 29, 2021
9eb8d58
More cluster constructors and customizability
sazzad16 Mar 29, 2021
27bce50
Use maxTotalRetriesDuration everywhere
sazzad16 Mar 29, 2021
b900a87
Merge remote-tracking branch 'redis/master' into j/backoff
sazzad16 Mar 31, 2021
4501b0d
more missing maxTotalRetriesDuration after merge
sazzad16 Mar 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<version>2.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down Expand Up @@ -129,8 +135,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public BinaryJedis(final JedisSocketFactory jedisSocketFactory) {
client = new Client(jedisSocketFactory);
}

@Override
public String toString() {
return "BinaryJedis{" + client + '}';
}

private void initializeClientFromURI(URI uri) {
initializeClientFromURI(uri, null, null, null);
}
Expand Down
476 changes: 254 additions & 222 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Connection(final JedisSocketFactory jedisSocketFactory) {
this.jedisSocketFactory = jedisSocketFactory;
}

@Override
public String toString() {
return "Connection{" + jedisSocketFactory + "}";
}

public Socket getSocket() {
return socket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ public int getSoTimeout() {
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}

@Override
public String toString() {
return "DefaultJedisSocketFactory{" + host + ":" + +port + "}";
walles marked this conversation as resolved.
Show resolved Hide resolved
}
}
441 changes: 221 additions & 220 deletions src/main/java/redis/clients/jedis/JedisCluster.java

Large diffs are not rendered by default.

171 changes: 123 additions & 48 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package redis.clients.jedis;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.JedisAskDataException;
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
import redis.clients.jedis.exceptions.JedisClusterException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisConnectionException;
Expand All @@ -11,18 +18,33 @@

public abstract class JedisClusterCommand<T> {

private static final Logger LOG = LoggerFactory.getLogger(JedisClusterCommand.class);

private static final Duration DEFAULT_MAX_TOTAL_RETRIES_DURATION = Duration.ofMillis(
BinaryJedisCluster.DEFAULT_TIMEOUT * BinaryJedisCluster.DEFAULT_MAX_ATTEMPTS);

sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
private final JedisClusterConnectionHandler connectionHandler;
private final int maxAttempts;
private final Duration maxTotalRetriesDuration;

public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) {
this(connectionHandler, maxAttempts, DEFAULT_MAX_TOTAL_RETRIES_DURATION);
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @param maxTotalRetriesDuration No more attempts after we have been trying for this long.
*/
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts,
Duration maxTotalRetriesDuration) {
this.connectionHandler = connectionHandler;
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
}

public abstract T execute(Jedis connection);

public T run(String key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T run(int keyCount, String... keys) {
Expand All @@ -42,11 +64,11 @@ public T run(int keyCount, String... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runBinary(byte[] key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T runBinary(int keyCount, byte[]... keys) {
Expand All @@ -66,7 +88,7 @@ public T runBinary(int keyCount, byte[]... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runWithAnyNode() {
Expand All @@ -79,61 +101,114 @@ public T runWithAnyNode() {
}
}

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
if (attempts <= 0) {
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}

Jedis connection = null;
try {

if (redirect != null) {
connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
private T runWithRetries(final int slot) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking the the time on each successful call seems like a waste and might impact the performance.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gkorland According to https://www.alibabacloud.com/blog/performance-issues-related-to-localdatetime-and-instant-during-serialization-operations_595605

Throughput of Instant.now+atZone+format+DateTimeFormatter.ofPattern is 6816922.578 ops/sec.
Without any formatting, throughput of Instant.now+plus should be much higher. Shouldn't it be enough?


JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Jedis connection = null;
try {
if (redirect != null) {
connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
}

return execute(connection);

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
++consecutiveConnectionFailures;
LOG.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
LOG.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
} finally {
releaseConnection(connection);
}
if (Instant.now().isAfter(deadline)) {
// TODO: change to JedisClusterOperationException or a new sub-class of it
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
throw new JedisClusterMaxAttemptsException("Deadline exceeded");
}
}

return execute(connection);
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;

if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
/**
* Related values should be reset if <code>TRUE</code> is returned.
*
* @param attemptsLeft
* @param consecutiveConnectionFailures
* @param doneDeadline
* @return true - if some actions are taken
* <br /> false - if no actions are taken
*/
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
if (this.maxAttempts < 3) {
// Since we only renew the slots cache after two consecutive connection
// failures (see consecutiveConnectionFailures above), we need to special
// case the situation where we max out after two or fewer attempts.
//
// Otherwise, on two or fewer max attempts, the slots cache would never be
// renewed.
if (attemptsLeft == 0) {
this.connectionHandler.renewSlotCache();
return true;
walles marked this conversation as resolved.
Show resolved Hide resolved
}
return false;
}

return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
if (consecutiveConnectionFailures < 2) {
return false;
}

// release current connection before recursion
releaseConnection(connection);
connection = null;
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
return true;
}

return runWithRetries(slot, attempts - 1, false, jre);
} finally {
releaseConnection(connection);
private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
// TODO: change to JedisClusterOperationException or a new sub-class of it
throw new JedisClusterMaxAttemptsException("Deadline exceeded");
}

return millisLeft / (attemptsLeft * attemptsLeft);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why divide by the square of attempts? (Just wondering if it is specific or an arbitrary decision?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of arbitrary. Not squared would work as well. The important thing is that the sleeps get longer and longer the fewer iterations that you have left, and that the last sleep uses up all of the time remaining.

Example: Let's say you have max 10s and max 5 attempts.

At time 0.0, 5 attempts left, sleep (10s/(5*5)) = 0.4s
At time 0.4, 4 attempts left, sleep (9.6s/(4*4)) = 0.6s
At time 1.0, 3 attempts left, sleep (9s/(3*3)) = 1.0s
At time 2.0, 2 attempts left, sleep (8/(2*2)) = 2.0s
At time 6.0, 1 attempts left, sleep (4/(1*1)) = 4.0s

}

protected void sleep(long sleepMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, protected enables overriding in unit tests.

try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,11 @@ public void testReturnConnectionOnJedisConnectionException() throws InterruptedE
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(1);
JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,

// Otherwise the test can time out before we are done
int shorterThanTheTestTimeoutMs = DEFAULT_TIMEOUT / 2;

JedisCluster jc = new JedisCluster(jedisClusterNode, shorterThanTheTestTimeoutMs, shorterThanTheTestTimeoutMs,
DEFAULT_REDIRECTIONS, "cluster", config);

Jedis j = jc.getClusterNodes().get("127.0.0.1:7380").getResource();
Expand Down
Loading