Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: timeout process as per config #287

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions uplink/src/collector/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use log::{debug, error, info};
use thiserror::Error;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::{pin, select, time};
use tokio::select;
use tokio::time::timeout;

use crate::base::bridge::BridgeTx;
use crate::{Action, ActionResponse, Package};
use crate::{Action, ActionResponse, ActionRoute, Package};

use std::collections::HashMap;
use std::io;
use std::process::Stdio;
use std::time::Duration;
Expand Down Expand Up @@ -37,20 +39,26 @@ pub struct ProcessHandler {
actions_rx: Receiver<Action>,
// to send responses back to bridge
bridge_tx: BridgeTx,
// to timeout actions as per action route configuration
timeouts: HashMap<String, Duration>,
}

impl ProcessHandler {
pub fn new(actions_rx: Receiver<Action>, bridge_tx: BridgeTx) -> Self {
Self { actions_rx, bridge_tx }
pub fn new(
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
action_routes: &[ActionRoute],
) -> Self {
let timeouts = action_routes
.iter()
.map(|ActionRoute { name, timeout }| (name.to_owned(), Duration::from_secs(*timeout)))
.collect();

Self { actions_rx, bridge_tx, timeouts }
}

/// Run a process of specified command
pub async fn run(
&mut self,
id: String,
command: String,
payload: String,
) -> Result<Child, Error> {
pub async fn run(&mut self, id: &str, command: &str, payload: &str) -> Result<Child, Error> {
let mut cmd = Command::new(command);
cmd.arg(id).arg(payload).kill_on_drop(true).stdout(Stdio::piped());

Expand All @@ -64,9 +72,6 @@ impl ProcessHandler {
let stdout = child.stdout.take().ok_or(Error::NoStdout)?;
let mut stdout = BufReader::new(stdout).lines();

let timeout = time::sleep(Duration::from_secs(10));
pin!(timeout);

loop {
select! {
Ok(Some(line)) = stdout.next_line() => {
Expand All @@ -78,23 +83,28 @@ impl ProcessHandler {
debug!("Action status: {:?}", status);
self.bridge_tx.send_action_response(status).await;
}
status = child.wait() => info!("Action done!! Status = {:?}", status),
_ = &mut timeout => break
status = child.wait() => {
info!("Action done!! Status = {:?}", status);
return Ok(());
},
}
}

Ok(())
}

#[tokio::main(flavor = "current_thread")]
pub async fn start(mut self) -> Result<(), Error> {
loop {
let action = self.actions_rx.recv_async().await?;
let command = String::from("tools/") + &action.name;
let duration = self.timeouts.get(&action.name).unwrap().to_owned();

// Spawn the action and capture its stdout
let child = self.run(action.action_id, command, action.payload).await?;
self.spawn_and_capture_stdout(child).await?;
// Spawn the action and capture its stdout, ignore timeouts
let child = self.run(&action.action_id, &command, &action.payload).await?;
if let Ok(o) = timeout(duration, self.spawn_and_capture_stdout(child)).await {
o?;
} else {
error!("Process timedout: {command}; action_id = {}", action.action_id);
}
}
}
}
3 changes: 2 additions & 1 deletion uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ impl Uplink {
};

if let Some(actions_rx) = bridge.register_action_routes(&self.config.processes) {
let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone());
let process_handler =
ProcessHandler::new(actions_rx, bridge_tx.clone(), &self.config.processes);
thread::spawn(move || {
if let Err(e) = process_handler.start() {
error!("Process handler stopped!! Error = {:?}", e);
Expand Down
Loading