Skip to content

Commit

Permalink
Adding Ping Pong Support to BinaryJedisPubSub . Github issue 1446 red…
Browse files Browse the repository at this point in the history
  • Loading branch information
Peeyush Chandel committed Dec 27, 2016
1 parent 8f20746 commit 573d840
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
16 changes: 14 additions & 2 deletions src/main/java/redis/clients/jedis/BinaryJedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static redis.clients.jedis.Protocol.Keyword.MESSAGE;
import static redis.clients.jedis.Protocol.Keyword.PMESSAGE;
import static redis.clients.jedis.Protocol.Keyword.PONG;
import static redis.clients.jedis.Protocol.Keyword.PSUBSCRIBE;
import static redis.clients.jedis.Protocol.Keyword.PUNSUBSCRIBE;
import static redis.clients.jedis.Protocol.Keyword.SUBSCRIBE;
Expand All @@ -13,8 +14,8 @@
import redis.clients.jedis.exceptions.JedisException;

public abstract class BinaryJedisPubSub {
private int subscribedChannels = 0;
private Client client;
protected int subscribedChannels = 0;
protected volatile Client client;

public void onMessage(byte[] channel, byte[] message) {
}
Expand All @@ -34,6 +35,9 @@ public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}

public void onPong(byte[] pattern) {
}

public void unsubscribe() {
client.unsubscribe();
client.flush();
Expand Down Expand Up @@ -64,6 +68,11 @@ public void punsubscribe(byte[]... patterns) {
client.flush();
}

public void ping() {
client.ping();
client.flush();
}

public boolean isSubscribed() {
return subscribedChannels > 0;
}
Expand Down Expand Up @@ -115,6 +124,9 @@ private void process(Client client) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
onPUnsubscribe(bpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
onPong(bpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
Expand Down
12 changes: 1 addition & 11 deletions src/main/java/redis/clients/jedis/JedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.SafeEncoder;

public abstract class JedisPubSub {
public abstract class JedisPubSub extends BinaryJedisPubSub {

private static final String JEDIS_SUBSCRIPTION_MESSAGE = "JedisPubSub is not subscribed to a Jedis instance.";
private int subscribedChannels = 0;
private volatile Client client;

public void onMessage(String channel, String message) {
}
Expand Down Expand Up @@ -91,14 +89,6 @@ public void punsubscribe(String... patterns) {
client.flush();
}

public void ping() {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.ping();
client.flush();
}

public boolean isSubscribed() {
return subscribedChannels > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void onUnsubscribe(String channel, int subscribedChannels) {

@Test
public void pubSubChannels() {
final List<String> expectedActiveChannels = Arrays
.asList("testchan1", "testchan2", "testchan3");
final List<String> expectedActiveChannels = Arrays.asList("testchan1", "testchan2",
"testchan3");
jedis.subscribe(new JedisPubSub() {
private int count = 0;

Expand All @@ -85,6 +85,7 @@ public void onSubscribe(String channel, int subscribedChannels) {
public void pubSubChannelWithPingPong() throws InterruptedException {
final CountDownLatch latchUnsubscribed = new CountDownLatch(1);
final CountDownLatch latchReceivedPong = new CountDownLatch(1);

jedis.subscribe(new JedisPubSub() {

@Override
Expand Down Expand Up @@ -112,6 +113,38 @@ public void onUnsubscribe(String channel, int subscribedChannels) {
assertEquals(0L, latchUnsubscribed.getCount());
}

@Test
public void binaryPubSubChannelWithPingPong() throws InterruptedException {
final CountDownLatch latchUnsubscribed = new CountDownLatch(1);
final CountDownLatch latchReceivedPong = new CountDownLatch(1);

jedis.subscribe(new BinaryJedisPubSub() {

@Override
public void onSubscribe(byte[] channel, int subscribedChannels) {
publishOne("testchan1", "hello");
}

@Override
public void onMessage(byte[] channel, byte[] message) {
this.ping();
}

@Override
public void onPong(byte[] pattern) {
latchReceivedPong.countDown();
unsubscribe();
}

@Override
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
latchUnsubscribed.countDown();
}
}, SafeEncoder.encode("testchan1"));
assertEquals(0L, latchReceivedPong.getCount());
assertEquals(0L, latchUnsubscribed.getCount());
}

@Test
public void pubSubNumPat() {
jedis.psubscribe(new JedisPubSub() {
Expand Down Expand Up @@ -289,7 +322,8 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}

@Test
public void binaryPsubscribeMany() throws UnknownHostException, IOException, InterruptedException {
public void binaryPsubscribeMany()
throws UnknownHostException, IOException, InterruptedException {
jedis.psubscribe(new BinaryJedisPubSub() {
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
publishOne(SafeEncoder.encode(pattern).replace("*", "123"), "exit");
Expand All @@ -302,8 +336,8 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}

@Test
public void binarySubscribeLazily() throws UnknownHostException, IOException,
InterruptedException {
public void binarySubscribeLazily()
throws UnknownHostException, IOException, InterruptedException {
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
unsubscribe(channel);
Expand Down

0 comments on commit 573d840

Please sign in to comment.