Skip to content

Commit

Permalink
CLIENT PAUSE command with mode option (#2601)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Jul 26, 2021
1 parent 468bf8f commit 6554807
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.Protocol.SentinelKeyword;
import redis.clients.jedis.args.ClientPauseMode;
import redis.clients.jedis.args.ClientType;
import redis.clients.jedis.args.ListDirection;
import redis.clients.jedis.args.FlushMode;
Expand Down Expand Up @@ -1387,6 +1388,10 @@ public void clientUnblock(final long clientId, final UnblockType unblockType) {
}
}

public void clientPause(final long timeout, final ClientPauseMode mode) {
sendCommand(CLIENT, Keyword.PAUSE.getRaw(), toByteArray(timeout), mode.getRaw());
}

public void time() {
sendCommand(TIME);
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4313,12 +4313,20 @@ public long clientUnblock(final long clientId, final UnblockType unblockType) {
return client.getIntegerReply();
}

@Override
public String clientPause(final long timeout) {
checkIsInMultiOrPipeline();
client.clientPause(timeout);
return client.getBulkReply();
}

@Override
public String clientPause(final long timeout, final ClientPauseMode mode) {
checkIsInMultiOrPipeline();
client.clientPause(timeout, mode);
return client.getBulkReply();
}

public List<String> time() {
checkIsInMultiOrPipeline();
client.time();
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/redis/clients/jedis/args/ClientPauseMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redis.clients.jedis.args;

import redis.clients.jedis.util.SafeEncoder;

public enum ClientPauseMode implements Rawable {

ALL, WRITE;

private final byte[] raw;

private ClientPauseMode() {
raw = SafeEncoder.encode(name());
}

@Override
public byte[] getRaw() {
return raw;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;

import redis.clients.jedis.AccessControlUser;
import redis.clients.jedis.args.ClientPauseMode;
import redis.clients.jedis.args.ClientType;
import redis.clients.jedis.args.UnblockType;
import redis.clients.jedis.params.MigrateParams;
Expand Down Expand Up @@ -69,6 +70,10 @@ String migrate(String host, int port, int destinationDB, int timeout, MigratePar

long clientUnblock(long clientId, UnblockType unblockType);

String clientPause(long timeout);

String clientPause(long timeout, ClientPauseMode mode);

byte[] memoryDoctorBinary();

Long memoryUsage(byte[] key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import redis.clients.jedis.AccessControlLogEntry;
import redis.clients.jedis.AccessControlUser;
import redis.clients.jedis.args.ClientPauseMode;
import redis.clients.jedis.args.ClientType;
import redis.clients.jedis.args.UnblockType;
import redis.clients.jedis.params.MigrateParams;
Expand Down Expand Up @@ -63,6 +64,10 @@ String migrate(String host, int port, int destinationDB, int timeout, MigratePar

long clientUnblock(long clientId, UnblockType unblockType);

String clientPause(long timeout);

String clientPause(long timeout, ClientPauseMode mode);

String memoryDoctor();

Long memoryUsage(String key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisMonitor;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.args.ClientPauseMode;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.util.SafeEncoder;

Expand Down Expand Up @@ -204,6 +205,77 @@ public Long call() throws Exception {
}
}

@Test
public void clientPauseAll() throws InterruptedException, ExecutionException {
final int pauseMillis = 1250;
final int pauseMillisDelta = 100;

ExecutorService executorService = Executors.newFixedThreadPool(1);
try (Jedis jedisPause = createJedis()) {

jedis.clientPause(pauseMillis, ClientPauseMode.ALL);

Future<Long> latency = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
long startMillis = System.currentTimeMillis();
jedisPause.get("key");
return System.currentTimeMillis() - startMillis;
}
});

long latencyMillis = latency.get();
assertTrue(pauseMillis <= latencyMillis && latencyMillis <= pauseMillis + pauseMillisDelta);

} finally {
executorService.shutdown();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
}

@Test
public void clientPauseWrite() throws InterruptedException, ExecutionException {
final int pauseMillis = 1250;
final int pauseMillisDelta = 100;

ExecutorService executorService = Executors.newFixedThreadPool(2);
try (Jedis jedisRead = createJedis(); Jedis jedisWrite = createJedis();) {

jedis.clientPause(pauseMillis, ClientPauseMode.WRITE);

Future<Long> latencyRead = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
long startMillis = System.currentTimeMillis();
jedisRead.get("key");
return System.currentTimeMillis() - startMillis;
}
});
Future<Long> latencyWrite = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
long startMillis = System.currentTimeMillis();
jedisWrite.set("key", "value");
return System.currentTimeMillis() - startMillis;
}
});

long latencyReadMillis = latencyRead.get();
assertTrue(0 <= latencyReadMillis && latencyReadMillis <= pauseMillisDelta);

long latencyWriteMillis = latencyWrite.get();
assertTrue(pauseMillis <= latencyWriteMillis && latencyWriteMillis <= pauseMillis + pauseMillisDelta);

} finally {
executorService.shutdown();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
}

@Test
public void memoryDoctorString() {
String memoryInfo = jedis.memoryDoctor();
Expand Down

0 comments on commit 6554807

Please sign in to comment.