Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eycorsican committed Apr 21, 2022
1 parent 0195e9a commit 499c5f2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 124 deletions.
19 changes: 17 additions & 2 deletions leaf-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,23 @@ fn main() {
.enable_all()
.build()
.unwrap();
rt.block_on(leaf::util::test_outbound(&tag, &config));
exit(0);
match rt.block_on(leaf::util::test_outbound(&tag, &config, None)) {
Err(e) => {
println!("test outbound failed: {}", e);
exit(1);
}
Ok((tcp_res, udp_res)) => {
match tcp_res {
Ok(duration) => println!("TCP ok in {}ms", duration.as_millis()),
Err(e) => println!("TCP failed: {}", e),
}
match udp_res {
Ok(duration) => println!("UDP ok in {}ms", duration.as_millis()),
Err(e) => println!("UDP failed: {}", e),
}
exit(0);
}
}
}

if let Err(e) = leaf::util::run_with_options(
Expand Down
192 changes: 70 additions & 122 deletions leaf/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use anyhow::{anyhow, Result};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::RwLock;
use tokio::time::timeout;
Expand Down Expand Up @@ -69,142 +69,90 @@ pub fn run_with_options(
}

async fn test_tcp_outbound(
sess: &Session,
dns_client: SyncDnsClient,
handler: &AnyOutboundHandler,
) {
handler: AnyOutboundHandler,
) -> Result<Duration> {
let sess = Session {
destination: SocksAddr::Domain("www.google.com".to_string(), 80),
..Default::default()
};
let start = tokio::time::Instant::now();
match crate::proxy::connect_tcp_outbound(sess, dns_client, handler).await {
Ok(stream) => match TcpOutboundHandler::handle(handler.as_ref(), &sess, stream).await {
Ok(mut stream) => {
if let Err(e) = stream.write_all(b"HEAD / HTTP/1.1\r\n\r\n").await {
println!("write to outbound {} failed: {}", &handler.tag(), e);
return;
}
let mut buf = vec![0u8; 30];
match stream.read_exact(&mut buf).await {
Ok(_) => {
let elapsed = tokio::time::Instant::now().duration_since(start);
println!(
"received response from outbound {} in {}ms",
&handler.tag(),
elapsed.as_millis()
);
println!("truncated response:\n{}", String::from_utf8_lossy(&buf))
}
Err(e) => {
println!("read from outbound {} failed: {}", &handler.tag(), e);
}
}
}
Err(e) => {
println!("dispatch to outbound {} failed: {}", &handler.tag(), e);
}
},
Err(e) => {
println!("dispatch to outbound {} failed: {}", &handler.tag(), e);
}
let stream = crate::proxy::connect_tcp_outbound(&sess, dns_client, &handler).await?;
let mut stream = TcpOutboundHandler::handle(handler.as_ref(), &sess, stream).await?;
stream.write_all(b"HEAD / HTTP/1.1\r\n\r\n").await?;
let mut buf = Vec::new();
let n = stream.read_buf(&mut buf).await?;
if n == 0 {
Err(anyhow!("EOF"))
} else {
Ok(tokio::time::Instant::now().duration_since(start))
}
}

async fn test_udp_outbound(
sess: &Session,
dns_client: SyncDnsClient,
handler: &AnyOutboundHandler,
) {
handler: AnyOutboundHandler,
) -> Result<Duration> {
use rand::{rngs::StdRng, Rng, SeedableRng};
use trust_dns_proto::{
op::{header::MessageType, op_code::OpCode, query::Query, Message},
rr::{record_type::RecordType, Name},
};
let start = tokio::time::Instant::now();
match crate::proxy::connect_udp_outbound(sess, dns_client, handler).await {
Ok(transport) => {
match UdpOutboundHandler::handle(handler.as_ref(), sess, transport).await {
Ok(socket) => {
let addr =
SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53));
let mut msg = Message::new();
let name = Name::from_str("www.google.com.").unwrap();
let query = Query::query(name, RecordType::A);
msg.add_query(query);
let mut rng = StdRng::from_entropy();
let id: u16 = rng.gen();
msg.set_id(id);
msg.set_op_code(OpCode::Query);
msg.set_message_type(MessageType::Query);
msg.set_recursion_desired(true);
let msg_buf = msg.to_vec().unwrap();
let (mut recv, mut send) = socket.split();
if let Err(e) = send.send_to(&msg_buf, &addr).await {
println!("send message to {} failed: {}", &handler.tag(), e);
}
let mut buf = [0u8; 1500];
match recv.recv_from(&mut buf).await {
Ok(_) => {
let elapsed = tokio::time::Instant::now().duration_since(start);
println!(
"received response from outbound {} in {}ms",
&handler.tag(),
elapsed.as_millis()
);
}
Err(e) => {
println!("receive from outbound {} failed: {}", &handler.tag(), e);
}
}
}
Err(e) => {
println!("dispatch to outbound {} failed: {}", &handler.tag(), e);
}
}
}
Err(e) => {
println!("dispatch to outbound {} failed: {}", &handler.tag(), e);
}
}
}

pub async fn test_outbound(tag: &str, config: &Config) {
let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns).unwrap()));
let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone()).unwrap();
let handler = if let Some(v) = outbound_manager.get(tag) {
v
} else {
println!("outbound {} not found", tag);
return;
};
println!("testing outbound {}", &handler.tag());

println!();

println!("testing TCP...");
let addr = SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53));
let sess = Session {
destination: SocksAddr::Domain("www.google.com".to_string(), 80),
destination: addr.clone(),
..Default::default()
};
if let Err(e) = timeout(
Duration::from_secs(4),
test_tcp_outbound(&sess, dns_client.clone(), &handler),
)
.await
{
println!("test outbound {} failed: {}", &handler.tag(), e);
}

println!();
let start = tokio::time::Instant::now();
let dgram = crate::proxy::connect_udp_outbound(&sess, dns_client, &handler).await?;
let dgram = UdpOutboundHandler::handle(handler.as_ref(), &sess, dgram).await?;
let mut msg = Message::new();
let name = Name::from_str("www.google.com.")?;
let query = Query::query(name, RecordType::A);
msg.add_query(query);
let mut rng = StdRng::from_entropy();
let id: u16 = rng.gen();
msg.set_id(id);
msg.set_op_code(OpCode::Query);
msg.set_message_type(MessageType::Query);
msg.set_recursion_desired(true);
let msg_buf = msg.to_vec()?;
let (mut recv, mut send) = dgram.split();
send.send_to(&msg_buf, &addr).await?;
let mut buf = [0u8; 1500];
let _ = recv.recv_from(&mut buf).await?;
Ok(tokio::time::Instant::now().duration_since(start))
}

println!("testing UDP...");
let sess = Session {
destination: SocksAddr::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53)),
..Default::default()
};
if let Err(e) = timeout(
Duration::from_secs(4),
test_udp_outbound(&sess, dns_client.clone(), &handler),
pub async fn test_outbound(
tag: &str,
config: &Config,
to: Option<Duration>,
) -> Result<(Result<Duration>, Result<Duration>)> {
let to = to.unwrap_or(Duration::from_secs(4));
let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?));
let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?;
let handler = outbound_manager
.get(tag)
.ok_or_else(|| anyhow!("outbound {} not found", tag))?;
let (tcp_res, udp_res) = futures::future::join(
timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())),
timeout(to, test_udp_outbound(dns_client, handler)),
)
.await
{
println!("test outbound {} failed: {}", &handler.tag(), e);
}
.await;
let tcp_res = match tcp_res.map_err(|e| e.into()) {
Err(e) => Err(e),
Ok(res) => match res {
Err(e) => Err(e),
Ok(duration) => Ok(duration),
},
};
let udp_res = match udp_res.map_err(|e| e.into()) {
Err(e) => Err(e),
Ok(res) => match res {
Err(e) => Err(e),
Ok(duration) => Ok(duration),
},
};
Ok((tcp_res, udp_res))
}

0 comments on commit 499c5f2

Please sign in to comment.