From 40c1445a5a75bb7dbdfd355ce269bad10ed8486b Mon Sep 17 00:00:00 2001 From: zakstucke <44890343+zakstucke@users.noreply.github.com> Date: Thu, 13 Jun 2024 08:47:02 +0200 Subject: [PATCH] Some new utilities (#43) --- rust/Cargo.lock | 31 ++- rust/Cargo.toml | 13 +- rust/bitbazaar/lib.rs | 2 + rust/bitbazaar/misc/bytes.rs | 30 +++ rust/bitbazaar/misc/mod.rs | 3 + rust/bitbazaar/threads/batch_futures.rs | 278 ++++++++++++++++++++++++ rust/bitbazaar/threads/mod.rs | 3 + 7 files changed, 352 insertions(+), 8 deletions(-) create mode 100644 rust/bitbazaar/misc/bytes.rs create mode 100644 rust/bitbazaar/threads/batch_futures.rs create mode 100644 rust/bitbazaar/threads/mod.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 959debab..2853ce7c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -53,6 +53,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" +[[package]] +name = "async-semaphore" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "538c756e85eb6ffdefaec153804afb6da84b033e2e5ec3e9d459c34b4bf4d3f6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -160,6 +169,7 @@ dependencies = [ name = "bitbazaar" version = "0.0.52" dependencies = [ + "async-semaphore", "chrono", "chrono-humanize", "colored", @@ -169,6 +179,7 @@ dependencies = [ "deadpool-redis", "error-stack", "futures", + "gloo-timers 0.3.0", "homedir", "hostname", "http 1.0.0", @@ -551,6 +562,12 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "2.1.0" @@ -710,7 +727,7 @@ dependencies = [ "gloo-net", "gloo-render", "gloo-storage", - "gloo-timers", + "gloo-timers 0.2.6", "gloo-utils", "gloo-worker", ] @@ -832,6 +849,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "gloo-utils" version = "0.1.7" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 900a777c..46ba7acf 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,14 +30,15 @@ parking_lot = { version = "0.12", features = ["deadlock_detection", "serde"] } serde = { version = "1", features = ["derive", "rc"] } time = { version = "0.3", features = ["local-offset"] } colored = '2' +futures = { version = "0.3", features = [] } +async-semaphore = "1.2" +gloo-timers = { version = "0.3", features = ["futures"] } # Not in default, but randomly useful in features: strum = { version = "0.25", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } rand = { version = "0.8", optional = true } -futures = { version = "0.3", optional = true } uuid = { version = "1.6", features = ["v4"], optional = true } -tokio = { version = '1', optional = true } # FEAT: chrono: (but also sometimes enabled by other features) chrono = { version = '0.4', optional = true } @@ -83,19 +84,19 @@ sysinfo = { version = "0.30", optional = true } tracing-subscriber-wasm = "0.1.0" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tracing-appender = '0.2' # This includes threading (non-blocking stuff that can't be used in wasm) +tracing-appender = '0.2' # This includes threading (non-blocking stuff that can't be used in wasm) hostname = "0.3.1" +tokio = { version = '1', features = ["time"] } [dev-dependencies] rstest = "0.18" criterion = { version = "0.3", features = ["html_reports", "async_futures"] } portpicker = '0.1.1' tempfile = '3.8' -tokio = { version = '1', features = ["rt-multi-thread", "macros"] } +tokio = { version = '1', features = ["time", "rt-multi-thread", "macros"] } uuid = { version = "1.6", features = ["v4"] } regex = "1" serde_json = "1" -futures = "0.3" # When adding new benches, they should be added like this with the name of the file in benches/: (obviously uncommented) # [[bench]] @@ -109,14 +110,12 @@ timing = ['dep:comfy-table', 'chrono'] cli = ['dep:normpath', 'dep:conch-parser', 'dep:homedir', 'chrono', 'dep:strum'] system = ['dep:sysinfo'] redis = [ - 'dep:tokio', 'dep:deadpool-redis', 'dep:redis', "dep:redis-macros", 'dep:sha1_smol', 'dep:serde_json', 'dep:rand', - 'dep:futures', 'chrono', 'dep:uuid', ] diff --git a/rust/bitbazaar/lib.rs b/rust/bitbazaar/lib.rs index 9059bead..582e40a1 100644 --- a/rust/bitbazaar/lib.rs +++ b/rust/bitbazaar/lib.rs @@ -25,6 +25,8 @@ pub mod misc; #[cfg(feature = "redis")] /// Redis utilities pub mod redis; +/// Concurrency/parallelism utilities +pub mod threads; #[cfg(feature = "timing")] /// Timing utilities pub mod timing; diff --git a/rust/bitbazaar/misc/bytes.rs b/rust/bitbazaar/misc/bytes.rs new file mode 100644 index 00000000..8d7ec960 --- /dev/null +++ b/rust/bitbazaar/misc/bytes.rs @@ -0,0 +1,30 @@ +/// Prettify bytes into a string for a user using the 1000 base. +pub fn bytes_to_pretty_1000(bytes: u64) -> String { + static UNITS: [&str; 9] = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; + static BASE: f64 = 1000.0; + let mut size = bytes as f64; + let mut unit = 0; + while size >= BASE { + size /= BASE; + unit += 1; + } + format!("{:.1}{}", size, UNITS[unit]) +} + +/// Prettify bytes into a string for a user using the 1024 base. +pub fn bytes_to_pretty_1024(bytes: u64) -> String { + static UNITS: [&str; 9] = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]; + static BASE: f64 = 1024.0; + let mut size = bytes as f64; + let mut unit = 0; + while size >= BASE { + size /= BASE; + unit += 1; + } + format!("{:.1}{}", size, UNITS[unit]) +} + +/// Convert bytes to megabits per second. +pub fn bytes_to_mbps(bytes: u64) -> f64 { + bytes as f64 / 1024.0 / 1024.0 * 8.0 +} diff --git a/rust/bitbazaar/misc/mod.rs b/rust/bitbazaar/misc/mod.rs index 76547178..e32bcc07 100644 --- a/rust/bitbazaar/misc/mod.rs +++ b/rust/bitbazaar/misc/mod.rs @@ -1,3 +1,6 @@ +/// Byte manipulation utilities, e.g. transfer speed. +pub mod bytes; + mod in_ci; mod is_tcp_port_listening; diff --git a/rust/bitbazaar/threads/batch_futures.rs b/rust/bitbazaar/threads/batch_futures.rs new file mode 100644 index 00000000..3c843adf --- /dev/null +++ b/rust/bitbazaar/threads/batch_futures.rs @@ -0,0 +1,278 @@ +use std::collections::HashMap; +#[cfg(target_arch = "wasm32")] +use std::rc::Rc; +#[cfg(not(target_arch = "wasm32"))] +use std::sync::Arc; + +use async_semaphore::Semaphore; +use futures::{ + select, + stream::{FuturesOrdered, FuturesUnordered}, + Future, FutureExt, StreamExt, +}; + +macro_rules! batch_futures_flat_impl { + ($limit:expr, $fut_cbs:expr, |$result:ident| $call_cb:expr) => {{ + let mut return_index = 0; + let mut result_cache: HashMap = HashMap::new(); + + macro_rules! manage_results { + ($index:expr, $current_result:expr) => { + // Handle sending a result out if one avail: + if $index == return_index { + // Was the next in line to return, send without touching the cache: + return_index += 1; // Must be before callback in-case async! + let $result = $current_result; + $call_cb + } else { + // Something else needs to be sent next, cache until ready: + result_cache.insert($index, $current_result); + } + // The next result might be in the cache, drain until we're waiting on something: + while let Some($result) = result_cache.remove(&return_index) { + return_index += 1; // Must be before callback in-case async! + $call_cb + } + }; + } + + // Can't use FuturesOrdered here, we use the length of the stream to decide when something new can be added, if it were ordered we'd have to wait for specifically the first added to finish before adding more. + // with unordered, we can add more the moment a permit is available, regardless of the order. + // the cache is used to maintain ordering. + let mut stream = FuturesUnordered::new(); + for (index, fut_cb) in $fut_cbs.into_iter().enumerate() { + stream.push(async move { (index, fut_cb().await) }); + // If full, wait for one to finish. + if stream.len() >= $limit { + if let Some((index, result)) = stream.next().await { + manage_results!(index, result); + } + } + } + + // Wait for the remaining to finish. + while let Some((index, result)) = stream.next().await { + manage_results!(index, result); + } + + Ok(()) + }}; +} + +/// A simple and performant runner for an iterator of future creators. +/// The futures are run in parallel up to the supplied limit. +/// The results are returned in the same order as the inputted futures. +/// +/// Returns: +/// - Vec of results in the same order as the inputted futures. +pub async fn batch_futures_flat>( + limit: usize, + fut_cbs: impl IntoIterator Fut>, +) -> Vec { + let mut results = vec![]; + let _ = batch_futures_flat_stream_sync_cb(limit, fut_cbs, |result| { + results.push(result); + Ok::<(), ()>(()) + }) + .await; + results +} + +/// A more performant version of [`batch_futures_descendants`] due to no descendant requirement. +/// By removing descendant limiting, semaphores no longer need to be used. +pub async fn batch_futures_flat_stream_async_cb< + R, + Fut: Future, + E, + CbFut: Future>, +>( + limit: usize, + fut_cbs: impl IntoIterator Fut>, + result_sync_cb: impl Fn(R) -> CbFut, +) -> Result<(), E> { + batch_futures_flat_impl!(limit, fut_cbs, |result| { + result_sync_cb(result).await?; + }) +} + +/// A more performant version of [`batch_futures_descendants`] due to no descendant requirement. +/// By removing descendant limiting, semaphores no longer need to be used. +pub async fn batch_futures_flat_stream_sync_cb, E>( + limit: usize, + fut_cbs: impl IntoIterator Fut>, + mut result_sync_cb: impl FnMut(R) -> Result<(), E>, +) -> Result<(), E> { + batch_futures_flat_impl!(limit, fut_cbs, |result| { + result_sync_cb(result)?; + }) +} + +/// How batched futures are limited, either using a parent's limit to prevent concurrency explosion, or direct. +#[derive(Debug, Clone)] +pub enum BatchLimit { + /// The entrypoint, where the total limit is set. + Direct(usize), + /// This level is limited by the parent's config. + #[cfg(not(target_arch = "wasm32"))] + Parent(Arc), + #[cfg(target_arch = "wasm32")] + Parent(Rc), +} + +/// Batch run futures but with a limiter on parents and descendants. +/// IF HIGHEST PERFORMANCE IS NEEDED AND YOU DON'T NEED DESCENDANT LIMITING, USE [`batch_futures_stream_async_cb_perf`] or just normal [`futures::StreamExt::buffer_unordered`]. +/// +/// Key specialised features: +/// - Limits can be shared with descendants/parents, preventing concurrency explosion, but also internally making sure no deadlocks. +/// - Light on memory, fut_cbs only called when imminently being processed, not requiring all in memory. +/// - Despite batching, vec of results/callbacks of results are in same order as inputted (keeping finished out of order futures in buffer until they're next in line.). +/// - The future callback will only be called when the future will imminently be polled, allowing sync setup. +/// +/// If you need to start processing the results before all the futures are done, use [`batch_futures_stream_async_cb`] or [`batch_futures_stream_sync_cb`]. +/// +/// If neither of the above features are needed, you may as well use `buffer_unordered` (https://users.rust-lang.org/t/batch-execution-of-futures-in-the-tokio-runtime-or-max-number-of-active-futures-at-a-time/47659) +pub async fn batch_futures_descendants>( + batch_limit: &BatchLimit, + fut_cbs: impl IntoIterator Fut>, +) -> Vec { + let mut results = vec![]; + let _ = batch_futures_descendants_stream_sync_cb(batch_limit, fut_cbs, |result| { + results.push(result); + Ok::<(), ()>(()) + }) + .await; + results +} + +macro_rules! batch_futures_descendants_impl { + ($batch_limit:expr, $fut_cbs:expr, |$result:ident| $call_cb:expr) => {{ + // As the semaphore, either using the parent's or creating a new one with the specified limit: + let main_sem = match $batch_limit { + BatchLimit::Parent(parent) => parent.clone(), + #[cfg(not(target_arch = "wasm32"))] + BatchLimit::Direct(max) => Arc::new(Semaphore::new(*max)), + #[cfg(target_arch = "wasm32")] + BatchLimit::Direct(max) => Rc::new(Semaphore::new(*max)), + }; + + // The crux of this manual implementation, if there's a parent, we already have 1 in-built permit, so we can use a direct route (i.e. a semaphore of 1) as well as getting parallelization permits from the parent. + let local_highway_sem = if matches!($batch_limit, BatchLimit::Parent(_)) { + Some(Semaphore::new(1)) + } else { + None + }; + + // Needs to be ordered due to a key req that the results are returned in the same order as the input futures. + // (Can be created automatically from the futures with collect()) + let mut stream: FuturesOrdered<_> = FuturesOrdered::new(); + for fut_cb in $fut_cbs.into_iter() { + macro_rules! stream_poller_fut { + () => { + async { + let mut seen_stream_empty = false; + let mut told_logic_unexpected = false; + loop { + if let Some($result) = stream.next().await { + if seen_stream_empty && !told_logic_unexpected { + told_logic_unexpected = true; + tracing::error!("Logic unexpected, never expected to get a result from stream here after seeing the stream is empty. See comments in file."); + } + $call_cb + } else { + // Here if the stream is empty, meaning all the permits must have been taken by parents or descendants, so this block effectively could sleep infinitely, as the event that will break the select!{} will be a permit becoming available from a parent or descendant. + // Doing 100ms and the error!() log so we know if this isn't true and our understanding is incorrect. 100ms is slow enough to not cause performance issues. + seen_stream_empty = true; + + #[cfg(target_arch = "wasm32")] + gloo_timers::future::TimeoutFuture::new(100).await; + #[cfg(not(target_arch = "wasm32"))] + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + #[allow(unreachable_code)] + Ok::(unreachable!()) + } + .fuse() + }; + } + let permit = if let Some(local_highway_sem) = &local_highway_sem { + // Always try using the local highway first, to prevent taking a permit from the parent semaphore unnecessarily (or polling the stream) when we've already obviously already got 1 in-built and ready to go: + if let Some(permit) = local_highway_sem.try_acquire() { + permit + } else if let Some(permit) = main_sem.try_acquire() { + // Then try getting one from main sem, to prevent spinning up the select!{} if we can avoid it: + permit + } else { + // Otherwise, select on: + // - local highway, still want to use if comes avail + // - parent's semaphore, can be used to parallelize instead of local highway + // - stream poller, if results become ready, their callbacks should be run (this also wakes unpolled futures in the stream) + + // Using the fuse thing to avoid pin: (probs not needed, but obvs a core fn so any perf gain is good) + // https://users.rust-lang.org/t/why-macro-select-needs-to-work-with-unpin-futures/70898 + select! { + permit = {local_highway_sem.acquire().fuse()} => permit, + permit = {main_sem.acquire().fuse()} => permit, + permit = {stream_poller_fut!()} => permit?, + } + } + } else { + // Try getting first from main_sem, to avoid spinning up the select!{} if we can avoid it: + if let Some(permit) = main_sem.try_acquire() { + permit + } else { + // This is top-level, no local highway, select on: + // - main sem which is the same as parent sem in other block + // - stream poller, if results become ready, their callbacks should be run (this also wakes unpolled futures in the stream) + select! { + permit = {main_sem.acquire().fuse()} => permit, + permit = {stream_poller_fut!()} => permit?, + } + } + }; + + // Got a permit, spawn off the task, only releasing the permit once the task is done. + // Passing down the current main_sem as the batch limit to allow descendants to limit themselves by it: + let fut = fut_cb(BatchLimit::Parent(main_sem.clone())); + stream.push_back(async move { + let result = fut.await; + drop(permit); + result + }); + } + + // All the tasks spawned off, process the remaining in the stream: + while let Some($result) = stream.next().await { + $call_cb + } + + Ok(()) + }}; +} + +/// Underlying of [`batch_futures_descendants`], use if need to process in order during execution. +pub async fn batch_futures_descendants_stream_sync_cb, E>( + batch_limit: &BatchLimit, + fut_cbs: impl IntoIterator Fut>, + mut result_sync_cb: impl FnMut(R) -> Result<(), E>, +) -> Result<(), E> { + batch_futures_descendants_impl!(batch_limit, fut_cbs, |result| { + result_sync_cb(result)?; + }) +} + +/// Underlying of [`batch_futures_descendants`], use if need to process in order during execution. +pub async fn batch_futures_descendants_stream_async_cb< + R, + Fut: Future, + E, + CbFut: Future>, +>( + batch_limit: &BatchLimit, + fut_cbs: impl IntoIterator Fut>, + result_async_cb: impl Fn(R) -> CbFut, +) -> Result<(), E> { + batch_futures_descendants_impl!(batch_limit, fut_cbs, |result| { + result_async_cb(result).await?; + }) +} diff --git a/rust/bitbazaar/threads/mod.rs b/rust/bitbazaar/threads/mod.rs new file mode 100644 index 00000000..25bda0b1 --- /dev/null +++ b/rust/bitbazaar/threads/mod.rs @@ -0,0 +1,3 @@ +mod batch_futures; + +pub use batch_futures::*;