Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added API to register to server events. #267

Merged
merged 25 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
556f8d1
Added API to register to server events.
MeirShpilraien Jan 23, 2023
55d4371
upgrade rust to latest stable
gkorland Jan 24, 2023
62c224f
Update system-setup.py
gkorland Jan 24, 2023
855fb1f
Merge branch 'master' into server_events_api
gkorland Jan 24, 2023
0a8d3d8
Update system-setup.py
gkorland Jan 24, 2023
fb78c60
Merge branch 'master' into server_events_api
MeirShpilraien Jan 26, 2023
7563d79
Checkout system-setup to master
MeirShpilraien Jan 26, 2023
037c8db
Apply suggestions from code review
MeirShpilraien Jan 26, 2023
f971b20
Fix tests
MeirShpilraien Jan 26, 2023
70fa6d6
Merge branch 'master' into server_events_api
MeirShpilraien Jan 31, 2023
c82b5d0
Added publish steps for redismodule-rs-derive
MeirShpilraien Feb 1, 2023
5e50dac
Merge branch 'master' into server_events_api
MeirShpilraien Feb 7, 2023
f59de1b
Fix annotations issue on cratesio-publish.yml
MeirShpilraien Feb 7, 2023
8c9d5b0
Maybe fix.
MeirShpilraien Feb 7, 2023
dbe3850
Merge branch 'master' into server_events_api
MeirShpilraien Feb 14, 2023
cda417f
fixing syntax in action
chayim Feb 14, 2023
4068f28
Merge branch 'master' into server_events_api
MeirShpilraien Mar 12, 2023
a92bac3
Review fixes
MeirShpilraien Mar 19, 2023
9b5e355
Format fixes
MeirShpilraien Mar 19, 2023
e013323
Update tests/integration.rs
MeirShpilraien Mar 20, 2023
32665a7
Merge branch 'master' into server_events_api
MeirShpilraien Mar 21, 2023
a93d785
Review fixes.
MeirShpilraien Mar 21, 2023
56acad4
Review fixes
MeirShpilraien Mar 21, 2023
c64794b
Merge branch 'master' into server_events_api
MeirShpilraien Mar 21, 2023
b373330
Format fixes
MeirShpilraien Mar 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .github/workflows/cratesio-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,22 @@ jobs:
key: "package.version"
value: "${{ steps.get_version.outputs.VERSION }}"

- uses: katyo/publish-crates@v1
- name: Set the version for publishing on derive crate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to also edit redismodule-rs-derive?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is editing redismodule-rs-derive

uses: ciiiii/toml-editor@1.0.0
with:
file: "redismodule-rs-derive/Cargo.toml"
key: "package.version"
value: "${{ steps.get_version.outputs.VERSION }}"

- name: Publishing redismodule-rs
uses: katyo/publish-crates@v1
with:
registry-token: ${{ secrets.CARGO_REGISTRY_TOKEN }}
args: --allow-dirty

- name: Publishing redismodule-rs-derive
uses: katyo/publish-crates@v1
with:
registry-token: ${{ secrets.CARGO_REGISTRY_TOKEN }}
args: --allow-dirty
path: './redismodule-rs-derive'
MeirShpilraien marked this conversation as resolved.
Show resolved Hide resolved
args: --allow-dirty
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ crate-type = ["cdylib"]
name = "ctx_flags"
crate-type = ["cdylib"]

[[example]]
name = "server_events"
crate-type = ["cdylib"]

[[example]]
name = "events"
crate-type = ["cdylib"]
Expand Down Expand Up @@ -94,11 +98,13 @@ regex = "1"
strum_macros = "0.24"
#failure = "0.1"
backtrace = "0.3"
linkme = "0.3"

[dev-dependencies]
anyhow = "1.0.38"
redis = "0.22.1"
lazy_static = "1.4.0"
redis-module-derive = { path = "./redismodule-rs-derive"}

[build-dependencies]
bindgen = "0.64"
Expand Down
31 changes: 31 additions & 0 deletions examples/server_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#[macro_use]
extern crate redis_module;

use std::sync::atomic::{AtomicI64, Ordering};

use redis_module::{server_events::FlushSubevent, Context, RedisResult, RedisString, RedisValue};
use redis_module_derive::flush_event_handler;

static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);

#[flush_event_handler]
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
if let FlushSubevent::Started = flush_event {
NUM_FLUSHES.fetch_add(1, Ordering::SeqCst);
}
}

fn num_flushed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst)))
}

//////////////////////////////////////////////////////

redis_module! {
name: "server_events",
version: 1,
data_types: [],
commands: [
["num_flushed", num_flushed, "readonly", 0, 0, 0],
],
}
21 changes: 21 additions & 0 deletions redismodule-rs-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "redis-module-derive"
version = "99.99.99"
authors = ["Meir Shpilraien <meir@redis.com>"]
edition = "2021"
description = "A derive crate for redismodule-rs"
license = "BSD-3-Clause"
repository = "https://github.com/RedisLabsModules/redismodule-rs"
keywords = ["redis", "plugin"]
categories = ["database", "api-bindings"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
syn = { version="1.0", features = ["full"]}
quote = "1.0"

[lib]
name = "redis_module_derive"
path = "src/lib.rs"
proc-macro = true
57 changes: 57 additions & 0 deletions redismodule-rs-derive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
use syn;
use syn::ItemFn;

#[proc_macro_attribute]
pub fn role_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Ok(res) => res,
Err(e) => return e.to_compile_error().into(),
};
let gen = quote! {
#[linkme::distributed_slice(redis_module::server_events::ROLE_CHANGED_SERVER_EVENTS_LIST)]
#ast
};
gen.into()
}

#[proc_macro_attribute]
pub fn loading_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Ok(res) => res,
Err(e) => return e.to_compile_error().into(),
};
let gen = quote! {
#[linkme::distributed_slice(redis_module::server_events::LOADING_SERVER_EVENTS_LIST)]
#ast
};
gen.into()
}

#[proc_macro_attribute]
pub fn flush_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Ok(res) => res,
Err(e) => return e.to_compile_error().into(),
};
let gen = quote! {
#[linkme::distributed_slice(redis_module::server_events::FLUSH_SERVER_EVENTS_LIST)]
#ast
};
gen.into()
}

#[proc_macro_attribute]
pub fn module_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
iddm marked this conversation as resolved.
Show resolved Hide resolved
Ok(res) => res,
Err(e) => return e.to_compile_error().into(),
};
let gen = quote! {
#[linkme::distributed_slice(redis_module::server_events::MODULE_CHANGED_SERVER_EVENTS_LIST)]
#ast
};
gen.into()
}
2 changes: 2 additions & 0 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub mod blocked;

pub mod info;

pub mod server_events;

pub mod keys_cursor;

/// `Context` is a structure that's designed to give us a high-level interface to
Expand Down
174 changes: 174 additions & 0 deletions src/context/server_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use crate::raw;
use crate::{context::Context, RedisError};
use linkme::distributed_slice;

#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub enum ServerRole {
Primary,
Replica,
}

#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub enum LoadingSubevent {
RdbStarted,
AofStarted,
ReplStarted,
Ended,
Failed,
}

#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub enum FlushSubevent {
Started,
Ended,
}

#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub enum ModuleChangeSubevent {
Loaded,
Unloaded,
}

#[derive(Clone)]
pub enum ServerEventHandler {
iddm marked this conversation as resolved.
Show resolved Hide resolved
RuleChanged(fn(&Context, ServerRole)),
Loading(fn(&Context, LoadingSubevent)),
Flush(fn(&Context, FlushSubevent)),
ModuleChange(fn(&Context, ModuleChangeSubevent)),
}

#[distributed_slice()]
pub static ROLE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ServerRole)] = [..];

#[distributed_slice()]
pub static LOADING_SERVER_EVENTS_LIST: [fn(&Context, LoadingSubevent)] = [..];

#[distributed_slice()]
pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];

#[distributed_slice()]
pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];

extern "C" fn role_changed_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
subevent: u64,
_data: *mut ::std::os::raw::c_void,
) {
let new_role = if subevent == raw::REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER {
ServerRole::Primary
} else {
ServerRole::Replica
};
let ctx = Context::new(ctx);
ROLE_CHANGED_SERVER_EVENTS_LIST.iter().for_each(|callback| {
callback(&ctx, new_role);
});
}

extern "C" fn loading_event_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
subevent: u64,
_data: *mut ::std::os::raw::c_void,
) {
let loading_sub_event = match subevent {
raw::REDISMODULE_SUBEVENT_LOADING_RDB_START => LoadingSubevent::RdbStarted,
raw::REDISMODULE_SUBEVENT_LOADING_REPL_START => LoadingSubevent::ReplStarted,
raw::REDISMODULE_SUBEVENT_LOADING_ENDED => LoadingSubevent::Ended,
_ => LoadingSubevent::Failed,
};
let ctx = Context::new(ctx);
LOADING_SERVER_EVENTS_LIST.iter().for_each(|callback| {
callback(&ctx, loading_sub_event);
});
}

extern "C" fn flush_event_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
subevent: u64,
_data: *mut ::std::os::raw::c_void,
) {
let flush_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_FLUSHDB_START {
FlushSubevent::Started
} else {
FlushSubevent::Ended
};
let ctx = Context::new(ctx);
FLUSH_SERVER_EVENTS_LIST.iter().for_each(|callback| {
callback(&ctx, flush_sub_event);
});
}

extern "C" fn module_change_event_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
subevent: u64,
_data: *mut ::std::os::raw::c_void,
) {
let module_changed_sub_event = if subevent == raw::REDISMODULE_SUBEVENT_MODULE_LOADED {
ModuleChangeSubevent::Loaded
} else {
ModuleChangeSubevent::Unloaded
};
let ctx = Context::new(ctx);
MODULE_CHANGED_SERVER_EVENTS_LIST
.iter()
.for_each(|callback| {
callback(&ctx, module_changed_sub_event);
});
}

fn register_single_server_event_type<T>(
ctx: &Context,
callbacks: &[fn(&Context, T)],
server_event: u64,
inner_callback: raw::RedisModuleEventCallback,
) -> Result<(), RedisError> {
if !callbacks.is_empty() {
let res = unsafe {
raw::RedisModule_SubscribeToServerEvent.unwrap()(
ctx.ctx,
raw::RedisModuleEvent {
id: server_event,
dataver: 1,
},
inner_callback,
)
};
if res != raw::REDISMODULE_OK as i32 {
return Err(RedisError::Str("Failed subscribing to server event"));
}
}

Ok(())
}

pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> {
register_single_server_event_type(
ctx,
&ROLE_CHANGED_SERVER_EVENTS_LIST,
raw::REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
Some(role_changed_callback),
)?;
register_single_server_event_type(
ctx,
&LOADING_SERVER_EVENTS_LIST,
raw::REDISMODULE_EVENT_LOADING,
Some(loading_event_callback),
)?;
register_single_server_event_type(
ctx,
&FLUSH_SERVER_EVENTS_LIST,
raw::REDISMODULE_EVENT_FLUSHDB,
Some(flush_event_callback),
)?;
register_single_server_event_type(
ctx,
&MODULE_CHANGED_SERVER_EVENTS_LIST,
raw::REDISMODULE_EVENT_MODULE_CHANGE,
Some(module_change_event_callback),
)?;
Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub use crate::raw::NotifyEvent;
pub use crate::context::keys_cursor::KeysCursor;
pub use crate::context::thread_safe::RedisGILGuard;
pub use crate::context::AclPermissions;
pub use crate::context::server_events;
pub use crate::context::Context;
pub use crate::context::ContextFlags;
pub use crate::raw::*;
Expand Down
6 changes: 6 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ macro_rules! redis_module {

use $crate::raw;
use $crate::RedisString;
use $crate::server_events::register_server_events;

// We use a statically sized buffer to avoid allocating.
// This is needed since we use a custom allocator that relies on the Redis allocator,
Expand Down Expand Up @@ -181,6 +182,11 @@ macro_rules! redis_module {

raw::register_info_function(ctx, Some(__info_func));

if let Err(e) = register_server_events(&context) {
context.log_warning(&format!("{e}"));
return raw::Status::Err as c_int;
}

raw::Status::Ok as c_int
}

Expand Down
Loading