diff --git a/async.c b/async.c index f5942484a..4d5a1f1f5 100644 --- a/async.c +++ b/async.c @@ -419,7 +419,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* Custom reply functions are not supported for pub/sub. This will fail * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { + if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH)) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->elements >= 2); assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; @@ -524,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) { break; } + /* Keep track of push message support for subscribe handling */ + if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; + /* Send any non-subscribe related PUSH messages to our PUSH handler * while allowing subscribe related PUSH messages to pass through. * This allows existing code to be backward compatible and work in diff --git a/hiredis.h b/hiredis.h index a093abea2..7dd44ee42 100644 --- a/hiredis.h +++ b/hiredis.h @@ -80,6 +80,9 @@ typedef long long ssize_t; /* Flag that is set when we should set SO_REUSEADDR before calling bind() */ #define REDIS_REUSEADDR 0x80 +/* Flag that is set when the async connection supports push replies. */ +#define REDIS_SUPPORTS_PUSH 0x100 + /** * Flag that indicates the user does not want the context to * be automatically freed upon error diff --git a/test.c b/test.c index 7af9beed0..c01f21648 100644 --- a/test.c +++ b/test.c @@ -1555,6 +1555,24 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) { exit(1); } +/* Expect a reply with type NIL */ +void nil_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_NIL); + state->checkpoint++; +} + +/* Expect a reply with type ARRAY */ +void array_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); + state->checkpoint++; +} + static void test_pubsub_handling_resp3(struct config config) { test("Subscribe, handle published message and unsubscribe using RESP3: "); /* Setup event dispatcher with a testcase timeout */ @@ -1582,13 +1600,17 @@ static void test_pubsub_handling_resp3(struct config config) { TestState state = {.options = &options, .resp3 = 1}; redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + /* Make sure non-subscribe commands works in RESP3 */ + redisAsyncCommand(ac,nil_cb,&state,"GET nonexisting"); + redisAsyncCommand(ac,array_cb,&state,"SORT nonexisting"); + /* Start event dispatching loop */ test_cond(event_base_dispatch(base) == 0); event_free(timeout); event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 1); + assert(state.checkpoint == 3); } #endif