diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 72c3f14c57..b3131b6ea8 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -27,6 +27,7 @@ import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.Protocol.Keyword; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.params.*; import redis.clients.jedis.util.SafeEncoder; @@ -1232,6 +1233,14 @@ public void clientId() { sendCommand(CLIENT, Keyword.ID.getRaw()); } + public void clientUnblock(final long clientId, final UnblockType unblockType) { + if (unblockType == null) { + sendCommand(CLIENT, Keyword.UNBLOCK.getRaw(), toByteArray(clientId)); + } else { + sendCommand(CLIENT, Keyword.UNBLOCK.getRaw(), toByteArray(clientId), unblockType.getRaw()); + } + } + public void time() { sendCommand(TIME); } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index c937c5aad6..a7398586fb 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -20,6 +20,7 @@ import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocketFactory; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.commands.AdvancedBinaryJedisCommands; import redis.clients.jedis.commands.BasicCommands; import redis.clients.jedis.commands.BinaryJedisCommands; @@ -4076,6 +4077,20 @@ public Long clientId() { return client.getIntegerReply(); } + /** + * Unblock a client blocked in a blocking command from a different connection. + * @param clientId + * @param unblockType could be {@code null} by default the client is unblocked as if the timeout + * of the command was reached + * @return + */ + @Override + public Long clientUnblock(final long clientId, final UnblockType unblockType) { + checkIsInMultiOrPipeline(); + client.clientUnblock(clientId, unblockType); + return client.getIntegerReply(); + } + public String clientPause(final long timeout) { checkIsInMultiOrPipeline(); client.clientPause(timeout); diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 5407b64d01..f78effee72 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -13,6 +13,7 @@ import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocketFactory; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.commands.AdvancedJedisCommands; import redis.clients.jedis.commands.BasicCommands; import redis.clients.jedis.commands.ClusterCommands; @@ -3428,6 +3429,20 @@ public Long clientId() { return client.getIntegerReply(); } + /** + * Unblock a client blocked in a blocking command from a different connection. + * @param clientId + * @param unblockType could be {@code null} by default the client is unblocked as if the timeout + * of the command was reached + * @return + */ + @Override + public Long clientUnblock(final long clientId, final UnblockType unblockType) { + checkIsInMultiOrPipeline(); + client.clientUnblock(clientId, unblockType); + return client.getIntegerReply(); + } + @Override public String migrate(final String host, final int port, final String key, final int destinationDb, final int timeout) { diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 097bed0e9e..a5b3ef205a 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -283,7 +283,7 @@ public static enum Keyword implements Rawable { GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, ID, IDLE, TIME, RETRYCOUNT, FORCE, USAGE, SAMPLES, STREAM, GROUPS, CONSUMERS, HELP, FREQ, SETUSER, - GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE, JUSTID, WITHVALUES; + GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE, JUSTID, WITHVALUES, UNBLOCK; /** * @deprecated This will be private in future. Use {@link #getRaw()}. diff --git a/src/main/java/redis/clients/jedis/args/UnblockType.java b/src/main/java/redis/clients/jedis/args/UnblockType.java new file mode 100644 index 0000000000..a69dd4105d --- /dev/null +++ b/src/main/java/redis/clients/jedis/args/UnblockType.java @@ -0,0 +1,21 @@ +package redis.clients.jedis.args; + +import redis.clients.jedis.util.SafeEncoder; + +/** + * Unblock type for {@code CLIENT UNBLOCK} command. + */ +public enum UnblockType implements Rawable { + TIMEOUT, ERROR; + + private final byte[] raw; + + UnblockType() { + raw = SafeEncoder.encode(this.name()); + } + + @Override + public byte[] getRaw() { + return raw; + } +} diff --git a/src/main/java/redis/clients/jedis/commands/AdvancedBinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/AdvancedBinaryJedisCommands.java index 33f6278558..55a923ca22 100644 --- a/src/main/java/redis/clients/jedis/commands/AdvancedBinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/AdvancedBinaryJedisCommands.java @@ -3,6 +3,7 @@ import java.util.List; import redis.clients.jedis.AccessControlUser; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.params.MigrateParams; import redis.clients.jedis.params.ClientKillParams; @@ -41,6 +42,8 @@ String migrate(String host, int port, int destinationDB, int timeout, MigratePar Long clientKill(ClientKillParams params); + Long clientUnblock(long clientId, UnblockType unblockType); + byte[] clientGetnameBinary(); byte[] clientListBinary(); diff --git a/src/main/java/redis/clients/jedis/commands/AdvancedJedisCommands.java b/src/main/java/redis/clients/jedis/commands/AdvancedJedisCommands.java index 3407e5dfe8..a47b580d78 100644 --- a/src/main/java/redis/clients/jedis/commands/AdvancedJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/AdvancedJedisCommands.java @@ -4,6 +4,7 @@ import redis.clients.jedis.AccessControlLogEntry; import redis.clients.jedis.AccessControlUser; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.params.MigrateParams; import redis.clients.jedis.params.ClientKillParams; import redis.clients.jedis.util.Slowlog; @@ -50,6 +51,8 @@ String migrate(String host, int port, int destinationDB, int timeout, MigratePar Long clientId(); + Long clientUnblock(long clientId, UnblockType unblockType); + String memoryDoctor(); Long memoryUsage(String key); diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 1a2b7ac55a..b50835da6e 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -8,6 +8,7 @@ import redis.clients.jedis.ListPosition; import redis.clients.jedis.ScanParams; import redis.clients.jedis.SortingParams; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.ZParams; import redis.clients.jedis.params.GetExParams; import redis.clients.jedis.params.MigrateParams; @@ -430,6 +431,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) { void clientId(); + void clientUnblock(long clientId, UnblockType unblockType); + void memoryDoctor(); void xadd(String key, StreamEntryID id, Map hash, long maxLen, boolean approximateLength); diff --git a/src/test/java/redis/clients/jedis/tests/commands/ClientCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/ClientCommandsTest.java index d39932a2f8..b7fd128229 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/ClientCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/ClientCommandsTest.java @@ -7,6 +7,11 @@ import static redis.clients.jedis.params.ClientKillParams.Type; import static redis.clients.jedis.params.ClientKillParams.SkipMe; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -16,6 +21,7 @@ import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; +import redis.clients.jedis.args.UnblockType; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.params.ClientKillParams; @@ -93,6 +99,22 @@ public void clientIdReconnect() { assertTrue(clientIdInitial < clientIdAfterReconnect); } + @Test + public void clientUnblock() throws InterruptedException, TimeoutException { + long clientId = client.clientId(); + assertEquals(0, jedis.clientUnblock(clientId, UnblockType.ERROR).longValue()); + Future future = Executors.newSingleThreadExecutor().submit(() -> client.brpop(100000, "foo")); + + try { + // to make true command already executed + TimeUnit.MILLISECONDS.sleep(500); + assertEquals(1, jedis.clientUnblock(clientId, UnblockType.ERROR).longValue()); + future.get(1, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertEquals("redis.clients.jedis.exceptions.JedisDataException: UNBLOCKED client unblocked via CLIENT UNBLOCK", e.getMessage()); + } + } + @Test public void killIdString() { String info = findInClientList();