Skip to content

Commit

Permalink
Re-push monitor callback for each reply
Browse files Browse the repository at this point in the history
MONITORING used the same callback for all replies while parsing
multiple responses. This handling was changed to avoid calling
the wrong callback in some scenarios.
Now also change monitorings repush to work with this change.

Includes an added async monitoring testcase.
  • Loading branch information
bjosv committed Jan 21, 2022
1 parent 5950921 commit e56b410
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 7 deletions.
12 changes: 5 additions & 7 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,6 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
return;
}

/* If monitor mode, repush callback */
if(c->flags & REDIS_MONITORING) {
redisCallback cb = {NULL, NULL, 0, NULL};
__redisPushCallback(&ac->replies,&cb);
}

/* When the connection is not being disconnected, simply stop
* trying to get replies and wait for the next loop tick. */
break;
Expand Down Expand Up @@ -595,6 +588,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
* doesn't know what the server will spit out over the wire. */
c->reader->fn->freeObject(reply);
}

/* If in monitor mode, repush the callback */
if (c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}
}

/* Disconnect when there was an error reading the reply */
Expand Down
77 changes: 77 additions & 0 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,82 @@ static void test_pubsub_multiple_channels(struct config config) {
/* Verify test checkpoints */
test_cond(state.checkpoint == 6);
}

/* Command callback for test_monitor() */
void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

/* NULL reply is received when BYE triggers a disconnect. */
if (reply == NULL) {
event_base_loopbreak(base);
return;
}

assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
state->checkpoint++;

if (state->checkpoint == 1) {
/* Response from MONITOR */
redisContext *c = redisConnectWithOptions(state->options);
assert(c != NULL);
redisReply *reply = redisCommand(c,"SET first 1");
assert(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
} else if (state->checkpoint == 2) {
/* Response for monitored command 'SET first 1' */
assert(strstr(reply->str,"first") != NULL);
redisContext *c = redisConnectWithOptions(state->options);
assert(c != NULL);
redisReply *reply = redisCommand(c,"SET second 2");
assert(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
} else if (state->checkpoint == 3) {
/* Response for monitored command 'SET second 2' */
assert(strstr(reply->str,"second") != NULL);
/* Send QUIT to disconnect */
redisAsyncCommand(ac,NULL,NULL,"QUIT");
}
}

/* Test handling of the monitor command
* - sends MONITOR to enable monitoring.
* - sends SET commands via separate clients to be monitored.
* - sends QUIT to stop monitoring and disconnect. */
static void test_monitor(struct config config) {
test("Enable monitoring: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac,unexpected_push_cb);

/* Start monitor */
TestState state = {.options = &options};
redisAsyncCommand(ac,monitor_cb,&state,"monitor");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 3);
}
#endif /* HIREDIS_TEST_ASYNC */

int main(int argc, char **argv) {
Expand Down Expand Up @@ -1934,6 +2010,7 @@ int main(int argc, char **argv) {

test_pubsub_handling(cfg);
test_pubsub_multiple_channels(cfg);
test_monitor(cfg);
if (major >= 6) {
test_pubsub_handling_resp3(cfg);
test_command_timeout_during_pubsub(cfg);
Expand Down

0 comments on commit e56b410

Please sign in to comment.