From 5608454bca7437386bce7a7b7f45526d95eb44d7 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 17 Apr 2023 09:00:42 +0300 Subject: [PATCH 1/2] Add RedisModule adapter --- adapters/redismoduleapi.h | 100 ++++++++++++++++++++++++++++++ examples/example-redismoduleapi.c | 80 ++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 adapters/redismoduleapi.h create mode 100644 examples/example-redismoduleapi.c diff --git a/adapters/redismoduleapi.h b/adapters/redismoduleapi.h new file mode 100644 index 000000000..e77460530 --- /dev/null +++ b/adapters/redismoduleapi.h @@ -0,0 +1,100 @@ +#ifndef __HIREDIS_REDISMODULEAPI_H__ +#define __HIREDIS_REDISMODULEAPI_H__ + +#include "redismodule.h" + +#include "../async.h" +#include "../hiredis.h" + +#include + +typedef struct redisModuleEvents { + redisAsyncContext *context; + int fd; + int reading, writing; +} redisModuleEvents; + +static void redisModuleReadEvent(int fd, void *privdata, int mask) { + (void) fd; + (void) mask; + + redisModuleEvents *e = (redisModuleEvents*)privdata; + redisAsyncHandleRead(e->context); +} + +static void redisModuleWriteEvent(int fd, void *privdata, int mask) { + (void) fd; + (void) mask; + + redisModuleEvents *e = (redisModuleEvents*)privdata; + redisAsyncHandleWrite(e->context); +} + +static void redisModuleAddRead(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + if (!e->reading) { + e->reading = 1; + RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_READABLE, redisModuleReadEvent, e); + } +} + +static void redisModuleDelRead(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + if (e->reading) { + e->reading = 0; + RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_READABLE); + } +} + +static void redisModuleAddWrite(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + if (!e->writing) { + e->writing = 1; + RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_WRITABLE, redisModuleWriteEvent, e); + } +} + +static void redisModuleDelWrite(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + if (e->writing) { + e->writing = 0; + RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_WRITABLE); + } +} + +static void redisModuleCleanup(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + redisModuleDelRead(privdata); + redisModuleDelWrite(privdata); + hi_free(e); +} + +static int redisModuleAttach(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisModuleEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisModuleEvents*)hi_malloc(sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + + e->context = ac; + e->fd = c->fd; + e->reading = e->writing = 0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisModuleAddRead; + ac->ev.delRead = redisModuleDelRead; + ac->ev.addWrite = redisModuleAddWrite; + ac->ev.delWrite = redisModuleDelWrite; + ac->ev.cleanup = redisModuleCleanup; + ac->ev.data = e; + + return REDIS_OK; +} + +#endif diff --git a/examples/example-redismoduleapi.c b/examples/example-redismoduleapi.c new file mode 100644 index 000000000..166a746f1 --- /dev/null +++ b/examples/example-redismoduleapi.c @@ -0,0 +1,80 @@ +#include +#include +#include +#include + +#include +#include +#include + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) { + if (c->errstr) { + printf("errstr: %s\n", c->errstr); + } + return; + } + printf("argv[%s]: %s\n", (char*)privdata, reply->str); + + /* Disconnect after receiving the reply to GET */ + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Disconnected...\n"); +} + +/* + * This example requires Redis 7.0 or above. + * + * 1- Compile this file as a shared library. Directory of "redismodule.h" must + * be in the include path. + * gcc -fPIC -shared -I../../redis/src/ -I.. example-redismoduleapi.c -o example-redismoduleapi.so + * + * 2- Load module: + * redis-server --loadmodule ./example-redismoduleapi.so value + */ +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + + int ret = RedisModule_Init(ctx, "example-redismoduleapi", 1, REDISMODULE_APIVER_1); + if (ret != REDISMODULE_OK) { + printf("error module init \n"); + return REDISMODULE_ERR; + } + + if (!RedisModule_GetServerVersion || + RedisModule_GetServerVersion() < 0x00070000) { + printf("Redis 7.0 or above is required! \n"); + return REDISMODULE_ERR; + } + + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + size_t len; + const char *val = RedisModule_StringPtrLen(argv[argc-1], &len); + + redisModuleAttach(c); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncCommand(c, NULL, NULL, "SET key %b", val, len); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + return 0; +} From 8d3e1d9554882f1a654d4e20893f8c7899f92a97 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 17 Apr 2023 15:56:24 +0300 Subject: [PATCH 2/2] add timer callback, add compatibility helper --- adapters/redismoduleapi.h | 60 ++++++++++++++++++++++++++----- examples/example-redismoduleapi.c | 31 +++++++++++++--- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/adapters/redismoduleapi.h b/adapters/redismoduleapi.h index e77460530..8a076fe46 100644 --- a/adapters/redismoduleapi.h +++ b/adapters/redismoduleapi.h @@ -10,11 +10,14 @@ typedef struct redisModuleEvents { redisAsyncContext *context; + RedisModuleCtx *module_ctx; int fd; int reading, writing; + int timer_active; + RedisModuleTimerID timer_id; } redisModuleEvents; -static void redisModuleReadEvent(int fd, void *privdata, int mask) { +static inline void redisModuleReadEvent(int fd, void *privdata, int mask) { (void) fd; (void) mask; @@ -22,7 +25,7 @@ static void redisModuleReadEvent(int fd, void *privdata, int mask) { redisAsyncHandleRead(e->context); } -static void redisModuleWriteEvent(int fd, void *privdata, int mask) { +static inline void redisModuleWriteEvent(int fd, void *privdata, int mask) { (void) fd; (void) mask; @@ -30,7 +33,7 @@ static void redisModuleWriteEvent(int fd, void *privdata, int mask) { redisAsyncHandleWrite(e->context); } -static void redisModuleAddRead(void *privdata) { +static inline void redisModuleAddRead(void *privdata) { redisModuleEvents *e = (redisModuleEvents*)privdata; if (!e->reading) { e->reading = 1; @@ -38,7 +41,7 @@ static void redisModuleAddRead(void *privdata) { } } -static void redisModuleDelRead(void *privdata) { +static inline void redisModuleDelRead(void *privdata) { redisModuleEvents *e = (redisModuleEvents*)privdata; if (e->reading) { e->reading = 0; @@ -46,7 +49,7 @@ static void redisModuleDelRead(void *privdata) { } } -static void redisModuleAddWrite(void *privdata) { +static inline void redisModuleAddWrite(void *privdata) { redisModuleEvents *e = (redisModuleEvents*)privdata; if (!e->writing) { e->writing = 1; @@ -54,7 +57,7 @@ static void redisModuleAddWrite(void *privdata) { } } -static void redisModuleDelWrite(void *privdata) { +static inline void redisModuleDelWrite(void *privdata) { redisModuleEvents *e = (redisModuleEvents*)privdata; if (e->writing) { e->writing = 0; @@ -62,14 +65,52 @@ static void redisModuleDelWrite(void *privdata) { } } -static void redisModuleCleanup(void *privdata) { +static inline void redisModuleStopTimer(void *privdata) { + redisModuleEvents *e = (redisModuleEvents*)privdata; + if (e->timer_active) { + RedisModule_StopTimer(e->module_ctx, e->timer_id, NULL); + } + e->timer_active = 0; +} + +static inline void redisModuleCleanup(void *privdata) { redisModuleEvents *e = (redisModuleEvents*)privdata; redisModuleDelRead(privdata); redisModuleDelWrite(privdata); + redisModuleStopTimer(privdata); hi_free(e); } -static int redisModuleAttach(redisAsyncContext *ac) { +static inline void redisModuleTimeout(RedisModuleCtx *ctx, void *privdata) { + (void) ctx; + + redisModuleEvents *e = (redisModuleEvents*)privdata; + e->timer_active = 0; + redisAsyncHandleTimeout(e->context); +} + +static inline void redisModuleSetTimeout(void *privdata, struct timeval tv) { + redisModuleEvents* e = (redisModuleEvents*)privdata; + + redisModuleStopTimer(privdata); + + mstime_t millis = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; + e->timer_id = RedisModule_CreateTimer(e->module_ctx, millis, redisModuleTimeout, e); + e->timer_active = 1; +} + +/* Check if Redis version is compatible with the adapter. */ +static inline int redisModuleCompatibilityCheck(void) { + if (!RedisModule_EventLoopAdd || + !RedisModule_EventLoopDel || + !RedisModule_CreateTimer || + !RedisModule_StopTimer) { + return REDIS_ERR; + } + return REDIS_OK; +} + +static inline int redisModuleAttach(redisAsyncContext *ac, RedisModuleCtx *module_ctx) { redisContext *c = &(ac->c); redisModuleEvents *e; @@ -83,8 +124,10 @@ static int redisModuleAttach(redisAsyncContext *ac) { return REDIS_ERR; e->context = ac; + e->module_ctx = module_ctx; e->fd = c->fd; e->reading = e->writing = 0; + e->timer_active = 0; /* Register functions to start/stop listening for events */ ac->ev.addRead = redisModuleAddRead; @@ -92,6 +135,7 @@ static int redisModuleAttach(redisAsyncContext *ac) { ac->ev.addWrite = redisModuleAddWrite; ac->ev.delWrite = redisModuleDelWrite; ac->ev.cleanup = redisModuleCleanup; + ac->ev.scheduleTimer = redisModuleSetTimeout; ac->ev.data = e; return REDIS_OK; diff --git a/examples/example-redismoduleapi.c b/examples/example-redismoduleapi.c index 166a746f1..7d12f8a06 100644 --- a/examples/example-redismoduleapi.c +++ b/examples/example-redismoduleapi.c @@ -7,6 +7,18 @@ #include #include +void debugCallback(redisAsyncContext *c, void *r, void *privdata) { + (void)privdata; //unused + redisReply *reply = r; + if (reply == NULL) { + /* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ + printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + /* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ + redisAsyncDisconnect(c); +} + void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; if (reply == NULL) { @@ -17,8 +29,8 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { } printf("argv[%s]: %s\n", (char*)privdata, reply->str); - /* Disconnect after receiving the reply to GET */ - redisAsyncDisconnect(c); + /* start another request that demonstrate timeout */ + redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); } void connectCallback(const redisAsyncContext *c, int status) { @@ -55,8 +67,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } - if (!RedisModule_GetServerVersion || - RedisModule_GetServerVersion() < 0x00070000) { + if (redisModuleCompatibilityCheck() != REDIS_OK) { printf("Redis 7.0 or above is required! \n"); return REDISMODULE_ERR; } @@ -71,9 +82,19 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) size_t len; const char *val = RedisModule_StringPtrLen(argv[argc-1], &len); - redisModuleAttach(c); + RedisModuleCtx *module_ctx = RedisModule_GetDetachedThreadSafeContext(ctx); + redisModuleAttach(c, module_ctx); redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); + + /* + In this demo, we first `set key`, then `get key` to demonstrate the basic usage of the adapter. + Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. + Because we have set a 1 second timeout to the connection, the command will always fail with a + timeout error, which is shown in the `debugCallback`. + */ + redisAsyncCommand(c, NULL, NULL, "SET key %b", val, len); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); return 0;