From a60e6445cd5ee31eeaead0fb1402d4b3b07c12f3 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 17 Dec 2018 10:52:46 +0000 Subject: [PATCH] Review comments --- src/rust/engine/Cargo.lock | 9 ++++ src/rust/engine/serverset/src/lib.rs | 81 +++++++++++++++++++--------- 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 04c0aa31f2e..49554840888 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -601,6 +601,14 @@ dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-timer" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "gcc" version = "0.3.55" @@ -2355,6 +2363,7 @@ dependencies = [ "checksum futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)" = "49e7653e374fe0d0c12de4250f0bdb60680b8c80eed558c5c7538eec9c89e21b" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)" = "" +"checksum futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5cedfe9b6dc756220782cc1ba5bcb1fa091cdcba155e40d3556159c3db58043" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum generic-array 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3c0f28c2f5bfb5960175af447a2da7c18900693738343dc896ffbcabd9839592" "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" diff --git a/src/rust/engine/serverset/src/lib.rs b/src/rust/engine/serverset/src/lib.rs index c518d6e5ab3..f150d5e927d 100644 --- a/src/rust/engine/serverset/src/lib.rs +++ b/src/rust/engine/serverset/src/lib.rs @@ -60,15 +60,16 @@ impl Clone for Serverset { } /// -/// An opaque value which can be passed to Serverset::callback to indicate for which server the -/// callback is being made. +/// An opaque value which can be passed to Serverset::report_health to indicate for which server the +/// report is being made. /// /// Do not rely on any implementation details of this type, including its Debug representation. /// It is liable to change at any time (though will continue to implement the traits which it /// implements in some way which may not be stable). /// #[derive(Clone, Copy, Debug)] -pub struct CallbackToken { +#[must_use] +pub struct HealthReportToken { index: usize, } @@ -79,6 +80,8 @@ struct Inner { pub(crate) next: AtomicUsize, backoff_config: BackoffConfig, + + timer_handle: futures_timer::TimerHandle, } #[derive(Clone, Copy, Debug)] @@ -180,7 +183,11 @@ impl BackoffConfig { } impl Serverset { - pub fn new(servers: Vec, backoff_config: BackoffConfig) -> Result { + pub fn new( + servers: Vec, + backoff_config: BackoffConfig, + timer_handle: futures_timer::TimerHandle, + ) -> Result { if servers.is_empty() { return Err("Must supply some servers".to_owned()); } @@ -195,6 +202,7 @@ impl Serverset { }).collect(), next: AtomicUsize::new(0), backoff_config, + timer_handle, }), }) } @@ -202,10 +210,10 @@ impl Serverset { /// /// Get the next (probably) healthy backend to use. /// - /// The caller will be given a backend to use, and should call ServerSet::callback with the + /// The caller will be given a backend to use, and should call Serverset::report_health with the /// supplied token, and the observed health of that backend. /// - /// If the callback is not called, the health status of the server will not be changed from its + /// If report_health is not called, the health status of the server will not be changed from its /// last known status. /// /// If all resources are unhealthy, the returned Future will delay until a resource becomes @@ -214,7 +222,7 @@ impl Serverset { /// No efforts are currently made to avoid a thundering heard at few healthy servers (or the the /// first server to become healthy after all are unhealthy). /// - pub fn next(&self) -> BoxFuture<(T, CallbackToken), String> { + pub fn next(&self) -> BoxFuture<(T, HealthReportToken), String> { let now = Instant::now(); let server_count = self.inner.servers.len(); @@ -243,22 +251,21 @@ impl Serverset { } } // A healthy server! Use it! - return futures::future::ok((server.server.clone(), CallbackToken { index: i })).to_boxed(); + return futures::future::ok((server.server.clone(), HealthReportToken { index: i })) + .to_boxed(); } // Unwrap is safe because if we hadn't populated earliest_future, we would already have returned. let (index, instant) = earliest_future.unwrap(); let server = self.inner.servers[index].server.clone(); // Note that Delay::new_at(time in the past) gets immediately scheduled. - Delay::new_at(instant) + Delay::new_handle(instant, self.inner.timer_handle.clone()) .map_err(|err| format!("Error delaying for serverset: {}", err)) - .map(move |()| (server, CallbackToken { index })) + .map(move |()| (server, HealthReportToken { index })) .to_boxed() } - pub fn callback(&self, callback_token: CallbackToken, health: Health) { - let mut unhealthy_info = self.inner.servers[callback_token.index] - .unhealthy_info - .lock(); + pub fn report_health(&self, token: HealthReportToken, health: Health) { + let mut unhealthy_info = self.inner.servers[token.index].unhealthy_info.lock(); match health { Health::Unhealthy => { if unhealthy_info.is_some() { @@ -292,6 +299,7 @@ impl std::fmt::Debug for Serverset { mod tests { use super::{BackoffConfig, Health, Serverset}; use futures::{self, Future}; + use futures_timer::TimerHandle; use parking_lot::Mutex; use std; use std::collections::HashSet; @@ -306,19 +314,28 @@ mod tests { #[test] fn no_servers_is_error() { let servers: Vec = vec![]; - Serverset::new(servers, backoff_config()).expect_err("Want error constructing with no servers"); + Serverset::new(servers, backoff_config(), TimerHandle::default()) + .expect_err("Want error constructing with no servers"); } #[test] fn round_robins() { - let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap(); + let s = Serverset::new( + vec!["good", "bad"], + backoff_config(), + TimerHandle::default(), + ).unwrap(); expect_both(&s, 2); } #[test] fn handles_overflow_internally() { - let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap(); + let s = Serverset::new( + vec!["good", "bad"], + backoff_config(), + TimerHandle::default(), + ).unwrap(); s.inner.next.store(std::usize::MAX, Ordering::SeqCst); // 3 because we may skip some values if the number of servers isn't a factor of @@ -334,7 +351,11 @@ mod tests { #[test] fn skips_unhealthy() { - let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap(); + let s = Serverset::new( + vec!["good", "bad"], + backoff_config(), + TimerHandle::default(), + ).unwrap(); mark_bad_as_bad(&s, Health::Unhealthy); @@ -343,7 +364,11 @@ mod tests { #[test] fn reattempts_unhealthy() { - let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap(); + let s = Serverset::new( + vec!["good", "bad"], + backoff_config(), + TimerHandle::default(), + ).unwrap(); mark_bad_as_bad(&s, Health::Unhealthy); @@ -354,7 +379,11 @@ mod tests { #[test] fn backoff_when_unhealthy() { - let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap(); + let s = Serverset::new( + vec!["good", "bad"], + backoff_config(), + TimerHandle::default(), + ).unwrap(); mark_bad_as_bad(&s, Health::Unhealthy); @@ -384,11 +413,11 @@ mod tests { #[test] fn waits_if_all_unhealthy() { let backoff_config = backoff_config(); - let s = Serverset::new(vec!["good", "bad"], backoff_config).unwrap(); + let s = Serverset::new(vec!["good", "bad"], backoff_config, TimerHandle::default()).unwrap(); for _ in 0..2 { s.next() - .map(|(_server, token)| s.callback(token, Health::Unhealthy)) + .map(|(_server, token)| s.report_health(token, Health::Unhealthy)) .wait() .unwrap(); } @@ -411,7 +440,7 @@ mod tests { let s = s.clone(); s.next().map(move |(server, token)| { saw.lock().insert(server); - s.callback(token, Health::Healthy) + s.report_health(token, Health::Healthy) }) }).collect::>(), ).wait() @@ -428,9 +457,9 @@ mod tests { .map(|(server, token)| { if server == "bad" { mark_bad_as_baded_bad = true; - s.callback(token, health); + s.report_health(token, health); } else { - s.callback(token, Health::Healthy); + s.report_health(token, Health::Healthy); } }).wait() .unwrap(); @@ -446,7 +475,7 @@ mod tests { s.next() .map(|(server, token)| { assert_eq!("good", server); - s.callback(token, Health::Healthy); + s.report_health(token, Health::Healthy); }).wait() .unwrap(); }