Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
illicitonion committed Dec 17, 2018
1 parent 17825d2 commit a60e644
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 55 additions & 26 deletions src/rust/engine/serverset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ impl<T> Clone for Serverset<T> {
}

///
/// An opaque value which can be passed to Serverset::callback to indicate for which server the
/// callback is being made.
/// An opaque value which can be passed to Serverset::report_health to indicate for which server the
/// report is being made.
///
/// Do not rely on any implementation details of this type, including its Debug representation.
/// It is liable to change at any time (though will continue to implement the traits which it
/// implements in some way which may not be stable).
///
#[derive(Clone, Copy, Debug)]
pub struct CallbackToken {
#[must_use]
pub struct HealthReportToken {
index: usize,
}

Expand All @@ -79,6 +80,8 @@ struct Inner<T> {
pub(crate) next: AtomicUsize,

backoff_config: BackoffConfig,

timer_handle: futures_timer::TimerHandle,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -180,7 +183,11 @@ impl BackoffConfig {
}

impl<T: Clone + Send + Sync + 'static> Serverset<T> {
pub fn new(servers: Vec<T>, backoff_config: BackoffConfig) -> Result<Self, String> {
pub fn new(
servers: Vec<T>,
backoff_config: BackoffConfig,
timer_handle: futures_timer::TimerHandle,
) -> Result<Self, String> {
if servers.is_empty() {
return Err("Must supply some servers".to_owned());
}
Expand All @@ -195,17 +202,18 @@ impl<T: Clone + Send + Sync + 'static> Serverset<T> {
}).collect(),
next: AtomicUsize::new(0),
backoff_config,
timer_handle,
}),
})
}

///
/// Get the next (probably) healthy backend to use.
///
/// The caller will be given a backend to use, and should call ServerSet::callback with the
/// The caller will be given a backend to use, and should call Serverset::report_health with the
/// supplied token, and the observed health of that backend.
///
/// If the callback is not called, the health status of the server will not be changed from its
/// If report_health is not called, the health status of the server will not be changed from its
/// last known status.
///
/// If all resources are unhealthy, the returned Future will delay until a resource becomes
Expand All @@ -214,7 +222,7 @@ impl<T: Clone + Send + Sync + 'static> Serverset<T> {
/// No efforts are currently made to avoid a thundering heard at few healthy servers (or the the
/// first server to become healthy after all are unhealthy).
///
pub fn next(&self) -> BoxFuture<(T, CallbackToken), String> {
pub fn next(&self) -> BoxFuture<(T, HealthReportToken), String> {
let now = Instant::now();
let server_count = self.inner.servers.len();

Expand Down Expand Up @@ -243,22 +251,21 @@ impl<T: Clone + Send + Sync + 'static> Serverset<T> {
}
}
// A healthy server! Use it!
return futures::future::ok((server.server.clone(), CallbackToken { index: i })).to_boxed();
return futures::future::ok((server.server.clone(), HealthReportToken { index: i }))
.to_boxed();
}
// Unwrap is safe because if we hadn't populated earliest_future, we would already have returned.
let (index, instant) = earliest_future.unwrap();
let server = self.inner.servers[index].server.clone();
// Note that Delay::new_at(time in the past) gets immediately scheduled.
Delay::new_at(instant)
Delay::new_handle(instant, self.inner.timer_handle.clone())
.map_err(|err| format!("Error delaying for serverset: {}", err))
.map(move |()| (server, CallbackToken { index }))
.map(move |()| (server, HealthReportToken { index }))
.to_boxed()
}

pub fn callback(&self, callback_token: CallbackToken, health: Health) {
let mut unhealthy_info = self.inner.servers[callback_token.index]
.unhealthy_info
.lock();
pub fn report_health(&self, token: HealthReportToken, health: Health) {
let mut unhealthy_info = self.inner.servers[token.index].unhealthy_info.lock();
match health {
Health::Unhealthy => {
if unhealthy_info.is_some() {
Expand Down Expand Up @@ -292,6 +299,7 @@ impl<T: std::fmt::Debug> std::fmt::Debug for Serverset<T> {
mod tests {
use super::{BackoffConfig, Health, Serverset};
use futures::{self, Future};
use futures_timer::TimerHandle;
use parking_lot::Mutex;
use std;
use std::collections::HashSet;
Expand All @@ -306,19 +314,28 @@ mod tests {
#[test]
fn no_servers_is_error() {
let servers: Vec<String> = vec![];
Serverset::new(servers, backoff_config()).expect_err("Want error constructing with no servers");
Serverset::new(servers, backoff_config(), TimerHandle::default())
.expect_err("Want error constructing with no servers");
}

#[test]
fn round_robins() {
let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap();
let s = Serverset::new(
vec!["good", "bad"],
backoff_config(),
TimerHandle::default(),
).unwrap();

expect_both(&s, 2);
}

#[test]
fn handles_overflow_internally() {
let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap();
let s = Serverset::new(
vec!["good", "bad"],
backoff_config(),
TimerHandle::default(),
).unwrap();
s.inner.next.store(std::usize::MAX, Ordering::SeqCst);

// 3 because we may skip some values if the number of servers isn't a factor of
Expand All @@ -334,7 +351,11 @@ mod tests {

#[test]
fn skips_unhealthy() {
let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap();
let s = Serverset::new(
vec!["good", "bad"],
backoff_config(),
TimerHandle::default(),
).unwrap();

mark_bad_as_bad(&s, Health::Unhealthy);

Expand All @@ -343,7 +364,11 @@ mod tests {

#[test]
fn reattempts_unhealthy() {
let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap();
let s = Serverset::new(
vec!["good", "bad"],
backoff_config(),
TimerHandle::default(),
).unwrap();

mark_bad_as_bad(&s, Health::Unhealthy);

Expand All @@ -354,7 +379,11 @@ mod tests {

#[test]
fn backoff_when_unhealthy() {
let s = Serverset::new(vec!["good", "bad"], backoff_config()).unwrap();
let s = Serverset::new(
vec!["good", "bad"],
backoff_config(),
TimerHandle::default(),
).unwrap();

mark_bad_as_bad(&s, Health::Unhealthy);

Expand Down Expand Up @@ -384,11 +413,11 @@ mod tests {
#[test]
fn waits_if_all_unhealthy() {
let backoff_config = backoff_config();
let s = Serverset::new(vec!["good", "bad"], backoff_config).unwrap();
let s = Serverset::new(vec!["good", "bad"], backoff_config, TimerHandle::default()).unwrap();

for _ in 0..2 {
s.next()
.map(|(_server, token)| s.callback(token, Health::Unhealthy))
.map(|(_server, token)| s.report_health(token, Health::Unhealthy))
.wait()
.unwrap();
}
Expand All @@ -411,7 +440,7 @@ mod tests {
let s = s.clone();
s.next().map(move |(server, token)| {
saw.lock().insert(server);
s.callback(token, Health::Healthy)
s.report_health(token, Health::Healthy)
})
}).collect::<Vec<_>>(),
).wait()
Expand All @@ -428,9 +457,9 @@ mod tests {
.map(|(server, token)| {
if server == "bad" {
mark_bad_as_baded_bad = true;
s.callback(token, health);
s.report_health(token, health);
} else {
s.callback(token, Health::Healthy);
s.report_health(token, Health::Healthy);
}
}).wait()
.unwrap();
Expand All @@ -446,7 +475,7 @@ mod tests {
s.next()
.map(|(server, token)| {
assert_eq!("good", server);
s.callback(token, Health::Healthy);
s.report_health(token, Health::Healthy);
}).wait()
.unwrap();
}
Expand Down

0 comments on commit a60e644

Please sign in to comment.