From 59509219aeb07313afd48de35e9c91f8756ca05f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 20 Jan 2022 23:06:16 +0100 Subject: [PATCH 1/2] No reuse of the previous reply callback When multiple replies are parsed from a socket in one read a previously found callback might get reused when the current reply has no known callback. This can be triggered by the added testcase which unsubscribe to subscribed (A,B) and a non-subscribed channel (X). Without this correction a callback for wrong channel is called. - In 'unsubscribe B X A', B's callback is called when handling X. - Now this is not done, i.e. there is no callback called for X. --- async.c | 3 +- test.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/async.c b/async.c index 8614b2058..2ea2e485f 100644 --- a/async.c +++ b/async.c @@ -508,7 +508,6 @@ static int redisIsSubscribeReply(redisReply *reply) { void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -524,6 +523,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* If monitor mode, repush callback */ if(c->flags & REDIS_MONITORING) { + redisCallback cb = {NULL, NULL, 0, NULL}; __redisPushCallback(&ac->replies,&cb); } @@ -547,6 +547,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular * callbacks will get a reply before pub/sub messages arrive. */ + redisCallback cb = {NULL, NULL, 0, NULL}; if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error diff --git a/test.c b/test.c index 4ef47f61f..ffdc8205f 100644 --- a/test.c +++ b/test.c @@ -1711,6 +1711,98 @@ static void test_command_timeout_during_pubsub(struct config config) { /* Verify test checkpoints */ test_cond(state.checkpoint == 5); } + +/* Subscribe callback for test_pubsub_multiple_channels */ +void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0); + publish_msg(state->options,"A","Hello!"); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Unsubscribe to channels, including a channel X which we don't subscribe to */ + redisAsyncCommand(ac,unexpected_cb, + (void*)"unsubscribe should not call unexpected_cb()", + "unsubscribe B X A"); + /* Send a regular command after unsubscribing, then disconnect */ + state->disconnect = 1; + redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo"); + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0); + state->checkpoint++; + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +/* Subscribe callback for test_pubsub_multiple_channels */ +void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"B") == 0); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"B") == 0); + state->checkpoint++; + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +/* Test handling of multiple channels + * - subscribe to channel A and B + * - a published message on A triggers an unsubscribe of channel B, X and A + * where channel X is not subscribed to. + * - a command sent after unsubscribe triggers a disconnect */ +static void test_pubsub_multiple_channels(struct config config) { + test("Subscribe to multiple channels: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base,timeout_cb,NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout,&timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start subscribing to two channels */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A"); + redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B"); + + /* Start event dispatching loop */ + assert(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + test_cond(state.checkpoint == 6); +} #endif /* HIREDIS_TEST_ASYNC */ int main(int argc, char **argv) { @@ -1841,6 +1933,7 @@ int main(int argc, char **argv) { disconnect(c, 0); test_pubsub_handling(cfg); + test_pubsub_multiple_channels(cfg); if (major >= 6) { test_pubsub_handling_resp3(cfg); test_command_timeout_during_pubsub(cfg); From 5f29d3938755f6ec394ad6dbb2bd66d653a48faa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 21 Jan 2022 00:53:33 +0100 Subject: [PATCH 2/2] Re-push monitor callback for each reply MONITORING used the same callback for all replies while parsing multiple responses. This handling was changed to avoid calling the wrong callback in some scenarios. Now also change monitorings repush to work with this change. Includes an added async monitoring testcase. --- async.c | 18 ++++++-------- test.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/async.c b/async.c index 2ea2e485f..65551142b 100644 --- a/async.c +++ b/async.c @@ -520,13 +520,6 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - - /* If monitor mode, repush callback */ - if(c->flags & REDIS_MONITORING) { - redisCallback cb = {NULL, NULL, 0, NULL}; - __redisPushCallback(&ac->replies,&cb); - } - /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ break; @@ -571,9 +564,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ - assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); - if(c->flags & REDIS_SUBSCRIBED) + /* No more regular callbacks and no errors, the context *must* be subscribed. */ + assert(c->flags & REDIS_SUBSCRIBED); + if (c->flags & REDIS_SUBSCRIBED) __redisGetSubscribeCallback(ac,reply,&cb); } @@ -595,6 +588,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); } + + /* If in monitor mode, repush the callback */ + if (c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } } /* Disconnect when there was an error reading the reply */ diff --git a/test.c b/test.c index ffdc8205f..f991ef1e7 100644 --- a/test.c +++ b/test.c @@ -1803,6 +1803,82 @@ static void test_pubsub_multiple_channels(struct config config) { /* Verify test checkpoints */ test_cond(state.checkpoint == 6); } + +/* Command callback for test_monitor() */ +void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + /* NULL reply is received when BYE triggers a disconnect. */ + if (reply == NULL) { + event_base_loopbreak(base); + return; + } + + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + state->checkpoint++; + + if (state->checkpoint == 1) { + /* Response from MONITOR */ + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET first 1"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 2) { + /* Response for monitored command 'SET first 1' */ + assert(strstr(reply->str,"first") != NULL); + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET second 2"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 3) { + /* Response for monitored command 'SET second 2' */ + assert(strstr(reply->str,"second") != NULL); + /* Send QUIT to disconnect */ + redisAsyncCommand(ac,NULL,NULL,"QUIT"); + } +} + +/* Test handling of the monitor command + * - sends MONITOR to enable monitoring. + * - sends SET commands via separate clients to be monitored. + * - sends QUIT to stop monitoring and disconnect. */ +static void test_monitor(struct config config) { + test("Enable monitoring: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start monitor */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,monitor_cb,&state,"monitor"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 3); +} #endif /* HIREDIS_TEST_ASYNC */ int main(int argc, char **argv) { @@ -1934,6 +2010,7 @@ int main(int argc, char **argv) { test_pubsub_handling(cfg); test_pubsub_multiple_channels(cfg); + test_monitor(cfg); if (major >= 6) { test_pubsub_handling_resp3(cfg); test_command_timeout_during_pubsub(cfg);