Skip to content

Commit

Permalink
No reuse of the previous reply callback
Browse files Browse the repository at this point in the history
When multiple replies are parsed from a socket in one read
a previously found callback might get reused when the current
reply has no known callback.

This can be triggered by the added testcase which unsubscribe to
subscribed (A,B) and a non-subscribed channel (X).
Without this correction a callback for wrong channel is called.
-  In 'unsubscribe B X A', B's callback is called when handling X.
-  Now this is not done, i.e. there is no callback called for X.
  • Loading branch information
bjosv committed Jan 21, 2022
1 parent e73ab2f commit 5950921
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 1 deletion.
3 changes: 2 additions & 1 deletion async.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ static int redisIsSubscribeReply(redisReply *reply) {

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL;
int status;

Expand All @@ -524,6 +523,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {

/* If monitor mode, repush callback */
if(c->flags & REDIS_MONITORING) {
redisCallback cb = {NULL, NULL, 0, NULL};
__redisPushCallback(&ac->replies,&cb);
}

Expand All @@ -547,6 +547,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {

/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
redisCallback cb = {NULL, NULL, 0, NULL};
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
Expand Down
93 changes: 93 additions & 0 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,98 @@ static void test_command_timeout_during_pubsub(struct config config) {
/* Verify test checkpoints */
test_cond(state.checkpoint == 5);
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0);
publish_msg(state->options,"A","Hello!");
state->checkpoint++;
} else if (strcmp(reply->element[0]->str,"message") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0 &&
strcmp(reply->element[2]->str,"Hello!") == 0);
state->checkpoint++;

/* Unsubscribe to channels, including a channel X which we don't subscribe to */
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should not call unexpected_cb()",
"unsubscribe B X A");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"A") == 0);
state->checkpoint++;
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
}
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
assert(strcmp(reply->element[1]->str,"B") == 0);
state->checkpoint++;
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"B") == 0);
state->checkpoint++;
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
}
}

/* Test handling of multiple channels
* - subscribe to channel A and B
* - a published message on A triggers an unsubscribe of channel B, X and A
* where channel X is not subscribed to.
* - a command sent after unsubscribe triggers a disconnect */
static void test_pubsub_multiple_channels(struct config config) {
test("Subscribe to multiple channels: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base,timeout_cb,NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout,&timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac,unexpected_push_cb);

/* Start subscribing to two channels */
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A");
redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B");

/* Start event dispatching loop */
assert(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
test_cond(state.checkpoint == 6);
}
#endif /* HIREDIS_TEST_ASYNC */

int main(int argc, char **argv) {
Expand Down Expand Up @@ -1841,6 +1933,7 @@ int main(int argc, char **argv) {
disconnect(c, 0);

test_pubsub_handling(cfg);
test_pubsub_multiple_channels(cfg);
if (major >= 6) {
test_pubsub_handling_resp3(cfg);
test_command_timeout_during_pubsub(cfg);
Expand Down

0 comments on commit 5950921

Please sign in to comment.