diff --git a/src/networking.c b/src/networking.c index dda86e4d66..72dff65d26 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4018,6 +4018,8 @@ void clientCommand(client *c) { for (int i = 2; i < c->argc; i++) { if (!strcasecmp(c->argv[i]->ptr, "redirect")) { c->capa |= CLIENT_CAPA_REDIRECT; + } else if (!strcasecmp(c->argv[i]->ptr, "subv2")) { + c->capa |= CLIENT_CAPA_SUBV2; } } addReply(c, shared.ok); diff --git a/src/pubsub.c b/src/pubsub.c index 5b037b5721..250867ecd2 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -257,16 +257,14 @@ void unmarkClientAsPubSub(client *c) { /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ -int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { +void pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { dictEntry *de, *existing; dict *clients = NULL; - int retval = 0; unsigned int slot = 0; /* Add the channel to the client -> channels hash table */ void *position = dictFindPositionForInsert(type.clientPubSubChannels(c), channel, NULL); if (position) { /* Not yet subscribed to this channel */ - retval = 1; /* Add the client to the channel -> list of clients hash table */ if (server.cluster_enabled && type.shard) { slot = getKeySlot(channel->ptr); @@ -287,9 +285,11 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position)); incrRefCount(channel); } - /* Notify the client */ - addReplyPubsubSubscribed(c, channel, type); - return retval; + + if (!(c->capa & CLIENT_CAPA_SUBV2)) { + /* Notify the client */ + addReplyPubsubSubscribed(c, channel, type); + } } /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or @@ -538,9 +538,29 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) { * Pubsub commands implementation *----------------------------------------------------------------------------*/ +void addPubSubChannel(client *c, pubsubtype type) { + int j; + struct ClientFlags old_flags = c->flag; + c->flag.pushing = 1; + int number = (c->argc - 1) * 3; + + if (c->resp == 2) + addReply(c, shared.mbulkhdr[number]); + else + addReplyPushLen(c, number); + + for (j = 1; j < c->argc; j++) { + pubsubSubscribeChannel(c, c->argv[j], type); + addReply(c, *type.subscribeMsg); + addReplyBulk(c, c->argv[j]); + addReplyLongLong(c, type.subscriptionCount(c)); + } + if (!old_flags.pushing) c->flag.pushing = 0; +} + + /* SUBSCRIBE channel [channel ...] */ void subscribeCommand(client *c) { - int j; if (c->flag.deny_blocking && !c->flag.multi) { /** * A client that has CLIENT_DENY_BLOCKING flag on @@ -552,7 +572,13 @@ void subscribeCommand(client *c) { addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client"); return; } - for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType); + + if (c->capa & CLIENT_CAPA_SUBV2) { + addPubSubChannel(c, pubSubType); + } else { + for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType); + } + markClientAsPubSub(c); } @@ -719,8 +745,10 @@ void ssubscribeCommand(client *c) { return; } - for (int j = 1; j < c->argc; j++) { - pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); + if (c->capa & CLIENT_CAPA_SUBV2) { + addPubSubChannel(c, pubSubShardType); + } else { + for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); } markClientAsPubSub(c); } diff --git a/src/server.h b/src/server.h index 1b8f08833f..73a38fc1fa 100644 --- a/src/server.h +++ b/src/server.h @@ -343,6 +343,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client capabilities */ #define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */ +#define CLIENT_CAPA_SUBV2 (1 << 1) /* Indicate that the client can handle pubsub v2 version */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 2af3c4c352..09833b3f09 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -272,6 +272,7 @@ static struct config { char *test_hint_file; int prefer_ipv4; /* Prefer IPv4 over IPv6 on DNS lookup. */ int prefer_ipv6; /* Prefer IPv6 over IPv4 on DNS lookup. */ + int pubsub_version; } config; /* User preferences. */ @@ -2345,6 +2346,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.output = OUTPUT_RAW; } + if (!strcasecmp(command, "client") && argc >= 3 && !strcasecmp(argv[1], "capa")) { + for (int index = 2; index < argc; index++) { + if (!strcasecmp(argv[index], "subv2")) { + config.pubsub_version = 2; + break; + } + } + } + /* Setup argument length */ argvlen = zmalloc(argc * sizeof(size_t)); for (j = 0; j < argc; j++) argvlen[j] = sdslen(argv[j]); @@ -2375,7 +2385,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { * an in-band message is received, but these commands are confirmed * using push replies only. There is one push reply per channel if * channels are specified, otherwise at least one. */ - num_expected_pubsub_push = argc > 1 ? argc - 1 : 1; + if (config.pubsub_version == 2) { + num_expected_pubsub_push = 1; + } else { + num_expected_pubsub_push = argc > 1 ? argc - 1 : 1; + } /* Unset our default PUSH handler so this works in RESP2/RESP3 */ redisSetPushCallback(context, NULL); } @@ -9535,6 +9549,7 @@ int main(int argc, char **argv) { config.server_version = NULL; config.prefer_ipv4 = 0; config.prefer_ipv6 = 0; + config.pubsub_version = 1; config.cluster_manager_command.name = NULL; config.cluster_manager_command.argc = 0; config.cluster_manager_command.argv = NULL; diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index 6344215a25..d659fa4f95 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -194,8 +194,8 @@ start_server {tags {"cli"}} { assert_equal "bar" [r get key] } - test_interactive_cli "Subscribed mode" { - if {$::force_resp3} { + test_interactive_cli "Subscribed mode -- deprecated" { + if {$::force_resp3} { run_command $fd "hello 3" } @@ -209,6 +209,32 @@ start_server {tags {"cli"}} { assert_equal $sub1$sub2$sub3$reading \ [run_command $fd "subscribe ch1 ch2 ch3"] + # Unsubscribe all. + set unsub1 "1) \"unsubscribe\"\n2) \"ch1\"\n3) (integer) 2\n" + set unsub2 "1) \"unsubscribe\"\n2) \"ch2\"\n3) (integer) 1\n" + set unsub3 "1) \"unsubscribe\"\n2) \"ch3\"\n3) (integer) 0\n" + assert_equal $erase$unsub1$unsub2$unsub3$reading \ + [run_command $fd "unsubscribe ch1 ch2 ch3"] + + } + + test_interactive_cli "Subscribed mode" { + if {$::force_resp3} { + run_command $fd "hello 3" + } + + set reading "Reading messages... (press Ctrl-C to quit or any key to type command)\r" + set erase "\033\[K"; # Erases the "Reading messages..." line. + + run_command $fd "client capa subv2" + + # Subscribe to some channels. + set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n" + set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n" + set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n" + assert_equal $sub1$sub2$sub3$reading \ + [run_command $fd "subscribe ch1 ch2 ch3"] + # Receive pubsub message. r publish ch2 hello set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n" diff --git a/tests/unit/cluster/pubsubshard.tcl b/tests/unit/cluster/pubsubshard.tcl index d38c22dedb..1f68952aa4 100644 --- a/tests/unit/cluster/pubsubshard.tcl +++ b/tests/unit/cluster/pubsubshard.tcl @@ -61,6 +61,7 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]] assert_equal [list 1 2 3] $sub_res + sunsubscribe $subscribeclient # wait for the unsubscribe to take effect diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 68dc79a4a4..8925afe985 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -45,7 +45,7 @@ start_server {tags {"pubsub network"}} { set rd1 [valkey_deferring_client] # subscribe to two channels - assert_equal {1 2} [subscribe $rd1 {chan1 chan2}] + assert_equal {1 2} [subscribe $rd1 {chan1 chan2}] assert_equal 1 [r publish chan1 hello] assert_equal 1 [r publish chan2 world] assert_equal {message chan1 hello} [$rd1 read] @@ -83,7 +83,7 @@ start_server {tags {"pubsub network"}} { test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" { set rd1 [valkey_deferring_client] - assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] + assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] unsubscribe $rd1 # wait for the unsubscribe to take effect wait_for_condition 50 100 { @@ -101,7 +101,7 @@ start_server {tags {"pubsub network"}} { test "SUBSCRIBE to one channel more than once" { set rd1 [valkey_deferring_client] - assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}] + assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r publish chan1 hello] assert_equal {message chan1 hello} [$rd1 read] @@ -479,13 +479,10 @@ start_server {tags {"pubsub network"}} { r hello 3 # Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command, - # breaks the multi response, see Redis OSS issue: https://github.com/redis/redis/issues/12207 - # this is just a temporary sanity test to detect unintended breakage. - - # subscribe for 3 channels actually emits 3 "responses" - assert_equal "subscribe foo 1" [r subscribe foo bar baz] - assert_equal "subscribe bar 2" [r read] - assert_equal "subscribe baz 3" [r read] + # Only one response is returned + # This update matches with Redis response: one command always returns one response + r client capa subv2 + assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz] r multi r ping abc diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl index d62a415705..ee565f4177 100644 --- a/tests/unit/pubsubshard.tcl +++ b/tests/unit/pubsubshard.tcl @@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} { test "SSUBSCRIBE to one channel more than once" { set rd1 [valkey_deferring_client] - assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] + assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r SPUBLISH chan1 hello] assert_equal {smessage chan1 hello} [$rd1 read]