Skip to content

Commit

Permalink
Handle array response during subscribe in RESP3
Browse files Browse the repository at this point in the history
RESP3 accepts sending commands during subscribe and commands can
respond with a REDIS_REPLY_ARRAY (like `SORT`). This conflicts with
the pubsub response handling for RESP2, and triggers asserts when
finding the correct subscribe callback.

Add functionality to keep track of PUSH/RESP3 support on the connection
and only expect the message type REDIS_REPLY_PUSH as subscribe messages
when once seen.
  • Loading branch information
bjosv committed Dec 2, 2021
1 parent f458d1a commit 7b34aab
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
6 changes: 5 additions & 1 deletion async.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,

/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH)) ||
reply->type == REDIS_REPLY_PUSH) {
assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
Expand Down Expand Up @@ -524,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}

/* Keep track of push message support for subscribe handling */
if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;

/* Send any non-subscribe related PUSH messages to our PUSH handler
* while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
Expand Down
3 changes: 3 additions & 0 deletions hiredis.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ typedef long long ssize_t;
/* Flag that is set when we should set SO_REUSEADDR before calling bind() */
#define REDIS_REUSEADDR 0x80

/* Flag that is set when the async connection supports push replies. */
#define REDIS_SUPPORTS_PUSH 0x100

/**
* Flag that indicates the user does not want the context to
* be automatically freed upon error
Expand Down
24 changes: 23 additions & 1 deletion test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,24 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}

/* Expect a reply with type NIL */
void nil_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_NIL);
state->checkpoint++;
}

/* Expect a reply with type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
}

static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
Expand Down Expand Up @@ -1582,13 +1600,17 @@ static void test_pubsub_handling_resp3(struct config config) {
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");

/* Make sure non-subscribe commands works in RESP3 */
redisAsyncCommand(ac,nil_cb,&state,"GET nonexisting");
redisAsyncCommand(ac,array_cb,&state,"SORT nonexisting");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 1);
assert(state.checkpoint == 3);
}
#endif

Expand Down

0 comments on commit 7b34aab

Please sign in to comment.