From 7d19724696ec8e3a27fda5713d196ae6026eb0e3 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Tue, 13 Aug 2024 20:17:39 +0300 Subject: [PATCH 1/4] No more mutable requirement for redis conns, much more ergonomic --- rust/Cargo.toml | 8 +- rust/benches/bench_default.rs | 56 +++++++++++++ rust/benches/bench_tester.rs | 25 ------ rust/bitbazaar/redis/batch.rs | 18 ++--- rust/bitbazaar/redis/conn.rs | 127 +++++++++++++++++------------- rust/bitbazaar/redis/dlock.rs | 10 +-- rust/bitbazaar/redis/mod.rs | 4 +- rust/bitbazaar/redis/temp_list.rs | 26 +----- 8 files changed, 153 insertions(+), 121 deletions(-) create mode 100644 rust/benches/bench_default.rs delete mode 100644 rust/benches/bench_tester.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 39835505..5711f382 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -144,10 +144,10 @@ rstest = "0.18" criterion = { version = "0.3", features = ["html_reports", "async_tokio"] } tokio = { version = '1', features = ["full"] } -# When adding new benches, they should be added like this with the name of the file in benches/: (obviously uncommented) -# [[bench]] -# name = "bench_tester" -# harness = false +# When adding new benches, they should be added like this with the name of the file in benches/: +[[bench]] +name = "bench_default" +harness = false [features] collector = ["dep:reqwest", "dep:tempfile", "tarball"] diff --git a/rust/benches/bench_default.rs b/rust/benches/bench_default.rs new file mode 100644 index 00000000..17b267f6 --- /dev/null +++ b/rust/benches/bench_default.rs @@ -0,0 +1,56 @@ +#![allow(unused_imports)] +use criterion::{black_box, criterion_group, criterion_main, Criterion}; + +// <--- EXAMPLE: + +fn fibonacci(n: u64) -> u64 { + let mut a = 0; + let mut b = 1; + + match n { + 0 => b, + _ => { + for _ in 0..n { + let c = a + b; + a = b; + b = c; + } + b + } + } +} + +async fn async_fibonacci(n: u64) -> u64 { + fibonacci(n) +} + +// SYNC EXAMPLE +pub fn bench_sync(c: &mut Criterion) { + c.bench_function("sync: fib 20", |b| b.iter(|| fibonacci(black_box(20)))); +} + +// ASYNC EXAMPLE +pub fn bench_async(c: &mut Criterion) { + c.bench_function("async: fib 20", |b| { + b.to_async(&get_tokio_rt()) + .iter(|| async_fibonacci(black_box(20))) + }); +} + +// CUSTOM CONFIG EXAMPLE +pub fn bench_config(c: &mut Criterion) { + let mut group = c.benchmark_group("small-sample-size"); + group.sample_size(10).significance_level(0.01); + group.bench_function("config: fib 20", |b| b.iter(|| fibonacci(black_box(20)))); + group.finish(); +} + +criterion_group!(benches, bench_sync, bench_async, bench_config); +criterion_main!(benches); + +fn get_tokio_rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} diff --git a/rust/benches/bench_tester.rs b/rust/benches/bench_tester.rs deleted file mode 100644 index 44289562..00000000 --- a/rust/benches/bench_tester.rs +++ /dev/null @@ -1,25 +0,0 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion}; - -fn fibonacci(n: u64) -> u64 { - let mut a = 0; - let mut b = 1; - - match n { - 0 => b, - _ => { - for _ in 0..n { - let c = a + b; - a = b; - b = c; - } - b - } - } -} - -pub fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("fib 20", |b| b.iter(|| fibonacci(black_box(20)))); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/rust/bitbazaar/redis/batch.rs b/rust/bitbazaar/redis/batch.rs index a9be6b71..22d7aae0 100644 --- a/rust/bitbazaar/redis/batch.rs +++ b/rust/bitbazaar/redis/batch.rs @@ -27,14 +27,14 @@ static MSET_WITH_EXPIRY_SCRIPT: LazyLock = /// Note each command may be run twice, if scripts needed caching to redis. pub struct RedisBatch<'a, 'c, ConnType: RedisConnLike, ReturnType> { _returns: PhantomData, - redis_conn: &'a mut ConnType, + redis_conn: &'a ConnType, pipe: Pipeline, /// Need to keep a reference to used scripts, these will all be reloaded to redis errors because one wasn't cached on the server. used_scripts: HashSet<&'c RedisScript>, } impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, ReturnType> { - pub(crate) fn new(redis_conn: &'a mut ConnType) -> Self { + pub(crate) fn new(redis_conn: &'a ConnType) -> Self { Self { _returns: PhantomData, redis_conn, @@ -43,8 +43,8 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R } } - async fn inner_fire(&mut self) -> Option { - if let Some(conn) = self.redis_conn.get_inner_conn().await { + async fn inner_fire(&self) -> Option { + if let Some(mut conn) = self.redis_conn.get_inner_conn().await { // Handling retryable errors internally: let retry = Retry::::fibonacci(chrono::Duration::milliseconds(10)) // Will cumulatively delay for up to 6 seconds. @@ -68,7 +68,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R _ => Some(info.last_error), }); - match retry_flexi!(retry.clone(), { self.pipe.query_async(conn).await }) { + match retry_flexi!(retry.clone(), { self.pipe.query_async(&mut conn).await }) { Ok(v) => Some(v), Err(e) => { // Load the scripts into Redis if the any of the scripts weren't there before. @@ -90,10 +90,10 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R load_pipe.add_command(script.load_cmd()); } match retry_flexi!(retry, { - load_pipe.query_async::(conn).await + load_pipe.query_async::(&mut conn).await }) { // Now loaded the scripts, rerun the batch: - Ok(_) => match self.pipe.query_async(conn).await { + Ok(_) => match self.pipe.query_async(&mut conn).await { Ok(result) => Some(result), Err(err) => { record_exception("Redis batch failed. Pipe returned NoScriptError, but we've just loaded all scripts.", format!("{:?}", err)); @@ -411,7 +411,7 @@ impl<'a, 'c, ConnType: RedisConnLike, R: FromRedisValue + RedisFuzzyUnwrap> Redi { type ReturnType = R; - async fn fire(mut self) -> as RedisFuzzyUnwrap>::Output { + async fn fire(self) -> as RedisFuzzyUnwrap>::Output { self.inner_fire::<(R,)>().await.map(|(r,)| r).fuzzy_unwrap() } } @@ -421,7 +421,7 @@ macro_rules! impl_batch_fire { impl<'a, 'c, ConnType: RedisConnLike, $($tup_item: FromRedisValue + RedisFuzzyUnwrap),*> RedisBatchFire for RedisBatch<'a, 'c, ConnType, ($($tup_item,)*)> { type ReturnType = ($($tup_item,)*); - async fn fire(mut self) -> as RedisFuzzyUnwrap>::Output { + async fn fire(self) -> as RedisFuzzyUnwrap>::Output { self.inner_fire::<($($tup_item,)*)>().await.fuzzy_unwrap() } } diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index bdd08657..3b026798 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -11,20 +11,22 @@ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs}; use super::batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; use crate::{errors::prelude::*, log::record_exception, redis::RedisScript}; -/// Wrapper around a lazy redis connection. +/// A lazy redis connection. pub struct RedisConn<'a> { pub(crate) prefix: &'a str, pool: &'a deadpool_redis::Pool, - conn: Option, + // We used to cache the [`deadpool_redis::Connection`] conn in here, + // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. + // so getting for each usage is fine, given: + // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). + // - prevents needing mutable references to the conn anymore, much nicer ergonomics. + // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. + // conn: Option, } impl<'a> RedisConn<'a> { pub(crate) fn new(pool: &'a deadpool_redis::Pool, prefix: &'a str) -> Self { - Self { - pool, - prefix, - conn: None, - } + Self { pool, prefix } } } @@ -34,20 +36,23 @@ impl<'a> Clone for RedisConn<'a> { Self { prefix: self.prefix, pool: self.pool, - conn: None, } } } -/// An owned variant of [`RedisConn`]. Useful when parent struct lifetimes get out of hand. -/// [`RedisConn`] is better, so keeping local in crate until a real need for it outside. -/// (requires pool arc cloning, and prefix string cloning, so slightly less efficient). +/// An owned variant of [`RedisConn`]. +/// Just requires a couple of Arc clones, so still quite lightweight. pub struct RedisConnOwned { // Prefix and pool both Arced now at top level for easy cloning. - // The conn will be reset to None on each clone, so it's a very heavy object I think. pub(crate) prefix: Arc, pool: deadpool_redis::Pool, - conn: Option, + // We used to cache the [`deadpool_redis::Connection`] conn in here, + // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. + // so getting for each usage is fine, given: + // - most conns will probably be used once anyway, e.g. get a conn in a handler and do some caching or whatever in a batch(). + // - prevents needing mutable references to the conn anymore, much nicer ergonomics. + // - prevents the chance of a stale cached connection, had issues before with this, deadpool handles internally. + // conn: Option, } impl Clone for RedisConnOwned { @@ -55,7 +60,6 @@ impl Clone for RedisConnOwned { Self { prefix: self.prefix.clone(), pool: self.pool.clone(), - conn: None, } } } @@ -67,7 +71,6 @@ macro_rules! impl_debug_for_conn { f.debug_struct($name) .field("prefix", &self.prefix) .field("pool", &self.pool) - .field("conn", &self.conn.is_some()) .finish() } } @@ -79,39 +82,62 @@ impl_debug_for_conn!(RedisConnOwned, "RedisConnOwned"); /// Generic methods over the RedisConn and RedisConnOwned types. pub trait RedisConnLike: std::fmt::Debug + Send + Sized { - /// Get an internal connection from the pool, connections are kept in the pool for reuse. + /// Get an internal connection from the pool. + /// Despite returning an owned object, the underlying real redis connection will be reused after this user drops it. /// If redis is acting up and unavailable, this will return None. /// NOTE: this mainly is used internally, but provides a fallback to the underlying connection, if the exposed interface does not provide options that fit an external user need (which could definitely happen). - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection>; + async fn get_inner_conn(&self) -> Option; /// Get the redis configured prefix. fn prefix(&self) -> &str; /// Convert to the owned variant. - fn into_owned(self) -> RedisConnOwned; + fn to_owned(&self) -> RedisConnOwned; - /// Ping redis, returning true if it's up. - async fn ping(&mut self) -> bool { - if let Some(conn) = self.get_inner_conn().await { - redis::cmd("PING").query_async::(conn).await.is_ok() + // TODONOW test. + // TODONOW for this, should implement with a fallback on batch, saves touching underlying like this. + /// Ping redis, returning true if it's up and responsive. + async fn ping(&self) -> bool { + if let Some(mut conn) = self.get_inner_conn().await { + redis::cmd("PING") + .query_async::(&mut conn) + .await + .is_ok() } else { false } } + // async fn pubsub(self) { + // if let Some(conn) = self.get_inner_conn().await { + // let mut pubsub = deadpool_redis::Connection::take(conn).into_pubsub(); + // let mut pubsub = conn.publish().await; + // conn.subscribe(channel_name); + // conn.into_pubsub(); + // loop { + // let msg = pubsub.get_message().await.unwrap(); + // let payload: String = msg.get_payload().unwrap(); + // println!("channel '{}': {}", msg.get_channel_name(), payload); + // if payload == "exit" { + // break; + // } + // } + // } + // } + // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// - // async fn dev_all_data(&mut self) -> HashMap { - // if let Some(conn) = self.get_inner_conn().await { + // async fn dev_all_data(&self) -> HashMap { + // if let Some(mut conn) = self.get_inner_conn().await { // let mut cmd = redis::cmd("SCAN"); // cmd.arg(0); // let mut data = HashMap::new(); // loop { - // let (next_cursor, keys): (i64, Vec) = cmd.query_async(conn).await.unwrap(); + // let (next_cursor, keys): (i64, Vec) = cmd.query_async(&mut conn).await.unwrap(); // for key in keys { // let val: redis::Value = - // redis::cmd("GET").arg(&key).query_async(conn).await.unwrap(); + // redis::cmd("GET").arg(&key).query_async(&mut conn).await.unwrap(); // data.insert(key, val); // } // if next_cursor == 0 { @@ -126,15 +152,15 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { // } /// Flush the whole redis cache, will delete all data. - async fn dev_flushall(&mut self, sync: bool) -> Option { - if let Some(conn) = self.get_inner_conn().await { + async fn dev_flushall(&self, sync: bool) -> Option { + if let Some(mut conn) = self.get_inner_conn().await { let mut cmd = redis::cmd("FLUSHALL"); if sync { cmd.arg("SYNC"); } else { cmd.arg("ASYNC"); } - match cmd.query_async::(conn).await { + match cmd.query_async::(&mut conn).await { Ok(s) => Some(s), Err(e) => { record_exception("Failed to reset redis cache.", format!("{:?}", e)); @@ -162,7 +188,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// - `None`: Continue with the operation. /// - `Some`: Retry after the duration. async fn rate_limiter( - &mut self, + &self, namespace: &str, caller_identifier: &str, start_delaying_after_attempt: usize, @@ -199,7 +225,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { } /// Get a new [`RedisBatch`] for this connection that commands can be piped together with. - fn batch(&mut self) -> RedisBatch<'_, '_, Self, ()> { + fn batch(&self) -> RedisBatch<'_, '_, Self, ()> { RedisBatch::new(self) } @@ -224,7 +250,7 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Expiry accurate to a millisecond. #[inline] async fn cached_fn<'b, T, Fut, K: Into>>( - &mut self, + &self, namespace: &str, key: K, expiry: Option, @@ -253,51 +279,44 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { } impl<'a> RedisConnLike for RedisConn<'a> { - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection> { - if self.conn.is_none() { - match self.pool.get().await { - Ok(conn) => self.conn = Some(conn), - Err(e) => { - record_exception("Failed to get redis connection.", format!("{:?}", e)); - return None; - } + async fn get_inner_conn(&self) -> Option { + match self.pool.get().await { + Ok(conn) => Some(conn), + Err(e) => { + record_exception("Failed to get redis connection.", format!("{:?}", e)); + None } } - self.conn.as_mut() } fn prefix(&self) -> &str { self.prefix } - fn into_owned(self) -> RedisConnOwned { + fn to_owned(&self) -> RedisConnOwned { RedisConnOwned { prefix: Arc::new(self.prefix.to_string()), pool: self.pool.clone(), - conn: self.conn, } } } impl RedisConnLike for RedisConnOwned { - async fn get_inner_conn(&mut self) -> Option<&mut deadpool_redis::Connection> { - if self.conn.is_none() { - match self.pool.get().await { - Ok(conn) => self.conn = Some(conn), - Err(e) => { - record_exception("Failed to get redis connection.", format!("{:?}", e)); - return None; - } + async fn get_inner_conn(&self) -> Option { + match self.pool.get().await { + Ok(conn) => Some(conn), + Err(e) => { + record_exception("Failed to get redis connection.", format!("{:?}", e)); + None } } - self.conn.as_mut() } fn prefix(&self) -> &str { &self.prefix } - fn into_owned(self) -> RedisConnOwned { - self + fn to_owned(&self) -> RedisConnOwned { + self.clone() } } diff --git a/rust/bitbazaar/redis/dlock.rs b/rust/bitbazaar/redis/dlock.rs index 7aacf154..aca9da23 100644 --- a/rust/bitbazaar/redis/dlock.rs +++ b/rust/bitbazaar/redis/dlock.rs @@ -111,18 +111,18 @@ impl<'a> RedisLock<'a> { // Need to actually lock for the first time: let lock_id = lock.lock_id.clone(); let val = lock.val.clone(); - lock.exec_or_retry(ttl, move |mut conn| { + lock.exec_or_retry(ttl, move |conn| { let lock_id = lock_id.clone(); let val = val.clone(); async move { - if let Some(conn) = conn.get_inner_conn().await { + if let Some(mut conn) = conn.get_inner_conn().await { let result: RedisResult = redis::cmd("SET") .arg(lock_id) .arg(val) .arg("NX") .arg("PX") .arg(ttl.as_millis() as usize) - .query_async(conn) + .query_async(&mut conn) .await; match result { @@ -225,7 +225,7 @@ impl<'a> RedisLock<'a> { let lock_id = self.lock_id.clone(); let val = self.val.clone(); - self.exec_or_retry(new_ttl, move |mut conn| { + self.exec_or_retry(new_ttl, move |conn| { let lock_id = lock_id.clone(); let val = val.clone(); async move { @@ -260,7 +260,7 @@ impl<'a> RedisLock<'a> { pub async fn unlock(&mut self) -> bool { let result = futures::future::join_all(self.redis.get_conn_to_each_server().into_iter().map( - |mut conn| { + |conn| { let lock_id = self.lock_id.clone(); let val = self.val.clone(); async move { diff --git a/rust/bitbazaar/redis/mod.rs b/rust/bitbazaar/redis/mod.rs index 07ad94e0..ca7586c6 100644 --- a/rust/bitbazaar/redis/mod.rs +++ b/rust/bitbazaar/redis/mod.rs @@ -460,7 +460,7 @@ mod tests { ) -> RResult<(), AnyErr> { let server = RedisStandalone::new_no_persistence().await?; let r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; - let mut rconn = r.conn(); + let rconn = r.conn(); // THIS ONLY ACTUALLY WORKS FOR STRINGS and false, OTHERS ARE NONE, DUE TO REDIS LIMITATION OF RETURNING NIL FOR EMPTY ARRS AND NONE ETC. // None::, empty vec![] and "" should all work fine as real stored values, @@ -585,7 +585,7 @@ mod tests { let server = RedisStandalone::new_no_persistence().await?; let r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; - let mut rconn = r.conn(); + let rconn = r.conn(); macro_rules! call { () => { diff --git a/rust/bitbazaar/redis/temp_list.rs b/rust/bitbazaar/redis/temp_list.rs index 7c60c8f0..06c50dae 100644 --- a/rust/bitbazaar/redis/temp_list.rs +++ b/rust/bitbazaar/redis/temp_list.rs @@ -130,7 +130,7 @@ impl serde::Deserialize<'de>> RedisTempListItem RedisTempListItemWithConn { RedisTempListItemWithConn { item: Mutex::new(self), - conn: Mutex::new(conn.into_owned()), + conn: Mutex::new(conn.to_owned()), } } @@ -382,9 +382,6 @@ impl RedisTempList { Some(self.item_inactive_ttl_chrono()), ) // Cleanup old members that have now expired: - // (set member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 .zremrangebyscore( &self.namespace, &self.key, @@ -480,11 +477,7 @@ impl RedisTempList { let item_info = conn .batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -582,11 +575,7 @@ impl RedisTempList { ) -> RedisTempListItem { let item = conn .batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -627,11 +616,7 @@ impl RedisTempList { uids: impl IntoIterator, ) { conn.batch() - // NOTE: cleaning up first as don't want these to be included in the read. - // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 + // Cleanup old members that have now expired first because don't want these to be included in the read: .zremrangebyscore( &self.namespace, &self.key, @@ -684,9 +669,6 @@ impl RedisTempList { Some(self.item_inactive_ttl_chrono()), ) // Cleanup old members that have now expired: - // (member expiry is a logical process, not currently part of redis but could be soon) - // https://github.com/redis/redis/issues/135#issuecomment-2361996 - // https://github.com/redis/redis/pull/13172 .zremrangebyscore( &self.namespace, &self.key, From a91dd21138a69b3e1c83b2b9839f48789fe99925 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Tue, 13 Aug 2024 20:40:26 +0300 Subject: [PATCH 2/4] Custom fallbacks to underlying redis interface without having to leave higher level interface --- rust/bitbazaar/redis/batch.rs | 33 +++++++++++++++++++++++++++ rust/bitbazaar/redis/conn.rs | 42 +++++++++++++---------------------- rust/bitbazaar/redis/mod.rs | 13 +++++++++++ 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/rust/bitbazaar/redis/batch.rs b/rust/bitbazaar/redis/batch.rs index 22d7aae0..851ea369 100644 --- a/rust/bitbazaar/redis/batch.rs +++ b/rust/bitbazaar/redis/batch.rs @@ -132,6 +132,26 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R } } + /// Low-level backdoor. Pass in a custom redis command to run, but don't expect a return value. + /// After calling this, custom_arg() can be used to add arguments. + /// + /// E.g. `batch.custom_no_return("SET").custom_arg("key").custom_arg("value").fire().await;` + pub fn custom_no_return(mut self, cmd: &str) -> Self { + self.pipe.cmd(cmd).ignore(); + RedisBatch { + _returns: PhantomData, + redis_conn: self.redis_conn, + pipe: self.pipe, + used_scripts: self.used_scripts, + } + } + + /// Low-level backdoor. Add a custom argument to the last custom command added with either `custom_no_return()` or `custom()`. + pub fn custom_arg(mut self, arg: impl ToRedisArgs) -> Self { + self.pipe.arg(arg); + self + } + /// Expire an existing key with a new/updated ttl. /// /// @@ -447,6 +467,9 @@ pub trait RedisBatchReturningOps<'c> { script_invokation: RedisScriptInvoker<'c>, ) -> Self::NextType>; + /// Low-level backdoor. Pass in a custom redis command to run, specifying the return value to coerce to. + fn custom(self, cmd: &str) -> Self::NextType; + /// Check if a key exists. fn exists(self, namespace: &str, key: &str) -> Self::NextType; @@ -543,6 +566,16 @@ macro_rules! impl_batch_ops { self.script_no_decode_protection(script_invokation) } + fn custom(mut self, cmd: &str) -> Self::NextType { + self.pipe.cmd(cmd); + RedisBatch { + _returns: PhantomData, + redis_conn: self.redis_conn, + pipe: self.pipe, + used_scripts: self.used_scripts + } + } + fn exists(mut self, namespace: &str, key: &str) -> Self::NextType { self.pipe.exists(self.redis_conn.final_key(namespace, key.into())); RedisBatch { diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index 3b026798..8b02867a 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -8,7 +8,10 @@ use std::{ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs}; -use super::batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; +use super::{ + batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}, + fuzzy::RedisFuzzy, +}; use crate::{errors::prelude::*, log::record_exception, redis::RedisScript}; /// A lazy redis connection. @@ -94,18 +97,14 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Convert to the owned variant. fn to_owned(&self) -> RedisConnOwned; - // TODONOW test. - // TODONOW for this, should implement with a fallback on batch, saves touching underlying like this. /// Ping redis, returning true if it's up and responsive. async fn ping(&self) -> bool { - if let Some(mut conn) = self.get_inner_conn().await { - redis::cmd("PING") - .query_async::(&mut conn) - .await - .is_ok() - } else { - false - } + self.batch() + .custom::>("PING") + .fire() + .await + .flatten() + .is_some() } // async fn pubsub(self) { @@ -152,24 +151,15 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { // } /// Flush the whole redis cache, will delete all data. + /// Returns the resulting string from the command, or None if failed for some reason. async fn dev_flushall(&self, sync: bool) -> Option { - if let Some(mut conn) = self.get_inner_conn().await { - let mut cmd = redis::cmd("FLUSHALL"); - if sync { - cmd.arg("SYNC"); - } else { - cmd.arg("ASYNC"); - } - match cmd.query_async::(&mut conn).await { - Ok(s) => Some(s), - Err(e) => { - record_exception("Failed to reset redis cache.", format!("{:?}", e)); - None - } - } + let mut batch = self.batch().custom::>("FLUSHALL"); + if sync { + batch = batch.custom_arg("SYNC"); } else { - None + batch = batch.custom_arg("ASYNC"); } + batch.fire().await.flatten() } /// A simple rate_limiter/backoff helper. diff --git a/rust/bitbazaar/redis/mod.rs b/rust/bitbazaar/redis/mod.rs index ca7586c6..5094fc5f 100644 --- a/rust/bitbazaar/redis/mod.rs +++ b/rust/bitbazaar/redis/mod.rs @@ -57,6 +57,19 @@ mod tests { Ok((server, work_r, fail_r)) } + #[rstest] + #[tokio::test] + async fn test_redis_ping(#[allow(unused_variables)] logging: ()) -> RResult<(), AnyErr> { + let (_server, work_r, fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + let fail_conn = fail_r.conn(); + + assert!(work_conn.ping().await); + assert!(!(fail_conn.ping().await)); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_redis_scripts(#[allow(unused_variables)] logging: ()) -> RResult<(), AnyErr> { From 9589518a4070b59b1c12a709710d797f1a4d1f91 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Wed, 14 Aug 2024 21:21:12 +0300 Subject: [PATCH 3/4] Redis pubsub --- rust/Cargo.toml | 2 +- rust/bitbazaar/misc/retry.rs | 6 + rust/bitbazaar/redis/batch.rs | 50 +- rust/bitbazaar/redis/conn.rs | 94 ++- rust/bitbazaar/redis/mod.rs | 3 + .../redis/pubsub/channel_listener.rs | 46 ++ rust/bitbazaar/redis/pubsub/mod.rs | 4 + rust/bitbazaar/redis/pubsub/pubsub_global.rs | 640 ++++++++++++++++++ rust/bitbazaar/redis/redis_retry.rs | 25 + rust/bitbazaar/redis/wrapper.rs | 23 +- 10 files changed, 801 insertions(+), 92 deletions(-) create mode 100644 rust/bitbazaar/redis/pubsub/channel_listener.rs create mode 100644 rust/bitbazaar/redis/pubsub/mod.rs create mode 100644 rust/bitbazaar/redis/pubsub/pubsub_global.rs create mode 100644 rust/bitbazaar/redis/redis_retry.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5711f382..9959a973 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -127,7 +127,7 @@ features = ["sync"] # These are included on top of above features when not wasm: [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "1" -features = ["time", "fs", "process", "rt", "io-util"] +features = ["time", "fs", "process", "rt", "io-util", "macros"] [target.'cfg(target_arch = "wasm32")'.dependencies] tracing-subscriber-wasm = "0.1.0" diff --git a/rust/bitbazaar/misc/retry.rs b/rust/bitbazaar/misc/retry.rs index 510ae4e8..eb7ac8be 100644 --- a/rust/bitbazaar/misc/retry.rs +++ b/rust/bitbazaar/misc/retry.rs @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> { self } + /// Never stop retrying. + pub fn until_forever(mut self) -> Self { + self.until = RetryUntil::TotalAttempts(usize::MAX); + self + } + /// Stop retrying after the total delay reaches the given duration. pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self { self.until = RetryUntil::TotalDelay(max_total_delay); diff --git a/rust/bitbazaar/redis/batch.rs b/rust/bitbazaar/redis/batch.rs index 851ea369..69975817 100644 --- a/rust/bitbazaar/redis/batch.rs +++ b/rust/bitbazaar/redis/batch.rs @@ -4,11 +4,12 @@ use std::{collections::HashSet, marker::PhantomData, sync::LazyLock}; use deadpool_redis::redis::{FromRedisValue, Pipeline, ToRedisArgs}; -use crate::{log::record_exception, misc::Retry, retry_flexi}; +use crate::{log::record_exception, retry_flexi}; use super::{ conn::RedisConnLike, fuzzy::{RedisFuzzy, RedisFuzzyUnwrap}, + redis_retry::redis_retry_config, RedisScript, RedisScriptInvoker, }; @@ -46,29 +47,9 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R async fn inner_fire(&self) -> Option { if let Some(mut conn) = self.redis_conn.get_inner_conn().await { // Handling retryable errors internally: - let retry = Retry::::fibonacci(chrono::Duration::milliseconds(10)) - // Will cumulatively delay for up to 6 seconds. - // SHOULDN'T BE LONGER, considering outer code may then handle the redis failure, - // in e.g. web request, any longer would then be harmful. - .until_total_delay(chrono::Duration::seconds(5)) - .on_retry(move |info| match info.last_error.kind() { - // These should all be automatically retried: - redis::ErrorKind::BusyLoadingError - | redis::ErrorKind::TryAgain - | redis::ErrorKind::MasterDown => { - tracing::warn!( - "Redis command failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", - info.delay_till_next_attempt, - info.last_attempt_no, - info.last_error - ); - None - }, - // Everything else should just exit straight away, no point retrying internally. - _ => Some(info.last_error), - }); - - match retry_flexi!(retry.clone(), { self.pipe.query_async(&mut conn).await }) { + match retry_flexi!(redis_retry_config(), { + self.pipe.query_async(&mut conn).await + }) { Ok(v) => Some(v), Err(e) => { // Load the scripts into Redis if the any of the scripts weren't there before. @@ -89,7 +70,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R for script in &self.used_scripts { load_pipe.add_command(script.load_cmd()); } - match retry_flexi!(retry, { + match retry_flexi!(redis_retry_config(), { load_pipe.query_async::(&mut conn).await }) { // Now loaded the scripts, rerun the batch: @@ -138,12 +119,7 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R /// E.g. `batch.custom_no_return("SET").custom_arg("key").custom_arg("value").fire().await;` pub fn custom_no_return(mut self, cmd: &str) -> Self { self.pipe.cmd(cmd).ignore(); - RedisBatch { - _returns: PhantomData, - redis_conn: self.redis_conn, - pipe: self.pipe, - used_scripts: self.used_scripts, - } + self } /// Low-level backdoor. Add a custom argument to the last custom command added with either `custom_no_return()` or `custom()`. @@ -152,6 +128,18 @@ impl<'a, 'c, ConnType: RedisConnLike, ReturnType> RedisBatch<'a, 'c, ConnType, R self } + /// Publish a message to a pubsub channel. + /// Use [`RedisConnLike::pubsub_listen`] to listen for messages. + pub fn publish(mut self, namespace: &str, channel: &str, message: impl ToRedisArgs) -> Self { + self.pipe + .publish( + self.redis_conn.final_key(namespace, channel.into()), + message, + ) + .ignore(); + self + } + /// Expire an existing key with a new/updated ttl. /// /// diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index 8b02867a..81a863d3 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -11,13 +11,17 @@ use deadpool_redis::redis::{FromRedisValue, ToRedisArgs}; use super::{ batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}, fuzzy::RedisFuzzy, + pubsub::{pubsub_global::RedisPubSubGlobal, RedisChannelListener}, }; use crate::{errors::prelude::*, log::record_exception, redis::RedisScript}; /// A lazy redis connection. +#[derive(Debug, Clone)] pub struct RedisConn<'a> { pub(crate) prefix: &'a str, pool: &'a deadpool_redis::Pool, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: &'a Arc, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: @@ -28,27 +32,28 @@ pub struct RedisConn<'a> { } impl<'a> RedisConn<'a> { - pub(crate) fn new(pool: &'a deadpool_redis::Pool, prefix: &'a str) -> Self { - Self { pool, prefix } - } -} - -// Cloning is still technically heavy for the un-owned, as the active connection can't be reused. -impl<'a> Clone for RedisConn<'a> { - fn clone(&self) -> Self { + pub(crate) fn new( + pool: &'a deadpool_redis::Pool, + prefix: &'a str, + pubsub_global: &'a Arc, + ) -> Self { Self { - prefix: self.prefix, - pool: self.pool, + pool, + prefix, + pubsub_global, } } } /// An owned variant of [`RedisConn`]. /// Just requires a couple of Arc clones, so still quite lightweight. +#[derive(Debug, Clone)] pub struct RedisConnOwned { // Prefix and pool both Arced now at top level for easy cloning. pub(crate) prefix: Arc, pool: deadpool_redis::Pool, + // It uses it's own global connection so needed down here too, to abstract away and use from the same higher-order connection: + pubsub_global: Arc, // We used to cache the [`deadpool_redis::Connection`] conn in here, // but after benching it literally costs about 20us to get a connection from deadpool because it rotates them internally. // so getting for each usage is fine, given: @@ -58,31 +63,6 @@ pub struct RedisConnOwned { // conn: Option, } -impl Clone for RedisConnOwned { - fn clone(&self) -> Self { - Self { - prefix: self.prefix.clone(), - pool: self.pool.clone(), - } - } -} - -macro_rules! impl_debug_for_conn { - ($conn_type:ty, $name:literal) => { - impl std::fmt::Debug for $conn_type { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct($name) - .field("prefix", &self.prefix) - .field("pool", &self.pool) - .finish() - } - } - }; -} - -impl_debug_for_conn!(RedisConn<'_>, "RedisConn"); -impl_debug_for_conn!(RedisConnOwned, "RedisConnOwned"); - /// Generic methods over the RedisConn and RedisConnOwned types. pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Get an internal connection from the pool. @@ -94,6 +74,9 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { /// Get the redis configured prefix. fn prefix(&self) -> &str; + /// Get the redis pubsub global manager. + fn _pubsub_global(&self) -> &Arc; + /// Convert to the owned variant. fn to_owned(&self) -> RedisConnOwned; @@ -107,23 +90,23 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { .is_some() } - // async fn pubsub(self) { - // if let Some(conn) = self.get_inner_conn().await { - // let mut pubsub = deadpool_redis::Connection::take(conn).into_pubsub(); - // let mut pubsub = conn.publish().await; - // conn.subscribe(channel_name); - // conn.into_pubsub(); - // loop { - // let msg = pubsub.get_message().await.unwrap(); - // let payload: String = msg.get_payload().unwrap(); - // println!("channel '{}': {}", msg.get_channel_name(), payload); - // if payload == "exit" { - // break; - // } - // } - // } - // } + /// Subscribe to a channel via pubsub, receiving messages through the returned receiver. + /// The subscription will be dropped when the receiver is dropped. + /// + /// Sending can be done via normal batches using [`RedisBatch::publish`]. + /// + /// Returns None when redis unavailable for some reason, after a few seconds of trying to connect. + async fn subscribe( + &self, + namespace: &str, + channel: &str, + ) -> Option> { + self._pubsub_global() + .subscribe(self.final_key(namespace, channel.into())) + .await + } + /// TODONOW update and test. // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// @@ -283,10 +266,15 @@ impl<'a> RedisConnLike for RedisConn<'a> { self.prefix } + fn _pubsub_global(&self) -> &Arc { + self.pubsub_global + } + fn to_owned(&self) -> RedisConnOwned { RedisConnOwned { prefix: Arc::new(self.prefix.to_string()), pool: self.pool.clone(), + pubsub_global: self.pubsub_global.clone(), } } } @@ -306,6 +294,10 @@ impl RedisConnLike for RedisConnOwned { &self.prefix } + fn _pubsub_global(&self) -> &Arc { + &self.pubsub_global + } + fn to_owned(&self) -> RedisConnOwned { self.clone() } diff --git a/rust/bitbazaar/redis/mod.rs b/rust/bitbazaar/redis/mod.rs index 5094fc5f..167249c9 100644 --- a/rust/bitbazaar/redis/mod.rs +++ b/rust/bitbazaar/redis/mod.rs @@ -3,6 +3,8 @@ mod conn; mod dlock; mod fuzzy; mod json; +mod pubsub; +mod redis_retry; mod script; mod temp_list; mod wrapper; @@ -15,6 +17,7 @@ pub use batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps}; pub use conn::{RedisConn, RedisConnLike}; pub use dlock::{RedisLock, RedisLockErr}; pub use json::{RedisJson, RedisJsonBorrowed}; +pub use pubsub::RedisChannelListener; // Re-exporting redis and deadpool_redis to be used outside if needed: pub use deadpool_redis; pub use redis; diff --git a/rust/bitbazaar/redis/pubsub/channel_listener.rs b/rust/bitbazaar/redis/pubsub/channel_listener.rs new file mode 100644 index 00000000..987c9301 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/channel_listener.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use redis::{from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::log::record_exception; + +/// A listener to receive messages from a redis channel via pubsub. +pub struct RedisChannelListener { + pub(crate) on_drop_tx: Arc>, + pub(crate) key: u64, + pub(crate) channel: String, + pub(crate) rx: tokio::sync::mpsc::UnboundedReceiver, + pub(crate) _t: std::marker::PhantomData, +} + +impl RedisChannelListener { + /// Get a new message from the channel. + /// The outer None indicates the channel has been closed erroneously, or the internal data could not be coerced to the target type. + /// In either case, something's gone wrong, an exception will probably have been recorded too. + pub async fn recv(&mut self) -> Option { + if let Some(v) = self.rx.recv().await { + match from_owned_redis_value(v) { + Ok(v) => Some(v), + Err(e) => { + record_exception( + format!( + "Failed to convert redis value to target type '{}'", + std::any::type_name::() + ), + format!("{:?}", e), + ); + None + } + } + } else { + None + } + } +} + +/// Tell the global pubsub manager this listener is being dropped. +impl Drop for RedisChannelListener { + fn drop(&mut self) { + let _ = self.on_drop_tx.send((self.channel.clone(), self.key)); + } +} diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs new file mode 100644 index 00000000..80da002e --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/mod.rs @@ -0,0 +1,4 @@ +mod channel_listener; +pub(crate) mod pubsub_global; + +pub use channel_listener::*; diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs new file mode 100644 index 00000000..10c6d339 --- /dev/null +++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs @@ -0,0 +1,640 @@ +use std::{ + collections::HashMap, + fmt::Debug, + sync::{atomic::AtomicBool, Arc}, +}; + +use chrono::TimeDelta; +use dashmap::DashMap; +use redis::{aio::MultiplexedConnection, from_owned_redis_value, FromRedisValue, ToRedisArgs}; + +use crate::{ + log::record_exception, + misc::{random_u64_rolling, Retry}, + prelude::*, + redis::redis_retry::redis_retry_config, +}; + +use super::RedisChannelListener; + +/// The lazy pubsub manager. +pub struct RedisPubSubGlobal { + client: redis::Client, + config: redis::AsyncConnectionConfig, + /// Unlike deadpool these aren't pooled, so definitely need to store and reuse until it becomes invalid, only then get a new one. + active_conn: tokio::sync::RwLock>, + /// The downstream configured listeners for different channels, messages will be pushed to all active listeners. + /// Putting in a nested hashmap for easy cleanup when listeners are dropped. + pub(crate) listeners: + DashMap>>, + + /// The global receiver of messages hooked directly into the redis connection. + /// This will be taken when the main listener is spawned. + rx: tokio::sync::Mutex>>, + /// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped. + /// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.) + listener_drop_tx: Arc>, + listener_drop_rx: + tokio::sync::Mutex>>, + spawned: AtomicBool, +} + +impl Debug for RedisPubSubGlobal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisPubSubGlobal") + .field("client", &self.client) + // .field("config", &self.config) + .field("active_conn", &self.active_conn) + .field("listeners", &self.listeners) + .field("rx", &self.rx) + .field("spawned", &self.spawned) + .finish() + } +} + +impl RedisPubSubGlobal { + pub(crate) fn new(redis_conn_str: impl Into) -> RResult { + let client = redis::Client::open(format!("{}?protocol=resp3", redis_conn_str.into())) + .change_context(AnyErr)?; + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (listener_drop_tx, listener_drop_rx) = tokio::sync::mpsc::unbounded_channel(); + let config = redis::AsyncConnectionConfig::new().set_push_sender(tx); + Ok(Self { + client, + config, + active_conn: tokio::sync::RwLock::new(None), + listeners: DashMap::new(), + rx: tokio::sync::Mutex::new(Some(rx)), + listener_drop_tx: Arc::new(listener_drop_tx), + listener_drop_rx: tokio::sync::Mutex::new(Some(listener_drop_rx)), + spawned: AtomicBool::new(false), + }) + } + + pub(crate) async fn unsubscribe(&self, channel: impl Into) { + let channel = channel.into(); + self.listeners.remove(&channel); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.unsubscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Ok(()) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to unsubscribe from channel.", + format!("{:?}", e), + ); + } + } + } + + /// Returns None when redis down/acting up and couldn't get over a few seconds. + pub(crate) async fn subscribe( + self: &Arc, + channel: impl Into, + ) -> Option> { + let channel = channel.into(); + + let force_new_connection = AtomicBool::new(false); + match redis_retry_config() + .call(|| async { + if let Some(mut conn) = self + .get_conn( + // Means on second attempt onwards, will always get new connections. + force_new_connection.swap(true, std::sync::atomic::Ordering::Relaxed), + ) + .await + { + conn.subscribe(&channel).await + } else { + // Doing nothing when None as that'll have been logged lower down. + Err(redis::RedisError::from(std::io::Error::new( + std::io::ErrorKind::Other, + "Couldn't get a connection to redis.", + ))) + } + }) + .await + { + Ok(()) => {} + Err(e) => { + record_exception( + "Pubsub: failed to subscribe to channel.", + format!("{:?}", e), + ); + return None; + } + } + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let listener_key = random_u64_rolling(); + self.listeners + .entry(channel.clone()) + .or_default() + .insert(listener_key, tx); + + if !self + .spawned + .swap(true, std::sync::atomic::Ordering::Relaxed) + { + let arc_self = self.clone(); + let mut rx = self + .rx + .lock() + .await + .take() + .expect("rx should only be taken once"); + let mut listener_drop_rx = self + .listener_drop_rx + .lock() + .await + .take() + .expect("listener_drop_rx should only be taken once"); + + tokio::spawn(async move { + loop { + tokio::select! { + // Adding this means the listener fut will always be polled first, i.e. has higher priority. + // This is what we want as it cleans up dead listeners, so avoids the second fut ideally hitting any dead listeners. + biased; + + result = listener_drop_rx.recv() => { + arc_self.spawned_handle_listener_dropped(result).await; + } + result = rx.recv() => { + arc_self.spawned_handle_message(result).await; + } + } + } + }); + } + + Some(RedisChannelListener { + key: listener_key, + on_drop_tx: self.listener_drop_tx.clone(), + channel, + rx, + _t: std::marker::PhantomData, + }) + } + + /// None returned when redis seemingly down/erroring and can't get a connection. + async fn get_conn(&self, force_new_connection: bool) -> Option { + // Inside rwlock so read only if already in there and not forcing new, to avoid getting a write lock when not needed: + if !force_new_connection { + if let Some(conn) = self.active_conn.read().await.as_ref() { + return Some(conn.clone()); + } + } + + // If couldn't return above, we need a new conn: + let mut maybe_conn = self.active_conn.write().await; + match redis_retry_config() + .call(move || { + // WARNING: unlike deadpool for the rest of redis, this is very heavy as it's not pooled. + self.client + .get_multiplexed_async_connection_with_config(&self.config) + }) + .await + { + Ok(mut conn) => { + // Need to re-subscribe to all actively listened to channels for the new connection: + for entry in self.listeners.iter() { + let channel = entry.key(); + match conn.subscribe(channel).await { + Ok(()) => {} + Err(e) => { + record_exception( + format!("Pubsub: failed to re-subscribe to channel '{}' with newly acquired connection, discarding.", channel), + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + } + *maybe_conn = Some(conn); + } + Err(e) => { + record_exception( + "Pubsub: creation of a new Redis connection failed.", + format!("{:?}", e), + ); + *maybe_conn = None; + return None; + } + } + + let conn = maybe_conn + .as_ref() + .expect("conn should be Some given just created if needed."); + + Some(conn.clone()) + } + + /// Handle cleaning up the listeners dashmap, and calling redis's unsubscribe method when no more listeners for a given channel. + /// The cleanup channel gets called in the drop fn of each [`RedisChannelListener`]. + async fn spawned_handle_listener_dropped( + self: &Arc, + channel_and_key: Option<(String, u64)>, + ) { + match channel_and_key { + Some((channel, key)) => { + let unsub = if let Some(mut listeners) = self.listeners.get_mut(&channel) { + listeners.remove(&key); + listeners.is_empty() + } else { + true + }; + // Need to come after otherwise dashmap could deadlock. + if unsub { + self.unsubscribe(&channel).await; + } + } + None => { + record_exception( + "Pubsub: redis cleanup channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } + + /// Handle redis pubsub messages coming into subscriptions. + async fn spawned_handle_message(self: &Arc, message: Option) { + match message { + Some(push_info) => { + match push_info.kind.clone() { + redis::PushKind::Subscribe => { + // Example received: + // PushInfo { kind: Subscribe, data: [bulk-string('"foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((channel, sub_count)) => { + // tracing::info!( + // "Subscribed to channel: '{}', sub_count: {}", + // channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Subscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Unsubscribe => { + // Example received: + // PushInfo { kind: Unsubscribe, data: [bulk-string('"49878c28-c7ef-4f4c-b196-9956942bbe95:n1:foo"'), int(1)] } + + // Don't actually need to do anything for these methods: + + // match from_owned_redis_value::<(String, i64)>( + // redis::Value::Array(push_info.data), + // ) { + // Ok((client_and_channel, sub_count)) => { + // tracing::info!( + // "Client unsubscribed from channel: '{}', sub_count: {}", + // client_and_channel, + // sub_count + // ); + // } + // Err(e) => { + // record_exception( + // "Pubsub: failed to decode redis::PushKind::Unsubscribe.", + // format!("{:?}", e), + // ); + // } + // } + } + redis::PushKind::Disconnection => { + tracing::warn!( + "Pubsub: redis disconnected, attempting to get new connection, retrying every 100ms until success..." + ); + let result = Retry::fixed(TimeDelta::milliseconds(100)) + .until_forever() + .call(|| async { + match self.get_conn(true).await { + Some(_) => { + tracing::info!("Pubsub: redis reconnected."); + Ok(()) + } + None => Err(()), + } + }) + .await; + if result.is_err() { + panic!("Should be impossible, above retry loop should go infinitely until success"); + } + } + redis::PushKind::Message => { + // Example received: + // PushInfo { kind: Message, data: [bulk-string('"foo"'), bulk-string('"bar"')] } + + match from_owned_redis_value::<(String, redis::Value)>(redis::Value::Array( + push_info.data, + )) { + Ok((channel, msg)) => { + if let Some(listeners) = self.listeners.get(&channel) { + for tx in listeners.values() { + // Given we have a separate future for cleaning up, + // this shouldn't be a big issue if this ever errors with dead listeners, + // as they should immediately be cleaned up by the cleanup future. + let _ = tx.send(msg.clone()); + } + } + } + Err(e) => { + record_exception( + "Pubsub: failed to decode redis::PushKind::Message.", + format!("{:?}", e), + ); + } + } + } + _ => { + record_exception( + "Pubsub: unsupported/unexpected message received by global listener.", + format!("{:?}", push_info), + ); + } + } + } + None => { + record_exception( + "Pubsub: redis listener channel died. Tx sender supposedly dropped.", + "", + ); + } + } + } +} + +// TESTS: +// - redis prefix still used. +// - DONE: sub to same channel twice with same name but 2 different fns, each should be called once, not first twice, second twice, first dropped etc. +// - Redis down then backup: +// - If just during listening, msgs, shld come in after back up. +// - if happened before subscribe, subscribe still recorded and applied after redis back up +// - After lots of random channels, lots of random listeners, once all dropped the hashmap should be empty. + +// Redis server can't be run on windows: +#[cfg(not(target_os = "windows"))] +#[cfg(test)] +mod tests { + + use chrono::TimeDelta; + + use crate::misc::with_timeout; + use crate::redis::{Redis, RedisBatchFire, RedisConnLike, RedisStandalone}; + use crate::testing::prelude::*; + + use super::*; + + async fn setup_conns() -> RResult<(RedisStandalone, Redis, Redis), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + // Also create a fake version on a random port, this will be used to check failure cases. + let fail_r = Redis::new( + "redis://FAKKEEEE:6372", + format!("test_{}", uuid::Uuid::new_v4()), + )?; + Ok((server, work_r, fail_r)) + } + + // The basics: + // - Listeners receive messages. + // - Listeners receive only their own messages. + // - Listeners clean themselves up. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_simple( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + for (mut rx, namespace) in [ + ( + work_conn.subscribe::("n1", "foo").await.unwrap(), + "n1", + ), + ( + work_conn.subscribe::("n2", "foo").await.unwrap(), + "n2", + ), + ] { + assert!(work_conn + .batch() + .publish(namespace, "foo", format!("{}_first_msg", namespace)) + .publish(namespace, "foo", format!("{}_second_msg", namespace)) + .fire() + .await + .is_some()); + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some(format!("{}_first_msg", namespace)), rx.recv().await); + assert_eq!(Some(format!("{}_second_msg", namespace)), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + // Multiple listeners on the same channel: + // - Each gets data + // - Each gets data only once + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_single_channel_multiple_listeners( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + let rx1 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx2 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx3 = work_conn.subscribe::("n1", "foo").await.unwrap(); + + // All 3 receivers should receive these messages: + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + for mut rx in [rx1, rx2, rx3] { + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + /// - pubsub should be able to continue after redis goes down and back up. + /// - subscribe() and publish() should work even if redis literally just coming back alive. + /// - subscriptions should automatically resubscribe when the connection has to be restarted on new redis. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_redis_sketchiness( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + // Start a server to get a static port then instantly shutdown but keep the redis instance (client): + let (client, port) = { + let server = RedisStandalone::new_no_persistence().await?; + let client = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4().to_string())?; + (client, server.port) + }; + + let restart_server = move || { + // Same as new_no_persistence, but have to use underlying for port: + RedisStandalone::new_with_opts(port, Some(&["--appendonly", "no", "--save", "\"\""])) + }; + + // subscribe() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let mut rx = { + let _server = restart_server().await?; + client + .conn() + .subscribe::("n1", "foo") + .await + .unwrap() + }; + + // publish() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let _server = restart_server().await?; + // This is separate, just confirming publish works straight away, + // slight delay needed for actual publish as redis needs time to resubscribe to the channel on the new connection, + // otherwise won't see the published event. + assert!(client + .conn() + .batch() + .publish("lah", "loo", "baz") + .fire() + .await + .is_some()); + + // Short delay, see above comment for redis to resubscribe before proper publish to check: + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + assert!(client + .conn() + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + // Despite all the madness messages should come through: + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + + Ok(()) + } + + // Nothing should break when no ones subscribed to a channel when a message is published. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_no_listener( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + Ok(()) + } +} diff --git a/rust/bitbazaar/redis/redis_retry.rs b/rust/bitbazaar/redis/redis_retry.rs new file mode 100644 index 00000000..3e0c8306 --- /dev/null +++ b/rust/bitbazaar/redis/redis_retry.rs @@ -0,0 +1,25 @@ +use crate::misc::Retry; + +pub(crate) fn redis_retry_config() -> Retry<'static, redis::RedisError> { + Retry::::fibonacci(chrono::Duration::milliseconds(10)) + // Will cumulatively delay for up to about 5ish seconds. + // SHOULDN'T BE LONGER, considering downstream user code may then handle the redis failure, + // in e.g. web request, any longer would then be harmful. + .until_total_delay(chrono::Duration::seconds(5)) + .on_retry(move |info| match info.last_error.kind() { + // These should all be automatically retried: + redis::ErrorKind::BusyLoadingError + | redis::ErrorKind::TryAgain + | redis::ErrorKind::MasterDown => { + tracing::warn!( + "Redis action failed with retryable error, retrying in {}. Last attempt no: '{}'.\nErr:\n{:?}.", + info.delay_till_next_attempt, + info.last_attempt_no, + info.last_error + ); + None + }, + // Everything else should just exit straight away, no point retrying internally. + _ => Some(info.last_error), + }) +} diff --git a/rust/bitbazaar/redis/wrapper.rs b/rust/bitbazaar/redis/wrapper.rs index bef6f812..a5123cee 100644 --- a/rust/bitbazaar/redis/wrapper.rs +++ b/rust/bitbazaar/redis/wrapper.rs @@ -1,9 +1,9 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use deadpool_redis::{Config, Runtime}; use futures::Future; -use super::{RedisConn, RedisLock, RedisLockErr}; +use super::{pubsub::pubsub_global::RedisPubSubGlobal, RedisConn, RedisLock, RedisLockErr}; use crate::errors::prelude::*; /// A wrapper around redis to make it more concise to use and not need redis in the downstream Cargo.toml. @@ -12,7 +12,9 @@ use crate::errors::prelude::*; /// All redis errors (availability, unexpected content) will be logged as errors and results returned as `None` (or similar) where possible. #[derive(Debug, Clone)] pub struct Redis { + // deadpool arced internally. pool: deadpool_redis::Pool, + pub(crate) pubsub_listener: Arc, prefix: String, } @@ -20,24 +22,27 @@ impl Redis { /// Create a new global redis wrapper from the given Redis URL (like `redis://127.0.0.1`). /// /// Note this should only be done once at startup. - pub fn new, B: Into>( - redis_conn_str: A, - prefix: B, + pub fn new( + redis_conn_str: impl Into, + prefix: impl Into, ) -> RResult { - let cfg = Config::from_url(redis_conn_str); + let redis_conn_str = redis_conn_str.into(); + let prefix = prefix.into(); + let cfg = Config::from_url(&redis_conn_str); let pool = cfg .create_pool(Some(Runtime::Tokio1)) .change_context(AnyErr)?; - + let pubsub_listener = Arc::new(RedisPubSubGlobal::new(&redis_conn_str)?); Ok(Self { pool, - prefix: prefix.into(), + prefix, + pubsub_listener, }) } /// Get a [`RedisConn`] redis can be called with. pub fn conn(&self) -> RedisConn<'_> { - RedisConn::new(&self.pool, &self.prefix) + RedisConn::new(&self.pool, &self.prefix, &self.pubsub_listener) } /// Get a distributed redis lock. From a98ac0d2fa6a0161f724e624ad9b42460c02f94d Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Mon, 19 Aug 2024 15:32:49 +0300 Subject: [PATCH 4/4] Error resistant, more tests, lazy clone. --- rust/bitbazaar/log/mod.rs | 2 +- rust/bitbazaar/misc/lazy_clone.rs | 98 ++++++ rust/bitbazaar/misc/mod.rs | 2 + rust/bitbazaar/misc/retry.rs | 5 +- rust/bitbazaar/redis/conn.rs | 1 - rust/bitbazaar/redis/pubsub/mod.rs | 299 ++++++++++++++++ rust/bitbazaar/redis/pubsub/pubsub_global.rs | 339 ++++--------------- 7 files changed, 464 insertions(+), 282 deletions(-) create mode 100644 rust/bitbazaar/misc/lazy_clone.rs diff --git a/rust/bitbazaar/log/mod.rs b/rust/bitbazaar/log/mod.rs index a3b9eac4..d7b79350 100644 --- a/rust/bitbazaar/log/mod.rs +++ b/rust/bitbazaar/log/mod.rs @@ -407,7 +407,7 @@ mod tests { // Sleeping after each, to try and ensure the correct debug output: log.with_tmp_global(|| { // On windows this needs to be really long to get static record ordering for testing: - let delay = if cfg!(windows) { 100 } else { 10 }; + let delay = if cfg!(windows) { 100 } else { 30 }; debug!("BEFORE"); std::thread::sleep(std::time::Duration::from_millis(delay)); diff --git a/rust/bitbazaar/misc/lazy_clone.rs b/rust/bitbazaar/misc/lazy_clone.rs new file mode 100644 index 00000000..769b5a08 --- /dev/null +++ b/rust/bitbazaar/misc/lazy_clone.rs @@ -0,0 +1,98 @@ +/// Efficient way to clone an item for each element in an iterator. +/// The final iteration will consume the original item, so no unnecessary clones are made. +pub trait IterWithCloneLazy { + /// The return type of the iterator. + type IterT; + + /// Efficient way to pass an owned clone of an item to each element in an iterator. + /// Will pass the final item by value without cloning, so no unnecessary clones are made. + fn with_clone_lazy( + self, + item: ItemT, + ) -> impl Iterator + where + Self: Sized; +} + +impl> IterWithCloneLazy for I { + type IterT = IterT; + + fn with_clone_lazy( + self, + item: ItemT, + ) -> impl Iterator + where + Self: Sized, + { + let mut iter = self.into_iter(); + LazyCloneIter { + item: Some(item), + next_in_iter: iter.next(), + iter, + } + } +} + +struct LazyCloneIter { + // Will consume when next_in_iter is None, as on last iteration. + item: Option, + iter: I, + next_in_iter: Option, +} + +impl Iterator for LazyCloneIter { + type Item = (ItemT, I::Item); + + fn next(&mut self) -> Option { + self.next_in_iter.take().map(|next| { + self.next_in_iter = self.iter.next(); + if self.next_in_iter.is_none() { + (self.item.take().unwrap(), next) + } else { + (self.item.clone().unwrap(), next) + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{atomic::AtomicUsize, Arc}; + + use super::*; + + #[test] + fn test_lazy_clone_with_clone_lazy() { + struct Test { + tot_clones: Arc, + } + impl Clone for Test { + fn clone(&self) -> Self { + self.tot_clones + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Test { + tot_clones: self.tot_clones.clone(), + } + } + } + + // Try for 0..10 iterator length, main things to check are 0, 1 and >1. + // For all but final iteration, should clone, then pass by value. + for count in 0..10 { + let tot_clones = Arc::new(AtomicUsize::new(0)); + let test = Test { + tot_clones: tot_clones.clone(), + }; + for (t, index) in (0..count).with_clone_lazy(test) { + assert_eq!( + t.tot_clones.load(std::sync::atomic::Ordering::Relaxed), + if index < count - 1 { index + 1 } else { index } + ); + } + assert_eq!( + tot_clones.load(std::sync::atomic::Ordering::Relaxed), + count.max(1) - 1 + ); + } + } +} diff --git a/rust/bitbazaar/misc/mod.rs b/rust/bitbazaar/misc/mod.rs index 70dddcda..ae7266ea 100644 --- a/rust/bitbazaar/misc/mod.rs +++ b/rust/bitbazaar/misc/mod.rs @@ -8,6 +8,7 @@ mod binary_search; mod flexi_logger; mod global_lock; mod is_tcp_port_listening; +mod lazy_clone; mod looper; mod main_wrapper; mod periodic_updater; @@ -26,6 +27,7 @@ pub use binary_search::*; pub use flexi_logger::*; pub use global_lock::*; pub use is_tcp_port_listening::is_tcp_port_listening; +pub use lazy_clone::*; pub use looper::*; pub use main_wrapper::*; pub use periodic_updater::*; diff --git a/rust/bitbazaar/misc/retry.rs b/rust/bitbazaar/misc/retry.rs index eb7ac8be..01cb601f 100644 --- a/rust/bitbazaar/misc/retry.rs +++ b/rust/bitbazaar/misc/retry.rs @@ -86,7 +86,7 @@ impl<'a, E> Retry<'a, E> { /// Never stop retrying. pub fn until_forever(mut self) -> Self { - self.until = RetryUntil::TotalAttempts(usize::MAX); + self.until = RetryUntil::Forever; self } @@ -199,6 +199,8 @@ pub enum RetryUntil { TotalDelay(Duration), /// UNSTABLE: ONLY PUBLIC FOR MACRO USE. Delay(Duration), + /// UNSTABLE: ONLY PUBLIC FOR MACRO USE. + Forever, } impl RetryUntil { @@ -229,6 +231,7 @@ impl RetryUntil { return true; } } + RetryUntil::Forever => return false, } false } diff --git a/rust/bitbazaar/redis/conn.rs b/rust/bitbazaar/redis/conn.rs index 81a863d3..7e8409e7 100644 --- a/rust/bitbazaar/redis/conn.rs +++ b/rust/bitbazaar/redis/conn.rs @@ -106,7 +106,6 @@ pub trait RedisConnLike: std::fmt::Debug + Send + Sized { .await } - /// TODONOW update and test. // Commented out as untested, not sure if works. // /// Get all data from redis, only really useful during testing. // /// diff --git a/rust/bitbazaar/redis/pubsub/mod.rs b/rust/bitbazaar/redis/pubsub/mod.rs index 80da002e..47d1db89 100644 --- a/rust/bitbazaar/redis/pubsub/mod.rs +++ b/rust/bitbazaar/redis/pubsub/mod.rs @@ -2,3 +2,302 @@ mod channel_listener; pub(crate) mod pubsub_global; pub use channel_listener::*; + +// Redis server can't be run on windows: +#[cfg(not(target_os = "windows"))] +#[cfg(test)] +mod tests { + + use chrono::TimeDelta; + + use crate::misc::with_timeout; + use crate::redis::{Redis, RedisBatchFire, RedisConnLike, RedisStandalone}; + use crate::testing::prelude::*; + + async fn setup_conns() -> RResult<(RedisStandalone, Redis, Redis), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + // Also create a fake version on a random port, this will be used to check failure cases. + let fail_r = Redis::new( + "redis://FAKE:6372", + format!("test_{}", uuid::Uuid::new_v4()), + )?; + Ok((server, work_r, fail_r)) + } + + // The basics: + // - Listeners receive messages. + // - Listeners receive only their own messages. + // - Listeners clean themselves up. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_simple( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + for (mut rx, namespace) in [ + ( + work_conn.subscribe::("n1", "foo").await.unwrap(), + "n1", + ), + ( + work_conn.subscribe::("n2", "foo").await.unwrap(), + "n2", + ), + ] { + assert!(work_conn + .batch() + .publish(namespace, "foo", format!("{}_first_msg", namespace)) + .publish(namespace, "foo", format!("{}_second_msg", namespace)) + .fire() + .await + .is_some()); + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some(format!("{}_first_msg", namespace)), rx.recv().await); + assert_eq!(Some(format!("{}_second_msg", namespace)), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + // The redis prefix should be respected for channels + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_prefix_respected( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let server = RedisStandalone::new_no_persistence().await?; + let work_r_1 = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + let work_r_2 = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; + let work_conn_1 = work_r_1.conn(); + let work_conn_2 = work_r_2.conn(); + + // Given we're using 2 different prefixes, each should not be impacted by the other: + let mut rx1 = work_conn_1.subscribe::("", "foo").await.unwrap(); + let mut rx2 = work_conn_2.subscribe::("", "foo").await.unwrap(); + + assert!(work_conn_1 + .batch() + .publish("", "foo", "conn_1_msg") + .fire() + .await + .is_some()); + assert!(work_conn_2 + .batch() + .publish("", "foo", "conn_2_msg") + .fire() + .await + .is_some()); + + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("conn_1_msg".to_string()), rx1.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx1.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + assert_eq!(Some("conn_2_msg".to_string()), rx2.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx2.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + Ok(()) + } + + // Multiple listeners on the same channel: + // - Each gets data + // - Each gets data only once + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_single_channel_multiple_listeners( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + let rx1 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx2 = work_conn.subscribe::("n1", "foo").await.unwrap(); + let rx3 = work_conn.subscribe::("n1", "foo").await.unwrap(); + + // All 3 receivers should receive these messages: + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + for mut rx in [rx1, rx2, rx3] { + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + } + + // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert_eq!(work_r.pubsub_listener.listeners.len(), 0); + + Ok(()) + } + + /// - pubsub should be able to continue after redis goes down and back up. + /// - subscribe() and publish() should work even if redis literally just coming back alive. + /// - subscriptions should automatically resubscribe when the connection has to be restarted on new redis. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_redis_sketchiness( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + // Start a server to get a static port then instantly shutdown but keep the redis instance (client): + let (client, port) = { + let server = RedisStandalone::new_no_persistence().await?; + let client = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4().to_string())?; + (client, server.port) + }; + + let restart_server = move || { + // Same as new_no_persistence, but have to use underlying for port: + RedisStandalone::new_with_opts(port, Some(&["--appendonly", "no", "--save", "\"\""])) + }; + + // subscribe() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let mut rx = { + let _server = restart_server().await?; + client + .conn() + .subscribe::("n1", "foo") + .await + .unwrap() + }; + + // publish() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. + let _server = restart_server().await?; + // This is separate, just confirming publish works straight away, + // slight delay needed for actual publish as redis needs time to resubscribe to the channel on the new connection, + // otherwise won't see the published event. + assert!(client + .conn() + .batch() + .publish("lah", "loo", "baz") + .fire() + .await + .is_some()); + + // Short delay, see above comment for redis to resubscribe before proper publish to check: + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + assert!(client + .conn() + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + // Despite all the madness messages should come through: + with_timeout( + TimeDelta::seconds(3), + || { + panic!("Timeout waiting for pubsub message"); + }, + async move { + assert_eq!(Some("first_msg".to_string()), rx.recv().await); + assert_eq!(Some("second_msg".to_string()), rx.recv().await); + with_timeout( + TimeDelta::milliseconds(100), + || Ok::<_, Report>(()), + async { + let msg = rx.recv().await; + panic!("Shouldn't have received any more messages, got: {:?}", msg); + }, + ) + .await?; + Ok::<_, Report>(()) + }, + ) + .await?; + + Ok(()) + } + + // Nothing should break when no ones subscribed to a channel when a message is published. + #[rstest] + #[tokio::test] + async fn test_redis_pubsub_no_listener( + #[allow(unused_variables)] logging: (), + ) -> RResult<(), AnyErr> { + let (_server, work_r, _fail_r) = setup_conns().await?; + let work_conn = work_r.conn(); + + assert!(work_conn + .batch() + .publish("n1", "foo", "first_msg") + .publish("n1", "foo", "second_msg") + .fire() + .await + .is_some()); + + Ok(()) + } +} diff --git a/rust/bitbazaar/redis/pubsub/pubsub_global.rs b/rust/bitbazaar/redis/pubsub/pubsub_global.rs index 10c6d339..6350b242 100644 --- a/rust/bitbazaar/redis/pubsub/pubsub_global.rs +++ b/rust/bitbazaar/redis/pubsub/pubsub_global.rs @@ -10,7 +10,7 @@ use redis::{aio::MultiplexedConnection, from_owned_redis_value, FromRedisValue, use crate::{ log::record_exception, - misc::{random_u64_rolling, Retry}, + misc::{random_u64_rolling, IterWithCloneLazy, Retry}, prelude::*, redis::redis_retry::redis_retry_config, }; @@ -28,15 +28,38 @@ pub struct RedisPubSubGlobal { pub(crate) listeners: DashMap>>, - /// The global receiver of messages hooked directly into the redis connection. - /// This will be taken when the main listener is spawned. - rx: tokio::sync::Mutex>>, /// Below used to trigger unsubscriptions and listeners dashmap cleanup when listeners are dropped. /// (The tx is called when a listener is dropped, and the spawned process listens for these and does the cleanup.) listener_drop_tx: Arc>, - listener_drop_rx: - tokio::sync::Mutex>>, + + /// Will be taken when the listener is lazily spawned. + spawn_init: tokio::sync::Mutex>, spawned: AtomicBool, + + /// Will be sent on Redis drop to kill the spawned listener. + on_drop_tx: Option>, +} + +impl Drop for RedisPubSubGlobal { + fn drop(&mut self) { + // This will kill the spawned listener, which will in turn kill the spawned process. + if let Some(on_drop_tx) = self.on_drop_tx.take() { + let _ = on_drop_tx.send(()); + }; + } +} + +#[derive(Debug)] +struct SpawnInit { + /// The global receiver of messages hooked directly into the redis connection. + /// This will be taken when the main listener is spawned. + rx: tokio::sync::mpsc::UnboundedReceiver, + + // Will receive whenever a listener is dropped: + listener_drop_rx: tokio::sync::mpsc::UnboundedReceiver<(String, u64)>, + + // Received when the redis instance dropped, meaning the spawned listener should shutdown. + on_drop_rx: tokio::sync::oneshot::Receiver<()>, } impl Debug for RedisPubSubGlobal { @@ -46,7 +69,8 @@ impl Debug for RedisPubSubGlobal { // .field("config", &self.config) .field("active_conn", &self.active_conn) .field("listeners", &self.listeners) - .field("rx", &self.rx) + .field("listener_drop_tx", &self.listener_drop_tx) + .field("spawn_init", &self.spawn_init) .field("spawned", &self.spawned) .finish() } @@ -59,15 +83,20 @@ impl RedisPubSubGlobal { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (listener_drop_tx, listener_drop_rx) = tokio::sync::mpsc::unbounded_channel(); let config = redis::AsyncConnectionConfig::new().set_push_sender(tx); + let (on_drop_tx, on_drop_rx) = tokio::sync::oneshot::channel(); Ok(Self { client, config, active_conn: tokio::sync::RwLock::new(None), listeners: DashMap::new(), - rx: tokio::sync::Mutex::new(Some(rx)), listener_drop_tx: Arc::new(listener_drop_tx), - listener_drop_rx: tokio::sync::Mutex::new(Some(listener_drop_rx)), + spawn_init: tokio::sync::Mutex::new(Some(SpawnInit { + rx, + listener_drop_rx, + on_drop_rx, + })), spawned: AtomicBool::new(false), + on_drop_tx: Some(on_drop_tx), }) } @@ -154,33 +183,32 @@ impl RedisPubSubGlobal { .swap(true, std::sync::atomic::Ordering::Relaxed) { let arc_self = self.clone(); - let mut rx = self - .rx + let mut init = self + .spawn_init .lock() .await .take() - .expect("rx should only be taken once"); - let mut listener_drop_rx = self - .listener_drop_rx - .lock() - .await - .take() - .expect("listener_drop_rx should only be taken once"); + .expect("init should only be taken once"); tokio::spawn(async move { - loop { - tokio::select! { - // Adding this means the listener fut will always be polled first, i.e. has higher priority. - // This is what we want as it cleans up dead listeners, so avoids the second fut ideally hitting any dead listeners. - biased; - - result = listener_drop_rx.recv() => { - arc_self.spawned_handle_listener_dropped(result).await; - } - result = rx.recv() => { - arc_self.spawned_handle_message(result).await; + // Spawned task will exit only when the on_drop_rx is sent, i.e. when the redis instance is dropped. + tokio::select! { + _ = init.on_drop_rx => {} + _ = async { + loop { + tokio::select! { + // Adding this means the listener fut will always be polled first, i.e. has higher priority. + // This is what we want as it cleans up dead listeners, so avoids the second fut ideally hitting any dead listeners. + biased; + result = init.listener_drop_rx.recv() => { + arc_self.spawned_handle_listener_dropped(result).await; + } + result = init.rx.recv() => { + arc_self.spawned_handle_message(result).await; + } + } } - } + } => {} } }); } @@ -358,11 +386,11 @@ impl RedisPubSubGlobal { )) { Ok((channel, msg)) => { if let Some(listeners) = self.listeners.get(&channel) { - for tx in listeners.values() { + for (msg, tx) in listeners.values().with_clone_lazy(msg) { // Given we have a separate future for cleaning up, // this shouldn't be a big issue if this ever errors with dead listeners, // as they should immediately be cleaned up by the cleanup future. - let _ = tx.send(msg.clone()); + let _ = tx.send(msg); } } } @@ -391,250 +419,3 @@ impl RedisPubSubGlobal { } } } - -// TESTS: -// - redis prefix still used. -// - DONE: sub to same channel twice with same name but 2 different fns, each should be called once, not first twice, second twice, first dropped etc. -// - Redis down then backup: -// - If just during listening, msgs, shld come in after back up. -// - if happened before subscribe, subscribe still recorded and applied after redis back up -// - After lots of random channels, lots of random listeners, once all dropped the hashmap should be empty. - -// Redis server can't be run on windows: -#[cfg(not(target_os = "windows"))] -#[cfg(test)] -mod tests { - - use chrono::TimeDelta; - - use crate::misc::with_timeout; - use crate::redis::{Redis, RedisBatchFire, RedisConnLike, RedisStandalone}; - use crate::testing::prelude::*; - - use super::*; - - async fn setup_conns() -> RResult<(RedisStandalone, Redis, Redis), AnyErr> { - let server = RedisStandalone::new_no_persistence().await?; - let work_r = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4())?; - // Also create a fake version on a random port, this will be used to check failure cases. - let fail_r = Redis::new( - "redis://FAKKEEEE:6372", - format!("test_{}", uuid::Uuid::new_v4()), - )?; - Ok((server, work_r, fail_r)) - } - - // The basics: - // - Listeners receive messages. - // - Listeners receive only their own messages. - // - Listeners clean themselves up. - #[rstest] - #[tokio::test] - async fn test_redis_pubsub_simple( - #[allow(unused_variables)] logging: (), - ) -> RResult<(), AnyErr> { - let (_server, work_r, _fail_r) = setup_conns().await?; - let work_conn = work_r.conn(); - - for (mut rx, namespace) in [ - ( - work_conn.subscribe::("n1", "foo").await.unwrap(), - "n1", - ), - ( - work_conn.subscribe::("n2", "foo").await.unwrap(), - "n2", - ), - ] { - assert!(work_conn - .batch() - .publish(namespace, "foo", format!("{}_first_msg", namespace)) - .publish(namespace, "foo", format!("{}_second_msg", namespace)) - .fire() - .await - .is_some()); - with_timeout( - TimeDelta::seconds(3), - || { - panic!("Timeout waiting for pubsub message"); - }, - async move { - assert_eq!(Some(format!("{}_first_msg", namespace)), rx.recv().await); - assert_eq!(Some(format!("{}_second_msg", namespace)), rx.recv().await); - with_timeout( - TimeDelta::milliseconds(100), - || Ok::<_, Report>(()), - async { - let msg = rx.recv().await; - panic!("Shouldn't have received any more messages, got: {:?}", msg); - }, - ) - .await?; - Ok::<_, Report>(()) - }, - ) - .await?; - } - - // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - assert_eq!(work_r.pubsub_listener.listeners.len(), 0); - - Ok(()) - } - - // Multiple listeners on the same channel: - // - Each gets data - // - Each gets data only once - #[rstest] - #[tokio::test] - async fn test_redis_pubsub_single_channel_multiple_listeners( - #[allow(unused_variables)] logging: (), - ) -> RResult<(), AnyErr> { - let (_server, work_r, _fail_r) = setup_conns().await?; - let work_conn = work_r.conn(); - - let rx1 = work_conn.subscribe::("n1", "foo").await.unwrap(); - let rx2 = work_conn.subscribe::("n1", "foo").await.unwrap(); - let rx3 = work_conn.subscribe::("n1", "foo").await.unwrap(); - - // All 3 receivers should receive these messages: - assert!(work_conn - .batch() - .publish("n1", "foo", "first_msg") - .publish("n1", "foo", "second_msg") - .fire() - .await - .is_some()); - - for mut rx in [rx1, rx2, rx3] { - with_timeout( - TimeDelta::seconds(3), - || { - panic!("Timeout waiting for pubsub message"); - }, - async move { - assert_eq!(Some("first_msg".to_string()), rx.recv().await); - assert_eq!(Some("second_msg".to_string()), rx.recv().await); - with_timeout( - TimeDelta::milliseconds(100), - || Ok::<_, Report>(()), - async { - let msg = rx.recv().await; - panic!("Shouldn't have received any more messages, got: {:?}", msg); - }, - ) - .await?; - Ok::<_, Report>(()) - }, - ) - .await?; - } - - // Given everything's dropped now we're out of the loop, internals should've been cleaned up after a short delay: - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - assert_eq!(work_r.pubsub_listener.listeners.len(), 0); - - Ok(()) - } - - /// - pubsub should be able to continue after redis goes down and back up. - /// - subscribe() and publish() should work even if redis literally just coming back alive. - /// - subscriptions should automatically resubscribe when the connection has to be restarted on new redis. - #[rstest] - #[tokio::test] - async fn test_redis_pubsub_redis_sketchiness( - #[allow(unused_variables)] logging: (), - ) -> RResult<(), AnyErr> { - // Start a server to get a static port then instantly shutdown but keep the redis instance (client): - let (client, port) = { - let server = RedisStandalone::new_no_persistence().await?; - let client = Redis::new(server.client_conn_str(), uuid::Uuid::new_v4().to_string())?; - (client, server.port) - }; - - let restart_server = move || { - // Same as new_no_persistence, but have to use underlying for port: - RedisStandalone::new_with_opts(port, Some(&["--appendonly", "no", "--save", "\"\""])) - }; - - // subscribe() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. - let mut rx = { - let _server = restart_server().await?; - client - .conn() - .subscribe::("n1", "foo") - .await - .unwrap() - }; - - // publish() should work even if redis is justttt coming back up, i.e. it should wait around for a connection. - let _server = restart_server().await?; - // This is separate, just confirming publish works straight away, - // slight delay needed for actual publish as redis needs time to resubscribe to the channel on the new connection, - // otherwise won't see the published event. - assert!(client - .conn() - .batch() - .publish("lah", "loo", "baz") - .fire() - .await - .is_some()); - - // Short delay, see above comment for redis to resubscribe before proper publish to check: - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - assert!(client - .conn() - .batch() - .publish("n1", "foo", "first_msg") - .publish("n1", "foo", "second_msg") - .fire() - .await - .is_some()); - - // Despite all the madness messages should come through: - with_timeout( - TimeDelta::seconds(3), - || { - panic!("Timeout waiting for pubsub message"); - }, - async move { - assert_eq!(Some("first_msg".to_string()), rx.recv().await); - assert_eq!(Some("second_msg".to_string()), rx.recv().await); - with_timeout( - TimeDelta::milliseconds(100), - || Ok::<_, Report>(()), - async { - let msg = rx.recv().await; - panic!("Shouldn't have received any more messages, got: {:?}", msg); - }, - ) - .await?; - Ok::<_, Report>(()) - }, - ) - .await?; - - Ok(()) - } - - // Nothing should break when no ones subscribed to a channel when a message is published. - #[rstest] - #[tokio::test] - async fn test_redis_pubsub_no_listener( - #[allow(unused_variables)] logging: (), - ) -> RResult<(), AnyErr> { - let (_server, work_r, _fail_r) = setup_conns().await?; - let work_conn = work_r.conn(); - - assert!(work_conn - .batch() - .publish("n1", "foo", "first_msg") - .publish("n1", "foo", "second_msg") - .fire() - .await - .is_some()); - - Ok(()) - } -}