-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add RedisModule adapter * add timer callback, add compatibility helper
- Loading branch information
Showing
2 changed files
with
245 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
#ifndef __HIREDIS_REDISMODULEAPI_H__ | ||
#define __HIREDIS_REDISMODULEAPI_H__ | ||
|
||
#include "redismodule.h" | ||
|
||
#include "../async.h" | ||
#include "../hiredis.h" | ||
|
||
#include <sys/types.h> | ||
|
||
typedef struct redisModuleEvents { | ||
redisAsyncContext *context; | ||
RedisModuleCtx *module_ctx; | ||
int fd; | ||
int reading, writing; | ||
int timer_active; | ||
RedisModuleTimerID timer_id; | ||
} redisModuleEvents; | ||
|
||
static inline void redisModuleReadEvent(int fd, void *privdata, int mask) { | ||
(void) fd; | ||
(void) mask; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisAsyncHandleRead(e->context); | ||
} | ||
|
||
static inline void redisModuleWriteEvent(int fd, void *privdata, int mask) { | ||
(void) fd; | ||
(void) mask; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisAsyncHandleWrite(e->context); | ||
} | ||
|
||
static inline void redisModuleAddRead(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (!e->reading) { | ||
e->reading = 1; | ||
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_READABLE, redisModuleReadEvent, e); | ||
} | ||
} | ||
|
||
static inline void redisModuleDelRead(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->reading) { | ||
e->reading = 0; | ||
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_READABLE); | ||
} | ||
} | ||
|
||
static inline void redisModuleAddWrite(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (!e->writing) { | ||
e->writing = 1; | ||
RedisModule_EventLoopAdd(e->fd, REDISMODULE_EVENTLOOP_WRITABLE, redisModuleWriteEvent, e); | ||
} | ||
} | ||
|
||
static inline void redisModuleDelWrite(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->writing) { | ||
e->writing = 0; | ||
RedisModule_EventLoopDel(e->fd, REDISMODULE_EVENTLOOP_WRITABLE); | ||
} | ||
} | ||
|
||
static inline void redisModuleStopTimer(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
if (e->timer_active) { | ||
RedisModule_StopTimer(e->module_ctx, e->timer_id, NULL); | ||
} | ||
e->timer_active = 0; | ||
} | ||
|
||
static inline void redisModuleCleanup(void *privdata) { | ||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
redisModuleDelRead(privdata); | ||
redisModuleDelWrite(privdata); | ||
redisModuleStopTimer(privdata); | ||
hi_free(e); | ||
} | ||
|
||
static inline void redisModuleTimeout(RedisModuleCtx *ctx, void *privdata) { | ||
(void) ctx; | ||
|
||
redisModuleEvents *e = (redisModuleEvents*)privdata; | ||
e->timer_active = 0; | ||
redisAsyncHandleTimeout(e->context); | ||
} | ||
|
||
static inline void redisModuleSetTimeout(void *privdata, struct timeval tv) { | ||
redisModuleEvents* e = (redisModuleEvents*)privdata; | ||
|
||
redisModuleStopTimer(privdata); | ||
|
||
mstime_t millis = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; | ||
e->timer_id = RedisModule_CreateTimer(e->module_ctx, millis, redisModuleTimeout, e); | ||
e->timer_active = 1; | ||
} | ||
|
||
/* Check if Redis version is compatible with the adapter. */ | ||
static inline int redisModuleCompatibilityCheck(void) { | ||
if (!RedisModule_EventLoopAdd || | ||
!RedisModule_EventLoopDel || | ||
!RedisModule_CreateTimer || | ||
!RedisModule_StopTimer) { | ||
return REDIS_ERR; | ||
} | ||
return REDIS_OK; | ||
} | ||
|
||
static inline int redisModuleAttach(redisAsyncContext *ac, RedisModuleCtx *module_ctx) { | ||
redisContext *c = &(ac->c); | ||
redisModuleEvents *e; | ||
|
||
/* Nothing should be attached when something is already attached */ | ||
if (ac->ev.data != NULL) | ||
return REDIS_ERR; | ||
|
||
/* Create container for context and r/w events */ | ||
e = (redisModuleEvents*)hi_malloc(sizeof(*e)); | ||
if (e == NULL) | ||
return REDIS_ERR; | ||
|
||
e->context = ac; | ||
e->module_ctx = module_ctx; | ||
e->fd = c->fd; | ||
e->reading = e->writing = 0; | ||
e->timer_active = 0; | ||
|
||
/* Register functions to start/stop listening for events */ | ||
ac->ev.addRead = redisModuleAddRead; | ||
ac->ev.delRead = redisModuleDelRead; | ||
ac->ev.addWrite = redisModuleAddWrite; | ||
ac->ev.delWrite = redisModuleDelWrite; | ||
ac->ev.cleanup = redisModuleCleanup; | ||
ac->ev.scheduleTimer = redisModuleSetTimeout; | ||
ac->ev.data = e; | ||
|
||
return REDIS_OK; | ||
} | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
#include <stdio.h> | ||
#include <stdlib.h> | ||
#include <string.h> | ||
#include <signal.h> | ||
|
||
#include <hiredis.h> | ||
#include <async.h> | ||
#include <adapters/redismoduleapi.h> | ||
|
||
void debugCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
(void)privdata; //unused | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ | ||
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); | ||
return; | ||
} | ||
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ | ||
redisAsyncDisconnect(c); | ||
} | ||
|
||
void getCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
if (c->errstr) { | ||
printf("errstr: %s\n", c->errstr); | ||
} | ||
return; | ||
} | ||
printf("argv[%s]: %s\n", (char*)privdata, reply->str); | ||
|
||
/* start another request that demonstrate timeout */ | ||
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); | ||
} | ||
|
||
void connectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("Error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Connected...\n"); | ||
} | ||
|
||
void disconnectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("Error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Disconnected...\n"); | ||
} | ||
|
||
/* | ||
* This example requires Redis 7.0 or above. | ||
* | ||
* 1- Compile this file as a shared library. Directory of "redismodule.h" must | ||
* be in the include path. | ||
* gcc -fPIC -shared -I../../redis/src/ -I.. example-redismoduleapi.c -o example-redismoduleapi.so | ||
* | ||
* 2- Load module: | ||
* redis-server --loadmodule ./example-redismoduleapi.so value | ||
*/ | ||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
|
||
int ret = RedisModule_Init(ctx, "example-redismoduleapi", 1, REDISMODULE_APIVER_1); | ||
if (ret != REDISMODULE_OK) { | ||
printf("error module init \n"); | ||
return REDISMODULE_ERR; | ||
} | ||
|
||
if (redisModuleCompatibilityCheck() != REDIS_OK) { | ||
printf("Redis 7.0 or above is required! \n"); | ||
return REDISMODULE_ERR; | ||
} | ||
|
||
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); | ||
if (c->err) { | ||
/* Let *c leak for now... */ | ||
printf("Error: %s\n", c->errstr); | ||
return 1; | ||
} | ||
|
||
size_t len; | ||
const char *val = RedisModule_StringPtrLen(argv[argc-1], &len); | ||
|
||
RedisModuleCtx *module_ctx = RedisModule_GetDetachedThreadSafeContext(ctx); | ||
redisModuleAttach(c, module_ctx); | ||
redisAsyncSetConnectCallback(c,connectCallback); | ||
redisAsyncSetDisconnectCallback(c,disconnectCallback); | ||
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); | ||
|
||
/* | ||
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of the adapter. | ||
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. | ||
Because we have set a 1 second timeout to the connection, the command will always fail with a | ||
timeout error, which is shown in the `debugCallback`. | ||
*/ | ||
|
||
redisAsyncCommand(c, NULL, NULL, "SET key %b", val, len); | ||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); | ||
return 0; | ||
} |