From 090b30922ee26d4a78bc2f35a1717840cbd29f74 Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Sun, 13 Oct 2024 14:49:31 +0530 Subject: [PATCH 1/6] Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands --- libs/resources/RespCommandsDocs.json | 48 +++++ libs/resources/RespCommandsInfo.json | 29 +++ libs/server/PubSub/SubscribeBroker.cs | 160 ++++++++++++++ libs/server/Resp/CmdStrings.cs | 4 + libs/server/Resp/Parser/RespCommand.cs | 34 +++ libs/server/Resp/PubSubCommands.cs | 77 +++++++ libs/server/Resp/RespServerSession.cs | 3 + .../CommandInfoUpdater/SupportedCommand.cs | 6 + test/Garnet.test/Resp/ACL/RespCommandTests.cs | 47 +++- test/Garnet.test/RespPubSubTests.cs | 202 +++++++++++++++++- website/docs/commands/analytics.md | 46 ++++ website/docs/commands/api-compatibility.md | 6 +- 12 files changed, 654 insertions(+), 8 deletions(-) diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json index 5d1e136100..2c1e860029 100644 --- a/libs/resources/RespCommandsDocs.json +++ b/libs/resources/RespCommandsDocs.json @@ -3775,6 +3775,54 @@ } ] }, + { + "Command": "PUBSUB", + "Name": "PUBSUB", + "Summary": "A container for Pub/Sub commands.", + "Group": "PubSub", + "Complexity": "Depends on subcommand.", + "SubCommands": [ + { + "Command": "PUBSUB_NUMPAT", + "Name": "PUBSUB|NUMPAT", + "Summary": "Returns a count of unique pattern subscriptions.", + "Group": "PubSub", + "Complexity": "O(1)" + }, + { + "Command": "PUBSUB_CHANNELS", + "Name": "PUBSUB|CHANNELS", + "Summary": "Returns the active channels.", + "Group": "PubSub", + "Complexity": "O(N) where N is the number of active channels, and assuming constant time pattern matching (relatively short channels and patterns)", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "PATTERN", + "DisplayText": "pattern", + "Type": "Pattern", + "ArgumentFlags": "Optional" + } + ] + }, + { + "Command": "PUBSUB_NUMSUB", + "Name": "PUBSUB|NUMSUB", + "Summary": "Returns a count of subscribers to channels.", + "Group": "PubSub", + "Complexity": "O(N) for the NUMSUB subcommand, where N is the number of requested channels", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "CHANNEL", + "DisplayText": "channel", + "Type": "String", + "ArgumentFlags": "Optional, Multiple" + } + ] + } + ] + }, { "Command": "PUNSUBSCRIBE", "Name": "PUNSUBSCRIBE", diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index fa010ddf37..ec1d001eb6 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -2747,6 +2747,35 @@ "Flags": "Fast, Loading, PubSub, Stale", "AclCategories": "Fast, PubSub" }, + { + "Command": "PUBSUB", + "Name": "PUBSUB", + "Arity": -2, + "AclCategories": "Slow", + "SubCommands": [ + { + "Command": "PUBSUB_CHANNELS", + "Name": "PUBSUB|CHANNELS", + "Arity": -2, + "Flags": "Loading, PubSub, Stale", + "AclCategories": "PubSub, Slow" + }, + { + "Command": "PUBSUB_NUMPAT", + "Name": "PUBSUB|NUMPAT", + "Arity": 2, + "Flags": "Loading, PubSub, Stale", + "AclCategories": "PubSub, Slow" + }, + { + "Command": "PUBSUB_NUMSUB", + "Name": "PUBSUB|NUMSUB", + "Arity": -2, + "Flags": "Loading, PubSub, Stale", + "AclCategories": "PubSub, Slow" + } + ] + }, { "Command": "PUNSUBSCRIBE", "Name": "PUNSUBSCRIBE", diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index a3985a5eb5..b94f2eb8d2 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -2,8 +2,10 @@ // Licensed under the MIT license. using System; +using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Garnet.common; @@ -408,6 +410,164 @@ public unsafe void Publish(byte* key, byte* value, int valueLength, bool ascii = log.Enqueue(logEntryBytes); } + /// + /// Retrieves the collection of channels that have active subscriptions. + /// + /// The collection of channels. + public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) + { + var isMemory = false; + MemoryHandle ptrHandle = default; + var ptr = output.SpanByte.ToPointer(); + + var curr = ptr; + var end = curr + output.Length; + + try + { + if (subscriptions is null || subscriptions.Keys.Count == 0) + { + while (!RespWriteUtils.WriteEmptyArray(ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + + if (input.parseState.Count == 0) + { + while (!RespWriteUtils.WriteArrayLength(subscriptions.Keys.Count, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + foreach (var key in subscriptions.Keys) + { + while (!RespWriteUtils.WriteSimpleString(key.AsSpan().Slice(sizeof(int)), ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + } + return; + } + + var totalArrayHeaderLen = 0; + while (!RespWriteUtils.WriteArrayLength(subscriptions.Keys.Count, ref curr, end, out var _, out totalArrayHeaderLen)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + var noOfFoundChannels = 0; + var pattern = input.parseState.GetArgSliceByRef(0).SpanByte; + var patternPtr = pattern.ToPointer() - sizeof(int); + *(int*)patternPtr = pattern.Length; + + foreach (var key in subscriptions.Keys) + { + fixed (byte* keyPtr = key) + { + var refKeyPtr = keyPtr; + var _patternPtr = patternPtr; + if (keySerializer.Match(ref keySerializer.ReadKeyByRef(ref refKeyPtr), true, ref keySerializer.ReadKeyByRef(ref _patternPtr), true)) + { + while (!RespWriteUtils.WriteSimpleString(key.AsSpan().Slice(sizeof(int)), ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + noOfFoundChannels++; + } + } + } + + if (noOfFoundChannels == 0) + { + curr = ptr; + while (!RespWriteUtils.WriteEmptyArray(ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + + var newTotalArrayHeaderLen = 0; + var _ptr = ptr; + RespWriteUtils.WriteArrayLength(noOfFoundChannels, ref _ptr, end, out var _, out newTotalArrayHeaderLen); // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length + + Debug.Assert(totalArrayHeaderLen >= newTotalArrayHeaderLen, "newTotalArrayHeaderLen can't be bigger than totalArrayHeaderLen as we have already written max array lenght in the buffer"); + if (totalArrayHeaderLen != newTotalArrayHeaderLen) + { + var remainingLength = (curr - ptr) - totalArrayHeaderLen; + Buffer.MemoryCopy(ptr + totalArrayHeaderLen, ptr + newTotalArrayHeaderLen, remainingLength, remainingLength); + curr = curr - (totalArrayHeaderLen - newTotalArrayHeaderLen); + } + } + finally + { + if (isMemory) + ptrHandle.Dispose(); + output.Length = (int)(curr - ptr); + } + } + + /// + /// Retrieves the number of pattern subscriptions. + /// + /// The number of pattern subscriptions. + public int NumPatternSubscriptions() + { + return prefixSubscriptions?.Count ?? 0; + } + + /// + /// PUBSUB NUMSUB + /// + /// + /// + public unsafe void NumSubscriptions(ref ObjectInput input, ref SpanByteAndMemory output) + { + var isMemory = false; + MemoryHandle ptrHandle = default; + var ptr = output.SpanByte.ToPointer(); + + var curr = ptr; + var end = curr + output.Length; + + try + { + var numOfChannels = input.parseState.Count; + if (subscriptions is null || numOfChannels == 0) + { + while (!RespWriteUtils.WriteEmptyArray(ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + + while (!RespWriteUtils.WriteArrayLength(numOfChannels * 2, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + var currChannelIdx = input.parseStateStartIdx; + while (currChannelIdx < numOfChannels) + { + var channelArg = input.parseState.GetArgSliceByRef(currChannelIdx); + var channelSpan = channelArg.SpanByte; + var channelPtr = channelSpan.ToPointer() - sizeof(int); // Memory would have been already pinned + *(int*)channelPtr = channelSpan.Length; + + while (!RespWriteUtils.WriteSimpleString(channelArg.ReadOnlySpan, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + var channel = new Span(channelPtr, channelSpan.Length + sizeof(int)).ToArray(); + + if (subscriptions.TryGetValue(channel, out var subscriptionDict)) + { + while (!RespWriteUtils.WriteInteger(subscriptionDict.Count, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + } + else + { + while (!RespWriteUtils.WriteInteger(0, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + } + + currChannelIdx++; + } + } + finally + { + if (isMemory) + ptrHandle.Dispose(); + output.Length = (int)(curr - ptr); + } + } + /// public void Dispose() { diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index da69aba555..29fe95fe48 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -99,6 +99,10 @@ static partial class CmdStrings public static ReadOnlySpan rank => "rank"u8; public static ReadOnlySpan MAXLEN => "MAXLEN"u8; public static ReadOnlySpan maxlen => "maxlen"u8; + public static ReadOnlySpan PUBSUB => "PUBSUB"u8; + public static ReadOnlySpan CHANNELS => "CHANNELS"u8; + public static ReadOnlySpan NUMPAT => "NUMPAT"u8; + public static ReadOnlySpan NUMSUB => "NUMSUB"u8; /// /// Response strings diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index 81f8ee8cbc..f0a9a03a11 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -169,6 +169,11 @@ public enum RespCommand : byte PING, + // Pub/Sub commands + PUBSUB, + PUBSUB_CHANNELS, + PUBSUB_NUMPAT, + PUBSUB_NUMSUB, PUBLISH, SUBSCRIBE, PSUBSCRIBE, @@ -1945,6 +1950,35 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan speci return RespCommand.MODULE_LOADCS; } } + else if (command.SequenceEqual(CmdStrings.PUBSUB)) + { + Span subCommand = GetCommand(out var gotSubCommand); + if (!gotSubCommand) + { + success = false; + return RespCommand.NONE; + } + + count--; + AsciiUtils.ToUpperInPlace(subCommand); + if (subCommand.SequenceEqual(CmdStrings.CHANNELS)) + { + return RespCommand.PUBSUB_CHANNELS; + } + else if (subCommand.SequenceEqual(CmdStrings.NUMSUB)) + { + return RespCommand.PUBSUB_NUMSUB; + } + else if (subCommand.SequenceEqual(CmdStrings.NUMPAT)) + { + return RespCommand.PUBSUB_NUMPAT; + } + else + { + success = false; + return RespCommand.NONE; + } + } else { // Custom commands should have never been set when we reach this point diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index 2bd16051f5..c6405c30ee 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -370,5 +370,82 @@ private bool NetworkPUNSUBSCRIBE() } return true; } + + private bool NetworkPUBSUB_CHANNELS() + { + if (parseState.Count > 1) + { + return AbortWithWrongNumberOfArguments(nameof(RespCommand.PUBSUB_CHANNELS)); + } + + if (subscribeBroker is null) + { + while (!RespWriteUtils.WriteError("ERR PUBSUB CHANNELS is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + SendAndReset(); + return true; + } + + var input = new ObjectInput + { + parseState = parseState, + parseStateStartIdx = 0 + }; + var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); + subscribeBroker.Channels(ref input, ref output); + + if (!output.IsSpanByte) + SendAndReset(output.Memory, output.Length); + else + dcurr += output.Length; + + return true; + } + + private bool NetworkPUBSUB_NUMPAT() + { + if (parseState.Count > 0) + { + return AbortWithWrongNumberOfArguments(nameof(RespCommand.PUBSUB_NUMPAT)); + } + + if (subscribeBroker is null) + { + while (!RespWriteUtils.WriteError("ERR PUBSUB NUMPAT is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + SendAndReset(); + return true; + } + + var numPatSubs = subscribeBroker.NumPatternSubscriptions(); + + while (!RespWriteUtils.WriteInteger(numPatSubs, ref dcurr, dend)) + SendAndReset(); + + return true; + } + + private bool NetworkPUBSUB_NUMSUB() + { + if (subscribeBroker is null) + { + while (!RespWriteUtils.WriteError("ERR PUBSUB NUMSUB is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + SendAndReset(); + return true; + } + + var input = new ObjectInput + { + parseState = parseState, + parseStateStartIdx = 0 + }; + var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); + subscribeBroker.NumSubscriptions(ref input, ref output); + + if (!output.IsSpanByte) + SendAndReset(output.Memory, output.Length); + else + dcurr += output.Length; + + return true; + } } } \ No newline at end of file diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index ac4df4174a..54fdcdbc11 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -577,6 +577,9 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(), RespCommand.UNSUBSCRIBE => NetworkUNSUBSCRIBE(), RespCommand.PUNSUBSCRIBE => NetworkPUNSUBSCRIBE(), + RespCommand.PUBSUB_CHANNELS => NetworkPUBSUB_CHANNELS(), + RespCommand.PUBSUB_NUMSUB => NetworkPUBSUB_NUMSUB(), + RespCommand.PUBSUB_NUMPAT => NetworkPUBSUB_NUMPAT(), // Custom Object Commands RespCommand.COSCAN => ObjectScan(GarnetObjectType.All, ref storageApi), // Sorted Set commands diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index 5346733d2b..46d7ef89af 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -197,6 +197,12 @@ public class SupportedCommand new("PSUBSCRIBE", RespCommand.PSUBSCRIBE), new("PTTL", RespCommand.PTTL), new("PUBLISH", RespCommand.PUBLISH), + new("PUBSUB", RespCommand.PUBSUB, + [ + new("PUBSUB|CHANNELS", RespCommand.PUBSUB_CHANNELS), + new("PUBSUB|NUMPAT", RespCommand.PUBSUB_NUMPAT), + new("PUBSUB|NUMSUB", RespCommand.PUBSUB_NUMSUB), + ]), new("PUNSUBSCRIBE", RespCommand.PUNSUBSCRIBE), new("REGISTERCS", RespCommand.REGISTERCS), new("QUIT", RespCommand.QUIT), diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index 4394f899f4..770081fad0 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -83,7 +83,7 @@ public void AllCommandsCovered() ClassicAssert.IsTrue(RespCommandsInfo.TryGetRespCommandNames(out IReadOnlySet advertisedCommands), "Couldn't get advertised RESP commands"); // TODO: See if these commands could be identified programmatically - IEnumerable withOnlySubCommands = ["ACL", "CLIENT", "CLUSTER", "CONFIG", "LATENCY", "MEMORY", "MODULE"]; + IEnumerable withOnlySubCommands = ["ACL", "CLIENT", "CLUSTER", "CONFIG", "LATENCY", "MEMORY", "MODULE", "PUBSUB"]; IEnumerable notCoveredByACLs = allInfo.Where(static x => x.Value.Flags.HasFlag(RespCommandFlags.NoAuth)).Select(static kv => kv.Key); // Check tests against RespCommandsInfo @@ -4398,6 +4398,51 @@ static async Task DoPublishAsync(GarnetClient client) } } + [Test] + public async Task PubSubChannelsACLsAsync() + { + await CheckCommandsAsync( + "PUBSUB CHANNELS", + [DoPubSubChannelsAsync] + ); + + static async Task DoPubSubChannelsAsync(GarnetClient client) + { + var count = await client.ExecuteForStringArrayResultAsync("PUBSUB", ["CHANNELS"]); + CollectionAssert.IsEmpty(count); + } + } + + [Test] + public async Task PubSubNumPatACLsAsync() + { + await CheckCommandsAsync( + "PUBSUB NUMPAT", + [DoPubSubNumPatAsync] + ); + + static async Task DoPubSubNumPatAsync(GarnetClient client) + { + var count = await client.ExecuteForLongResultAsync("PUBSUB", ["NUMPAT"]); + ClassicAssert.AreEqual(0, count); + } + } + + [Test] + public async Task PubSubNumSubACLsAsync() + { + await CheckCommandsAsync( + "PUBSUB NUMSUB", + [DoPubSubNumSubAsync] + ); + + static async Task DoPubSubNumSubAsync(GarnetClient client) + { + var count = await client.ExecuteForStringArrayResultAsync("PUBSUB", ["NUMSUB"]); + CollectionAssert.IsEmpty(count); + } + } + [Test] public async Task ReadOnlyACLsAsync() { diff --git a/test/Garnet.test/RespPubSubTests.cs b/test/Garnet.test/RespPubSubTests.cs index d54cabd527..63b953bcc4 100644 --- a/test/Garnet.test/RespPubSubTests.cs +++ b/test/Garnet.test/RespPubSubTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Linq; using System.Threading; using NUnit.Framework; using NUnit.Framework.Legacy; @@ -39,7 +40,7 @@ public void BasicSUBSCRIBE() ManualResetEvent evt = new(false); - sub.Subscribe(RedisChannel.Pattern("messages"), (channel, message) => + sub.Subscribe(RedisChannel.Literal("messages"), (channel, message) => { ClassicAssert.AreEqual("messages", (string)channel); ClassicAssert.AreEqual("published message", (string)message); @@ -57,7 +58,7 @@ public void BasicSUBSCRIBE() repeat--; ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); } - sub.Unsubscribe(RedisChannel.Pattern("messages")); + sub.Unsubscribe(RedisChannel.Literal("messages")); } [Test] @@ -68,8 +69,8 @@ public void BasicPSUBSCRIBE() var sub = subRedis.GetSubscriber(); var db = redis.GetDatabase(0); - string glob = "com.messages.*"; - string actual = "com.messages.testmessage"; + string glob = "messagesA*"; + string actual = "messagesAtest"; string value = "published message"; var channel = new RedisChannel(glob, RedisChannel.PatternMode.Pattern); @@ -97,5 +98,198 @@ public void BasicPSUBSCRIBE() } sub.Unsubscribe(channel); } + + [Test] + public void BasicPUBSUB_CHANNELS() + { + using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var sub = subRedis.GetSubscriber(); + var db = redis.GetDatabase(0); + var server = redis.GetServers()[0]; + + ManualResetEvent evt = new(false); + var isMessagesASubed = false; + var isMessagesBSubed = false; + var channelA = "messagesAtest"; + var channelB = "messagesB"; + + sub.Subscribe(RedisChannel.Literal(channelA), (channel, message) => + { + isMessagesASubed = true; + if (isMessagesBSubed) + evt.Set(); + }); + + sub.Subscribe(RedisChannel.Literal(channelB), (channel, message) => + { + isMessagesBSubed = true; + if (isMessagesASubed) + evt.Set(); + }); + + // Doing publish to make sure the channel is subscribed + // Repeat to work-around bug in StackExchange.Redis subscribe behavior + // where it returns before the SUBSCRIBE call is processed. + int repeat = 5; + while (true) + { + if (!isMessagesASubed) + db.Publish(RedisChannel.Pattern(channelA), "published message"); + if (!isMessagesBSubed) + db.Publish(RedisChannel.Pattern(channelB), "published message"); + var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); + if (ret) break; + repeat--; + ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); + } + + var result = server.SubscriptionChannels(); + string[] expectedResult = [channelA, channelB]; + CollectionAssert.IsSubsetOf(expectedResult, result.Select(x => x.ToString())); + + result = server.SubscriptionChannels(RedisChannel.Pattern("messages*")); + expectedResult = [channelA, channelB]; + CollectionAssert.AreEquivalent(expectedResult, result.Select(x => x.ToString())); + + result = server.SubscriptionChannels(RedisChannel.Pattern("messages?test")); + expectedResult = [channelA]; + CollectionAssert.AreEquivalent(expectedResult, result.Select(x => x.ToString())); + + result = server.SubscriptionChannels(RedisChannel.Pattern("messagesC*")); + ClassicAssert.AreEqual(0, result.Length); + + sub.Unsubscribe(RedisChannel.Literal(channelA)); + sub.Unsubscribe(RedisChannel.Literal(channelB)); + } + + [Test] + public void BasicPUBSUB_NUMPAT() + { + using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var sub = subRedis.GetSubscriber(); + var db = redis.GetDatabase(0); + var server = redis.GetServers()[0]; + + string glob = "com.messages.*"; + string globB = "com.messagesB.*"; + string actual = "com.messages.testmessage"; + string actualB = "com.messagesB.testmessage"; + string value = "published message"; + + var channel = new RedisChannel(glob, RedisChannel.PatternMode.Pattern); + var channelB = new RedisChannel(globB, RedisChannel.PatternMode.Pattern); + + var result = server.SubscriptionPatternCount(); + ClassicAssert.AreEqual(0, result); + + var isMessagesASubed = false; + var isMessagesBSubed = false; + + ManualResetEvent evt = new(false); + + sub.Subscribe(channel, (receivedChannel, message) => + { + isMessagesASubed = true; + if (isMessagesASubed && isMessagesBSubed) + evt.Set(); + }); + + sub.Subscribe(channelB, (receivedChannel, message) => + { + isMessagesBSubed = true; + if (isMessagesASubed && isMessagesBSubed) + evt.Set(); + }); + + // Repeat to work-around bug in StackExchange.Redis subscribe behavior + // where it returns before the SUBSCRIBE call is processed. + int repeat = 5; + while (true) + { + if (!isMessagesASubed) + db.Publish(RedisChannel.Literal(actual), value); + if (!isMessagesBSubed) + db.Publish(RedisChannel.Literal(actualB), value); + var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); + if (ret) break; + repeat--; + ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); + } + + result = server.SubscriptionPatternCount(); + ClassicAssert.AreEqual(2, result); + + sub.Unsubscribe(channel); + sub.Unsubscribe(channelB); + } + + [Test] + public void BasicPUBSUB_NUMSUB() + { + using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var sub = subRedis.GetSubscriber(); + var db = redis.GetDatabase(0); + var server = redis.GetServers()[0]; + + ManualResetEvent evt = new(false); + var isMessagesASubed = false; + var isMessagesBSubed = false; + + var multiChannelResult = server.Execute("PUBSUB", ["NUMSUB"]); + ClassicAssert.AreEqual(0, multiChannelResult.Length); + + multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]); + ClassicAssert.AreEqual(4, multiChannelResult.Length); + ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString()); + ClassicAssert.AreEqual("0", multiChannelResult[1].ToString()); + ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString()); + ClassicAssert.AreEqual("0", multiChannelResult[3].ToString()); + + sub.Subscribe(RedisChannel.Literal("messagesA"), (channel, message) => + { + isMessagesASubed = true; + if (isMessagesASubed && isMessagesBSubed) + evt.Set(); + }); + + sub.Subscribe(RedisChannel.Literal("messagesB"), (channel, message) => + { + isMessagesBSubed = true; + if (isMessagesASubed && isMessagesBSubed) + evt.Set(); + }); + + // Doing publish to make sure the channel is subscribed + // Repeat to work-around bug in StackExchange.Redis subscribe behavior + // where it returns before the SUBSCRIBE call is processed. + int repeat = 5; + while (true) + { + if(!isMessagesASubed) + db.Publish(RedisChannel.Pattern("messagesA"), "published message"); + if (!isMessagesBSubed) + db.Publish(RedisChannel.Pattern("messagesB"), "published message"); + var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); + if (ret) break; + repeat--; + ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); + } + + var result = server.SubscriptionSubscriberCount(RedisChannel.Literal("messagesA")); + ClassicAssert.AreEqual(1, result); + + multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]); + ClassicAssert.AreEqual(4, multiChannelResult.Length); + ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString()); + ClassicAssert.AreEqual("1", multiChannelResult[1].ToString()); + ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString()); + ClassicAssert.AreEqual("1", multiChannelResult[3].ToString()); + + sub.Unsubscribe(RedisChannel.Literal("messagesA")); + sub.Unsubscribe(RedisChannel.Literal("messagesB")); + } } } \ No newline at end of file diff --git a/website/docs/commands/analytics.md b/website/docs/commands/analytics.md index 7ab10a7c4d..69a1c43851 100644 --- a/website/docs/commands/analytics.md +++ b/website/docs/commands/analytics.md @@ -218,6 +218,52 @@ Posts a message to the given channel. Integer Reply: the number of clients that received the message. --- + +### PUBSUB CHANNELS +#### Syntax + +```bash +PUBSUB CHANNELS [pattern] +``` + +Lists the currently active channels. An active channel is a Pub/Sub channel with one or more subscribers (excluding clients subscribed to patterns). + +#### Resp Reply + +Array reply: a list of active channels, optionally matching the specified pattern. + +--- + +### PUBSUB NUMPAT +#### Syntax + +```bash +PUBSUB NUMPAT +``` + +Returns the number of unique patterns that are subscribed to by clients (that are performed using the PSUBSCRIBE command). + +#### Resp Reply + +Integer reply: the number of patterns all the clients are subscribed to. + +--- + +### PUBSUB NUMSUB +#### Syntax + +```bash +PUBSUB NUMSUB [channel [channel ...]] +``` + +Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels. + +#### Resp Reply + +Array reply: the number of subscribers per channel, each even element (including the 0th) is channel name, each odd element is the number of subscribers + +--- + ### PUNSUBSCRIBE #### Syntax diff --git a/website/docs/commands/api-compatibility.md b/website/docs/commands/api-compatibility.md index cf18f38a5c..04aa8a9569 100644 --- a/website/docs/commands/api-compatibility.md +++ b/website/docs/commands/api-compatibility.md @@ -248,10 +248,10 @@ Note that this list is subject to change as we continue to expand our API comman | | REFCOUNT | ➖ | | | **PUB/SUB** | [PSUBSCRIBE](analytics.md#psubscribe) | ➕ | | | | [PUBLISH](analytics.md#publish) | ➕ | | -| | PUBSUB CHANNELS | ➖ | | +| | [PUBSUB CHANNELS](analytics.md#pubsub-channels) | ➖ | | | | PUBSUB HELP | ➖ | | -| | PUBSUB NUMPAT | ➖ | | -| | PUBSUB NUMSUB | ➖ | | +| | [PUBSUB NUMPAT](analytics.md#pubsub-numpat) | ➖ | | +| | [PUBSUB NUMSUB](analytics.md#pubsub-numsub) | ➖ | | | | PUBSUB SHARDCHANNELS | ➖ | | | | PUBSUB SHARDNUMSUB | ➖ | | | | [PUNSUBSCRIBE](analytics.md#punsubscribe) | ➕ | | From 91404c266401da428b4acf118b139ee1e2937664 Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Sun, 13 Oct 2024 15:35:11 +0530 Subject: [PATCH 2/6] Code style fix --- test/Garnet.test/RespPubSubTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Garnet.test/RespPubSubTests.cs b/test/Garnet.test/RespPubSubTests.cs index 63b953bcc4..5a1e4caa62 100644 --- a/test/Garnet.test/RespPubSubTests.cs +++ b/test/Garnet.test/RespPubSubTests.cs @@ -235,7 +235,7 @@ public void BasicPUBSUB_NUMSUB() var server = redis.GetServers()[0]; ManualResetEvent evt = new(false); - var isMessagesASubed = false; + var isMessagesASubed = false; var isMessagesBSubed = false; var multiChannelResult = server.Execute("PUBSUB", ["NUMSUB"]); @@ -268,7 +268,7 @@ public void BasicPUBSUB_NUMSUB() int repeat = 5; while (true) { - if(!isMessagesASubed) + if (!isMessagesASubed) db.Publish(RedisChannel.Pattern("messagesA"), "published message"); if (!isMessagesBSubed) db.Publish(RedisChannel.Pattern("messagesB"), "published message"); From 0a00a6afe3a3b364c60cf51e57c93278aafa1844 Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Wed, 16 Oct 2024 12:12:45 +0530 Subject: [PATCH 3/6] Review comment fix --- libs/server/PubSub/SubscribeBroker.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index b94f2eb8d2..ce5c6040c5 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -425,7 +425,7 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) try { - if (subscriptions is null || subscriptions.Keys.Count == 0) + if (subscriptions is null || subscriptions.Count == 0) { while (!RespWriteUtils.WriteEmptyArray(ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); @@ -434,7 +434,7 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) if (input.parseState.Count == 0) { - while (!RespWriteUtils.WriteArrayLength(subscriptions.Keys.Count, ref curr, end)) + while (!RespWriteUtils.WriteArrayLength(subscriptions.Count, ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); foreach (var key in subscriptions.Keys) @@ -446,7 +446,7 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) } var totalArrayHeaderLen = 0; - while (!RespWriteUtils.WriteArrayLength(subscriptions.Keys.Count, ref curr, end, out var _, out totalArrayHeaderLen)) + while (!RespWriteUtils.WriteArrayLength(subscriptions.Count, ref curr, end, out var _, out totalArrayHeaderLen)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); var noOfFoundChannels = 0; From 6ac09c74078b92e13f64cc6e294fb6896ce1051a Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Thu, 24 Oct 2024 13:52:11 +0530 Subject: [PATCH 4/6] Fixed build issue --- libs/server/PubSub/SubscribeBroker.cs | 2 +- libs/server/Resp/PubSubCommands.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index ce5c6040c5..338cc1adaa 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -533,7 +533,7 @@ public unsafe void NumSubscriptions(ref ObjectInput input, ref SpanByteAndMemory while (!RespWriteUtils.WriteArrayLength(numOfChannels * 2, ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); - var currChannelIdx = input.parseStateStartIdx; + var currChannelIdx = input.parseStateFirstArgIdx; while (currChannelIdx < numOfChannels) { var channelArg = input.parseState.GetArgSliceByRef(currChannelIdx); diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index c6405c30ee..da73f2f03f 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -385,10 +385,10 @@ private bool NetworkPUBSUB_CHANNELS() return true; } - var input = new ObjectInput + var input = new ObjectInput() { parseState = parseState, - parseStateStartIdx = 0 + parseStateFirstArgIdx = 0 }; var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); subscribeBroker.Channels(ref input, ref output); @@ -435,7 +435,7 @@ private bool NetworkPUBSUB_NUMSUB() var input = new ObjectInput { parseState = parseState, - parseStateStartIdx = 0 + parseStateFirstArgIdx = 0 }; var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); subscribeBroker.NumSubscriptions(ref input, ref output); From c510e65c8ce149e8dff7f40224da3021b0adffea Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Fri, 25 Oct 2024 14:57:07 +0530 Subject: [PATCH 5/6] Review command fix --- libs/server/PubSub/SubscribeBroker.cs | 21 +++++++-------------- libs/server/Resp/CmdStrings.cs | 1 + libs/server/Resp/PubSubCommands.cs | 6 +++--- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index 338cc1adaa..dbe12d2ca7 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -439,7 +439,7 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) foreach (var key in subscriptions.Keys) { - while (!RespWriteUtils.WriteSimpleString(key.AsSpan().Slice(sizeof(int)), ref curr, end)) + while (!RespWriteUtils.WriteBulkString(key.AsSpan().Slice(sizeof(int)), ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); } return; @@ -458,9 +458,9 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) { fixed (byte* keyPtr = key) { - var refKeyPtr = keyPtr; + var endKeyPtr = keyPtr; var _patternPtr = patternPtr; - if (keySerializer.Match(ref keySerializer.ReadKeyByRef(ref refKeyPtr), true, ref keySerializer.ReadKeyByRef(ref _patternPtr), true)) + if (keySerializer.Match(ref keySerializer.ReadKeyByRef(ref endKeyPtr), true, ref keySerializer.ReadKeyByRef(ref _patternPtr), true)) { while (!RespWriteUtils.WriteSimpleString(key.AsSpan().Slice(sizeof(int)), ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); @@ -541,21 +541,14 @@ public unsafe void NumSubscriptions(ref ObjectInput input, ref SpanByteAndMemory var channelPtr = channelSpan.ToPointer() - sizeof(int); // Memory would have been already pinned *(int*)channelPtr = channelSpan.Length; - while (!RespWriteUtils.WriteSimpleString(channelArg.ReadOnlySpan, ref curr, end)) + while (!RespWriteUtils.WriteBulkString(channelArg.ReadOnlySpan, ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); var channel = new Span(channelPtr, channelSpan.Length + sizeof(int)).ToArray(); - if (subscriptions.TryGetValue(channel, out var subscriptionDict)) - { - while (!RespWriteUtils.WriteInteger(subscriptionDict.Count, ref curr, end)) - ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); - } - else - { - while (!RespWriteUtils.WriteInteger(0, ref curr, end)) - ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); - } + subscriptions.TryGetValue(channel, out var subscriptionDict); + while (!RespWriteUtils.WriteInteger(subscriptionDict is null ? 0 : subscriptionDict.Count, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); currChannelIdx++; } diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index baf4cb38f5..4dc7121f11 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -211,6 +211,7 @@ static partial class CmdStrings public const string GenericParamShouldBeGreaterThanZero = "ERR {0} should be greater than 0"; public const string GenericUnknownClientType = "ERR Unknown client type '{0}'"; public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times"; + public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option."; /// /// Response errors while scripting diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index da73f2f03f..5af312f28a 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -380,7 +380,7 @@ private bool NetworkPUBSUB_CHANNELS() if (subscribeBroker is null) { - while (!RespWriteUtils.WriteError("ERR PUBSUB CHANNELS is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB CHANNELS"), ref dcurr, dend)) SendAndReset(); return true; } @@ -410,7 +410,7 @@ private bool NetworkPUBSUB_NUMPAT() if (subscribeBroker is null) { - while (!RespWriteUtils.WriteError("ERR PUBSUB NUMPAT is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB NUMPAT"), ref dcurr, dend)) SendAndReset(); return true; } @@ -427,7 +427,7 @@ private bool NetworkPUBSUB_NUMSUB() { if (subscribeBroker is null) { - while (!RespWriteUtils.WriteError("ERR PUBSUB NUMSUB is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + while (!RespWriteUtils.WriteError(string.Format(CmdStrings.GenericPubSubCommandDisabled, "PUBSUB NUMSUB"), ref dcurr, dend)) SendAndReset(); return true; } From 0e42496335438119fc7e33354fdfb7abee49ae0a Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Sat, 26 Oct 2024 14:01:31 +0530 Subject: [PATCH 6/6] Review command fix --- libs/server/PubSub/SubscribeBroker.cs | 9 +- libs/server/Resp/PubSubCommands.cs | 6 +- test/Garnet.test/RespPubSubTests.cs | 157 ++++++-------------------- 3 files changed, 44 insertions(+), 128 deletions(-) diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index dbe12d2ca7..53d073b922 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -445,6 +445,8 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) return; } + // Below WriteArrayLength is primarily to move the start of the buffer to the max length that is required to write the array length. The actual length is written in the below line. + // This is done to avoid multiple two passes over the subscriptions or new array allocation if we use single pass over the subscriptions var totalArrayHeaderLen = 0; while (!RespWriteUtils.WriteArrayLength(subscriptions.Count, ref curr, end, out var _, out totalArrayHeaderLen)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); @@ -477,9 +479,12 @@ public unsafe void Channels(ref ObjectInput input, ref SpanByteAndMemory output) return; } + // Below code is to write the actual array length in the buffer + // And move the array elements to the start of the new array length if new array length is less than the max array length that we orginally write in the above line var newTotalArrayHeaderLen = 0; var _ptr = ptr; - RespWriteUtils.WriteArrayLength(noOfFoundChannels, ref _ptr, end, out var _, out newTotalArrayHeaderLen); // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length + // ReallocateOutput is not needed here as there should be always be available space in the output buffer as we have already written the max array length + _ = RespWriteUtils.WriteArrayLength(noOfFoundChannels, ref _ptr, end, out var _, out newTotalArrayHeaderLen); Debug.Assert(totalArrayHeaderLen >= newTotalArrayHeaderLen, "newTotalArrayHeaderLen can't be bigger than totalArrayHeaderLen as we have already written max array lenght in the buffer"); if (totalArrayHeaderLen != newTotalArrayHeaderLen) @@ -533,7 +538,7 @@ public unsafe void NumSubscriptions(ref ObjectInput input, ref SpanByteAndMemory while (!RespWriteUtils.WriteArrayLength(numOfChannels * 2, ref curr, end)) ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); - var currChannelIdx = input.parseStateFirstArgIdx; + var currChannelIdx = 0; while (currChannelIdx < numOfChannels) { var channelArg = input.parseState.GetArgSliceByRef(currChannelIdx); diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index 5af312f28a..0c1c666d12 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -387,8 +387,7 @@ private bool NetworkPUBSUB_CHANNELS() var input = new ObjectInput() { - parseState = parseState, - parseStateFirstArgIdx = 0 + parseState = parseState }; var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); subscribeBroker.Channels(ref input, ref output); @@ -434,8 +433,7 @@ private bool NetworkPUBSUB_NUMSUB() var input = new ObjectInput { - parseState = parseState, - parseStateFirstArgIdx = 0 + parseState = parseState }; var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); subscribeBroker.NumSubscriptions(ref input, ref output); diff --git a/test/Garnet.test/RespPubSubTests.cs b/test/Garnet.test/RespPubSubTests.cs index 5a1e4caa62..43d851a72a 100644 --- a/test/Garnet.test/RespPubSubTests.cs +++ b/test/Garnet.test/RespPubSubTests.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Channels; using NUnit.Framework; using NUnit.Framework.Legacy; using StackExchange.Redis; @@ -37,27 +38,17 @@ public void BasicSUBSCRIBE() using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); var sub = subRedis.GetSubscriber(); var db = redis.GetDatabase(0); + string value = "published message"; ManualResetEvent evt = new(false); - sub.Subscribe(RedisChannel.Literal("messages"), (channel, message) => + SubscribeAndPublish(sub, db, RedisChannel.Literal("messages"), RedisChannel.Literal("messages"), value, onSubscribe: (channel, message) => { ClassicAssert.AreEqual("messages", (string)channel); - ClassicAssert.AreEqual("published message", (string)message); + ClassicAssert.AreEqual(value, (string)message); evt.Set(); }); - // Repeat to work-around bug in StackExchange.Redis subscribe behavior - // where it returns before the SUBSCRIBE call is processed. - int repeat = 5; - while (true) - { - db.Publish(RedisChannel.Pattern("messages"), "published message"); - var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); - if (ret) break; - repeat--; - ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); - } sub.Unsubscribe(RedisChannel.Literal("messages")); } @@ -77,7 +68,7 @@ public void BasicPSUBSCRIBE() ManualResetEvent evt = new(false); - sub.Subscribe(channel, (receivedChannel, message) => + SubscribeAndPublish(sub, db, channel, RedisChannel.Pattern(actual), value, (receivedChannel, message) => { ClassicAssert.AreEqual(glob, (string)channel); ClassicAssert.AreEqual(actual, (string)receivedChannel); @@ -85,17 +76,6 @@ public void BasicPSUBSCRIBE() evt.Set(); }); - // Repeat to work-around bug in StackExchange.Redis subscribe behavior - // where it returns before the SUBSCRIBE call is processed. - int repeat = 5; - while (true) - { - db.Publish(RedisChannel.Pattern(actual), value); - var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); - if (ret) break; - repeat--; - ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); - } sub.Unsubscribe(channel); } @@ -108,41 +88,11 @@ public void BasicPUBSUB_CHANNELS() var db = redis.GetDatabase(0); var server = redis.GetServers()[0]; - ManualResetEvent evt = new(false); - var isMessagesASubed = false; - var isMessagesBSubed = false; var channelA = "messagesAtest"; var channelB = "messagesB"; - sub.Subscribe(RedisChannel.Literal(channelA), (channel, message) => - { - isMessagesASubed = true; - if (isMessagesBSubed) - evt.Set(); - }); - - sub.Subscribe(RedisChannel.Literal(channelB), (channel, message) => - { - isMessagesBSubed = true; - if (isMessagesASubed) - evt.Set(); - }); - - // Doing publish to make sure the channel is subscribed - // Repeat to work-around bug in StackExchange.Redis subscribe behavior - // where it returns before the SUBSCRIBE call is processed. - int repeat = 5; - while (true) - { - if (!isMessagesASubed) - db.Publish(RedisChannel.Pattern(channelA), "published message"); - if (!isMessagesBSubed) - db.Publish(RedisChannel.Pattern(channelB), "published message"); - var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); - if (ret) break; - repeat--; - ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); - } + SubscribeAndPublish(sub, db, RedisChannel.Literal(channelA), RedisChannel.Pattern(channelA)); + SubscribeAndPublish(sub, db, RedisChannel.Literal(channelB), RedisChannel.Pattern(channelB)); var result = server.SubscriptionChannels(); string[] expectedResult = [channelA, channelB]; @@ -184,39 +134,8 @@ public void BasicPUBSUB_NUMPAT() var result = server.SubscriptionPatternCount(); ClassicAssert.AreEqual(0, result); - var isMessagesASubed = false; - var isMessagesBSubed = false; - - ManualResetEvent evt = new(false); - - sub.Subscribe(channel, (receivedChannel, message) => - { - isMessagesASubed = true; - if (isMessagesASubed && isMessagesBSubed) - evt.Set(); - }); - - sub.Subscribe(channelB, (receivedChannel, message) => - { - isMessagesBSubed = true; - if (isMessagesASubed && isMessagesBSubed) - evt.Set(); - }); - - // Repeat to work-around bug in StackExchange.Redis subscribe behavior - // where it returns before the SUBSCRIBE call is processed. - int repeat = 5; - while (true) - { - if (!isMessagesASubed) - db.Publish(RedisChannel.Literal(actual), value); - if (!isMessagesBSubed) - db.Publish(RedisChannel.Literal(actualB), value); - var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); - if (ret) break; - repeat--; - ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); - } + SubscribeAndPublish(sub, db, channel, RedisChannel.Literal(actual), value); + SubscribeAndPublish(sub, db, channelB, RedisChannel.Literal(actualB), value); result = server.SubscriptionPatternCount(); ClassicAssert.AreEqual(2, result); @@ -234,10 +153,6 @@ public void BasicPUBSUB_NUMSUB() var db = redis.GetDatabase(0); var server = redis.GetServers()[0]; - ManualResetEvent evt = new(false); - var isMessagesASubed = false; - var isMessagesBSubed = false; - var multiChannelResult = server.Execute("PUBSUB", ["NUMSUB"]); ClassicAssert.AreEqual(0, multiChannelResult.Length); @@ -248,18 +163,32 @@ public void BasicPUBSUB_NUMSUB() ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString()); ClassicAssert.AreEqual("0", multiChannelResult[3].ToString()); - sub.Subscribe(RedisChannel.Literal("messagesA"), (channel, message) => - { - isMessagesASubed = true; - if (isMessagesASubed && isMessagesBSubed) - evt.Set(); - }); + SubscribeAndPublish(sub, db, RedisChannel.Literal("messagesA")); + SubscribeAndPublish(sub, db, RedisChannel.Literal("messagesB")); + + var result = server.SubscriptionSubscriberCount(RedisChannel.Literal("messagesA")); + ClassicAssert.AreEqual(1, result); + + multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]); + ClassicAssert.AreEqual(4, multiChannelResult.Length); + ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString()); + ClassicAssert.AreEqual("1", multiChannelResult[1].ToString()); + ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString()); + ClassicAssert.AreEqual("1", multiChannelResult[3].ToString()); + + sub.Unsubscribe(RedisChannel.Literal("messagesA")); + sub.Unsubscribe(RedisChannel.Literal("messagesB")); + } - sub.Subscribe(RedisChannel.Literal("messagesB"), (channel, message) => + private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, string message = null, Action onSubscribe = null) + { + message ??= "published message"; + publishChannel ??= channel; + ManualResetEvent evt = new(false); + sub.Subscribe(channel, (receivedChannel, receivedMessage) => { - isMessagesBSubed = true; - if (isMessagesASubed && isMessagesBSubed) - evt.Set(); + onSubscribe?.Invoke(receivedChannel, receivedMessage); + evt.Set(); }); // Doing publish to make sure the channel is subscribed @@ -268,28 +197,12 @@ public void BasicPUBSUB_NUMSUB() int repeat = 5; while (true) { - if (!isMessagesASubed) - db.Publish(RedisChannel.Pattern("messagesA"), "published message"); - if (!isMessagesBSubed) - db.Publish(RedisChannel.Pattern("messagesB"), "published message"); + db.Publish(publishChannel.Value, message); var ret = evt.WaitOne(TimeSpan.FromSeconds(1)); if (ret) break; repeat--; - ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subsciption receive"); + ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subscription receive"); } - - var result = server.SubscriptionSubscriberCount(RedisChannel.Literal("messagesA")); - ClassicAssert.AreEqual(1, result); - - multiChannelResult = server.Execute("PUBSUB", ["NUMSUB", "messagesA", "messagesB"]); - ClassicAssert.AreEqual(4, multiChannelResult.Length); - ClassicAssert.AreEqual("messagesA", multiChannelResult[0].ToString()); - ClassicAssert.AreEqual("1", multiChannelResult[1].ToString()); - ClassicAssert.AreEqual("messagesB", multiChannelResult[2].ToString()); - ClassicAssert.AreEqual("1", multiChannelResult[3].ToString()); - - sub.Unsubscribe(RedisChannel.Literal("messagesA")); - sub.Unsubscribe(RedisChannel.Literal("messagesB")); } } } \ No newline at end of file