Skip to content

Commit

Permalink
Add timeout support to libhv adapter. (#1109)
Browse files Browse the repository at this point in the history
Add timeout support to libhv adapter.

See: #904
  • Loading branch information
michael-grunder authored Sep 21, 2022
1 parent 722e340 commit 68b29e1
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
78 changes: 66 additions & 12 deletions adapters/libhv.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
#include "../hiredis.h"
#include "../async.h"

typedef struct redisLibhvEvents {
hio_t *io;
htimer_t *timer;
} redisLibhvEvents;

static void redisLibhvHandleEvents(hio_t* io) {
redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io);
int events = hio_events(io);
Expand All @@ -18,51 +23,100 @@ static void redisLibhvHandleEvents(hio_t* io) {
}

static void redisLibhvAddRead(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_add(io, redisLibhvHandleEvents, HV_READ);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_add(events->io, redisLibhvHandleEvents, HV_READ);
}

static void redisLibhvDelRead(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_del(io, HV_READ);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_del(events->io, HV_READ);
}

static void redisLibhvAddWrite(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_add(io, redisLibhvHandleEvents, HV_WRITE);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_add(events->io, redisLibhvHandleEvents, HV_WRITE);
}

static void redisLibhvDelWrite(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_del(io, HV_WRITE);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_del(events->io, HV_WRITE);
}

static void redisLibhvCleanup(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_close(io);
hevent_set_userdata(io, NULL);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;

if (events->timer)
htimer_del(events->timer);

hio_close(events->io);
hevent_set_userdata(events->io, NULL);

hi_free(events);
}

static void redisLibhvTimeout(htimer_t* timer) {
hio_t* io = (hio_t*)hevent_userdata(timer);
redisAsyncHandleTimeout(hevent_userdata(io));
}

static void redisLibhvSetTimeout(void *privdata, struct timeval tv) {
redisLibhvEvents* events;
uint32_t millis;
hloop_t* loop;

events = (redisLibhvEvents*)privdata;
millis = tv.tv_sec * 1000 + tv.tv_usec / 1000;

if (millis == 0) {
/* Libhv disallows zero'd timers so treat this as a delete or NO OP */
if (events->timer) {
htimer_del(events->timer);
events->timer = NULL;
}
} else if (events->timer == NULL) {
/* Add new timer */
loop = hevent_loop(events->io);
events->timer = htimer_add(loop, redisLibhvTimeout, millis, 1);
hevent_set_userdata(events->timer, events->io);
} else {
/* Update existing timer */
htimer_reset(events->timer, millis);
}
}

static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) {
redisContext *c = &(ac->c);
redisLibhvEvents *events;
hio_t* io = NULL;

if (ac->ev.data != NULL) {
return REDIS_ERR;
}

/* Create container struct to keep track of our io and any timer */
events = hi_malloc(sizeof(*events));
if (events == NULL) {
return REDIS_ERR;
}

io = hio_get(loop, c->fd);
if (io == NULL) {
hi_free(events);
return REDIS_ERR;
}

hevent_set_userdata(io, ac);

events->io = io;
events->timer = NULL;

ac->ev.addRead = redisLibhvAddRead;
ac->ev.delRead = redisLibhvDelRead;
ac->ev.addWrite = redisLibhvAddWrite;
ac->ev.delWrite = redisLibhvDelWrite;
ac->ev.cleanup = redisLibhvCleanup;
ac->ev.data = io;
ac->ev.scheduleTimer = redisLibhvSetTimeout;
ac->ev.data = events;

return REDIS_OK;
}
Expand Down
15 changes: 15 additions & 0 deletions examples/example-libhv.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c);
}

void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
(void)privdata;
redisReply *reply = r;

if (reply == NULL) {
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}

redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
Expand Down Expand Up @@ -46,10 +58,13 @@ int main (int argc, char **argv) {

hloop_t* loop = hloop_new(HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS);
redisLibhvAttach(c, loop);
redisAsyncSetTimeout(c, (struct timeval){.tv_sec = 0, .tv_usec = 500000});
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %d", 1);
hloop_run(loop);
hloop_free(&loop);
return 0;
}

0 comments on commit 68b29e1

Please sign in to comment.