Skip to content

Commit

Permalink
feat: allow stoppage of downloader to free up network (#341)
Browse files Browse the repository at this point in the history
* feat: STOP_DOWNLOAD enables arbitrary downloader stoppage

* feat: disable downloader with console

* fix: use put

* fix: continuously check

* refactor: don't use `AtomicBool`

* fix: set bool in console

* doc: remove comment
  • Loading branch information
Devdutt Shenoi authored May 14, 2024
1 parent a410693 commit 959550a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 17 deletions.
36 changes: 29 additions & 7 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identit
use rsa::sha2::{Digest, Sha256};
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::time::{timeout_at, Instant};
use tokio::time::{sleep, timeout_at, Instant};

use std::fs::{metadata, read, remove_dir_all, remove_file, write, File};
use std::io;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[cfg(unix)]
use std::{
Expand Down Expand Up @@ -108,6 +108,7 @@ pub struct FileDownloader {
bridge_tx: BridgeTx,
client: Client,
shutdown_rx: Receiver<DownloaderShutdown>,
disabled: Arc<Mutex<bool>>,
}

impl FileDownloader {
Expand All @@ -117,6 +118,7 @@ impl FileDownloader {
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
shutdown_rx: Receiver<DownloaderShutdown>,
disabled: Arc<Mutex<bool>>,
) -> Result<Self, Error> {
// Authenticate with TLS certs from config
let client_builder = ClientBuilder::new();
Expand All @@ -140,6 +142,7 @@ impl FileDownloader {
bridge_tx,
action_id: String::default(),
shutdown_rx,
disabled,
})
}

Expand Down Expand Up @@ -259,7 +262,14 @@ impl FileDownloader {
};

// Download and store to disk by streaming as chunks
while let Some(item) = stream.next().await {
loop {
// Checks if downloader is disabled by user or not
if *self.disabled.lock().unwrap() {
// async to ensure download can be cancelled during sleep
sleep(Duration::from_secs(1)).await;
continue;
}
let Some(item) = stream.next().await else { break };
let chunk = match item {
Ok(c) => c,
// Retry non-status errors
Expand Down Expand Up @@ -605,8 +615,14 @@ mod test {
// Create channels to forward and push actions on
let (download_tx, download_rx) = bounded(1);
let (_, ctrl_rx) = bounded(1);
let downloader =
FileDownloader::new(Arc::new(config), download_rx, bridge_tx, ctrl_rx).unwrap();
let downloader = FileDownloader::new(
Arc::new(config),
download_rx,
bridge_tx,
ctrl_rx,
Arc::new(Mutex::new(false)),
)
.unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());
Expand Down Expand Up @@ -666,8 +682,14 @@ mod test {
// Create channels to forward and push action_status on
let (download_tx, download_rx) = bounded(1);
let (_, ctrl_rx) = bounded(1);
let downloader =
FileDownloader::new(Arc::new(config), download_rx, bridge_tx, ctrl_rx).unwrap();
let downloader = FileDownloader::new(
Arc::new(config),
download_rx,
bridge_tx,
ctrl_rx,
Arc::new(Mutex::new(false)),
)
.unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());
Expand Down
46 changes: 43 additions & 3 deletions uplink/src/console.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Router};
use std::sync::{Arc, Mutex};

use axum::{
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{post, put},
Router,
};
use log::info;
use uplink::base::CtrlTx;

Expand All @@ -8,16 +16,24 @@ use crate::ReloadHandle;
struct StateHandle {
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
}

#[tokio::main]
pub async fn start(port: u16, reload_handle: ReloadHandle, ctrl_tx: CtrlTx) {
pub async fn start(
port: u16,
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
) {
let address = format!("0.0.0.0:{port}");
info!("Starting uplink console server: {address}");
let state = StateHandle { reload_handle, ctrl_tx };
let state = StateHandle { reload_handle, ctrl_tx, downloader_disable };
let app = Router::new()
.route("/logs", post(reload_loglevel))
.route("/shutdown", post(shutdown))
.route("/disable_downloader", put(disable_downloader))
.route("/enable_downloader", put(enable_downloader))
.with_state(state);

axum::Server::bind(&address.parse().unwrap()).serve(app.into_make_service()).await.unwrap();
Expand All @@ -38,3 +54,27 @@ async fn shutdown(State(state): State<StateHandle>) -> impl IntoResponse {

StatusCode::OK
}

// Stops downloader from downloading even if it was already stopped
async fn disable_downloader(State(state): State<StateHandle>) -> impl IntoResponse {
info!("Downloader stopped");
let mut is_disabled = state.downloader_disable.lock().unwrap();
if *is_disabled {
StatusCode::ACCEPTED
} else {
*is_disabled = true;
StatusCode::OK
}
}

// Start downloader back up even if it was already not stopped
async fn enable_downloader(State(state): State<StateHandle>) -> impl IntoResponse {
info!("Downloader started");
let mut is_disabled = state.downloader_disable.lock().unwrap();
if *state.downloader_disable.lock().unwrap() {
*is_disabled = false;
StatusCode::OK
} else {
StatusCode::ACCEPTED
}
}
17 changes: 13 additions & 4 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//!```
//! [`port`]: base::AppConfig#structfield.port
//! [`name`]: Action#structfield.name
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -129,7 +129,11 @@ impl Uplink {
)
}

pub fn spawn(&mut self, mut bridge: Bridge) -> Result<CtrlTx, Error> {
pub fn spawn(
&mut self,
mut bridge: Bridge,
downloader_disable: Arc<Mutex<bool>>,
) -> Result<CtrlTx, Error> {
let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10);
let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx();

Expand All @@ -151,8 +155,13 @@ impl Uplink {
// Downloader thread if configured
if !self.config.downloader.actions.is_empty() {
let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?;
let file_downloader =
FileDownloader::new(self.config.clone(), actions_rx, bridge.bridge_tx(), ctrl_rx)?;
let file_downloader = FileDownloader::new(
self.config.clone(),
actions_rx,
bridge.bridge_tx(),
ctrl_rx,
downloader_disable,
)?;
spawn_named_thread("File Downloader", || file_downloader.start());
}

Expand Down
9 changes: 6 additions & 3 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod console;

use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::Error;
Expand Down Expand Up @@ -307,7 +307,8 @@ fn main() -> Result<(), Error> {
_ => None,
};

let ctrl_tx = uplink.spawn(bridge)?;
let downloader_disable = Arc::new(Mutex::new(false));
let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone())?;

if let Some(config) = config.simulator.clone() {
spawn_named_thread("Simulator", || {
Expand All @@ -318,7 +319,9 @@ fn main() -> Result<(), Error> {
if config.console.enabled {
let port = config.console.port;
let ctrl_tx = ctrl_tx.clone();
spawn_named_thread("Uplink Console", move || console::start(port, reload_handle, ctrl_tx));
spawn_named_thread("Uplink Console", move || {
console::start(port, reload_handle, ctrl_tx, downloader_disable)
});
}

let rt = tokio::runtime::Builder::new_current_thread()
Expand Down

0 comments on commit 959550a

Please sign in to comment.