diff --git a/async.c b/async.c index 2ea2e485f..5d2d63e22 100644 --- a/async.c +++ b/async.c @@ -520,13 +520,6 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - - /* If monitor mode, repush callback */ - if(c->flags & REDIS_MONITORING) { - redisCallback cb = {NULL, NULL, 0, NULL}; - __redisPushCallback(&ac->replies,&cb); - } - /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ break; @@ -595,6 +588,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); } + + /* If in monitor mode, repush the callback */ + if (c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } } /* Disconnect when there was an error reading the reply */ diff --git a/test.c b/test.c index ffdc8205f..f991ef1e7 100644 --- a/test.c +++ b/test.c @@ -1803,6 +1803,82 @@ static void test_pubsub_multiple_channels(struct config config) { /* Verify test checkpoints */ test_cond(state.checkpoint == 6); } + +/* Command callback for test_monitor() */ +void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + /* NULL reply is received when BYE triggers a disconnect. */ + if (reply == NULL) { + event_base_loopbreak(base); + return; + } + + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + state->checkpoint++; + + if (state->checkpoint == 1) { + /* Response from MONITOR */ + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET first 1"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 2) { + /* Response for monitored command 'SET first 1' */ + assert(strstr(reply->str,"first") != NULL); + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET second 2"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 3) { + /* Response for monitored command 'SET second 2' */ + assert(strstr(reply->str,"second") != NULL); + /* Send QUIT to disconnect */ + redisAsyncCommand(ac,NULL,NULL,"QUIT"); + } +} + +/* Test handling of the monitor command + * - sends MONITOR to enable monitoring. + * - sends SET commands via separate clients to be monitored. + * - sends QUIT to stop monitoring and disconnect. */ +static void test_monitor(struct config config) { + test("Enable monitoring: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start monitor */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,monitor_cb,&state,"monitor"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 3); +} #endif /* HIREDIS_TEST_ASYNC */ int main(int argc, char **argv) { @@ -1934,6 +2010,7 @@ int main(int argc, char **argv) { test_pubsub_handling(cfg); test_pubsub_multiple_channels(cfg); + test_monitor(cfg); if (major >= 6) { test_pubsub_handling_resp3(cfg); test_command_timeout_during_pubsub(cfg);