-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added API to register to server events. #267
Merged
Merged
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
556f8d1
Added API to register to server events.
MeirShpilraien 55d4371
upgrade rust to latest stable
gkorland 62c224f
Update system-setup.py
gkorland 855fb1f
Merge branch 'master' into server_events_api
gkorland 0a8d3d8
Update system-setup.py
gkorland fb78c60
Merge branch 'master' into server_events_api
MeirShpilraien 7563d79
Checkout system-setup to master
MeirShpilraien 037c8db
Apply suggestions from code review
MeirShpilraien f971b20
Fix tests
MeirShpilraien 70fa6d6
Merge branch 'master' into server_events_api
MeirShpilraien c82b5d0
Added publish steps for redismodule-rs-derive
MeirShpilraien 5e50dac
Merge branch 'master' into server_events_api
MeirShpilraien f59de1b
Fix annotations issue on cratesio-publish.yml
MeirShpilraien 8c9d5b0
Maybe fix.
MeirShpilraien dbe3850
Merge branch 'master' into server_events_api
MeirShpilraien cda417f
fixing syntax in action
chayim 4068f28
Merge branch 'master' into server_events_api
MeirShpilraien a92bac3
Review fixes
MeirShpilraien 9b5e355
Format fixes
MeirShpilraien e013323
Update tests/integration.rs
MeirShpilraien 32665a7
Merge branch 'master' into server_events_api
MeirShpilraien a93d785
Review fixes.
MeirShpilraien 56acad4
Review fixes
MeirShpilraien c64794b
Merge branch 'master' into server_events_api
MeirShpilraien b373330
Format fixes
MeirShpilraien File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#[macro_use] | ||
extern crate redis_module; | ||
|
||
use std::sync::atomic::{AtomicI64, Ordering}; | ||
|
||
use redis_module::{server_events::FlushSubevent, Context, RedisResult, RedisString, RedisValue}; | ||
use redis_module_derive::flush_event_handler; | ||
|
||
static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0); | ||
|
||
#[flush_event_handler] | ||
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) { | ||
if let FlushSubevent::Started = flush_event { | ||
NUM_FLUSHES.fetch_add(1, Ordering::SeqCst); | ||
} | ||
} | ||
|
||
fn num_flushed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult { | ||
Ok(RedisValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst))) | ||
} | ||
|
||
////////////////////////////////////////////////////// | ||
|
||
redis_module! { | ||
name: "server_events", | ||
version: 1, | ||
data_types: [], | ||
commands: [ | ||
["num_flushed", num_flushed, "readonly", 0, 0, 0], | ||
], | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
[package] | ||
name = "redis-module-derive" | ||
version = "99.99.99" | ||
authors = ["Meir Shpilraien <meir@redis.com>"] | ||
edition = "2021" | ||
description = "A derive crate for redismodule-rs" | ||
license = "BSD-3-Clause" | ||
repository = "https://github.com/RedisLabsModules/redismodule-rs" | ||
keywords = ["redis", "plugin"] | ||
categories = ["database", "api-bindings"] | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
syn = { version="1.0", features = ["full"]} | ||
quote = "1.0" | ||
|
||
[lib] | ||
name = "redis_module_derive" | ||
path = "src/lib.rs" | ||
proc-macro = true | ||
MeirShpilraien marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
extern crate proc_macro; | ||
use proc_macro::TokenStream; | ||
use quote::quote; | ||
use syn; | ||
use syn::ItemFn; | ||
|
||
#[proc_macro_attribute] | ||
pub fn role_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let ast: ItemFn = match syn::parse(item) { | ||
Ok(res) => res, | ||
Err(e) => return e.to_compile_error().into(), | ||
}; | ||
let gen = quote! { | ||
#[linkme::distributed_slice(redis_module::server_events::ROLE_CHANGED_SERVER_EVENTS_LIST)] | ||
#ast | ||
}; | ||
gen.into() | ||
} | ||
|
||
#[proc_macro_attribute] | ||
pub fn loading_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let ast: ItemFn = match syn::parse(item) { | ||
Ok(res) => res, | ||
Err(e) => return e.to_compile_error().into(), | ||
}; | ||
let gen = quote! { | ||
#[linkme::distributed_slice(redis_module::server_events::LOADING_SERVER_EVENTS_LIST)] | ||
#ast | ||
}; | ||
gen.into() | ||
} | ||
|
||
#[proc_macro_attribute] | ||
pub fn flush_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let ast: ItemFn = match syn::parse(item) { | ||
Ok(res) => res, | ||
Err(e) => return e.to_compile_error().into(), | ||
}; | ||
let gen = quote! { | ||
#[linkme::distributed_slice(redis_module::server_events::FLUSH_SERVER_EVENTS_LIST)] | ||
#ast | ||
}; | ||
gen.into() | ||
} | ||
|
||
#[proc_macro_attribute] | ||
pub fn module_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream { | ||
let ast: ItemFn = match syn::parse(item) { | ||
iddm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(res) => res, | ||
Err(e) => return e.to_compile_error().into(), | ||
}; | ||
let gen = quote! { | ||
#[linkme::distributed_slice(redis_module::server_events::MODULE_CHANGED_SERVER_EVENTS_LIST)] | ||
#ast | ||
}; | ||
gen.into() | ||
} | ||
MeirShpilraien marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
use crate::raw; | ||
use crate::{context::Context, RedisError}; | ||
use linkme::distributed_slice; | ||
|
||
#[derive(Clone)] | ||
iddm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub enum ServerRole { | ||
Primary, | ||
Replica, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub enum LoadingSubevent { | ||
RdbStarted, | ||
AofStarted, | ||
ReplStarted, | ||
Ended, | ||
Failed, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub enum FlushSubevent { | ||
Started, | ||
Ended, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub enum ModuleChangeSubevent { | ||
Loaded, | ||
Unloaded, | ||
} | ||
|
||
pub enum ServerEventHandler { | ||
iddm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RuleChanged(fn(&Context, ServerRole)), | ||
Loading(fn(&Context, LoadingSubevent)), | ||
Flush(fn(&Context, FlushSubevent)), | ||
ModuleChange(fn(&Context, ModuleChangeSubevent)), | ||
} | ||
|
||
#[distributed_slice()] | ||
pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..]; | ||
|
||
#[distributed_slice()] | ||
pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..]; | ||
|
||
#[distributed_slice()] | ||
pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..]; | ||
|
||
#[distributed_slice()] | ||
pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..]; | ||
|
||
extern "C" fn role_changed_callback( | ||
ctx: *mut raw::RedisModuleCtx, | ||
_eid: raw::RedisModuleEvent, | ||
subevent: u64, | ||
_data: *mut ::std::os::raw::c_void, | ||
) { | ||
let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER { | ||
ServerRole::Primary | ||
} else { | ||
ServerRole::Replica | ||
}; | ||
let ctx = Context::new(ctx); | ||
ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| { | ||
callback(&ctx, new_role.clone()); | ||
iddm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}); | ||
} | ||
|
||
extern "C" fn loading_event_callback( | ||
ctx: *mut raw::RedisModuleCtx, | ||
_eid: raw::RedisModuleEvent, | ||
subevent: u64, | ||
_data: *mut ::std::os::raw::c_void, | ||
) { | ||
let loading_sub_event = match subevent { | ||
raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted, | ||
raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted, | ||
raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended, | ||
_ => LoadingSubevent::Failed, | ||
}; | ||
let ctx = Context::new(ctx); | ||
LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| { | ||
callback(&ctx, loading_sub_event.clone()); | ||
}); | ||
} | ||
|
||
extern "C" fn flush_event_callback( | ||
ctx: *mut raw::RedisModuleCtx, | ||
_eid: raw::RedisModuleEvent, | ||
subevent: u64, | ||
_data: *mut ::std::os::raw::c_void, | ||
) { | ||
let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START { | ||
FlushSubevent::Started | ||
} else { | ||
FlushSubevent::Ended | ||
}; | ||
let ctx = Context::new(ctx); | ||
FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| { | ||
callback(&ctx, flush_sub_event.clone()); | ||
}); | ||
} | ||
|
||
extern "C" fn module_change_event_callback( | ||
ctx: *mut raw::RedisModuleCtx, | ||
_eid: raw::RedisModuleEvent, | ||
subevent: u64, | ||
_data: *mut ::std::os::raw::c_void, | ||
) { | ||
let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED { | ||
ModuleChangeSubevent::Loaded | ||
} else { | ||
ModuleChangeSubevent::Unloaded | ||
}; | ||
let ctx = Context::new(ctx); | ||
MODULE_CHANGED_SERVER_EVENTS_LIST | ||
.iter() | ||
.for_each(|callback| { | ||
callback(&ctx, module_changed_sub_event.clone()); | ||
}); | ||
} | ||
|
||
pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> { | ||
if !ROLE_CHANGED_SERVER_EVENTS_LIST.is_empty() { | ||
let res = unsafe { | ||
raw::RedisModule_SubscribeToServerEvent.unwrap()( | ||
ctx.ctx, | ||
raw::RedisModuleEvent { | ||
id: raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, | ||
dataver: 1, | ||
}, | ||
Some(role_changed_callback), | ||
) | ||
}; | ||
if res != raw::REDISMODULE_OK as i32 { | ||
return Err(RedisError::Str( | ||
"Failed subscribing to role changed server event", | ||
)); | ||
} | ||
} | ||
|
||
if !LOADING_SERVER_EVENTS_LIST.is_empty() { | ||
iddm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let res = unsafe { | ||
raw::RedisModule_SubscribeToServerEvent.unwrap()( | ||
ctx.ctx, | ||
raw::RedisModuleEvent { | ||
id: raw::REDISMODULE_EVENT_LOADING, | ||
dataver: 1, | ||
}, | ||
Some(loading_event_callback), | ||
) | ||
}; | ||
if res != raw::REDISMODULE_OK as i32 { | ||
return Err(RedisError::Str( | ||
"Failed subscribing to loading server event", | ||
)); | ||
} | ||
} | ||
|
||
if !FLUSH_SERVER_EVENTS_LIST.is_empty() { | ||
let res = unsafe { | ||
raw::RedisModule_SubscribeToServerEvent.unwrap()( | ||
ctx.ctx, | ||
raw::RedisModuleEvent { | ||
id: raw::REDISMODULE_EVENT_FLUSHDB, | ||
dataver: 1, | ||
}, | ||
Some(flush_event_callback), | ||
) | ||
}; | ||
if res != raw::REDISMODULE_OK as i32 { | ||
return Err(RedisError::Str("Failed subscribing to flush server event")); | ||
} | ||
} | ||
|
||
if !MODULE_CHANGED_SERVER_EVENTS_LIST.is_empty() { | ||
let res = unsafe { | ||
raw::RedisModule_SubscribeToServerEvent.unwrap()( | ||
ctx.ctx, | ||
raw::RedisModuleEvent { | ||
id: raw::REDISMODULE_EVENT_MODULE_CHANGE, | ||
dataver: 1, | ||
}, | ||
Some(module_change_event_callback), | ||
) | ||
}; | ||
if res != raw::REDISMODULE_OK as i32 { | ||
return Err(RedisError::Str( | ||
"Failed subscribing to module changed server event", | ||
)); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need to also edit redismodule-rs-derive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is editing redismodule-rs-derive