Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle array response during subscribe in RESP3 #1014

Merged
merged 1 commit into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion async.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Match reply with the expected format of a pushed message.
* The type and number of elements (3 to 4) are specified at:
* https://redis.io/topics/pubsub#format-of-pushed-messages */
if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
reply->type == REDIS_REPLY_PUSH) {
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
Expand Down Expand Up @@ -525,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}

/* Keep track of push message support for subscribe handling */
if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;

/* Send any non-subscribe related PUSH messages to our PUSH handler
* while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
Expand Down
3 changes: 3 additions & 0 deletions hiredis.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ typedef long long ssize_t;
/* Flag that is set when we should set SO_REUSEADDR before calling bind() */
#define REDIS_REUSEADDR 0x80

/* Flag that is set when the async connection supports push replies. */
#define REDIS_SUPPORTS_PUSH 0x100

/**
* Flag that indicates the user does not want the context to
* be automatically freed upon error
Expand Down
18 changes: 17 additions & 1 deletion test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,15 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}

/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
}

static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
Expand Down Expand Up @@ -1594,13 +1603,20 @@ static void test_pubsub_handling_resp3(struct config config) {
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");

/* Make sure non-subscribe commands are handled in RESP3 */
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
/* Handle an array with 3 elements as a non-subscribe command */
redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 1);
assert(state.checkpoint == 5);
}
#endif

Expand Down