diff --git a/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java b/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java index e7c8f9f1b4..1b7a6996b9 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java +++ b/src/main/java/redis/clients/jedis/BinaryJedisPubSub.java @@ -117,8 +117,12 @@ private void process() { onUnsubscribe(bchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.getRaw(), resp)) { final byte[] bchannel = (byte[]) reply.get(1); - final byte[] bmesg = (byte[]) reply.get(2); - onMessage(bchannel, bmesg); + final Object mesg = reply.get(2); + if (mesg instanceof List) { + ((List) mesg).forEach(bmesg -> onMessage(bchannel, bmesg)); + } else { + onMessage(bchannel, (byte[]) mesg); + } } else if (Arrays.equals(PMESSAGE.getRaw(), resp)) { final byte[] bpattern = (byte[]) reply.get(1); final byte[] bchannel = (byte[]) reply.get(2); diff --git a/src/main/java/redis/clients/jedis/JedisPubSub.java b/src/main/java/redis/clients/jedis/JedisPubSub.java index e34ea39c6f..d74a667909 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSub.java +++ b/src/main/java/redis/clients/jedis/JedisPubSub.java @@ -150,10 +150,13 @@ private void process() { onUnsubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.getRaw(), resp)) { final byte[] bchannel = (byte[]) reply.get(1); - final byte[] bmesg = (byte[]) reply.get(2); + final Object mesg = reply.get(2); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); - final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg); - onMessage(strchannel, strmesg); + if (mesg instanceof List) { + ((List) mesg).forEach(bmesg -> onMessage(strchannel, SafeEncoder.encode(bmesg))); + } else { + onMessage(strchannel, (mesg == null) ? null : SafeEncoder.encode((byte[]) mesg)); + } } else if (Arrays.equals(PMESSAGE.getRaw(), resp)) { final byte[] bpattern = (byte[]) reply.get(1); final byte[] bchannel = (byte[]) reply.get(2); diff --git a/src/test/java/redis/clients/jedis/commands/jedis/PublishSubscribeCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/PublishSubscribeCommandsTest.java index 6e4b2b22b3..0fc61d0099 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/PublishSubscribeCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/PublishSubscribeCommandsTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static redis.clients.jedis.Protocol.Command.CLIENT; import java.io.IOException; import java.net.UnknownHostException; @@ -15,6 +16,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.junit.Test; @@ -532,4 +534,69 @@ private String makeLargeString(int size) { return sb.toString(); } + + @Test(timeout = 5000) + public void subscribeCacheInvalidateChannel() { + final String cacheInvalidate = "__redis__:invalidate"; + final AtomicBoolean onMessage = new AtomicBoolean(false); + final JedisPubSub pubsub = new JedisPubSub() { + @Override public void onMessage(String channel, String message) { + onMessage.set(true); + assertEquals(cacheInvalidate, channel); + if (message != null) { + assertEquals("foo", message); + consumeJedis(j -> j.flushAll()); + } else { + unsubscribe(channel); + } + } + + @Override public void onSubscribe(String channel, int subscribedChannels) { + assertEquals(cacheInvalidate, channel); + consumeJedis(j -> j.set("foo", "bar")); + } + }; + + try (Jedis subscriber = createJedis()) { + long clientId = subscriber.clientId(); + subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST"); + subscriber.subscribe(pubsub, cacheInvalidate); + assertTrue("Subscriber didn't get any message.", onMessage.get()); + } + } + + @Test(timeout = 5000) + public void subscribeCacheInvalidateChannelBinary() { + final byte[] cacheInvalidate = "__redis__:invalidate".getBytes(); + final AtomicBoolean onMessage = new AtomicBoolean(false); + final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() { + @Override public void onMessage(byte[] channel, byte[] message) { + onMessage.set(true); + assertArrayEquals(cacheInvalidate, channel); + if (message != null) { + assertArrayEquals("foo".getBytes(), message); + consumeJedis(j -> j.flushAll()); + } else { + unsubscribe(channel); + } + } + + @Override public void onSubscribe(byte[] channel, int subscribedChannels) { + assertArrayEquals(cacheInvalidate, channel); + consumeJedis(j -> j.set("foo".getBytes(), "bar".getBytes())); + } + }; + + try (Jedis subscriber = createJedis()) { + long clientId = subscriber.clientId(); + subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST"); + subscriber.subscribe(pubsub, cacheInvalidate); + assertTrue("Subscriber didn't get any message.", onMessage.get()); + } + } + + private void consumeJedis(Consumer consumer) { + Thread t = new Thread(() -> consumer.accept(jedis)); + t.start(); + } }