Skip to content

Commit

Permalink
feat: adding relay ranking feature
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Sep 12, 2023
1 parent f0a91ec commit 852e74e
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ rand_core = "0.6"

# for limiters
lru = "0.9.0"
rand = "0.8.5"

[build-dependencies]
git-version = "0.3.5"
Expand Down
6 changes: 5 additions & 1 deletion src/peer/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ async fn retainer<S: Signer>(
let (ws, _) = match tokio_tungstenite::connect_async(&u).await {
Ok(v) => v,
Err(err) => {
log::error!("failed to establish connection: {:#}", err);
log::error!(
"failed to establish connection to {:?} : {:#}",
u.domain(),
err
);
tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("retrying connection");
continue;
Expand Down
1 change: 1 addition & 0 deletions src/relay/federation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use bb8_redis::{
use prometheus::{IntCounterVec, Opts, Registry};
use workers::WorkerPool;

pub mod ranker;
mod router;

lazy_static::lazy_static! {
Expand Down
170 changes: 170 additions & 0 deletions src/relay/federation/ranker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, SystemTime};

use anyhow::Result;
use rand::Rng;

pub const HOUR: Duration = Duration::from_secs(3600);
#[derive(Debug, Clone)]
struct RelayStats {
pub failure_times: Vec<SystemTime>,
}

impl RelayStats {
fn new() -> RelayStats {
RelayStats {
failure_times: Vec::new(),
}
}

fn _clean(&mut self, retain: Duration) {
let count = self.failure_times.len();
self.failure_times.retain(|t| {
t.elapsed().unwrap_or({
log::warn!("the system experience system time error, ranker may malfunction.");
retain.saturating_add(Duration::from_secs(1))
}) < retain
});
log::trace!("cleaning {:?} entires", count - &self.failure_times.len());

Check warning on line 31 in src/relay/federation/ranker.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

taken reference of right operand
}

fn add_failure(&mut self, retain: Duration) {
self.failure_times.push(SystemTime::now());
self._clean(retain);
}

fn failures_last(&self, period: Duration) -> Result<usize> {
let mut count = 0;
for failure_time in &self.failure_times {
if failure_time
.elapsed()
.map_err(|e| anyhow::anyhow!(e.to_string()))?
< period
{
break;
}
count += 1;
}
Ok(&self.failure_times.len() - count)

Check warning on line 51 in src/relay/federation/ranker.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

needlessly taken reference of left operand
}

fn mean_failure_rate(&self, period: Duration) -> Result<f64> {
let failures = self.failures_last(period)?;

if failures == 0 {
return Ok(0.0);
}
Ok(failures as f64 / (period.as_secs_f64() / 3600.0))
}
}

#[derive(Debug, Clone)]
pub struct RelayRanker {
domain_stats: Arc<Mutex<RefCell<HashMap<String, RelayStats>>>>,
max_duration: Duration,
}

impl RelayRanker {
pub fn new(retain: Duration) -> RelayRanker {
RelayRanker {
domain_stats: Arc::new(Mutex::new(RefCell::new(HashMap::new()))),
max_duration: retain,
}
}

pub fn downvote(&self, domain: impl Into<String>) -> Result<()> {
let guard = match self.domain_stats.lock() {
Ok(guard) => guard,
Err(_) => return Err(anyhow::anyhow!("failed to acquire lock")),
};
let mut inner = guard.borrow_mut();
let stats = inner.entry(domain.into()).or_insert(RelayStats::new());
stats.add_failure(self.max_duration);
Ok(())
}

pub fn reorder(&self, domains: &mut Vec<&str>) -> Result<()> {
let guard = match self.domain_stats.lock() {
Ok(guard) => guard,
Err(_) => return Err(anyhow::anyhow!("failed to acquire lock")),
};

domains.sort_by(|a, b| {
let a_failure_rate = self._mean_failure_rate(*a, &guard).unwrap_or_default();
let b_failure_rate = self._mean_failure_rate(*b, &guard).unwrap_or_default();
if a_failure_rate == b_failure_rate {
let mut rng = rand::thread_rng();
rng.gen::<bool>().cmp(&rng.gen::<bool>())
} else {
a_failure_rate
.partial_cmp(&b_failure_rate)
.unwrap_or(Ordering::Equal)
}
});
log::debug!("ranking system hint: {:?}", domains);
Ok(())
}

fn _mean_failure_rate(
&self,
domain: impl Into<String>,
guard: &MutexGuard<'_, RefCell<HashMap<String, RelayStats>>>,
) -> Result<f64> {
let inner = guard.borrow();
if let Some(stats) = inner.get(&domain.into()) {
stats.mean_failure_rate(self.max_duration)
} else {
Ok(0.0)
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
fn test_downvoted_at_last() {
let ranking_system = RelayRanker::new(HOUR);

let _ = ranking_system.downvote("bing.com");
let mut domains = vec!["example.com", "bing.com", "google.com"];
let _ = ranking_system.reorder(&mut domains);

assert_eq!(*domains.last().unwrap(), "bing.com");
}

#[test]
fn test_order_by_failure_rate() {
let ranking_system = RelayRanker::new(HOUR);

let _ = ranking_system.downvote("bing.com");
let _ = ranking_system.downvote("example.com");
let _ = ranking_system.downvote("example.com");

let mut domains = vec!["example.com", "bing.com", "google.com"];
let _ = ranking_system.reorder(&mut domains);

assert_eq!(domains, vec!("google.com", "bing.com", "example.com"));
}

#[test]
fn test_rank_healing() {
let ranking_system = RelayRanker::new(HOUR);

let _ = ranking_system.downvote("bing.com");

let binding = ranking_system.domain_stats.lock().unwrap();
let mut inner = binding.borrow_mut();
let ds = inner.get_mut("bing.com").unwrap();
if let Some(first) = ds.failure_times.get_mut(0) {
*first = SystemTime::checked_sub(&SystemTime::now(), HOUR * 2).unwrap();
}
let failure_rate = ds.mean_failure_rate(HOUR).unwrap();
assert_eq!(failure_rate, 0.0);
}
}
39 changes: 25 additions & 14 deletions src/relay/federation/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt::Debug;

use crate::{
relay::federation::ranker::{RelayRanker, HOUR},
relay::switch::{Sink, StreamID},
twin::RelayDomains,
twin::TwinDB,
types::Envelope,
};
Expand All @@ -15,6 +17,7 @@ use workers::Work;
pub(crate) struct Router<D: TwinDB> {
sink: Option<Sink>,
twins: D,
ranker: RelayRanker,
}

impl<D> Router<D>
Expand All @@ -25,48 +28,54 @@ where
Self {
sink: Some(sink),
twins: twins,

Check warning on line 30 in src/relay/federation/router.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

redundant field names in struct initialization
ranker: RelayRanker::new(HOUR),
}
}

async fn try_send<'a>(&self, domains: &'a RelayDomains, msg: Vec<u8>) -> Result<&'a str> {
async fn try_send<'a, S: AsRef<str> + Debug>(
&self,
domains: &'a Vec<S>,
msg: Vec<u8>,
) -> Result<&'a str> {
// TODO: FIX ME
for _ in 0..3 {
for domain in domains.iter() {
let url = if cfg!(test) {
format!("http://{}/", domain)
format!("http://{}/", domain.as_ref())
} else {
format!("https://{}/", domain)
format!("https://{}/", domain.as_ref())
};
log::debug!("federation to: {}", url);
let client = Client::new();
let resp = match client.post(&url).body(msg.clone()).send().await {
Ok(resp) => resp,
Err(err) => {
if err.is_connect() || err.is_timeout() {
std::thread::sleep(std::time::Duration::from_secs(2));
continue;
}
log::warn!("could not send message to relay '{}': {}", domain, err);
break;
log::warn!(
"could not send message to relay '{}': {}",
domain.as_ref(),
err
);
let _ = self.ranker.downvote(domain.as_ref());
continue;
// bail!("could not send message to relay '{}': {}", domain, err)
}
};

if resp.status() != StatusCode::ACCEPTED {
log::warn!(
"received relay '{}' did not accept the message: {}",
domain,
domain.as_ref(),
resp.status()
);
break;
continue;
/* bail!(
"received relay '{}' did not accept the message: {}",
domain,
resp.status()
); */
}

return Ok(domain);
return Ok(domain.as_ref());
}
}
bail!("relays '{:?}' was not reachable in time", domains);
Expand All @@ -83,7 +92,9 @@ where
let domains = twin
.relay
.ok_or_else(|| anyhow::anyhow!("relay is not set for this twin"))?;
let result = self.try_send(&domains, msg).await;
let mut sorted_doamin = domains.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
let _ = self.ranker.reorder(&mut sorted_doamin);
let result = self.try_send(&sorted_doamin, msg).await;
match result {
Ok(domain) => super::MESSAGE_SUCCESS.with_label_values(&[domain]).inc(),
Err(ref err) => {
Expand Down

0 comments on commit 852e74e

Please sign in to comment.