Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command "Client Capa subv2" to change behavior for SUBSCRIBE and SSUBSCRIBE commands: one call with only one response #962

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4018,6 +4018,8 @@ void clientCommand(client *c) {
for (int i = 2; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
c->capa |= CLIENT_CAPA_REDIRECT;
} else if (!strcasecmp(c->argv[i]->ptr, "subv2")) {
c->capa |= CLIENT_CAPA_SUBV2;
}
}
addReply(c, shared.ok);
Expand Down
48 changes: 38 additions & 10 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,14 @@ void unmarkClientAsPubSub(client *c) {

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
void pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dictEntry *de, *existing;
dict *clients = NULL;
int retval = 0;
unsigned int slot = 0;

/* Add the channel to the client -> channels hash table */
void *position = dictFindPositionForInsert(type.clientPubSubChannels(c), channel, NULL);
if (position) { /* Not yet subscribed to this channel */
retval = 1;
/* Add the client to the channel -> list of clients hash table */
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr);
Expand All @@ -287,9 +285,11 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position));
incrRefCount(channel);
}
/* Notify the client */
addReplyPubsubSubscribed(c, channel, type);
return retval;

if (!(c->capa & CLIENT_CAPA_SUBV2)) {
/* Notify the client */
addReplyPubsubSubscribed(c, channel, type);
}
}

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
Expand Down Expand Up @@ -538,9 +538,29 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
* Pubsub commands implementation
*----------------------------------------------------------------------------*/

void addPubSubChannel(client *c, pubsubtype type) {
int j;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
int number = (c->argc - 1) * 3;

if (c->resp == 2)
addReply(c, shared.mbulkhdr[number]);
else
addReplyPushLen(c, number);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with RESP3, this is still no normal in-band replies and one push message (instead of one per channel). This is still the main problem of [UN][P|S]SUBSCRIBE IMO.

To fix this behaviour, SUBSCRIBE should return an in-band reply. Either the list of channels as an array or map-reply, or just +OK and send the channels as push.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want to import this change in Valkey 8.0. I wish we could do this change on Valkey 8.X or 9 version. Error need to be correct, never late ^_^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot fix it ever. Only maybe with a CLIENT CAPA or RESP4.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with this being a potentially significant breaking change. I think maybe CLIENT CAPA could fix it (or RESP4).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update the design, add a new parameter for client capa command, but the parameter name is just temporarily. If you have better name, please let me know or we can discuss it in the meeting


for (j = 1; j < c->argc; j++) {
pubsubSubscribeChannel(c, c->argv[j], type);
addReply(c, *type.subscribeMsg);
addReplyBulk(c, c->argv[j]);
addReplyLongLong(c, type.subscriptionCount(c));
}
if (!old_flags.pushing) c->flag.pushing = 0;
}


/* SUBSCRIBE channel [channel ...] */
void subscribeCommand(client *c) {
int j;
if (c->flag.deny_blocking && !c->flag.multi) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
Expand All @@ -552,7 +572,13 @@ void subscribeCommand(client *c) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType);

if (c->capa & CLIENT_CAPA_SUBV2) {
addPubSubChannel(c, pubSubType);
} else {
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType);
}

markClientAsPubSub(c);
}

Expand Down Expand Up @@ -719,8 +745,10 @@ void ssubscribeCommand(client *c) {
return;
}

for (int j = 1; j < c->argc; j++) {
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
if (c->capa & CLIENT_CAPA_SUBV2) {
addPubSubChannel(c, pubSubShardType);
} else {
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
}
markClientAsPubSub(c);
}
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];

/* Client capabilities */
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
#define CLIENT_CAPA_SUBV2 (1 << 1) /* Indicate that the client can handle pubsub v2 version */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
17 changes: 16 additions & 1 deletion src/valkey-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ static struct config {
char *test_hint_file;
int prefer_ipv4; /* Prefer IPv4 over IPv6 on DNS lookup. */
int prefer_ipv6; /* Prefer IPv6 over IPv4 on DNS lookup. */
int pubsub_version;
} config;

/* User preferences. */
Expand Down Expand Up @@ -2345,6 +2346,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.output = OUTPUT_RAW;
}

if (!strcasecmp(command, "client") && argc >= 3 && !strcasecmp(argv[1], "capa")) {
for (int index = 2; index < argc; index++) {
if (!strcasecmp(argv[index], "subv2")) {
config.pubsub_version = 2;
break;
}
}
}

/* Setup argument length */
argvlen = zmalloc(argc * sizeof(size_t));
for (j = 0; j < argc; j++) argvlen[j] = sdslen(argv[j]);
Expand Down Expand Up @@ -2375,7 +2385,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
* an in-band message is received, but these commands are confirmed
* using push replies only. There is one push reply per channel if
* channels are specified, otherwise at least one. */
num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
if (config.pubsub_version == 2) {
num_expected_pubsub_push = 1;
} else {
num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
}
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
}
Expand Down Expand Up @@ -9535,6 +9549,7 @@ int main(int argc, char **argv) {
config.server_version = NULL;
config.prefer_ipv4 = 0;
config.prefer_ipv6 = 0;
config.pubsub_version = 1;
config.cluster_manager_command.name = NULL;
config.cluster_manager_command.argc = 0;
config.cluster_manager_command.argv = NULL;
Expand Down
30 changes: 28 additions & 2 deletions tests/integration/valkey-cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ start_server {tags {"cli"}} {
assert_equal "bar" [r get key]
}

test_interactive_cli "Subscribed mode" {
if {$::force_resp3} {
test_interactive_cli "Subscribed mode -- deprecated" {
if {$::force_resp3} {
run_command $fd "hello 3"
}

Expand All @@ -209,6 +209,32 @@ start_server {tags {"cli"}} {
assert_equal $sub1$sub2$sub3$reading \
[run_command $fd "subscribe ch1 ch2 ch3"]

# Unsubscribe all.
set unsub1 "1) \"unsubscribe\"\n2) \"ch1\"\n3) (integer) 2\n"
set unsub2 "1) \"unsubscribe\"\n2) \"ch2\"\n3) (integer) 1\n"
set unsub3 "1) \"unsubscribe\"\n2) \"ch3\"\n3) (integer) 0\n"
assert_equal $erase$unsub1$unsub2$unsub3$reading \
[run_command $fd "unsubscribe ch1 ch2 ch3"]

}

test_interactive_cli "Subscribed mode" {
if {$::force_resp3} {
run_command $fd "hello 3"
}

set reading "Reading messages... (press Ctrl-C to quit or any key to type command)\r"
set erase "\033\[K"; # Erases the "Reading messages..." line.

run_command $fd "client capa subv2"

# Subscribe to some channels.
set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n"
set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n"
assert_equal $sub1$sub2$sub3$reading \
[run_command $fd "subscribe ch1 ch2 ch3"]

# Receive pubsub message.
r publish ch2 hello
set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n"
Expand Down
1 change: 1 addition & 0 deletions tests/unit/cluster/pubsubshard.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch

set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]]
assert_equal [list 1 2 3] $sub_res

sunsubscribe $subscribeclient

# wait for the unsubscribe to take effect
Expand Down
17 changes: 7 additions & 10 deletions tests/unit/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ start_server {tags {"pubsub network"}} {
set rd1 [valkey_deferring_client]

# subscribe to two channels
assert_equal {1 2} [subscribe $rd1 {chan1 chan2}]
assert_equal {1 2} [subscribe $rd1 {chan1 chan2}]
assert_equal 1 [r publish chan1 hello]
assert_equal 1 [r publish chan2 world]
assert_equal {message chan1 hello} [$rd1 read]
Expand Down Expand Up @@ -83,7 +83,7 @@ start_server {tags {"pubsub network"}} {

test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [valkey_deferring_client]
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
unsubscribe $rd1
# wait for the unsubscribe to take effect
wait_for_condition 50 100 {
Expand All @@ -101,7 +101,7 @@ start_server {tags {"pubsub network"}} {

test "SUBSCRIBE to one channel more than once" {
set rd1 [valkey_deferring_client]
assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}]
assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r publish chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]

Expand Down Expand Up @@ -479,13 +479,10 @@ start_server {tags {"pubsub network"}} {
r hello 3

# Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command,
# breaks the multi response, see Redis OSS issue: https://github.com/redis/redis/issues/12207
# this is just a temporary sanity test to detect unintended breakage.

# subscribe for 3 channels actually emits 3 "responses"
assert_equal "subscribe foo 1" [r subscribe foo bar baz]
assert_equal "subscribe bar 2" [r read]
assert_equal "subscribe baz 3" [r read]
# Only one response is returned
# This update matches with Redis response: one command always returns one response
r client capa subv2
assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz]

r multi
r ping abc
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsubshard.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} {

test "SSUBSCRIBE to one channel more than once" {
set rd1 [valkey_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal {smessage chan1 hello} [$rd1 read]

Expand Down
Loading