diff --git a/leaf-bin/src/main.rs b/leaf-bin/src/main.rs index b19914f61..16efb19c2 100644 --- a/leaf-bin/src/main.rs +++ b/leaf-bin/src/main.rs @@ -93,8 +93,23 @@ fn main() { .enable_all() .build() .unwrap(); - rt.block_on(leaf::util::test_outbound(&tag, &config)); - exit(0); + match rt.block_on(leaf::util::test_outbound(&tag, &config, None)) { + Err(e) => { + println!("test outbound failed: {}", e); + exit(1); + } + Ok((tcp_res, udp_res)) => { + match tcp_res { + Ok(duration) => println!("TCP ok in {}ms", duration.as_millis()), + Err(e) => println!("TCP failed: {}", e), + } + match udp_res { + Ok(duration) => println!("UDP ok in {}ms", duration.as_millis()), + Err(e) => println!("UDP failed: {}", e), + } + exit(0); + } + } } if let Err(e) = leaf::util::run_with_options( diff --git a/leaf/src/util.rs b/leaf/src/util.rs index 54a15f3a8..7487a3058 100644 --- a/leaf/src/util.rs +++ b/leaf/src/util.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::Result; +use anyhow::{anyhow, Result}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::RwLock; use tokio::time::timeout; @@ -69,142 +69,90 @@ pub fn run_with_options( } async fn test_tcp_outbound( - sess: &Session, dns_client: SyncDnsClient, - handler: &AnyOutboundHandler, -) { + handler: AnyOutboundHandler, +) -> Result { + let sess = Session { + destination: SocksAddr::Domain("www.google.com".to_string(), 80), + ..Default::default() + }; let start = tokio::time::Instant::now(); - match crate::proxy::connect_tcp_outbound(sess, dns_client, handler).await { - Ok(stream) => match TcpOutboundHandler::handle(handler.as_ref(), &sess, stream).await { - Ok(mut stream) => { - if let Err(e) = stream.write_all(b"HEAD / HTTP/1.1\r\n\r\n").await { - println!("write to outbound {} failed: {}", &handler.tag(), e); - return; - } - let mut buf = vec![0u8; 30]; - match stream.read_exact(&mut buf).await { - Ok(_) => { - let elapsed = tokio::time::Instant::now().duration_since(start); - println!( - "received response from outbound {} in {}ms", - &handler.tag(), - elapsed.as_millis() - ); - println!("truncated response:\n{}", String::from_utf8_lossy(&buf)) - } - Err(e) => { - println!("read from outbound {} failed: {}", &handler.tag(), e); - } - } - } - Err(e) => { - println!("dispatch to outbound {} failed: {}", &handler.tag(), e); - } - }, - Err(e) => { - println!("dispatch to outbound {} failed: {}", &handler.tag(), e); - } + let stream = crate::proxy::connect_tcp_outbound(&sess, dns_client, &handler).await?; + let mut stream = TcpOutboundHandler::handle(handler.as_ref(), &sess, stream).await?; + stream.write_all(b"HEAD / HTTP/1.1\r\n\r\n").await?; + let mut buf = Vec::new(); + let n = stream.read_buf(&mut buf).await?; + if n == 0 { + Err(anyhow!("EOF")) + } else { + Ok(tokio::time::Instant::now().duration_since(start)) } } async fn test_udp_outbound( - sess: &Session, dns_client: SyncDnsClient, - handler: &AnyOutboundHandler, -) { + handler: AnyOutboundHandler, +) -> Result { use rand::{rngs::StdRng, Rng, SeedableRng}; use trust_dns_proto::{ op::{header::MessageType, op_code::OpCode, query::Query, Message}, rr::{record_type::RecordType, Name}, }; - let start = tokio::time::Instant::now(); - match crate::proxy::connect_udp_outbound(sess, dns_client, handler).await { - Ok(transport) => { - match UdpOutboundHandler::handle(handler.as_ref(), sess, transport).await { - Ok(socket) => { - let addr = - SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53)); - let mut msg = Message::new(); - let name = Name::from_str("www.google.com.").unwrap(); - let query = Query::query(name, RecordType::A); - msg.add_query(query); - let mut rng = StdRng::from_entropy(); - let id: u16 = rng.gen(); - msg.set_id(id); - msg.set_op_code(OpCode::Query); - msg.set_message_type(MessageType::Query); - msg.set_recursion_desired(true); - let msg_buf = msg.to_vec().unwrap(); - let (mut recv, mut send) = socket.split(); - if let Err(e) = send.send_to(&msg_buf, &addr).await { - println!("send message to {} failed: {}", &handler.tag(), e); - } - let mut buf = [0u8; 1500]; - match recv.recv_from(&mut buf).await { - Ok(_) => { - let elapsed = tokio::time::Instant::now().duration_since(start); - println!( - "received response from outbound {} in {}ms", - &handler.tag(), - elapsed.as_millis() - ); - } - Err(e) => { - println!("receive from outbound {} failed: {}", &handler.tag(), e); - } - } - } - Err(e) => { - println!("dispatch to outbound {} failed: {}", &handler.tag(), e); - } - } - } - Err(e) => { - println!("dispatch to outbound {} failed: {}", &handler.tag(), e); - } - } -} - -pub async fn test_outbound(tag: &str, config: &Config) { - let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns).unwrap())); - let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone()).unwrap(); - let handler = if let Some(v) = outbound_manager.get(tag) { - v - } else { - println!("outbound {} not found", tag); - return; - }; - println!("testing outbound {}", &handler.tag()); - - println!(); - - println!("testing TCP..."); + let addr = SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53)); let sess = Session { - destination: SocksAddr::Domain("www.google.com".to_string(), 80), + destination: addr.clone(), ..Default::default() }; - if let Err(e) = timeout( - Duration::from_secs(4), - test_tcp_outbound(&sess, dns_client.clone(), &handler), - ) - .await - { - println!("test outbound {} failed: {}", &handler.tag(), e); - } - - println!(); + let start = tokio::time::Instant::now(); + let dgram = crate::proxy::connect_udp_outbound(&sess, dns_client, &handler).await?; + let dgram = UdpOutboundHandler::handle(handler.as_ref(), &sess, dgram).await?; + let mut msg = Message::new(); + let name = Name::from_str("www.google.com.")?; + let query = Query::query(name, RecordType::A); + msg.add_query(query); + let mut rng = StdRng::from_entropy(); + let id: u16 = rng.gen(); + msg.set_id(id); + msg.set_op_code(OpCode::Query); + msg.set_message_type(MessageType::Query); + msg.set_recursion_desired(true); + let msg_buf = msg.to_vec()?; + let (mut recv, mut send) = dgram.split(); + send.send_to(&msg_buf, &addr).await?; + let mut buf = [0u8; 1500]; + let _ = recv.recv_from(&mut buf).await?; + Ok(tokio::time::Instant::now().duration_since(start)) +} - println!("testing UDP..."); - let sess = Session { - destination: SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53)), - ..Default::default() - }; - if let Err(e) = timeout( - Duration::from_secs(4), - test_udp_outbound(&sess, dns_client.clone(), &handler), +pub async fn test_outbound( + tag: &str, + config: &Config, + to: Option, +) -> Result<(Result, Result)> { + let to = to.unwrap_or(Duration::from_secs(4)); + let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?)); + let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?; + let handler = outbound_manager + .get(tag) + .ok_or_else(|| anyhow!("outbound {} not found", tag))?; + let (tcp_res, udp_res) = futures::future::join( + timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())), + timeout(to, test_udp_outbound(dns_client, handler)), ) - .await - { - println!("test outbound {} failed: {}", &handler.tag(), e); - } + .await; + let tcp_res = match tcp_res.map_err(|e| e.into()) { + Err(e) => Err(e), + Ok(res) => match res { + Err(e) => Err(e), + Ok(duration) => Ok(duration), + }, + }; + let udp_res = match udp_res.map_err(|e| e.into()) { + Err(e) => Err(e), + Ok(res) => match res { + Err(e) => Err(e), + Ok(duration) => Ok(duration), + }, + }; + Ok((tcp_res, udp_res)) }