Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: setup connections with bridge before spawn #277

Merged
merged 14 commits into from
Sep 7, 2023
196 changes: 110 additions & 86 deletions uplink/src/base/bridge/mod.rs

Large diffs are not rendered by default.

39 changes: 16 additions & 23 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
//! [`action_redirections`]: Config#structfield.action_redirections

use bytes::BytesMut;
use flume::Receiver;
use futures_util::StreamExt;
use log::{error, info, warn};
use reqwest::{Certificate, Client, ClientBuilder, Identity, Response};
Expand Down Expand Up @@ -91,6 +92,7 @@ pub enum Error {
/// to the connected bridge application.
pub struct FileDownloader {
config: DownloaderConfig,
actions_rx: Receiver<Action>,
action_id: String,
bridge_tx: BridgeTx,
client: Client,
Expand All @@ -100,7 +102,11 @@ pub struct FileDownloader {

impl FileDownloader {
/// Creates a handler for download actions within uplink and uses HTTP to download files.
pub fn new(config: Arc<Config>, bridge_tx: BridgeTx) -> Result<Self, Error> {
pub fn new(
config: Arc<Config>,
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
) -> Result<Self, Error> {
// Authenticate with TLS certs from config
let client_builder = ClientBuilder::new();
let client = match &config.authentication {
Expand All @@ -125,6 +131,7 @@ impl FileDownloader {

Ok(Self {
config: config.downloader.clone(),
actions_rx,
timeouts,
client,
bridge_tx,
Expand All @@ -137,16 +144,10 @@ impl FileDownloader {
/// back to bridge for further processing, e.g. OTA update installation.
#[tokio::main(flavor = "current_thread")]
pub async fn start(mut self) {
let routes = &self.config.actions;
let download_rx = match self.bridge_tx.register_action_routes(routes).await {
Some(r) => r,
_ => return,
};

info!("Downloader thread is ready to receive download actions");
loop {
self.sequence = 0;
let action = match download_rx.recv_async().await {
let action = match self.actions_rx.recv_async().await {
Ok(a) => a,
Err(e) => {
error!("Downloader thread had to stop: {e}");
Expand Down Expand Up @@ -383,8 +384,9 @@ mod test {
let (shutdown_handle, _) = bounded(1);
let bridge_tx = BridgeTx { events_tx, shutdown_handle };

// Create channels to forward and push action_status on
let downloader = FileDownloader::new(Arc::new(config), bridge_tx).unwrap();
// Create channels to forward and push actions on
let (download_tx, download_rx) = bounded(1);
let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());
Expand All @@ -405,11 +407,6 @@ mod test {
payload: json!(download_update).to_string(),
};

let download_tx = match events_rx.recv().unwrap() {
Event::RegisterActionRoute(_, download_tx) => download_tx,
e => unreachable!("Unexpected event: {e:#?}"),
};

std::thread::sleep(Duration::from_millis(10));

// Send action to FileDownloader with Sender<Action>
Expand Down Expand Up @@ -454,12 +451,13 @@ mod test {
path: format!("{}/download", DOWNLOAD_DIR),
};
let config = config(downloader_cfg.clone());
let (events_tx, events_rx) = flume::bounded(3);
let (events_tx, _) = flume::bounded(3);
let (shutdown_handle, _) = bounded(1);
let bridge_tx = BridgeTx { events_tx, shutdown_handle };

// Create channels to forward and push action_status on
let downloader = FileDownloader::new(Arc::new(config), bridge_tx).unwrap();
// Create channels to forward and push actions on
let (download_tx, download_rx) = bounded(1);
let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());
Expand All @@ -480,11 +478,6 @@ mod test {
payload: json!(download_update).to_string(),
};

let download_tx = match events_rx.recv().unwrap() {
Event::RegisterActionRoute(_, download_tx) => download_tx,
e => unreachable!("Unexpected event: {e:#?}"),
};

std::thread::sleep(Duration::from_millis(10));

// Send action to FileDownloader with Sender<Action>
Expand Down
13 changes: 5 additions & 8 deletions uplink/src/collector/installer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fs::File, path::PathBuf};

use flume::Receiver;
use log::{debug, error, warn};
use tar::Archive;
use tokio::process::Command;
Expand All @@ -22,22 +23,18 @@ pub enum Error {

pub struct OTAInstaller {
config: InstallerConfig,
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
}

impl OTAInstaller {
pub fn new(config: InstallerConfig, bridge_tx: BridgeTx) -> Self {
Self { config, bridge_tx }
pub fn new(config: InstallerConfig, actions_rx: Receiver<Action>, bridge_tx: BridgeTx) -> Self {
Self { config, actions_rx, bridge_tx }
}

#[tokio::main]
pub async fn start(&self) {
let actions_rx = match self.bridge_tx.register_action_routes(&self.config.actions).await {
Some(r) => r,
_ => return,
};

while let Ok(action) = actions_rx.recv_async().await {
while let Ok(action) = self.actions_rx.recv_async().await {
if let Err(e) = self.extractor(&action) {
error!("Error extracting tarball: {e}");
self.forward_action_error(action, e).await;
Expand Down
24 changes: 10 additions & 14 deletions uplink/src/collector/journalctl.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use flume::Receiver;
use serde::{Deserialize, Serialize};

use std::io::BufRead;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::{io::BufReader, time::Duration};

use crate::{base::bridge::BridgeTx, ActionResponse, ActionRoute, Payload};
use crate::Action;
use crate::{base::bridge::BridgeTx, ActionResponse, Payload};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -100,7 +102,9 @@ impl LogEntry {
}

pub struct JournalCtl {
config: JournalCtlConfig,
kill_switch: Arc<Mutex<bool>>,
actions_rx: Receiver<Action>,
bridge: BridgeTx,
}

Expand All @@ -112,28 +116,20 @@ impl Drop for JournalCtl {
}

impl JournalCtl {
pub fn new(bridge: BridgeTx) -> Self {
pub fn new(config: JournalCtlConfig, actions_rx: Receiver<Action>, bridge: BridgeTx) -> Self {
let kill_switch = Arc::new(Mutex::new(true));

Self { kill_switch, bridge }
Self { config, kill_switch, actions_rx, bridge }
}

/// Starts a journalctl instance on a linux system which reports to the logs stream for a given device+project id,
/// that logcat instance is killed when this object is dropped. On any other system, it's a noop.
#[tokio::main(flavor = "current_thread")]
pub async fn start(mut self, config: JournalCtlConfig) -> Result<(), Error> {
self.spawn_logger(config).await;

let log_rx = self
.bridge
.register_action_route(ActionRoute {
name: "journalctl_config".to_string(),
timeout: 10,
})
.await;
pub async fn start(mut self) -> Result<(), Error> {
self.spawn_logger(self.config.clone()).await;

loop {
let action = log_rx.recv()?;
let action = self.actions_rx.recv()?;
let mut config = serde_json::from_str::<JournalCtlConfig>(action.payload.as_str())?;
config.tags.retain(|tag| !tag.is_empty());
config.units.retain(|unit| !unit.is_empty());
Expand Down
25 changes: 12 additions & 13 deletions uplink/src/collector/logcat.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use flume::Receiver;
use serde::{Deserialize, Serialize};

use std::io::{BufRead, BufReader};
Expand All @@ -6,7 +7,8 @@ use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::{base::clock, ActionResponse, ActionRoute, BridgeTx, Payload};
use crate::base::{bridge::BridgeTx, clock};
use crate::{Action, ActionResponse, Payload};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -97,8 +99,8 @@ pub struct LogEntry {
}

lazy_static::lazy_static! {
pub static ref LOGCAT_RE: regex::Regex = regex::Regex::new(r#"^(\S+ \S+) (\w)/([^(\s]*).+?:\s*(.*)$"#).unwrap();
pub static ref LOGCAT_TIME_RE: regex::Regex = regex::Regex::new(r#"^(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)\.(\d+)$"#).unwrap();
pub static ref LOGCAT_RE: regex::Regex = regex::Regex::new(r"^(\S+ \S+) (\w)/([^(\s]*).+?:\s*(.*)$").unwrap();
de-sh marked this conversation as resolved.
Show resolved Hide resolved
pub static ref LOGCAT_TIME_RE: regex::Regex = regex::Regex::new(r"^(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)\.(\d+)$").unwrap();
}

pub fn parse_logcat_time(s: &str) -> Option<u64> {
Expand Down Expand Up @@ -150,7 +152,9 @@ impl LogEntry {
}

pub struct Logcat {
config: LogcatConfig,
kill_switch: Arc<Mutex<bool>>,
actions_rx: Receiver<Action>,
bridge: BridgeTx,
}

Expand All @@ -162,25 +166,20 @@ impl Drop for Logcat {
}

impl Logcat {
pub fn new(bridge: BridgeTx) -> Self {
pub fn new(config: LogcatConfig, actions_rx: Receiver<Action>, bridge: BridgeTx) -> Self {
let kill_switch = Arc::new(Mutex::new(true));

Self { kill_switch, bridge }
Self { config, kill_switch, actions_rx, bridge }
}

/// On an android system, starts a logcat instance that reports to the logs stream for a given device+project id,
/// that logcat instance is killed when this object is dropped. On any other system, it's a noop.
#[tokio::main(flavor = "current_thread")]
pub async fn start(mut self, config: LogcatConfig) -> Result<(), Error> {
self.spawn_logger(config).await;

let log_rx = self
.bridge
.register_action_route(ActionRoute { name: "logcat_config".to_string(), timeout: 10 })
.await;
pub async fn start(mut self) -> Result<(), Error> {
self.spawn_logger(self.config.clone()).await;

loop {
let action = log_rx.recv()?;
let action = self.actions_rx.recv()?;
let mut config = serde_json::from_str::<LogcatConfig>(action.payload.as_str())?;
config.tags.retain(|tag| !tag.is_empty());

Expand Down
22 changes: 9 additions & 13 deletions uplink/src/collector/process.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use flume::{RecvError, SendError};
use flume::{Receiver, RecvError, SendError};
use log::{debug, error, info};
use thiserror::Error;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::{pin, select, time};

use crate::base::bridge::BridgeTx;
use crate::base::ActionRoute;
use crate::{ActionResponse, Package};
use crate::{Action, ActionResponse, Package};

use std::io;
use std::process::Stdio;
Expand All @@ -34,13 +33,15 @@ pub enum Error {
/// is in progress.
/// It sends result and errors to the broker over collector_tx
pub struct ProcessHandler {
// to receive actions and send responses back to bridge
// to receive actions
actions_rx: Receiver<Action>,
// to send responses back to bridge
bridge_tx: BridgeTx,
}

impl ProcessHandler {
pub fn new(bridge_tx: BridgeTx) -> Self {
Self { bridge_tx }
pub fn new(actions_rx: Receiver<Action>, bridge_tx: BridgeTx) -> Self {
Self { actions_rx, bridge_tx }
}

/// Run a process of specified command
Expand Down Expand Up @@ -85,14 +86,9 @@ impl ProcessHandler {
Ok(())
}

pub async fn start(mut self, processes: Vec<ActionRoute>) -> Result<(), Error> {
let action_rx = match self.bridge_tx.register_action_routes(processes).await {
Some(r) => r,
_ => return Ok(()),
};

pub async fn start(mut self) -> Result<(), Error> {
loop {
let action = action_rx.recv_async().await?;
let action = self.actions_rx.recv_async().await?;
let command = String::from("tools/") + &action.name;

// Spawn the action and capture its stdout
Expand Down
Loading