Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis PubSub, RedisConn methods no longer require mutable, custom backdoor to underlying redis commands in a batch #57

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ features = ["sync"]
# These are included on top of above features when not wasm:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1"
features = ["time", "fs", "process", "rt", "io-util"]
features = ["time", "fs", "process", "rt", "io-util", "macros"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
tracing-subscriber-wasm = "0.1.0"
Expand All @@ -144,10 +144,10 @@ rstest = "0.18"
criterion = { version = "0.3", features = ["html_reports", "async_tokio"] }
tokio = { version = '1', features = ["full"] }

# When adding new benches, they should be added like this with the name of the file in benches/: (obviously uncommented)
# [[bench]]
# name = "bench_tester"
# harness = false
# When adding new benches, they should be added like this with the name of the file in benches/:
[[bench]]
name = "bench_default"
harness = false

[features]
collector = ["dep:reqwest", "dep:tempfile", "tarball"]
Expand Down
56 changes: 56 additions & 0 deletions rust/benches/bench_default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#![allow(unused_imports)]
use criterion::{black_box, criterion_group, criterion_main, Criterion};

// <--- EXAMPLE:

fn fibonacci(n: u64) -> u64 {
let mut a = 0;
let mut b = 1;

match n {
0 => b,
_ => {
for _ in 0..n {
let c = a + b;
a = b;
b = c;
}
b
}
}
}

async fn async_fibonacci(n: u64) -> u64 {
fibonacci(n)
}

// SYNC EXAMPLE
pub fn bench_sync(c: &mut Criterion) {
c.bench_function("sync: fib 20", |b| b.iter(|| fibonacci(black_box(20))));
}

// ASYNC EXAMPLE
pub fn bench_async(c: &mut Criterion) {
c.bench_function("async: fib 20", |b| {
b.to_async(&get_tokio_rt())
.iter(|| async_fibonacci(black_box(20)))
});
}

// CUSTOM CONFIG EXAMPLE
pub fn bench_config(c: &mut Criterion) {
let mut group = c.benchmark_group("small-sample-size");
group.sample_size(10).significance_level(0.01);
group.bench_function("config: fib 20", |b| b.iter(|| fibonacci(black_box(20))));
group.finish();
}

criterion_group!(benches, bench_sync, bench_async, bench_config);
criterion_main!(benches);

fn get_tokio_rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
25 changes: 0 additions & 25 deletions rust/benches/bench_tester.rs

This file was deleted.

2 changes: 1 addition & 1 deletion rust/bitbazaar/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ mod tests {
// Sleeping after each, to try and ensure the correct debug output:
log.with_tmp_global(|| {
// On windows this needs to be really long to get static record ordering for testing:
let delay = if cfg!(windows) { 100 } else { 10 };
let delay = if cfg!(windows) { 100 } else { 30 };

debug!("BEFORE");
std::thread::sleep(std::time::Duration::from_millis(delay));
Expand Down
98 changes: 98 additions & 0 deletions rust/bitbazaar/misc/lazy_clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/// Efficient way to clone an item for each element in an iterator.
/// The final iteration will consume the original item, so no unnecessary clones are made.
pub trait IterWithCloneLazy {
/// The return type of the iterator.
type IterT;

/// Efficient way to pass an owned clone of an item to each element in an iterator.
/// Will pass the final item by value without cloning, so no unnecessary clones are made.
fn with_clone_lazy<ItemT: Clone>(
self,
item: ItemT,
) -> impl Iterator<Item = (ItemT, Self::IterT)>
where
Self: Sized;
}

impl<IterT, I: IntoIterator<Item = IterT>> IterWithCloneLazy for I {
type IterT = IterT;

fn with_clone_lazy<ItemT: Clone>(
self,
item: ItemT,
) -> impl Iterator<Item = (ItemT, Self::IterT)>
where
Self: Sized,
{
let mut iter = self.into_iter();
LazyCloneIter {
item: Some(item),
next_in_iter: iter.next(),
iter,
}
}
}

struct LazyCloneIter<I: Iterator, ItemT: Clone> {
// Will consume when next_in_iter is None, as on last iteration.
item: Option<ItemT>,
iter: I,
next_in_iter: Option<I::Item>,
}

impl<I: Iterator, ItemT: Clone> Iterator for LazyCloneIter<I, ItemT> {
type Item = (ItemT, I::Item);

fn next(&mut self) -> Option<Self::Item> {
self.next_in_iter.take().map(|next| {
self.next_in_iter = self.iter.next();
if self.next_in_iter.is_none() {
(self.item.take().unwrap(), next)
} else {
(self.item.clone().unwrap(), next)
}
})
}
}

#[cfg(test)]
mod tests {
use std::sync::{atomic::AtomicUsize, Arc};

use super::*;

#[test]
fn test_lazy_clone_with_clone_lazy() {
struct Test {
tot_clones: Arc<AtomicUsize>,
}
impl Clone for Test {
fn clone(&self) -> Self {
self.tot_clones
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Test {
tot_clones: self.tot_clones.clone(),
}
}
}

// Try for 0..10 iterator length, main things to check are 0, 1 and >1.
// For all but final iteration, should clone, then pass by value.
for count in 0..10 {
let tot_clones = Arc::new(AtomicUsize::new(0));
let test = Test {
tot_clones: tot_clones.clone(),
};
for (t, index) in (0..count).with_clone_lazy(test) {
assert_eq!(
t.tot_clones.load(std::sync::atomic::Ordering::Relaxed),
if index < count - 1 { index + 1 } else { index }
);
}
assert_eq!(
tot_clones.load(std::sync::atomic::Ordering::Relaxed),
count.max(1) - 1
);
}
}
}
2 changes: 2 additions & 0 deletions rust/bitbazaar/misc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod binary_search;
mod flexi_logger;
mod global_lock;
mod is_tcp_port_listening;
mod lazy_clone;
mod looper;
mod main_wrapper;
mod periodic_updater;
Expand All @@ -26,6 +27,7 @@ pub use binary_search::*;
pub use flexi_logger::*;
pub use global_lock::*;
pub use is_tcp_port_listening::is_tcp_port_listening;
pub use lazy_clone::*;
pub use looper::*;
pub use main_wrapper::*;
pub use periodic_updater::*;
Expand Down
9 changes: 9 additions & 0 deletions rust/bitbazaar/misc/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> {
self
}

/// Never stop retrying.
pub fn until_forever(mut self) -> Self {
self.until = RetryUntil::Forever;
self
}

/// Stop retrying after the total delay reaches the given duration.
pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self {
self.until = RetryUntil::TotalDelay(max_total_delay);
Expand Down Expand Up @@ -193,6 +199,8 @@ pub enum RetryUntil {
TotalDelay(Duration),
/// UNSTABLE: ONLY PUBLIC FOR MACRO USE.
Delay(Duration),
/// UNSTABLE: ONLY PUBLIC FOR MACRO USE.
Forever,
}

impl RetryUntil {
Expand Down Expand Up @@ -223,6 +231,7 @@ impl RetryUntil {
return true;
}
}
RetryUntil::Forever => return false,
}
false
}
Expand Down
Loading
Loading