diff --git a/Cargo.toml b/Cargo.toml index bbdc4b2..479b4f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ rand_core = "0.6" # for limiters lru = "0.9.0" +rand = "0.8.5" [build-dependencies] git-version = "0.3.5" diff --git a/src/peer/socket.rs b/src/peer/socket.rs index 019136a..a110744 100644 --- a/src/peer/socket.rs +++ b/src/peer/socket.rs @@ -101,7 +101,11 @@ async fn retainer( let (ws, _) = match tokio_tungstenite::connect_async(&u).await { Ok(v) => v, Err(err) => { - log::error!("failed to establish connection: {:#}", err); + log::error!( + "failed to establish connection to {:?} : {:#}", + u.domain(), + err + ); tokio::time::sleep(Duration::from_secs(2)).await; log::info!("retrying connection"); continue; diff --git a/src/relay/federation/mod.rs b/src/relay/federation/mod.rs index b76de3b..a539a7f 100644 --- a/src/relay/federation/mod.rs +++ b/src/relay/federation/mod.rs @@ -12,6 +12,7 @@ use bb8_redis::{ use prometheus::{IntCounterVec, Opts, Registry}; use workers::WorkerPool; +pub mod ranker; mod router; lazy_static::lazy_static! { diff --git a/src/relay/federation/ranker.rs b/src/relay/federation/ranker.rs new file mode 100644 index 0000000..4791528 --- /dev/null +++ b/src/relay/federation/ranker.rs @@ -0,0 +1,170 @@ +use std::cell::RefCell; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::{Duration, SystemTime}; + +use anyhow::Result; +use rand::Rng; + +pub const HOUR: Duration = Duration::from_secs(3600); +#[derive(Debug, Clone)] +struct RelayStats { + pub failure_times: Vec, +} + +impl RelayStats { + fn new() -> RelayStats { + RelayStats { + failure_times: Vec::new(), + } + } + + fn _clean(&mut self, retain: Duration) { + let count = self.failure_times.len(); + self.failure_times.retain(|t| { + t.elapsed().unwrap_or({ + log::warn!("the system experience system time error, ranker may malfunction."); + retain.saturating_add(Duration::from_secs(1)) + }) < retain + }); + log::trace!("cleaning {:?} entires", count - &self.failure_times.len()); + } + + fn add_failure(&mut self, retain: Duration) { + self.failure_times.push(SystemTime::now()); + self._clean(retain); + } + + fn failures_last(&self, period: Duration) -> Result { + let mut count = 0; + for failure_time in &self.failure_times { + if failure_time + .elapsed() + .map_err(|e| anyhow::anyhow!(e.to_string()))? + < period + { + break; + } + count += 1; + } + Ok(&self.failure_times.len() - count) + } + + fn mean_failure_rate(&self, period: Duration) -> Result { + let failures = self.failures_last(period)?; + + if failures == 0 { + return Ok(0.0); + } + Ok(failures as f64 / (period.as_secs_f64() / 3600.0)) + } +} + +#[derive(Debug, Clone)] +pub struct RelayRanker { + domain_stats: Arc>>>, + max_duration: Duration, +} + +impl RelayRanker { + pub fn new(retain: Duration) -> RelayRanker { + RelayRanker { + domain_stats: Arc::new(Mutex::new(RefCell::new(HashMap::new()))), + max_duration: retain, + } + } + + pub fn downvote(&self, domain: impl Into) -> Result<()> { + let guard = match self.domain_stats.lock() { + Ok(guard) => guard, + Err(_) => return Err(anyhow::anyhow!("failed to acquire lock")), + }; + let mut inner = guard.borrow_mut(); + let stats = inner.entry(domain.into()).or_insert(RelayStats::new()); + stats.add_failure(self.max_duration); + Ok(()) + } + + pub fn reorder(&self, domains: &mut Vec<&str>) -> Result<()> { + let guard = match self.domain_stats.lock() { + Ok(guard) => guard, + Err(_) => return Err(anyhow::anyhow!("failed to acquire lock")), + }; + + domains.sort_by(|a, b| { + let a_failure_rate = self._mean_failure_rate(*a, &guard).unwrap_or_default(); + let b_failure_rate = self._mean_failure_rate(*b, &guard).unwrap_or_default(); + if a_failure_rate == b_failure_rate { + let mut rng = rand::thread_rng(); + rng.gen::().cmp(&rng.gen::()) + } else { + a_failure_rate + .partial_cmp(&b_failure_rate) + .unwrap_or(Ordering::Equal) + } + }); + log::debug!("ranking system hint: {:?}", domains); + Ok(()) + } + + fn _mean_failure_rate( + &self, + domain: impl Into, + guard: &MutexGuard<'_, RefCell>>, + ) -> Result { + let inner = guard.borrow(); + if let Some(stats) = inner.get(&domain.into()) { + stats.mean_failure_rate(self.max_duration) + } else { + Ok(0.0) + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_downvoted_at_last() { + let ranking_system = RelayRanker::new(HOUR); + + let _ = ranking_system.downvote("bing.com"); + let mut domains = vec!["example.com", "bing.com", "google.com"]; + let _ = ranking_system.reorder(&mut domains); + + assert_eq!(*domains.last().unwrap(), "bing.com"); + } + + #[test] + fn test_order_by_failure_rate() { + let ranking_system = RelayRanker::new(HOUR); + + let _ = ranking_system.downvote("bing.com"); + let _ = ranking_system.downvote("example.com"); + let _ = ranking_system.downvote("example.com"); + + let mut domains = vec!["example.com", "bing.com", "google.com"]; + let _ = ranking_system.reorder(&mut domains); + + assert_eq!(domains, vec!("google.com", "bing.com", "example.com")); + } + + #[test] + fn test_rank_healing() { + let ranking_system = RelayRanker::new(HOUR); + + let _ = ranking_system.downvote("bing.com"); + + let binding = ranking_system.domain_stats.lock().unwrap(); + let mut inner = binding.borrow_mut(); + let ds = inner.get_mut("bing.com").unwrap(); + if let Some(first) = ds.failure_times.get_mut(0) { + *first = SystemTime::checked_sub(&SystemTime::now(), HOUR * 2).unwrap(); + } + let failure_rate = ds.mean_failure_rate(HOUR).unwrap(); + assert_eq!(failure_rate, 0.0); + } +} diff --git a/src/relay/federation/router.rs b/src/relay/federation/router.rs index 893985d..4a9e55a 100644 --- a/src/relay/federation/router.rs +++ b/src/relay/federation/router.rs @@ -1,6 +1,8 @@ +use std::fmt::Debug; + use crate::{ + relay::federation::ranker::{RelayRanker, HOUR}, relay::switch::{Sink, StreamID}, - twin::RelayDomains, twin::TwinDB, types::Envelope, }; @@ -15,6 +17,7 @@ use workers::Work; pub(crate) struct Router { sink: Option, twins: D, + ranker: RelayRanker, } impl Router @@ -25,29 +28,35 @@ where Self { sink: Some(sink), twins: twins, + ranker: RelayRanker::new(HOUR), } } - async fn try_send<'a>(&self, domains: &'a RelayDomains, msg: Vec) -> Result<&'a str> { + async fn try_send<'a, S: AsRef + Debug>( + &self, + domains: &'a Vec, + msg: Vec, + ) -> Result<&'a str> { // TODO: FIX ME for _ in 0..3 { for domain in domains.iter() { let url = if cfg!(test) { - format!("http://{}/", domain) + format!("http://{}/", domain.as_ref()) } else { - format!("https://{}/", domain) + format!("https://{}/", domain.as_ref()) }; log::debug!("federation to: {}", url); let client = Client::new(); let resp = match client.post(&url).body(msg.clone()).send().await { Ok(resp) => resp, Err(err) => { - if err.is_connect() || err.is_timeout() { - std::thread::sleep(std::time::Duration::from_secs(2)); - continue; - } - log::warn!("could not send message to relay '{}': {}", domain, err); - break; + log::warn!( + "could not send message to relay '{}': {}", + domain.as_ref(), + err + ); + let _ = self.ranker.downvote(domain.as_ref()); + continue; // bail!("could not send message to relay '{}': {}", domain, err) } }; @@ -55,10 +64,10 @@ where if resp.status() != StatusCode::ACCEPTED { log::warn!( "received relay '{}' did not accept the message: {}", - domain, + domain.as_ref(), resp.status() ); - break; + continue; /* bail!( "received relay '{}' did not accept the message: {}", domain, @@ -66,7 +75,7 @@ where ); */ } - return Ok(domain); + return Ok(domain.as_ref()); } } bail!("relays '{:?}' was not reachable in time", domains); @@ -83,7 +92,9 @@ where let domains = twin .relay .ok_or_else(|| anyhow::anyhow!("relay is not set for this twin"))?; - let result = self.try_send(&domains, msg).await; + let mut sorted_doamin = domains.iter().map(|s| s.as_str()).collect::>(); + let _ = self.ranker.reorder(&mut sorted_doamin); + let result = self.try_send(&sorted_doamin, msg).await; match result { Ok(domain) => super::MESSAGE_SUCCESS.with_label_values(&[domain]).inc(), Err(ref err) => {