Skip to content

Commit

Permalink
refactor TrafficStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Jun 5, 2024
1 parent 5088234 commit e8c8920
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
overtls-android-libs.zip
overtls-android-libs/
overtls.xcframework/
overtls-ffi.h
examples/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures-util = { version = "0.3", default-features = false, features = [
] }
http = "1"
httparse = "1"
lazy_static = "1"
log = { version = "0.4", features = ["std"] }
moka = { version = "0.12", default-features = false, features = ["future"] }
reqwest = { version = "0.12", default-features = false, features = [
Expand Down Expand Up @@ -66,4 +67,3 @@ daemonize = "0.5"
[target.'cfg(target_os="android")'.dependencies]
android_logger = "0.13"
jni = { version = "0.21", default-features = false }
lazy_static = "1"
1 change: 1 addition & 0 deletions cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include = [
"over_tls_client_run_with_ssr_url",
"over_tls_client_stop",
"overtls_set_log_callback",
"overtls_set_traffic_status_callback",
"overtls_generate_url",
"overtls_free_string",
]
Expand Down
53 changes: 17 additions & 36 deletions src/android.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![cfg(target_os = "android")]

use crate::traffic_status::{overtls_set_traffic_status_callback, TrafficStatus};
use crate::{ArgVerbosity, Error, Result};
use jni::{
objects::{GlobalRef, JClass, JObject, JString, JValue},
signature::{Primitive, ReturnType},
sys::jint,
JNIEnv, JavaVM,
};
use std::sync::RwLock;
use std::os::raw::c_void;

static EXITING_FLAG: std::sync::Mutex<Option<crate::CancellationToken>> = std::sync::Mutex::new(None);

Expand Down Expand Up @@ -53,8 +54,10 @@ pub unsafe extern "C" fn Java_com_github_shadowsocks_bg_OverTlsWrapper_runClient
.replace(env.new_global_ref(vpn_service)?);

if let Ok(stat_path) = get_java_string(&mut env, &stat_path) {
let mut stat = STAT_PATH.write().map_err(|e| Error::from(e.to_string()))?;
*stat = stat_path;
let mut stat = STAT_PATH.lock().map_err(|e| Error::from(e.to_string()))?;
*stat = Some(stat_path);

overtls_set_traffic_status_callback(1, Some(send_traffic_stat), std::ptr::null_mut());
}
let config_path = get_java_string(&mut env, &config_path)?.to_owned();
set_panic_handler();
Expand Down Expand Up @@ -135,47 +138,25 @@ fn remove_panic_handler() {
let _ = std::panic::take_hook();
}

#[repr(C)]
#[derive(Debug, Default, Copy, Clone)]
struct TrafficStatus {
tx: u64,
rx: u64,
}
static STAT_PATH: std::sync::Mutex<Option<String>> = std::sync::Mutex::new(None);

lazy_static::lazy_static! {
static ref TRAFFIC_STATUS: RwLock<TrafficStatus> = RwLock::new(TrafficStatus::default());
static ref STAT_PATH: RwLock<String> = RwLock::new(String::new());
static ref TIME_STAMP: RwLock<std::time::Instant> = RwLock::new(std::time::Instant::now());
}

pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> {
{
let mut traffic_status = TRAFFIC_STATUS.write().map_err(|e| Error::from(e.to_string()))?;
traffic_status.tx += delta_tx as u64;
traffic_status.rx += delta_rx as u64;
}
let old_time = { *TIME_STAMP.read().map_err(|e| Error::from(e.to_string()))? };
if std::time::Instant::now().duration_since(old_time).as_secs() >= 1 {
send_traffic_stat()?;
let mut time_stamp = TIME_STAMP.write().map_err(|e| Error::from(e.to_string()))?;
*time_stamp = std::time::Instant::now();
unsafe extern "C" fn send_traffic_stat(traffic_status: *const TrafficStatus, _ctx: *mut c_void) {
let traffic_status = *traffic_status;
if let Err(e) = _send_traffic_stat(&traffic_status) {
log::error!("failed to send traffic stat, error={:?}", e);
}
Ok(())
}

fn send_traffic_stat() -> Result<()> {
fn _send_traffic_stat(traffic_status: &TrafficStatus) -> Result<()> {
use std::io::{Read, Write};
let stat_path = { (*STAT_PATH.read().map_err(|e| Error::from(e.to_string()))?).clone() };
if stat_path.is_empty() {
return Ok(());
}
let stat_path = {
let stat_path = STAT_PATH.lock().map_err(|e| Error::from(e.to_string()))?;
stat_path.clone().ok_or_else(|| Error::from("stat path is not initialized"))?
};
let mut stream = std::os::unix::net::UnixStream::connect(&stat_path)?;
stream.set_write_timeout(Some(std::time::Duration::new(1, 0)))?;
stream.set_read_timeout(Some(std::time::Duration::new(1, 0)))?;
let buf = {
let traffic_status = TRAFFIC_STATUS.read().map_err(|e| Error::from(e.to_string()))?;
unsafe { std::mem::transmute::<TrafficStatus, [u8; std::mem::size_of::<TrafficStatus>()]>(*traffic_status) }
};
let buf = unsafe { std::mem::transmute::<TrafficStatus, [u8; std::mem::size_of::<TrafficStatus>()]>(*traffic_status) };
stream.write_all(&buf)?;

let mut response = String::new();
Expand Down
6 changes: 2 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ async fn client_traffic_loop<T: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + A
ws_stream.send(Message::Binary(buf.to_vec())).await?;
log::trace!("{} -> {} length {}", peer_addr, target_addr, buf.len());

#[cfg(target_os = "android")]
if let Err(e) = crate::android::traffic_status_update(len, 0) {
if let Err(e) = crate::traffic_status::traffic_status_update(len, 0) {
log::error!("{}", e);
}

Expand All @@ -191,8 +190,7 @@ async fn client_traffic_loop<T: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + A
result = ws_stream.next() => {
let msg = result.ok_or("message not exist")??;

#[cfg(target_os = "android")]
if let Err(e) = crate::android::traffic_status_update(0, msg.len()) {
if let Err(e) = crate::traffic_status::traffic_status_update(0, msg.len()) {
log::error!("{}", e);
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) mod server;
pub(crate) mod tcp_stream;
pub(crate) mod tls;
pub(crate) mod traffic_audit;
pub(crate) mod traffic_status;
pub(crate) mod udprelay;
pub(crate) mod webapi;
pub(crate) mod weirduri;
Expand All @@ -26,6 +27,7 @@ pub use error::{BoxError, Error, Result};
pub use server::run_server;
use socks5_impl::protocol::{Address, StreamOperation};
pub use tokio_util::sync::CancellationToken;
pub use traffic_status::{overtls_set_traffic_status_callback, TrafficStatus};

#[cfg(target_os = "windows")]
pub(crate) const STREAM_BUFFER_SIZE: usize = 1024 * 32;
Expand Down
79 changes: 79 additions & 0 deletions src/traffic_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::error::{Error, Result};
use std::sync::RwLock;
use std::{os::raw::c_void, sync::Mutex};

/// # Safety
///
/// set traffic status callback.
#[no_mangle]
pub unsafe extern "C" fn overtls_set_traffic_status_callback(
send_interval_secs: u32,
callback: Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>,
ctx: *mut c_void,
) {
if let Ok(mut cb) = TRAFFIC_STATUS_CALLBACK.lock() {
*cb = Some(TrafficStatusCallback(callback, ctx));
} else {
log::error!("set traffic status callback failed");
}
if send_interval_secs > 0 {
SEND_INTERVAL_SECS.store(send_interval_secs as u64, std::sync::atomic::Ordering::Relaxed);
}
}

#[repr(C)]
#[derive(Debug, Default, Copy, Clone, serde::Serialize, serde::Deserialize)]
pub struct TrafficStatus {
pub tx: u64,
pub rx: u64,
}

#[derive(Clone)]
struct TrafficStatusCallback(Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>, *mut c_void);

impl TrafficStatusCallback {
unsafe fn call(self, info: &TrafficStatus) {
if let Some(cb) = self.0 {
cb(info, self.1);
}
}
}

unsafe impl Send for TrafficStatusCallback {}
unsafe impl Sync for TrafficStatusCallback {}

static TRAFFIC_STATUS_CALLBACK: Mutex<Option<TrafficStatusCallback>> = Mutex::new(None);
static SEND_INTERVAL_SECS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);

lazy_static::lazy_static! {
static ref TRAFFIC_STATUS: RwLock<TrafficStatus> = RwLock::new(TrafficStatus::default());
static ref TIME_STAMP: RwLock<std::time::Instant> = RwLock::new(std::time::Instant::now());
}

pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> {
{
let mut traffic_status = TRAFFIC_STATUS.write().map_err(|e| Error::from(e.to_string()))?;
traffic_status.tx += delta_tx as u64;
traffic_status.rx += delta_rx as u64;
}
let old_time = { *TIME_STAMP.read().map_err(|e| Error::from(e.to_string()))? };
let interval_secs = SEND_INTERVAL_SECS.load(std::sync::atomic::Ordering::Relaxed);
if std::time::Instant::now().duration_since(old_time).as_secs() >= interval_secs {
send_traffic_stat()?;
{
let mut time_stamp = TIME_STAMP.write().map_err(|e| Error::from(e.to_string()))?;
*time_stamp = std::time::Instant::now();
}
}
Ok(())
}

fn send_traffic_stat() -> Result<()> {
if let Ok(cb) = TRAFFIC_STATUS_CALLBACK.lock() {
if let Some(cb) = cb.clone() {
let traffic_status = { *TRAFFIC_STATUS.read().map_err(|e| Error::from(e.to_string()))? };
unsafe { cb.call(&traffic_status) };
}
}
Ok(())
}
6 changes: 2 additions & 4 deletions src/udprelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ async fn _run_udp_loop<S: AsyncRead + AsyncWrite + Unpin>(
src_addr.write_to_buf(&mut buf);
buf.put_slice(&pkt);

#[cfg(target_os = "android")]
if let Err(e) = crate::android::traffic_status_update(buf.len(), 0) {
if let Err(e) = crate::traffic_status::traffic_status_update(buf.len(), 0) {
log::error!("{}", e);
}

Expand All @@ -208,8 +207,7 @@ async fn _run_udp_loop<S: AsyncRead + AsyncWrite + Unpin>(
},
msg = ws_stream.next() => {
let len = msg.as_ref().map(|m| m.as_ref().map(|m| m.len()).unwrap_or(0)).unwrap_or(0);
#[cfg(target_os = "android")]
if let Err(e) = crate::android::traffic_status_update(0, len) {
if let Err(e) = crate::traffic_status::traffic_status_update(0, len) {
log::error!("{}", e);
}

Expand Down

0 comments on commit e8c8920

Please sign in to comment.