Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RedisModule adapter #1182

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions adapters/redismoduleapi.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#ifndef __HIREDIS_REDISMODULEAPI_H__
#define __HIREDIS_REDISMODULEAPI_H__

#include "redismodule.h"

#include "../async.h"
#include "../hiredis.h"

#include <sys/types.h>

typedef struct redisModuleEvents {
redisAsyncContext *context;
RedisModuleCtx *module_ctx;
int fd;
int reading, writing;
int timer_active;
RedisModuleTimerID timer_id;
} redisModuleEvents;

static inline void redisModuleReadEvent(int fd, void *privdata, int mask) {
(void) fd;
(void) mask;

redisModuleEvents *e = (redisModuleEvents*)privdata;
redisAsyncHandleRead(e->context);
}

static inline void redisModuleWriteEvent(int fd, void *privdata, int mask) {
(void) fd;
(void) mask;

redisModuleEvents *e = (redisModuleEvents*)privdata;
redisAsyncHandleWrite(e->context);
}

static inline void redisModuleAddRead(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
if (!e->reading) {
e->reading = 1;
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_READABLE, redisModuleReadEvent, e);
}
}

static inline void redisModuleDelRead(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
if (e->reading) {
e->reading = 0;
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_READABLE);
}
}

static inline void redisModuleAddWrite(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
if (!e->writing) {
e->writing = 1;
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_WRITABLE, redisModuleWriteEvent, e);
}
}

static inline void redisModuleDelWrite(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
if (e->writing) {
e->writing = 0;
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_WRITABLE);
}
}

static inline void redisModuleStopTimer(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
if (e->timer_active) {
RedisModule_StopTimer(e->module_ctx, e->timer_id, NULL);
}
e->timer_active = 0;
}

static inline void redisModuleCleanup(void *privdata) {
redisModuleEvents *e = (redisModuleEvents*)privdata;
redisModuleDelRead(privdata);
redisModuleDelWrite(privdata);
redisModuleStopTimer(privdata);
hi_free(e);
}

static inline void redisModuleTimeout(RedisModuleCtx *ctx, void *privdata) {
(void) ctx;

redisModuleEvents *e = (redisModuleEvents*)privdata;
e->timer_active = 0;
redisAsyncHandleTimeout(e->context);
}

static inline void redisModuleSetTimeout(void *privdata, struct timeval tv) {
redisModuleEvents* e = (redisModuleEvents*)privdata;

redisModuleStopTimer(privdata);

mstime_t millis = tv.tv_sec * 1000 + tv.tv_usec / 1000.0;
e->timer_id = RedisModule_CreateTimer(e->module_ctx, millis, redisModuleTimeout, e);
e->timer_active = 1;
}

/* Check if Redis version is compatible with the adapter. */
static inline int redisModuleCompatibilityCheck(void) {
if (!RedisModule_EventLoopAdd ||
!RedisModule_EventLoopDel ||
!RedisModule_CreateTimer ||
!RedisModule_StopTimer) {
return REDIS_ERR;
}
return REDIS_OK;
}

static inline int redisModuleAttach(redisAsyncContext *ac, RedisModuleCtx *module_ctx) {
redisContext *c = &(ac->c);
redisModuleEvents *e;

/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;

/* Create container for context and r/w events */
e = (redisModuleEvents*)hi_malloc(sizeof(*e));
if (e == NULL)
return REDIS_ERR;

e->context = ac;
e->module_ctx = module_ctx;
e->fd = c->fd;
e->reading = e->writing = 0;
e->timer_active = 0;

/* Register functions to start/stop listening for events */
ac->ev.addRead = redisModuleAddRead;
ac->ev.delRead = redisModuleDelRead;
ac->ev.addWrite = redisModuleAddWrite;
ac->ev.delWrite = redisModuleDelWrite;
ac->ev.cleanup = redisModuleCleanup;
ac->ev.scheduleTimer = redisModuleSetTimeout;
ac->ev.data = e;

return REDIS_OK;
}

#endif
101 changes: 101 additions & 0 deletions examples/example-redismoduleapi.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <hiredis.h>
#include <async.h>
#include <adapters/redismoduleapi.h>

void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
(void)privdata; //unused
redisReply *reply = r;
if (reply == NULL) {
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/
redisAsyncDisconnect(c);
}

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) {
if (c->errstr) {
printf("errstr: %s\n", c->errstr);
}
return;
}
printf("argv[%s]: %s\n", (char*)privdata, reply->str);

/* start another request that demonstrate timeout */
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5);
}

void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
}

/*
* This example requires Redis 7.0 or above.
*
* 1- Compile this file as a shared library. Directory of "redismodule.h" must
* be in the include path.
* gcc -fPIC -shared -I../../redis/src/ -I.. example-redismoduleapi.c -o example-redismoduleapi.so
*
* 2- Load module:
* redis-server --loadmodule ./example-redismoduleapi.so value
*/
tezc marked this conversation as resolved.
Show resolved Hide resolved
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {

int ret = RedisModule_Init(ctx, "example-redismoduleapi", 1, REDISMODULE_APIVER_1);
if (ret != REDISMODULE_OK) {
printf("error module init \n");
return REDISMODULE_ERR;
}

if (redisModuleCompatibilityCheck() != REDIS_OK) {
printf("Redis 7.0 or above is required! \n");
return REDISMODULE_ERR;
}

redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}

size_t len;
const char *val = RedisModule_StringPtrLen(argv[argc-1], &len);

RedisModuleCtx *module_ctx = RedisModule_GetDetachedThreadSafeContext(ctx);
redisModuleAttach(c, module_ctx);
tezc marked this conversation as resolved.
Show resolved Hide resolved
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0});

/*
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of the adapter.
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request.
Because we have set a 1 second timeout to the connection, the command will always fail with a
timeout error, which is shown in the `debugCallback`.
*/

redisAsyncCommand(c, NULL, NULL, "SET key %b", val, len);
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
return 0;
}