diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index 4c7a3046..d56bee9e 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -84,9 +84,8 @@ impl Point for Payload { } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum Event { - /// App name and handle for brige to send actions to the app - RegisterActionRoute(String, ActionRouter), /// Data sent by the app Data(Payload), /// Sometime apps can choose to directly send action response instead @@ -153,6 +152,42 @@ impl Bridge { } } + pub fn register_action_route(&mut self, route: ActionRoute) -> Receiver { + let (actions_tx, actions_rx) = bounded(1); + self.insert_route(route, actions_tx.clone()); + + actions_rx + } + + pub fn register_action_routes, V: IntoIterator>( + &mut self, + routes: V, + ) -> Option> { + let routes: Vec = routes.into_iter().map(|n| n.into()).collect(); + if routes.is_empty() { + return None; + } + + let (actions_tx, actions_rx) = bounded(1); + for route in routes { + self.insert_route(route, actions_tx.clone()); + } + + Some(actions_rx) + } + + fn insert_route( + &mut self, + ActionRoute { name, timeout }: ActionRoute, + actions_tx: Sender, + ) { + let duration = Duration::from_secs(timeout); + let action_router = ActionRouter { actions_tx, duration }; + if self.action_routes.insert(name.clone(), action_router).is_some() { + panic!("Action Route clash: {name}"); + } + } + pub fn tx(&mut self) -> BridgeTx { BridgeTx { events_tx: self.bridge_tx.clone(), shutdown_handle: self.ctrl_tx.clone() } } @@ -212,9 +247,6 @@ impl Bridge { event = self.bridge_rx.recv_async() => { let event = event?; match event { - Event::RegisterActionRoute(name, tx) => { - self.action_routes.insert(name, tx); - } Event::Data(v) => { self.streams.forward(v).await; } @@ -358,6 +390,11 @@ impl Bridge { let mut fwd_action = inflight_action.action.clone(); fwd_action.name = fwd_name.to_owned(); + debug!( + "Redirecting action {}: {} ~> {}", + fwd_action.action_id, inflight_action.action.name, fwd_action.name + ); + if let Err(e) = self.try_route_action(fwd_action.clone()) { error!("Failed to route action to app. Error = {:?}", e); self.forward_action_error(fwd_action, e).await; @@ -451,41 +488,6 @@ pub struct BridgeTx { } impl BridgeTx { - pub async fn register_action_route(&self, route: ActionRoute) -> Receiver { - let (actions_tx, actions_rx) = bounded(1); - let ActionRoute { name, timeout } = route; - let duration = Duration::from_secs(timeout); - let action_router = ActionRouter { actions_tx, duration }; - let event = Event::RegisterActionRoute(name, action_router); - - // Bridge should always be up and hence unwrap is ok - self.events_tx.send_async(event).await.unwrap(); - actions_rx - } - - pub async fn register_action_routes, V: IntoIterator>( - &self, - routes: V, - ) -> Option> { - let routes: Vec = routes.into_iter().map(|n| n.into()).collect(); - if routes.is_empty() { - return None; - } - - let (actions_tx, actions_rx) = bounded(1); - - for route in routes { - let ActionRoute { name, timeout } = route; - let duration = Duration::from_secs(timeout); - let action_router = ActionRouter { actions_tx: actions_tx.clone(), duration }; - let event = Event::RegisterActionRoute(name, action_router); - // Bridge should always be up and hence unwrap is ok - self.events_tx.send_async(event).await.unwrap(); - } - - Some(actions_rx) - } - pub async fn send_payload(&self, payload: Payload) { let event = Event::Data(payload); self.events_tx.send_async(event).await.unwrap() @@ -537,21 +539,21 @@ mod tests { } } - fn start_bridge(config: Arc) -> (BridgeTx, Sender, Receiver>) { + fn create_bridge(config: Arc) -> (Bridge, Sender, Receiver>) { let (package_tx, package_rx) = bounded(10); let (metrics_tx, _) = bounded(10); let (actions_tx, actions_rx) = bounded(10); let (shutdown_handle, _) = bounded(1); + let bridge = Bridge::new(config, package_tx, metrics_tx, actions_rx, shutdown_handle); - let mut bridge = Bridge::new(config, package_tx, metrics_tx, actions_rx, shutdown_handle); - let bridge_tx = bridge.tx(); + (bridge, actions_tx, package_rx) + } + fn spawn_bridge(mut bridge: Bridge) { std::thread::spawn(move || { let rt = Runtime::new().unwrap(); rt.block_on(async { bridge.start().await.unwrap() }); }); - - (bridge_tx, actions_tx, package_rx) } struct Responses { @@ -575,12 +577,14 @@ mod tests { let tmpdir = tempdir::TempDir::new("bridge").unwrap(); std::env::set_current_dir(&tmpdir).unwrap(); let config = Arc::new(default_config()); - let (bridge_tx, actions_tx, package_rx) = start_bridge(config); + let (mut bridge, actions_tx, package_rx) = create_bridge(config); let route_1 = ActionRoute { name: "route_1".to_string(), timeout: 10 }; - let route_1_rx = bridge_tx.register_action_route(route_1).await; + let route_1_rx = bridge.register_action_route(route_1); let route_2 = ActionRoute { name: "route_2".to_string(), timeout: 30 }; - let route_2_rx = bridge_tx.register_action_route(route_2).await; + let route_2_rx = bridge.register_action_route(route_2); + + spawn_bridge(bridge); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); @@ -653,10 +657,12 @@ mod tests { let tmpdir = tempdir::TempDir::new("bridge").unwrap(); std::env::set_current_dir(&tmpdir).unwrap(); let config = Arc::new(default_config()); - let (bridge_tx, actions_tx, package_rx) = start_bridge(config); + let (mut bridge, actions_tx, package_rx) = create_bridge(config); let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; - let action_rx = bridge_tx.register_action_route(test_route).await; + let action_rx = bridge.register_action_route(test_route); + + spawn_bridge(bridge); std::thread::spawn(move || loop { let action = action_rx.recv().unwrap(); @@ -699,10 +705,13 @@ mod tests { let tmpdir = tempdir::TempDir::new("bridge").unwrap(); std::env::set_current_dir(&tmpdir).unwrap(); let config = Arc::new(default_config()); - let (bridge_tx, actions_tx, package_rx) = start_bridge(config); + let (mut bridge, actions_tx, package_rx) = create_bridge(config); let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; - let action_rx = bridge_tx.register_action_route(test_route).await; + let action_rx = bridge.register_action_route(test_route); + let bridge_tx = bridge.tx(); + + spawn_bridge(bridge); std::thread::spawn(move || loop { let action = action_rx.recv().unwrap(); @@ -741,31 +750,36 @@ mod tests { std::env::set_current_dir(&tmpdir).unwrap(); let mut config = default_config(); config.action_redirections.insert("test".to_string(), "redirect".to_string()); - let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config)); - let bridge_tx_clone = bridge_tx.clone(); + let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config)); + let bridge_tx_1 = bridge.tx(); + let bridge_tx_2 = bridge.tx(); + + let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let action_rx_1 = bridge.register_action_route(test_route); + + let redirect_route = ActionRoute { name: "redirect".to_string(), timeout: 30 }; + let action_rx_2 = bridge.register_action_route(redirect_route); + + spawn_bridge(bridge); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx.register_action_route(test_route)); - let action = action_rx.recv().unwrap(); + let action = action_rx_1.recv().unwrap(); assert_eq!(action.action_id, "1".to_owned()); std::thread::sleep(Duration::from_secs(1)); let response = ActionResponse::progress("1", "Tested", 100); - rt.block_on(bridge_tx.send_action_response(response)); + rt.block_on(bridge_tx_1.send_action_response(response)); }); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let test_route = ActionRoute { name: "redirect".to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route)); - let action = action_rx.recv().unwrap(); + let action = action_rx_2.recv().unwrap(); assert_eq!(action.action_id, "1".to_owned()); let response = ActionResponse::progress("1", "Redirected", 0); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); std::thread::sleep(Duration::from_secs(1)); let response = ActionResponse::success("1"); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); }); std::thread::sleep(Duration::from_secs(1)); @@ -800,33 +814,38 @@ mod tests { let tmpdir = tempdir::TempDir::new("bridge").unwrap(); std::env::set_current_dir(&tmpdir).unwrap(); let config = default_config(); - let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config)); - let bridge_tx_clone = bridge_tx.clone(); + let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config)); + let bridge_tx_1 = bridge.tx(); + let bridge_tx_2 = bridge.tx(); + + let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; + let action_rx_1 = bridge.register_action_route(tunshell_route); + + let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let action_rx_2 = bridge.register_action_route(test_route); + + spawn_bridge(bridge); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx.register_action_route(tunshell_route)); - let action = action_rx.recv().unwrap(); + let action = action_rx_1.recv().unwrap(); assert_eq!(action.action_id, "1"); let response = ActionResponse::progress(&action.action_id, "Launched", 0); - rt.block_on(bridge_tx.send_action_response(response)); + rt.block_on(bridge_tx_1.send_action_response(response)); std::thread::sleep(Duration::from_secs(3)); let response = ActionResponse::success(&action.action_id); - rt.block_on(bridge_tx.send_action_response(response)); + rt.block_on(bridge_tx_1.send_action_response(response)); }); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route)); - let action = action_rx.recv().unwrap(); + let action = action_rx_2.recv().unwrap(); assert_eq!(action.action_id, "2"); let response = ActionResponse::progress(&action.action_id, "Running", 0); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); std::thread::sleep(Duration::from_secs(1)); let response = ActionResponse::success(&action.action_id); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); }); std::thread::sleep(Duration::from_secs(1)); @@ -881,33 +900,38 @@ mod tests { let tmpdir = tempdir::TempDir::new("bridge").unwrap(); std::env::set_current_dir(&tmpdir).unwrap(); let config = default_config(); - let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config)); - let bridge_tx_clone = bridge_tx.clone(); + let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config)); + let bridge_tx_1 = bridge.tx(); + let bridge_tx_2 = bridge.tx(); + + let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; + let action_rx_1 = bridge.register_action_route(test_route); + + let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; + let action_rx = bridge.register_action_route(tunshell_route); + + spawn_bridge(bridge); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let test_route = ActionRoute { name: "test".to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route)); - let action = action_rx.recv().unwrap(); + let action = action_rx_1.recv().unwrap(); assert_eq!(action.action_id, "1"); let response = ActionResponse::progress(&action.action_id, "Running", 0); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_1.send_action_response(response)); std::thread::sleep(Duration::from_secs(3)); let response = ActionResponse::success(&action.action_id); - rt.block_on(bridge_tx_clone.send_action_response(response)); + rt.block_on(bridge_tx_1.send_action_response(response)); }); std::thread::spawn(move || { let rt = Runtime::new().unwrap(); - let test_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 }; - let action_rx = rt.block_on(bridge_tx.register_action_route(test_route)); let action = action_rx.recv().unwrap(); assert_eq!(action.action_id, "2"); let response = ActionResponse::progress(&action.action_id, "Launched", 0); - rt.block_on(bridge_tx.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); std::thread::sleep(Duration::from_secs(1)); let response = ActionResponse::success(&action.action_id); - rt.block_on(bridge_tx.send_action_response(response)); + rt.block_on(bridge_tx_2.send_action_response(response)); }); std::thread::sleep(Duration::from_secs(1)); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 7ce0877d..000950c8 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -48,6 +48,7 @@ //! [`action_redirections`]: Config#structfield.action_redirections use bytes::BytesMut; +use flume::Receiver; use futures_util::StreamExt; use log::{error, info, warn}; use reqwest::{Certificate, Client, ClientBuilder, Identity, Response}; @@ -91,6 +92,7 @@ pub enum Error { /// to the connected bridge application. pub struct FileDownloader { config: DownloaderConfig, + actions_rx: Receiver, action_id: String, bridge_tx: BridgeTx, client: Client, @@ -100,7 +102,11 @@ pub struct FileDownloader { impl FileDownloader { /// Creates a handler for download actions within uplink and uses HTTP to download files. - pub fn new(config: Arc, bridge_tx: BridgeTx) -> Result { + pub fn new( + config: Arc, + actions_rx: Receiver, + bridge_tx: BridgeTx, + ) -> Result { // Authenticate with TLS certs from config let client_builder = ClientBuilder::new(); let client = match &config.authentication { @@ -125,6 +131,7 @@ impl FileDownloader { Ok(Self { config: config.downloader.clone(), + actions_rx, timeouts, client, bridge_tx, @@ -137,16 +144,10 @@ impl FileDownloader { /// back to bridge for further processing, e.g. OTA update installation. #[tokio::main(flavor = "current_thread")] pub async fn start(mut self) { - let routes = &self.config.actions; - let download_rx = match self.bridge_tx.register_action_routes(routes).await { - Some(r) => r, - _ => return, - }; - info!("Downloader thread is ready to receive download actions"); loop { self.sequence = 0; - let action = match download_rx.recv_async().await { + let action = match self.actions_rx.recv_async().await { Ok(a) => a, Err(e) => { error!("Downloader thread had to stop: {e}"); @@ -383,8 +384,9 @@ mod test { let (shutdown_handle, _) = bounded(1); let bridge_tx = BridgeTx { events_tx, shutdown_handle }; - // Create channels to forward and push action_status on - let downloader = FileDownloader::new(Arc::new(config), bridge_tx).unwrap(); + // Create channels to forward and push actions on + let (download_tx, download_rx) = bounded(1); + let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap(); // Start FileDownloader in separate thread std::thread::spawn(|| downloader.start()); @@ -405,11 +407,6 @@ mod test { payload: json!(download_update).to_string(), }; - let download_tx = match events_rx.recv().unwrap() { - Event::RegisterActionRoute(_, download_tx) => download_tx, - e => unreachable!("Unexpected event: {e:#?}"), - }; - std::thread::sleep(Duration::from_millis(10)); // Send action to FileDownloader with Sender @@ -454,12 +451,13 @@ mod test { path: format!("{}/download", DOWNLOAD_DIR), }; let config = config(downloader_cfg.clone()); - let (events_tx, events_rx) = flume::bounded(3); + let (events_tx, _) = flume::bounded(3); let (shutdown_handle, _) = bounded(1); let bridge_tx = BridgeTx { events_tx, shutdown_handle }; - // Create channels to forward and push action_status on - let downloader = FileDownloader::new(Arc::new(config), bridge_tx).unwrap(); + // Create channels to forward and push actions on + let (download_tx, download_rx) = bounded(1); + let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap(); // Start FileDownloader in separate thread std::thread::spawn(|| downloader.start()); @@ -480,11 +478,6 @@ mod test { payload: json!(download_update).to_string(), }; - let download_tx = match events_rx.recv().unwrap() { - Event::RegisterActionRoute(_, download_tx) => download_tx, - e => unreachable!("Unexpected event: {e:#?}"), - }; - std::thread::sleep(Duration::from_millis(10)); // Send action to FileDownloader with Sender diff --git a/uplink/src/collector/installer.rs b/uplink/src/collector/installer.rs index 007bf87d..b9ff027f 100644 --- a/uplink/src/collector/installer.rs +++ b/uplink/src/collector/installer.rs @@ -1,5 +1,6 @@ use std::{fs::File, path::PathBuf}; +use flume::Receiver; use log::{debug, error, warn}; use tar::Archive; use tokio::process::Command; @@ -22,22 +23,18 @@ pub enum Error { pub struct OTAInstaller { config: InstallerConfig, + actions_rx: Receiver, bridge_tx: BridgeTx, } impl OTAInstaller { - pub fn new(config: InstallerConfig, bridge_tx: BridgeTx) -> Self { - Self { config, bridge_tx } + pub fn new(config: InstallerConfig, actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { config, actions_rx, bridge_tx } } #[tokio::main] pub async fn start(&self) { - let actions_rx = match self.bridge_tx.register_action_routes(&self.config.actions).await { - Some(r) => r, - _ => return, - }; - - while let Ok(action) = actions_rx.recv_async().await { + while let Ok(action) = self.actions_rx.recv_async().await { if let Err(e) = self.extractor(&action) { error!("Error extracting tarball: {e}"); self.forward_action_error(action, e).await; diff --git a/uplink/src/collector/journalctl.rs b/uplink/src/collector/journalctl.rs index c934576e..58a81a05 100644 --- a/uplink/src/collector/journalctl.rs +++ b/uplink/src/collector/journalctl.rs @@ -1,3 +1,4 @@ +use flume::Receiver; use serde::{Deserialize, Serialize}; use std::io::BufRead; @@ -5,7 +6,8 @@ use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; use std::{io::BufReader, time::Duration}; -use crate::{base::bridge::BridgeTx, ActionResponse, ActionRoute, Payload}; +use crate::Action; +use crate::{base::bridge::BridgeTx, ActionResponse, Payload}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -100,7 +102,9 @@ impl LogEntry { } pub struct JournalCtl { + config: JournalCtlConfig, kill_switch: Arc>, + actions_rx: Receiver, bridge: BridgeTx, } @@ -112,28 +116,20 @@ impl Drop for JournalCtl { } impl JournalCtl { - pub fn new(bridge: BridgeTx) -> Self { + pub fn new(config: JournalCtlConfig, actions_rx: Receiver, bridge: BridgeTx) -> Self { let kill_switch = Arc::new(Mutex::new(true)); - Self { kill_switch, bridge } + Self { config, kill_switch, actions_rx, bridge } } /// Starts a journalctl instance on a linux system which reports to the logs stream for a given device+project id, /// that logcat instance is killed when this object is dropped. On any other system, it's a noop. #[tokio::main(flavor = "current_thread")] - pub async fn start(mut self, config: JournalCtlConfig) -> Result<(), Error> { - self.spawn_logger(config).await; - - let log_rx = self - .bridge - .register_action_route(ActionRoute { - name: "journalctl_config".to_string(), - timeout: 10, - }) - .await; + pub async fn start(mut self) -> Result<(), Error> { + self.spawn_logger(self.config.clone()).await; loop { - let action = log_rx.recv()?; + let action = self.actions_rx.recv()?; let mut config = serde_json::from_str::(action.payload.as_str())?; config.tags.retain(|tag| !tag.is_empty()); config.units.retain(|unit| !unit.is_empty()); diff --git a/uplink/src/collector/logcat.rs b/uplink/src/collector/logcat.rs index 5e3bd934..4c5ce808 100644 --- a/uplink/src/collector/logcat.rs +++ b/uplink/src/collector/logcat.rs @@ -1,3 +1,4 @@ +use flume::Receiver; use serde::{Deserialize, Serialize}; use std::io::{BufRead, BufReader}; @@ -6,7 +7,8 @@ use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use crate::{base::clock, ActionResponse, ActionRoute, BridgeTx, Payload}; +use crate::base::{bridge::BridgeTx, clock}; +use crate::{Action, ActionResponse, Payload}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -97,8 +99,8 @@ pub struct LogEntry { } lazy_static::lazy_static! { - pub static ref LOGCAT_RE: regex::Regex = regex::Regex::new(r#"^(\S+ \S+) (\w)/([^(\s]*).+?:\s*(.*)$"#).unwrap(); - pub static ref LOGCAT_TIME_RE: regex::Regex = regex::Regex::new(r#"^(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)\.(\d+)$"#).unwrap(); + pub static ref LOGCAT_RE: regex::Regex = regex::Regex::new(r"^(\S+ \S+) (\w)/([^(\s]*).+?:\s*(.*)$").unwrap(); + pub static ref LOGCAT_TIME_RE: regex::Regex = regex::Regex::new(r"^(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)\.(\d+)$").unwrap(); } pub fn parse_logcat_time(s: &str) -> Option { @@ -150,7 +152,9 @@ impl LogEntry { } pub struct Logcat { + config: LogcatConfig, kill_switch: Arc>, + actions_rx: Receiver, bridge: BridgeTx, } @@ -162,25 +166,20 @@ impl Drop for Logcat { } impl Logcat { - pub fn new(bridge: BridgeTx) -> Self { + pub fn new(config: LogcatConfig, actions_rx: Receiver, bridge: BridgeTx) -> Self { let kill_switch = Arc::new(Mutex::new(true)); - Self { kill_switch, bridge } + Self { config, kill_switch, actions_rx, bridge } } /// On an android system, starts a logcat instance that reports to the logs stream for a given device+project id, /// that logcat instance is killed when this object is dropped. On any other system, it's a noop. #[tokio::main(flavor = "current_thread")] - pub async fn start(mut self, config: LogcatConfig) -> Result<(), Error> { - self.spawn_logger(config).await; - - let log_rx = self - .bridge - .register_action_route(ActionRoute { name: "logcat_config".to_string(), timeout: 10 }) - .await; + pub async fn start(mut self) -> Result<(), Error> { + self.spawn_logger(self.config.clone()).await; loop { - let action = log_rx.recv()?; + let action = self.actions_rx.recv()?; let mut config = serde_json::from_str::(action.payload.as_str())?; config.tags.retain(|tag| !tag.is_empty()); diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index ba75bcd4..69bf668e 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -1,4 +1,4 @@ -use flume::{RecvError, SendError}; +use flume::{Receiver, RecvError, SendError}; use log::{debug, error, info}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; @@ -6,8 +6,7 @@ use tokio::process::{Child, Command}; use tokio::{pin, select, time}; use crate::base::bridge::BridgeTx; -use crate::base::ActionRoute; -use crate::{ActionResponse, Package}; +use crate::{Action, ActionResponse, Package}; use std::io; use std::process::Stdio; @@ -34,13 +33,15 @@ pub enum Error { /// is in progress. /// It sends result and errors to the broker over collector_tx pub struct ProcessHandler { - // to receive actions and send responses back to bridge + // to receive actions + actions_rx: Receiver, + // to send responses back to bridge bridge_tx: BridgeTx, } impl ProcessHandler { - pub fn new(bridge_tx: BridgeTx) -> Self { - Self { bridge_tx } + pub fn new(actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { actions_rx, bridge_tx } } /// Run a process of specified command @@ -85,14 +86,10 @@ impl ProcessHandler { Ok(()) } - pub async fn start(mut self, processes: Vec) -> Result<(), Error> { - let action_rx = match self.bridge_tx.register_action_routes(processes).await { - Some(r) => r, - _ => return Ok(()), - }; - + #[tokio::main(flavor = "current_thread")] + pub async fn start(mut self) -> Result<(), Error> { loop { - let action = action_rx.recv_async().await?; + let action = self.actions_rx.recv_async().await?; let command = String::from("tools/") + &action.name; // Spawn the action and capture its stdout diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 5412eb79..f65b997b 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -1,4 +1,4 @@ -use flume::{RecvError, SendError}; +use flume::{Receiver, RecvError, SendError}; use log::{debug, error, info, warn}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; @@ -8,7 +8,7 @@ use tokio::time::timeout; use super::downloader::DownloadFile; use crate::base::{bridge::BridgeTx, ActionRoute}; -use crate::{ActionResponse, Package}; +use crate::{Action, ActionResponse, Package}; use std::collections::HashMap; use std::io; @@ -35,15 +35,23 @@ pub enum Error { /// Multiple scripts can't be run in parallel. It can also send progress, result and errors to the platform by using /// the JSON formatted output over STDOUT. pub struct ScriptRunner { - // to receive actions and send responses back to bridge + // to receive actions + actions_rx: Receiver, + // to send responses back to bridge bridge_tx: BridgeTx, timeouts: HashMap, sequence: u32, } impl ScriptRunner { - pub fn new(bridge_tx: BridgeTx) -> Self { - Self { bridge_tx, timeouts: HashMap::new(), sequence: 0 } + pub fn new( + routes: Vec, + actions_rx: Receiver, + bridge_tx: BridgeTx, + ) -> Self { + let timeouts = + routes.iter().map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))).collect(); + Self { actions_rx, bridge_tx, timeouts, sequence: 0 } } /// Spawn a child process to run the script with sh @@ -92,19 +100,12 @@ impl ScriptRunner { Ok(()) } - pub async fn start(mut self, routes: Vec) -> Result<(), Error> { - self.timeouts = - routes.iter().map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))).collect(); - - let action_rx = match self.bridge_tx.register_action_routes(routes).await { - Some(r) => r, - _ => return Ok(()), - }; - + #[tokio::main(flavor = "current_thread")] + pub async fn start(mut self) -> Result<(), Error> { info!("Script runner is ready"); loop { - let action = action_rx.recv_async().await?; + let action = self.actions_rx.recv_async().await?; let command = match serde_json::from_str::(&action.payload) { Ok(DownloadFile { download_path: Some(download_path), .. }) => download_path, Ok(_) => { @@ -156,32 +157,21 @@ mod tests { use std::thread; use super::*; - use crate::{ - base::bridge::{ActionRouter, Event}, - Action, - }; + use crate::{base::bridge::Event, Action}; use flume::bounded; - use tokio::runtime::Runtime; #[test] fn empty_payload() { let (events_tx, events_rx) = bounded(1); let (shutdown_handle, _) = bounded(1); - let script_runner = ScriptRunner::new(BridgeTx { events_tx, shutdown_handle }); + let (actions_tx, actions_rx) = bounded(1); let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; + let script_runner = + ScriptRunner::new(routes, actions_rx, BridgeTx { events_tx, shutdown_handle }); - thread::spawn(move || { - Runtime::new().unwrap().block_on(async { - script_runner.start(routes).await.unwrap(); - }) - }); + thread::spawn(move || script_runner.start().unwrap()); - let Event::RegisterActionRoute(_, ActionRouter { actions_tx, .. }) = - events_rx.recv().unwrap() - else { - unreachable!() - }; actions_tx .send(Action { action_id: "1".to_string(), @@ -203,20 +193,13 @@ mod tests { fn missing_path() { let (events_tx, events_rx) = bounded(1); let (shutdown_handle, _) = bounded(1); - let script_runner = ScriptRunner::new(BridgeTx { events_tx, shutdown_handle }); + let (actions_tx, actions_rx) = bounded(1); let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; + let script_runner = + ScriptRunner::new(routes, actions_rx, BridgeTx { events_tx, shutdown_handle }); - thread::spawn(move || { - Runtime::new().unwrap().block_on(async { - script_runner.start(routes).await.unwrap(); - }) - }); + thread::spawn(move || script_runner.start().unwrap()); - let Event::RegisterActionRoute(_, ActionRouter { actions_tx, .. }) = - events_rx.recv().unwrap() - else { - unreachable!() - }; actions_tx .send(Action { action_id: "1".to_string(), diff --git a/uplink/src/collector/simulator/mod.rs b/uplink/src/collector/simulator/mod.rs index eca338a2..10d81ec0 100644 --- a/uplink/src/collector/simulator/mod.rs +++ b/uplink/src/collector/simulator/mod.rs @@ -2,7 +2,7 @@ use crate::base::bridge::{BridgeTx, Payload}; use crate::base::{clock, SimulatorConfig}; use crate::Action; use data::{Bms, DeviceData, DeviceShadow, Gps, Imu, Motor, PeripheralState}; -use flume::{bounded, Sender}; +use flume::{bounded, Receiver, Sender}; use log::{error, info}; use rand::Rng; use serde::Serialize; @@ -117,12 +117,14 @@ pub fn spawn_data_simulators(device: DeviceData, tx: Sender) { } #[tokio::main(flavor = "current_thread")] -pub async fn start(config: SimulatorConfig, bridge_tx: BridgeTx) -> Result<(), Error> { +pub async fn start( + config: SimulatorConfig, + bridge_tx: BridgeTx, + actions_rx: Option>, +) -> Result<(), Error> { let path = read_gps_path(&config.gps_paths); let device = new_device_data(path); - let actions_rx = bridge_tx.register_action_routes(&config.actions).await; - let (tx, rx) = bounded(10); spawn_data_simulators(device, tx.clone()); diff --git a/uplink/src/collector/tcpjson.rs b/uplink/src/collector/tcpjson.rs index a59f09e2..f3679520 100644 --- a/uplink/src/collector/tcpjson.rs +++ b/uplink/src/collector/tcpjson.rs @@ -43,9 +43,12 @@ pub struct TcpJson { } impl TcpJson { - pub async fn new(name: String, config: AppConfig, bridge: BridgeTx) -> TcpJson { - let actions_rx = bridge.register_action_routes(&config.actions).await; - + pub fn new( + name: String, + config: AppConfig, + actions_rx: Option>, + bridge: BridgeTx, + ) -> TcpJson { // Note: We can register `TcpJson` itself as an app to direct actions to it TcpJson { name, config, bridge, actions_rx } } diff --git a/uplink/src/collector/tunshell.rs b/uplink/src/collector/tunshell.rs index 4019ceb2..21608a28 100644 --- a/uplink/src/collector/tunshell.rs +++ b/uplink/src/collector/tunshell.rs @@ -1,12 +1,10 @@ +use flume::Receiver; use log::error; use serde::{Deserialize, Serialize}; use tokio_compat_02::FutureExt; use tunshell_client::{Client, ClientMode, Config, HostShell}; -use crate::{ - base::{bridge::BridgeTx, ActionRoute}, - Action, ActionResponse, -}; +use crate::{base::bridge::BridgeTx, Action, ActionResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -27,12 +25,13 @@ pub struct Keys { #[derive(Debug, Clone)] pub struct TunshellClient { + actions_rx: Receiver, bridge: BridgeTx, } impl TunshellClient { - pub fn new(bridge: BridgeTx) -> Self { - Self { bridge } + pub fn new(actions_rx: Receiver, bridge: BridgeTx) -> Self { + Self { actions_rx, bridge } } fn config(&self, keys: Keys) -> Config { @@ -50,10 +49,7 @@ impl TunshellClient { #[tokio::main(flavor = "current_thread")] pub async fn start(self) { - let route = ActionRoute { name: "launch_shell".to_owned(), timeout: 10 }; - let actions_rx = self.bridge.register_action_route(route).await; - - while let Ok(action) = actions_rx.recv_async().await { + while let Ok(action) = self.actions_rx.recv_async().await { let session = self.clone(); //TODO(RT): Findout why this is spawned. We want to send other action's with shell? tokio::spawn(async move { diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 7043327e..f6ef14b5 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -51,6 +51,10 @@ use base::monitor::Monitor; use collector::device_shadow::DeviceShadow; use collector::downloader::FileDownloader; use collector::installer::OTAInstaller; +#[cfg(target_os = "linux")] +use collector::journalctl::JournalCtl; +#[cfg(target_os = "android")] +use collector::logcat::Logcat; use collector::process::ProcessHandler; use collector::script_runner::ScriptRunner; use collector::systemstats::StatCollector; @@ -261,7 +265,7 @@ pub mod config { } pub use base::actions::{Action, ActionResponse}; -use base::bridge::{Bridge, BridgeTx, Package, Payload, Point, StreamMetrics}; +use base::bridge::{Bridge, Package, Payload, Point, StreamMetrics}; use base::mqtt::Mqtt; use base::serializer::{Serializer, SerializerMetrics}; pub use base::{ActionRoute, Config}; @@ -305,34 +309,19 @@ impl Uplink { }) } - pub fn spawn(&mut self) -> Result { - let config = self.config.clone(); - let mut bridge = Bridge::new( + pub fn configure_bridge(&mut self) -> Bridge { + Bridge::new( self.config.clone(), self.data_tx.clone(), self.stream_metrics_tx(), self.action_rx.clone(), self.shutdown_tx.clone(), - ); + ) + } - let bridge_tx = bridge.tx(); + pub fn spawn(&mut self, mut bridge: Bridge) -> Result<(), Error> { let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10); - // Bridge thread to batch data and redicet actions - thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("bridge") - .enable_time() - .build() - .unwrap(); - - rt.block_on(async move { - if let Err(e) = bridge.start().await { - error!("Bridge stopped!! Error = {:?}", e); - } - }) - }); - let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx); let mqtt_client = mqtt.client(); @@ -373,60 +362,6 @@ impl Uplink { }) }); - let tunshell_client = TunshellClient::new(bridge_tx.clone()); - thread::spawn(move || tunshell_client.start()); - - let file_downloader = FileDownloader::new(config.clone(), bridge_tx.clone())?; - thread::spawn(move || file_downloader.start()); - - let device_shadow = DeviceShadow::new(config.device_shadow.clone(), bridge_tx.clone()); - thread::spawn(move || device_shadow.start()); - - if let Some(config) = &config.ota_installer { - let ota_installer = OTAInstaller::new(config.clone(), bridge_tx.clone()); - thread::spawn(move || ota_installer.start()); - } - - #[cfg(target_os = "linux")] - if let Some(config) = &config.logging { - let logger = collector::journalctl::JournalCtl::new(bridge_tx.clone()); - let config = config.clone(); - thread::spawn(move || logger.start(config)); - } - - #[cfg(target_os = "android")] - if let Some(config) = &config.logging { - let logger = collector::logcat::Logcat::new(bridge_tx.clone()); - let config = config.clone(); - thread::spawn(move || logger.start(config)); - } - - if config.system_stats.enabled { - let stat_collector = StatCollector::new(config.clone(), bridge_tx.clone()); - thread::spawn(move || stat_collector.start()); - } - - let process_handler = ProcessHandler::new(bridge_tx.clone()); - let processes = config.processes.clone(); - thread::spawn(move || process_handler.start(processes)); - - let script_runner = ScriptRunner::new(bridge_tx.clone()); - let routes: Vec = config.script_runner.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("script_runner") - .enable_io() - .enable_time() - .build() - .unwrap(); - - rt.block_on(async move { - if let Err(e) = script_runner.start(routes).await { - error!("Monitor stopped!! Error = {:?}", e); - } - }) - }); - let monitor = Monitor::new( self.config.clone(), mqtt_client, @@ -450,7 +385,97 @@ impl Uplink { }) }); - Ok(bridge_tx) + // Bridge thread to batch data and redicet actions + thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .thread_name("bridge") + .enable_time() + .build() + .unwrap(); + + rt.block_on(async move { + if let Err(e) = bridge.start().await { + error!("Bridge stopped!! Error = {:?}", e); + } + }) + }); + + Ok(()) + } + + pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> { + let bridge_tx = bridge.tx(); + + let route = ActionRoute { name: "launch_shell".to_owned(), timeout: 10 }; + let actions_rx = bridge.register_action_route(route); + let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); + thread::spawn(move || tunshell_client.start()); + + if let Some(actions_rx) = bridge.register_action_routes(&self.config.downloader.actions) { + let file_downloader = + FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?; + thread::spawn(move || file_downloader.start()); + } + + let device_shadow = DeviceShadow::new(self.config.device_shadow.clone(), bridge_tx.clone()); + thread::spawn(move || device_shadow.start()); + + if let Some(config) = self.config.ota_installer.clone() { + if let Some(actions_rx) = bridge.register_action_routes(&config.actions) { + let ota_installer = OTAInstaller::new(config, actions_rx, bridge_tx.clone()); + thread::spawn(move || ota_installer.start()); + } + } + + #[cfg(target_os = "linux")] + if let Some(config) = self.config.logging.clone() { + let route = ActionRoute { name: "journalctl_config".to_string(), timeout: 10 }; + let actions_rx = bridge.register_action_route(route); + let logger = JournalCtl::new(config.clone(), actions_rx, bridge_tx.clone()); + thread::spawn(move || { + if let Err(e) = logger.start() { + error!("Logger stopped!! Error = {:?}", e); + } + }); + } + + #[cfg(target_os = "android")] + if let Some(config) = self.config.logging.clone() { + let route = ActionRoute { name: "journalctl_config".to_string(), timeout: 10 }; + let actions_rx = bridge.register_action_route(route); + let logger = Logcat::new(config.clone(), actions_rx, bridge_tx.clone()); + thread::spawn(move || { + if let Err(e) = logger.start() { + error!("Logger stopped!! Error = {:?}", e); + } + }); + } + + if self.config.system_stats.enabled { + let stat_collector = StatCollector::new(self.config.clone(), bridge_tx.clone()); + thread::spawn(move || stat_collector.start()); + }; + + if let Some(actions_rx) = bridge.register_action_routes(&self.config.processes) { + let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); + thread::spawn(move || { + if let Err(e) = process_handler.start() { + error!("Process handler stopped!! Error = {:?}", e); + } + }); + } + + if let Some(actions_rx) = bridge.register_action_routes(&self.config.script_runner) { + let script_runner = + ScriptRunner::new(self.config.script_runner.clone(), actions_rx, bridge_tx.clone()); + thread::spawn(move || { + if let Err(e) = script_runner.start() { + error!("Script runner stopped!! Error = {:?}", e); + } + }); + } + + Ok(()) } pub fn bridge_action_rx(&self) -> Receiver { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index a9386efb..823a6e1b 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -118,19 +118,33 @@ fn main() -> Result<(), Error> { banner(&commandline, &config); let mut uplink = Uplink::new(config.clone())?; - let bridge = uplink.spawn()?; + let mut bridge = uplink.configure_bridge(); + uplink.spawn_builtins(&mut bridge)?; + + let bridge_tx = bridge.tx(); + + let mut tcpapps = vec![]; + for (app, cfg) in config.tcpapps.clone() { + let actions_rx = bridge.register_action_routes(&cfg.actions); + tcpapps.push(TcpJson::new(app, cfg, actions_rx, bridge.tx())); + } + + let simulator_actions = + config.simulator.as_ref().and_then(|cfg| bridge.register_action_routes(&cfg.actions)); + + uplink.spawn(bridge)?; if let Some(config) = config.simulator.clone() { - let bridge = bridge.clone(); + let bridge_tx = bridge_tx.clone(); thread::spawn(move || { - simulator::start(config, bridge).unwrap(); + simulator::start(config, bridge_tx, simulator_actions).unwrap(); }); } if config.console.enabled { let port = config.console.port; - let bridge_handle = bridge.clone(); - thread::spawn(move || console::start(port, reload_handle, bridge_handle)); + let bridge_tx = bridge_tx.clone(); + thread::spawn(move || console::start(port, reload_handle, bridge_tx)); } let rt = tokio::runtime::Builder::new_current_thread() @@ -141,10 +155,9 @@ fn main() -> Result<(), Error> { .unwrap(); rt.block_on(async { - for (app, cfg) in config.tcpapps.iter() { - let tcpjson = TcpJson::new(app.to_owned(), cfg.clone(), bridge.clone()).await; + for app in tcpapps { tokio::task::spawn(async move { - if let Err(e) = tcpjson.start().await { + if let Err(e) = app.start().await { error!("App failed. Error = {:?}", e); } }); @@ -160,7 +173,7 @@ fn main() -> Result<(), Error> { // Handle a shutdown signal from POSIX while let Some(signal) = signals.next().await { match signal { - SIGTERM | SIGINT | SIGQUIT => bridge.trigger_shutdown().await, + SIGTERM | SIGINT | SIGQUIT => bridge_tx.trigger_shutdown().await, s => error!("Couldn't handle signal: {s}"), } }