diff --git a/Cargo.lock b/Cargo.lock index 4fd2a90..5fd25d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -534,6 +534,21 @@ dependencies = [ "libc", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -541,6 +556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -549,12 +565,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -573,8 +611,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1601,6 +1644,7 @@ dependencies = [ "dns-lookup", "env_logger", "fern", + "futures", "hex", "httparse", "ipset", diff --git a/Cargo.toml b/Cargo.toml index 886ab6f..669a484 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ httparse = "1.8" async_smoltcp = { path = "async_smoltcp" } tokio-rustls = "0.25" rustls-pki-types = "1.1" +futures = "0.3" [dev-dependencies] env_logger = "0.10" diff --git a/src/aproxy/profiler.rs b/src/aproxy/profiler.rs index 2208d93..6f34bb1 100644 --- a/src/aproxy/profiler.rs +++ b/src/aproxy/profiler.rs @@ -6,12 +6,11 @@ use std::{ }; use bytes::{Buf, BufMut, BytesMut}; -use dns_lookup::lookup_host; use itertools::Itertools; use rand::random; use ringbuf::{HeapRb, Rb}; use rustls_pki_types::ServerName; -use surge_ping::{Client, ConfigBuilder, PingIdentifier, PingSequence, ICMP}; +use surge_ping::{Client, ConfigBuilder, PingIdentifier, PingSequence, Pinger, ICMP}; use tokio::{ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf}, net::TcpStream, @@ -193,65 +192,90 @@ async fn do_check( } } +async fn ping_server(mut pinger: Pinger) -> (u128, u128, IpAddr) { + pinger.timeout(Duration::from_millis(999)); + let mut avg_cost = 0; + let mut received = 0; + let mut tick = tokio::time::interval(Duration::from_secs(1)); + for i in 0..100u128 { + tick.tick().await; + if let Ok((_, cost)) = pinger.ping(PingSequence(i as u16), &[]).await { + avg_cost = ((avg_cost * received) + cost.as_millis()) / (received + 1); + received += 1; + } + } + (avg_cost, received, pinger.host) +} + async fn check_server(host: String, timeout: u64, ip_timeout: u64) { let config = ConfigBuilder::default().kind(ICMP::V4).build(); let mut client = Client::new(&config).unwrap(); let mut interval = tokio::time::interval(Duration::from_secs(timeout)); let size = (ip_timeout / timeout + 1) as usize; - let mut rb = HeapRb::new(size); + let mut all_rb = HashMap::new(); loop { interval.tick().await; - let ip = if let Ok(Some(ip)) = - lookup_host(host.as_str()).map(|data| data.iter().find(|ip| ip.is_ipv4()).cloned()) - { - ip - } else { + let mut tasks = vec![]; + if let Ok(addrs) = tokio::net::lookup_host((host.clone(), 443)).await { + for addr in addrs { + if addr.is_ipv4() { + let pinger = client.pinger(addr.ip(), PingIdentifier(random())).await; + let t = ping_server(pinger); + tasks.push(t); + } + } + } + if tasks.is_empty() { continue; - }; - let mut pinger = client.pinger(ip, PingIdentifier(random())).await; - pinger.timeout(Duration::from_millis(999)); - let mut avg_cost = 0; - let mut received = 0; - let mut tick = tokio::time::interval(Duration::from_secs(1)); - for i in 0..100u128 { - tick.tick().await; - if let Ok((_, cost)) = pinger.ping(PingSequence(i as u16), &[]).await { - avg_cost = ((avg_cost * received) + cost.as_millis()) / (received + 1); - received += 1; + } + + let mut reset_client = false; + let mut total_avg_lost = 0; + let mut total_avg_ping = 0; + let task_count = tasks.len(); + for (avg_cost, received, ip) in futures::future::join_all(tasks).await { + let rb = all_rb.entry(ip).or_insert_with(|| HeapRb::new(size)); + if received != 100 { + reset_client = true; + } + + log::error!( + "current proxy server status, ip:{} ping:{}, lost:{}", + ip, + avg_cost, + 100 - received + ); + + rb.push_overwrite(Condition { + lost: (100 - received) as u8, + ping: avg_cost as u16, + }); + let mut total_ping = 0; + let mut total_lost = 0; + for cond in rb.iter() { + total_ping += cond.ping as usize; + total_lost += cond.lost as usize; } + let avg_ping = total_ping / rb.len(); + let avg_lost = total_lost / rb.len(); + log::error!( + "average proxy server status, ip:{} ping:{}, lost:{}", + ip, + avg_ping, + avg_lost, + ); + total_avg_ping += avg_ping; + total_avg_lost += avg_lost; } - if received != 100 { + + if reset_client { // recreate a client if any error occur, otherwise pinger would stuck for the error, maybe some error in surge client = Client::new(&config).unwrap(); } - log::error!( - "current proxy server status, ip:{} ping:{}, lost:{}", - ip, - avg_cost, - 100 - received - ); - rb.push_overwrite(Condition { - lost: (100 - received) as u8, - ping: avg_cost as u16, - }); - let mut total_ping = 0; - let mut total_lost = 0; - for cond in rb.iter() { - total_ping += cond.ping as usize; - total_lost += cond.lost as usize; - } - let avg_ping = total_ping / rb.len(); - let avg_lost = total_lost / rb.len(); - log::error!( - "average proxy server status, ip:{} ping:{}, lost:{}", - ip, - avg_ping, - avg_lost, - ); if let Err(err) = CONDITION.write().map(|mut cond| { - cond.lost = avg_lost as u8; - cond.ping = avg_ping as u16; + cond.lost = (total_avg_lost / task_count) as u8; + cond.ping = (total_avg_ping / task_count) as u16; }) { log::error!("write on condition failed:{}", err); }