Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid incorrect call to the previous reply's callback #1040

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ static int redisIsSubscribeReply(redisReply *reply) {

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL;
int status;

Expand All @@ -521,12 +520,6 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
return;
}

/* If monitor mode, repush callback */
if(c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}

/* When the connection is not being disconnected, simply stop
* trying to get replies and wait for the next loop tick. */
break;
Expand All @@ -547,6 +540,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {

/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
redisCallback cb = {NULL, NULL, 0, NULL};
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
Expand All @@ -570,9 +564,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
return;
}
/* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if(c->flags & REDIS_SUBSCRIBED)
/* No more regular callbacks and no errors, the context *must* be subscribed. */
assert(c->flags & REDIS_SUBSCRIBED);
if (c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
}

Expand All @@ -594,6 +588,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
* doesn't know what the server will spit out over the wire. */
c->reader->fn->freeObject(reply);
}

/* If in monitor mode, repush the callback */
if (c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}
}

/* Disconnect when there was an error reading the reply */
Expand Down
170 changes: 170 additions & 0 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,174 @@ static void test_command_timeout_during_pubsub(struct config config) {
/* Verify test checkpoints */
test_cond(state.checkpoint == 5);
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0);
publish_msg(state->options,"A","Hello!");
state->checkpoint++;
} else if (strcmp(reply->element[0]->str,"message") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0 &&
strcmp(reply->element[2]->str,"Hello!") == 0);
state->checkpoint++;

/* Unsubscribe to channels, including a channel X which we don't subscribe to */
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should not call unexpected_cb()",
"unsubscribe B X A");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0);
state->checkpoint++;
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
}
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
assert(strcmp(reply->element[1]->str,"B") == 0);
state->checkpoint++;
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"B") == 0);
state->checkpoint++;
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
}
}

/* Test handling of multiple channels
* - subscribe to channel A and B
* - a published message on A triggers an unsubscribe of channel B, X and A
* where channel X is not subscribed to.
* - a command sent after unsubscribe triggers a disconnect */
static void test_pubsub_multiple_channels(struct config config) {
test("Subscribe to multiple channels: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base,timeout_cb,NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout,&timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac,unexpected_push_cb);

/* Start subscribing to two channels */
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A");
redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B");

/* Start event dispatching loop */
assert(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
test_cond(state.checkpoint == 6);
}

/* Command callback for test_monitor() */
void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

/* NULL reply is received when BYE triggers a disconnect. */
if (reply == NULL) {
event_base_loopbreak(base);
return;
}

assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
state->checkpoint++;

if (state->checkpoint == 1) {
/* Response from MONITOR */
redisContext *c = redisConnectWithOptions(state->options);
assert(c != NULL);
redisReply *reply = redisCommand(c,"SET first 1");
assert(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
} else if (state->checkpoint == 2) {
/* Response for monitored command 'SET first 1' */
assert(strstr(reply->str,"first") != NULL);
redisContext *c = redisConnectWithOptions(state->options);
assert(c != NULL);
redisReply *reply = redisCommand(c,"SET second 2");
assert(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
} else if (state->checkpoint == 3) {
/* Response for monitored command 'SET second 2' */
assert(strstr(reply->str,"second") != NULL);
/* Send QUIT to disconnect */
redisAsyncCommand(ac,NULL,NULL,"QUIT");
}
}

/* Test handling of the monitor command
* - sends MONITOR to enable monitoring.
* - sends SET commands via separate clients to be monitored.
* - sends QUIT to stop monitoring and disconnect. */
static void test_monitor(struct config config) {
test("Enable monitoring: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac,unexpected_push_cb);

/* Start monitor */
TestState state = {.options = &options};
redisAsyncCommand(ac,monitor_cb,&state,"monitor");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 3);
}
#endif /* HIREDIS_TEST_ASYNC */

int main(int argc, char **argv) {
Expand Down Expand Up @@ -1841,6 +2009,8 @@ int main(int argc, char **argv) {
disconnect(c, 0);

test_pubsub_handling(cfg);
test_pubsub_multiple_channels(cfg);
test_monitor(cfg);
if (major >= 6) {
test_pubsub_handling_resp3(cfg);
test_command_timeout_during_pubsub(cfg);
Expand Down