-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
16 changed files
with
917 additions
and
166 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
//! Utilities for running a cargo command like `cargo check` or `cargo test` in a separate thread and | ||
//! parse its stdout/stderr. | ||
|
||
use std::{ | ||
ffi::OsString, | ||
fmt, io, | ||
path::PathBuf, | ||
process::{ChildStderr, ChildStdout, Command, Stdio}, | ||
}; | ||
|
||
use command_group::{CommandGroup, GroupChild}; | ||
use crossbeam_channel::{unbounded, Receiver, Sender}; | ||
use stdx::process::streaming_output; | ||
|
||
/// Cargo output is structured as a one JSON per line. This trait abstracts parsing one line of | ||
/// cargo output into a Rust data type. | ||
pub(crate) trait ParseFromLine: Sized + Send + 'static { | ||
fn from_line(line: &str, error: &mut String) -> Option<Self>; | ||
fn from_eof() -> Option<Self>; | ||
} | ||
|
||
struct CargoActor<T> { | ||
sender: Sender<T>, | ||
stdout: ChildStdout, | ||
stderr: ChildStderr, | ||
} | ||
|
||
impl<T: ParseFromLine> CargoActor<T> { | ||
fn new(sender: Sender<T>, stdout: ChildStdout, stderr: ChildStderr) -> Self { | ||
CargoActor { sender, stdout, stderr } | ||
} | ||
|
||
fn run(self) -> io::Result<(bool, String)> { | ||
// We manually read a line at a time, instead of using serde's | ||
// stream deserializers, because the deserializer cannot recover | ||
// from an error, resulting in it getting stuck, because we try to | ||
// be resilient against failures. | ||
// | ||
// Because cargo only outputs one JSON object per line, we can | ||
// simply skip a line if it doesn't parse, which just ignores any | ||
// erroneous output. | ||
|
||
let mut stdout_errors = String::new(); | ||
let mut stderr_errors = String::new(); | ||
let mut read_at_least_one_stdout_message = false; | ||
let mut read_at_least_one_stderr_message = false; | ||
let process_line = |line: &str, error: &mut String| { | ||
// Try to deserialize a message from Cargo or Rustc. | ||
if let Some(t) = T::from_line(line, error) { | ||
self.sender.send(t).unwrap(); | ||
true | ||
} else { | ||
false | ||
} | ||
}; | ||
let output = streaming_output( | ||
self.stdout, | ||
self.stderr, | ||
&mut |line| { | ||
if process_line(line, &mut stdout_errors) { | ||
read_at_least_one_stdout_message = true; | ||
} | ||
}, | ||
&mut |line| { | ||
if process_line(line, &mut stderr_errors) { | ||
read_at_least_one_stderr_message = true; | ||
} | ||
}, | ||
&mut || { | ||
if let Some(t) = T::from_eof() { | ||
self.sender.send(t).unwrap(); | ||
} | ||
}, | ||
); | ||
|
||
let read_at_least_one_message = | ||
read_at_least_one_stdout_message || read_at_least_one_stderr_message; | ||
let mut error = stdout_errors; | ||
error.push_str(&stderr_errors); | ||
match output { | ||
Ok(_) => Ok((read_at_least_one_message, error)), | ||
Err(e) => Err(io::Error::new(e.kind(), format!("{e:?}: {error}"))), | ||
} | ||
} | ||
} | ||
|
||
struct JodGroupChild(GroupChild); | ||
|
||
impl Drop for JodGroupChild { | ||
fn drop(&mut self) { | ||
_ = self.0.kill(); | ||
_ = self.0.wait(); | ||
} | ||
} | ||
|
||
/// A handle to a cargo process used for fly-checking. | ||
pub(crate) struct CommandHandle<T> { | ||
/// The handle to the actual cargo process. As we cannot cancel directly from with | ||
/// a read syscall dropping and therefore terminating the process is our best option. | ||
child: JodGroupChild, | ||
thread: stdx::thread::JoinHandle<io::Result<(bool, String)>>, | ||
pub(crate) receiver: Receiver<T>, | ||
program: OsString, | ||
arguments: Vec<OsString>, | ||
current_dir: Option<PathBuf>, | ||
} | ||
|
||
impl<T> fmt::Debug for CommandHandle<T> { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("CommandHandle") | ||
.field("program", &self.program) | ||
.field("arguments", &self.arguments) | ||
.field("current_dir", &self.current_dir) | ||
.finish() | ||
} | ||
} | ||
|
||
impl<T: ParseFromLine> CommandHandle<T> { | ||
pub(crate) fn spawn(mut command: Command) -> std::io::Result<Self> { | ||
command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); | ||
let mut child = command.group_spawn().map(JodGroupChild)?; | ||
|
||
let program = command.get_program().into(); | ||
let arguments = command.get_args().map(|arg| arg.into()).collect::<Vec<OsString>>(); | ||
let current_dir = command.get_current_dir().map(|arg| arg.to_path_buf()); | ||
|
||
let stdout = child.0.inner().stdout.take().unwrap(); | ||
let stderr = child.0.inner().stderr.take().unwrap(); | ||
|
||
let (sender, receiver) = unbounded(); | ||
let actor = CargoActor::<T>::new(sender, stdout, stderr); | ||
let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) | ||
.name("CargoHandle".to_owned()) | ||
.spawn(move || actor.run()) | ||
.expect("failed to spawn thread"); | ||
Ok(CommandHandle { program, arguments, current_dir, child, thread, receiver }) | ||
} | ||
|
||
pub(crate) fn cancel(mut self) { | ||
let _ = self.child.0.kill(); | ||
let _ = self.child.0.wait(); | ||
} | ||
|
||
pub(crate) fn join(mut self) -> io::Result<()> { | ||
let _ = self.child.0.kill(); | ||
let exit_status = self.child.0.wait()?; | ||
let (read_at_least_one_message, error) = self.thread.join()?; | ||
if read_at_least_one_message || exit_status.success() { | ||
Ok(()) | ||
} else { | ||
Err(io::Error::new(io::ErrorKind::Other, format!( | ||
"Cargo watcher failed, the command produced no valid metadata (exit code: {exit_status:?}):\n{error}" | ||
))) | ||
} | ||
} | ||
} |
Oops, something went wrong.