Skip to content

Commit

Permalink
Add asynchronous test for pubsub using RESP2
Browse files Browse the repository at this point in the history
The testcase will subscribe to a channel, and via a second client
a message is published to the channel. After receiving the message
the testcase will unsubscribe and disconnect.
  • Loading branch information
bjosv committed Oct 26, 2021
1 parent 648763c commit 0b1af42
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
#ifdef HIREDIS_TEST_ASYNC
#include "adapters/libevent.h"
#include <event.h>
#endif
#include "net.h"
#include "win32.h"

Expand Down Expand Up @@ -1443,6 +1447,104 @@ static void test_throughput(struct config config) {
// redisFree(c);
// }

#ifdef HIREDIS_TEST_ASYNC
struct event_base *base;

/* Testcase timeout, will trigger a failure */
void timeout_cb(int fd, short event, void *arg) {
(void) fd; (void) event; (void) arg;
printf("Timeout in async testing!\n");
exit(1);
}

/* Unexpected call, will trigger a failure */
void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac; (void) r;
printf("Unexpected call: %s\n",(char*)privdata);
exit(1);
}

/* Timer callback to publish a message,
arg contains connection options */
void publish_cb(int fd, short event, void *arg) {
(void) fd; (void) event;
redisContext *c;
assert((c = redisConnectWithOptions((redisOptions *)arg)) != NULL);

/* Only publishing one message here since receiving a message after */
/* a sent unsubscribe triggers assert in async.c (possibly issue #490) */
redisReply *reply = redisCommand(c,"PUBLISH mychannel Hello!");
assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
freeReplyObject(reply);
disconnect(c, 0);
}

/* Subscribe callback for test_pubsub_handling:
- a published message triggers an unsubscribe
- an unsubscribe response triggers a disconnect */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) privdata;
redisReply *reply = r;

assert(reply != NULL &&
reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);

if (strcmp(reply->element[0]->str,"subscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);
} else if (strcmp(reply->element[0]->str,"message") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
strcmp(reply->element[2]->str,"Hello!") == 0);

/* Unsubscribe after receiving the published message. Send unsubscribe
which should call the callback registered during subscribe */
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should call pubsub_cb()",
"unsubscribe");
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);

// Disconnect after unsubscribe
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
}
}

static void test_pubsub_handling(struct config config) {
test("Subscribe, handle published message and unsubscribe: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event timeout;
evtimer_assign(&timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(&timeout, &timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Start subscribe */
redisAsyncCommand(ac,subscribe_cb,NULL,"subscribe mychannel");

/* Publish a message via another client. */
struct event publish;
evtimer_assign(&publish,base,publish_cb,(void*)&options);
struct timeval publish_tv = {.tv_usec = 100000};
evtimer_add(&publish,&publish_tv);

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_base_free(base);
}
#endif

int main(int argc, char **argv) {
struct config cfg = {
.tcp = {
Expand Down Expand Up @@ -1561,6 +1663,11 @@ int main(int argc, char **argv) {
}
#endif

#ifdef HIREDIS_TEST_ASYNC
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
test_pubsub_handling(cfg);
#endif

if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
if (test_unix_socket) {
Expand Down

0 comments on commit 0b1af42

Please sign in to comment.