Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split JedisClusterCommand into multiple methods #2355

Merged
merged 9 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
108 changes: 58 additions & 50 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.util.function.Supplier;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
Expand All @@ -22,7 +23,7 @@ public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int
public abstract T execute(Jedis connection);

public T run(String key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T run(int keyCount, String... keys) {
Expand All @@ -42,11 +43,11 @@ public T run(int keyCount, String... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runBinary(byte[] key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T runBinary(int keyCount, byte[]... keys) {
Expand All @@ -66,7 +67,7 @@ public T runBinary(int keyCount, byte[]... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runWithAnyNode() {
Expand All @@ -79,62 +80,69 @@ public T runWithAnyNode() {
}
}

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
if (attempts <= 0) {
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}
private T runWithRetries(final int slot) {
Supplier<Jedis> connectionSupplier = () -> connectionHandler.getConnectionFromSlot(slot);

Jedis connection = null;
try {
// If we got one redirection, stick with that and don't try anything else
Supplier<Jedis> redirectionSupplier = null;

if (redirect != null) {
connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
Exception lastException = null;
for (int currentAttempt = 0; currentAttempt < this.maxAttempts; currentAttempt++) {
Jedis connection = null;
try {
if (redirectionSupplier != null) {
connection = redirectionSupplier.get();
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
connection = connectionSupplier.get();
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException e) {
throw e;
} catch (JedisConnectionException e) {
lastException = e;
connectionSupplier = handleConnectionProblem(slot, currentAttempt);
} catch (JedisRedirectionException e) {
lastException = e;
redirectionSupplier = handleRedirection(connection, e);
} finally {
releaseConnection(connection);
}
}

return execute(connection);

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;

if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
}
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.", lastException);
walles marked this conversation as resolved.
Show resolved Hide resolved
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}

return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
private Supplier<Jedis> handleConnectionProblem(final int slot, int currentAttempt) {
int attemptsLeft = (maxAttempts - currentAttempt) - 1;
if (attemptsLeft <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
}

// release current connection before recursion
releaseConnection(connection);
connection = null;
return () -> connectionHandler.getConnectionFromSlot(slot);
}

return runWithRetries(slot, attempts - 1, false, jre);
} finally {
releaseConnection(connection);
private Supplier<Jedis> handleRedirection(Jedis connection, final JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}


return () -> {
Jedis redirectedConnection = connectionHandler.getConnectionFromNode(jre.getTargetNode());
if (jre instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
redirectedConnection.asking();
}

return redirectedConnection;
};
}

private void releaseConnection(Jedis connection) {
Expand Down