Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asynchronous test for pubsub using RESP3 #1012

Merged
merged 2 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) {
len = reply->element[0]->len - off;

return !strncasecmp(str, "subscribe", len) ||
!strncasecmp(str, "message", len);

!strncasecmp(str, "message", len) ||
!strncasecmp(str, "unsubscribe", len);
}

void redisProcessCallbacks(redisAsyncContext *ac) {
Expand Down
56 changes: 54 additions & 2 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ struct event_base *base;
typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
} TestState;

/* Testcase timeout, will trigger a failure */
Expand All @@ -1479,15 +1480,15 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}

/* Subscribe callback for test_pubsub_handling:
/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;

assert(reply != NULL &&
reply->type == REDIS_REPLY_ARRAY &&
reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
Expand Down Expand Up @@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 1);
}

/* Unexpected push message, will trigger a failure */
void unexpected_push_cb(redisAsyncContext *ac, void *r) {
(void) ac; (void) r;
printf("Unexpected call to the PUSH callback!\n");
exit(1);
}

static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac, unexpected_push_cb);

/* Switch protocol */
redisAsyncCommand(ac,NULL,NULL,"HELLO 3");

/* Start subscribe */
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoints */
assert(state.checkpoint == 1);
}
#endif

int main(int argc, char **argv) {
Expand Down Expand Up @@ -1668,7 +1712,15 @@ int main(int argc, char **argv) {

#ifdef HIREDIS_TEST_ASYNC
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;

int major;
redisContext *c = do_connect(cfg);
get_redis_version(c, &major, NULL);
disconnect(c, 0);

test_pubsub_handling(cfg);
if (major >= 6) test_pubsub_handling_resp3(cfg);
#endif

if (test_inherit_fd) {
Expand Down