From 033e1aa635c07f864a05fbb3cb304baa3417988a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 26 Oct 2021 09:22:57 +0200 Subject: [PATCH 1/2] Include `unsubscribe` as a subscribe reply in RESP3 By providing the (p)unsubscribe message via the subscribe callback, instead of via the push callback, we get the same behavior in RESP3 as in RESP2. --- async.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async.c b/async.c index e37afbde7..4459c19eb 100644 --- a/async.c +++ b/async.c @@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { From 2ca4b7f00e468af401c3cc626f90a2a720efa863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 26 Oct 2021 09:32:21 +0200 Subject: [PATCH 2/2] Add asynchronous test for pubsub using RESP3 The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect. This RESP3 testcase reuses the subscribe callback from the RESP2 testcase to make sure we have a common behavior. --- test.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/test.c b/test.c index 04d519797..7af9beed0 100644 --- a/test.c +++ b/test.c @@ -1453,6 +1453,7 @@ struct event_base *base; typedef struct TestState { redisOptions *options; int checkpoint; + int resp3; } TestState; /* Testcase timeout, will trigger a failure */ @@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) { disconnect(c, 0); } -/* Subscribe callback for test_pubsub_handling: +/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: * - a published message triggers an unsubscribe * - an unsubscribe response triggers a disconnect */ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { @@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { TestState *state = privdata; assert(reply != NULL && - reply->type == REDIS_REPLY_ARRAY && + reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && reply->elements == 3); if (strcmp(reply->element[0]->str,"subscribe") == 0) { @@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) { /* Verify test checkpoints */ assert(state.checkpoint == 1); } + +/* Unexpected push message, will trigger a failure */ +void unexpected_push_cb(redisAsyncContext *ac, void *r) { + (void) ac; (void) r; + printf("Unexpected call to the PUSH callback!\n"); + exit(1); +} + +static void test_pubsub_handling_resp3(struct config config) { + test("Subscribe, handle published message and unsubscribe using RESP3: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac, unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 1); +} #endif int main(int argc, char **argv) { @@ -1668,7 +1712,15 @@ int main(int argc, char **argv) { #ifdef HIREDIS_TEST_ASYNC printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + cfg.type = CONN_TCP; + + int major; + redisContext *c = do_connect(cfg); + get_redis_version(c, &major, NULL); + disconnect(c, 0); + test_pubsub_handling(cfg); + if (major >= 6) test_pubsub_handling_resp3(cfg); #endif if (test_inherit_fd) {