Skip to content

Commit

Permalink
fix multiple ip ping
Browse files Browse the repository at this point in the history
  • Loading branch information
lazytiger committed Jan 12, 2024
1 parent 0dd7e4e commit fbe4065
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 46 deletions.
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ httparse = "1.8"
async_smoltcp = { path = "async_smoltcp" }
tokio-rustls = "0.25"
rustls-pki-types = "1.1"
futures = "0.3"

[dev-dependencies]
env_logger = "0.10"
Expand Down
116 changes: 70 additions & 46 deletions src/aproxy/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use std::{
};

use bytes::{Buf, BufMut, BytesMut};
use dns_lookup::lookup_host;
use itertools::Itertools;
use rand::random;
use ringbuf::{HeapRb, Rb};
use rustls_pki_types::ServerName;
use surge_ping::{Client, ConfigBuilder, PingIdentifier, PingSequence, ICMP};
use surge_ping::{Client, ConfigBuilder, PingIdentifier, PingSequence, Pinger, ICMP};
use tokio::{
io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf},
net::TcpStream,
Expand Down Expand Up @@ -193,65 +192,90 @@ async fn do_check(
}
}

async fn ping_server(mut pinger: Pinger) -> (u128, u128, IpAddr) {
pinger.timeout(Duration::from_millis(999));
let mut avg_cost = 0;
let mut received = 0;
let mut tick = tokio::time::interval(Duration::from_secs(1));
for i in 0..100u128 {
tick.tick().await;
if let Ok((_, cost)) = pinger.ping(PingSequence(i as u16), &[]).await {
avg_cost = ((avg_cost * received) + cost.as_millis()) / (received + 1);
received += 1;
}
}
(avg_cost, received, pinger.host)
}

async fn check_server(host: String, timeout: u64, ip_timeout: u64) {
let config = ConfigBuilder::default().kind(ICMP::V4).build();
let mut client = Client::new(&config).unwrap();
let mut interval = tokio::time::interval(Duration::from_secs(timeout));
let size = (ip_timeout / timeout + 1) as usize;
let mut rb = HeapRb::new(size);
let mut all_rb = HashMap::new();
loop {
interval.tick().await;
let ip = if let Ok(Some(ip)) =
lookup_host(host.as_str()).map(|data| data.iter().find(|ip| ip.is_ipv4()).cloned())
{
ip
} else {
let mut tasks = vec![];
if let Ok(addrs) = tokio::net::lookup_host((host.clone(), 443)).await {
for addr in addrs {
if addr.is_ipv4() {
let pinger = client.pinger(addr.ip(), PingIdentifier(random())).await;
let t = ping_server(pinger);
tasks.push(t);
}
}
}
if tasks.is_empty() {
continue;
};
let mut pinger = client.pinger(ip, PingIdentifier(random())).await;
pinger.timeout(Duration::from_millis(999));
let mut avg_cost = 0;
let mut received = 0;
let mut tick = tokio::time::interval(Duration::from_secs(1));
for i in 0..100u128 {
tick.tick().await;
if let Ok((_, cost)) = pinger.ping(PingSequence(i as u16), &[]).await {
avg_cost = ((avg_cost * received) + cost.as_millis()) / (received + 1);
received += 1;
}

let mut reset_client = false;
let mut total_avg_lost = 0;
let mut total_avg_ping = 0;
let task_count = tasks.len();
for (avg_cost, received, ip) in futures::future::join_all(tasks).await {
let rb = all_rb.entry(ip).or_insert_with(|| HeapRb::new(size));
if received != 100 {
reset_client = true;
}

log::error!(
"current proxy server status, ip:{} ping:{}, lost:{}",
ip,
avg_cost,
100 - received
);

rb.push_overwrite(Condition {
lost: (100 - received) as u8,
ping: avg_cost as u16,
});
let mut total_ping = 0;
let mut total_lost = 0;
for cond in rb.iter() {
total_ping += cond.ping as usize;
total_lost += cond.lost as usize;
}
let avg_ping = total_ping / rb.len();
let avg_lost = total_lost / rb.len();
log::error!(
"average proxy server status, ip:{} ping:{}, lost:{}",
ip,
avg_ping,
avg_lost,
);
total_avg_ping += avg_ping;
total_avg_lost += avg_lost;
}
if received != 100 {

if reset_client {
// recreate a client if any error occur, otherwise pinger would stuck for the error, maybe some error in surge
client = Client::new(&config).unwrap();
}
log::error!(
"current proxy server status, ip:{} ping:{}, lost:{}",
ip,
avg_cost,
100 - received
);
rb.push_overwrite(Condition {
lost: (100 - received) as u8,
ping: avg_cost as u16,
});
let mut total_ping = 0;
let mut total_lost = 0;
for cond in rb.iter() {
total_ping += cond.ping as usize;
total_lost += cond.lost as usize;
}
let avg_ping = total_ping / rb.len();
let avg_lost = total_lost / rb.len();
log::error!(
"average proxy server status, ip:{} ping:{}, lost:{}",
ip,
avg_ping,
avg_lost,
);

if let Err(err) = CONDITION.write().map(|mut cond| {
cond.lost = avg_lost as u8;
cond.ping = avg_ping as u16;
cond.lost = (total_avg_lost / task_count) as u8;
cond.ping = (total_avg_ping / task_count) as u16;
}) {
log::error!("write on condition failed:{}", err);
}
Expand Down

0 comments on commit fbe4065

Please sign in to comment.