Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout support to libhv adapter. #1109

Merged
merged 3 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions adapters/libhv.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
#include "../hiredis.h"
#include "../async.h"

typedef struct redisLibhvEvents {
hio_t *io;
htimer_t *timer;
} redisLibhvEvents;

static void redisLibhvHandleEvents(hio_t* io) {
redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io);
int events = hio_events(io);
Expand All @@ -18,51 +23,100 @@ static void redisLibhvHandleEvents(hio_t* io) {
}

static void redisLibhvAddRead(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_add(io, redisLibhvHandleEvents, HV_READ);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_add(events->io, redisLibhvHandleEvents, HV_READ);
}

static void redisLibhvDelRead(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_del(io, HV_READ);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_del(events->io, HV_READ);
}

static void redisLibhvAddWrite(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_add(io, redisLibhvHandleEvents, HV_WRITE);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_add(events->io, redisLibhvHandleEvents, HV_WRITE);
}

static void redisLibhvDelWrite(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_del(io, HV_WRITE);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;
hio_del(events->io, HV_WRITE);
}

static void redisLibhvCleanup(void *privdata) {
hio_t* io = (hio_t*)privdata;
hio_close(io);
hevent_set_userdata(io, NULL);
redisLibhvEvents* events = (redisLibhvEvents*)privdata;

if (events->timer)
htimer_del(events->timer);

hio_close(events->io);
hevent_set_userdata(events->io, NULL);

hi_free(events);
}

static void redisLibhvTimeout(htimer_t* timer) {
hio_t* io = (hio_t*)hevent_userdata(timer);
redisAsyncHandleTimeout(hevent_userdata(io));
}

static void redisLibhvSetTimeout(void *privdata, struct timeval tv) {
redisLibhvEvents* events;
uint32_t millis;
hloop_t* loop;

events = (redisLibhvEvents*)privdata;
millis = tv.tv_sec * 1000 + tv.tv_usec / 1000;

if (millis == 0) {
/* Libhv disallows zero'd timers so treat this as a delete or NO OP */
if (events->timer) {
htimer_del(events->timer);
events->timer = NULL;
}
} else if (events->timer == NULL) {
/* Add new timer */
loop = hevent_loop(events->io);
events->timer = htimer_add(loop, redisLibhvTimeout, millis, 1);
hevent_set_userdata(events->timer, events->io);
} else {
/* Update existing timer */
htimer_reset(events->timer, millis);
}
}

static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) {
redisContext *c = &(ac->c);
redisLibhvEvents *events;
hio_t* io = NULL;

if (ac->ev.data != NULL) {
return REDIS_ERR;
}

/* Create container struct to keep track of our io and any timer */
events = hi_malloc(sizeof(*events));
if (events == NULL) {
return REDIS_ERR;
}

io = hio_get(loop, c->fd);
if (io == NULL) {
hi_free(events);
return REDIS_ERR;
}

hevent_set_userdata(io, ac);

events->io = io;
events->timer = NULL;

ac->ev.addRead = redisLibhvAddRead;
ac->ev.delRead = redisLibhvDelRead;
ac->ev.addWrite = redisLibhvAddWrite;
ac->ev.delWrite = redisLibhvDelWrite;
ac->ev.cleanup = redisLibhvCleanup;
ac->ev.data = io;
ac->ev.scheduleTimer = redisLibhvSetTimeout;
ac->ev.data = events;

return REDIS_OK;
}
Expand Down
15 changes: 15 additions & 0 deletions examples/example-libhv.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c);
}

void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
(void)privdata;
redisReply *reply = r;

if (reply == NULL) {
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}

redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
Expand Down Expand Up @@ -46,10 +58,13 @@ int main (int argc, char **argv) {

hloop_t* loop = hloop_new(HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS);
redisLibhvAttach(c, loop);
redisAsyncSetTimeout(c, (struct timeval){.tv_sec = 0, .tv_usec = 500000});
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %d", 1);
hloop_run(loop);
hloop_free(&loop);
return 0;
}