Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a Non-Blocking Child Process Interface #211

Merged
merged 40 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
900b671
feat: initial revamped process.spawn impl
CompeyDev Jun 9, 2024
8ce4780
feat: impl readToEnd convenience method for reader
CompeyDev Jun 9, 2024
c63caca
chore: remove testing file
CompeyDev Jun 9, 2024
4b5b54e
fix: change code channel capacity should be i32 size
CompeyDev Jun 9, 2024
ce033bb
fix: rename tests to for process.spawn->process.exec
CompeyDev Jun 9, 2024
78a3d4d
chore(types): include types for new process.spawn
CompeyDev Jun 9, 2024
50b1bcb
fix: stop accepting stdio options for process.spawn
CompeyDev Jun 9, 2024
6a2f506
chore(types + tests): update types and tests for exec
CompeyDev Jun 9, 2024
d9cc71e
chore(tests): add new process.spawn tests
CompeyDev Jun 9, 2024
f0906c9
chore(tests): windows support for process.spawn stream test
CompeyDev Jun 9, 2024
48760b6
chore(tests): status test should use 0 as an exit code too
CompeyDev Jun 10, 2024
d3cda4b
chore(types): add spawn example in process docs
CompeyDev Jun 10, 2024
821c6d9
chore(types): add yieldability docs for ChildProcessReader:read
CompeyDev Jun 10, 2024
fc26d13
chore(types): clarify yieldability with exit for ChildProcessReader:read
CompeyDev Jun 10, 2024
2a4c610
feat(tests): update tests list
CompeyDev Jun 10, 2024
968a9b4
chore(tests): fix stream test for windows
CompeyDev Jun 10, 2024
70583c6
chore(tests): stream test on windows should redir stdout to stderr no…
CompeyDev Jun 10, 2024
a30380c
fix: address todo comments
CompeyDev Jun 10, 2024
daedbf9
refactor: minor formatting change
CompeyDev Jun 10, 2024
80ebab4
chore(tests): attempt to fix windows cmd yielding
CompeyDev Jun 10, 2024
cf707a3
chore(tests): fix stream test failing on windows cmd
CompeyDev Jun 11, 2024
7ed656c
chore(tests): update comment specifying command for exec
CompeyDev Jun 23, 2024
c9cbaf6
feat: return strings and null instead of buffers
CompeyDev Jun 23, 2024
ad4b8a7
fix: lint in process lib.rs
CompeyDev Jun 23, 2024
c08b738
chore: remove bstr dep and rearrange bytes dep
CompeyDev Jun 23, 2024
70088c7
revert: remove bstr dep
CompeyDev Jun 23, 2024
0c346a5
feat: rename process.spawn->process.create
CompeyDev Jun 24, 2024
aa10e88
chore(tests): replace old process.spawn usage with process.exec for s…
CompeyDev Jun 24, 2024
bd71b5e
feat: correct non blocking test for process.create
CompeyDev Jun 24, 2024
b1f9b1d
Merge branch 'main' into feature/process-stream
CompeyDev Jun 24, 2024
53402d8
feat: kill() for process.create spawning
CompeyDev Jun 25, 2024
e143a50
chore(tests): rename tests to process.create
CompeyDev Jun 25, 2024
e07d37e
chore(types): add kill function typedef for ChildProcess
CompeyDev Jun 25, 2024
e134120
feat: add tests and default exit code for kill() on ChildProcess
CompeyDev Jun 25, 2024
96ee1ba
feat: update test paths for process.create
CompeyDev Jun 25, 2024
6a713a8
chore(types): use consistent class naming for `SpawnResult`
CompeyDev Jun 29, 2024
db3893c
Merge branch 'main' into feature/process-stream
CompeyDev Jul 11, 2024
4c9ccd2
Merge branch 'main' into feature/process-stream
CompeyDev Jul 22, 2024
c29d7a9
Merge branch 'main' into feature/process-stream
CompeyDev Aug 11, 2024
02df224
Merge branch 'main' into feature/process-stream
CompeyDev Sep 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .lune/hello_lune.luau
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ end
]]

print("Sending 4 pings to google 🌏")
local result = process.spawn("ping", {
local result = process.exec("ping", {
"google.com",
"-c 4",
})
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/lune-std-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ directories = "5.0"
pin-project = "1.0"
os_str_bytes = { version = "7.0", features = ["conversions"] }

bstr = "1.9"
bytes = "1.6.0"

tokio = { version = "1", default-features = false, features = [
"io-std",
"io-util",
Expand Down
126 changes: 106 additions & 20 deletions crates/lune-std-process/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
#![allow(clippy::cargo_common_metadata)]

use std::{
cell::RefCell,
env::{
self,
consts::{ARCH, OS},
},
path::MAIN_SEPARATOR,
process::Stdio,
rc::Rc,
sync::Arc,
};

use mlua::prelude::*;

use lune_utils::TableBuilder;
use mlua_luau_scheduler::{Functions, LuaSpawnExt};
use options::ProcessSpawnOptionsStdio;
use os_str_bytes::RawOsString;
use tokio::io::AsyncWriteExt;
use stream::{ChildProcessReader, ChildProcessWriter};
use tokio::{io::AsyncWriteExt, process::Child, sync::RwLock};

mod options;
mod stream;
mod tee_writer;
mod wait_for_child;

use self::options::ProcessSpawnOptions;
use self::wait_for_child::{wait_for_child, WaitForChildResult};
use self::wait_for_child::wait_for_child;

use lune_utils::path::get_current_dir;

Expand Down Expand Up @@ -73,7 +79,8 @@ pub fn module(lua: &Lua) -> LuaResult<LuaTable> {
.with_value("cwd", cwd_str)?
.with_value("env", env_tab)?
.with_value("exit", process_exit)?
.with_async_function("spawn", process_spawn)?
.with_async_function("exec", process_exec)?
.with_function("create", process_create)?
.build_readonly()
}

Expand Down Expand Up @@ -141,11 +148,16 @@ fn process_env_iter<'lua>(
})
}

async fn process_spawn(
async fn process_exec(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
let res = lua.spawn(spawn_command(program, args, options)).await?;
let res = lua
.spawn(async move {
let cmd = spawn_command_with_stdin(program, args, options.clone()).await?;
wait_for_child(cmd, options.stdio.stdout, options.stdio.stderr).await
})
.await?;

/*
NOTE: If an exit code was not given by the child process,
Expand All @@ -168,30 +180,104 @@ async fn process_spawn(
.build_readonly()
}

async fn spawn_command(
#[allow(clippy::await_holding_refcell_ref)]
fn process_create(
lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> {
// We do not want the user to provide stdio options for process.create,
// so we reset the options, regardless of what the user provides us
let mut spawn_options = options.clone();
spawn_options.stdio = ProcessSpawnOptionsStdio::default();

let (code_tx, code_rx) = tokio::sync::broadcast::channel(4);
let code_rx_rc = Rc::new(RefCell::new(code_rx));

let child = spawn_command(program, args, spawn_options)?;

let child_arc = Arc::new(RwLock::new(child));

let child_arc_clone = Arc::clone(&child_arc);
let mut child_lock = tokio::task::block_in_place(|| child_arc_clone.blocking_write());

let stdin = child_lock.stdin.take().unwrap();
let stdout = child_lock.stdout.take().unwrap();
let stderr = child_lock.stderr.take().unwrap();

let child_arc_inner = Arc::clone(&child_arc);

// Spawn a background task to wait for the child to exit and send the exit code
let status_handle = tokio::spawn(async move {
let res = child_arc_inner.write().await.wait().await;

if let Ok(output) = res {
let code = output.code().unwrap_or_default();

code_tx
.send(code)
.expect("ExitCode receiver was unexpectedly dropped");
}
});

TableBuilder::new(lua)?
.with_value("stdout", ChildProcessReader(stdout))?
.with_value("stderr", ChildProcessReader(stderr))?
.with_value("stdin", ChildProcessWriter(stdin))?
.with_async_function("kill", move |_, ()| {
// First, stop the status task so the RwLock is dropped
status_handle.abort();
let child_arc_clone = Arc::clone(&child_arc);

// Then get another RwLock to write to the child process and kill it
async move { Ok(child_arc_clone.write().await.kill().await?) }
})?
.with_async_function("status", move |lua, ()| {
let code_rx_rc_clone = Rc::clone(&code_rx_rc);
async move {
// Exit code of 9 corresponds to SIGKILL, which should be the only case where
// the receiver gets suddenly dropped
let code = code_rx_rc_clone.borrow_mut().recv().await.unwrap_or(9);

TableBuilder::new(lua)?
.with_value("code", code)?
.with_value("ok", code == 0)?
.build_readonly()
}
})?
.build_readonly()
}

async fn spawn_command_with_stdin(
program: String,
args: Option<Vec<String>>,
mut options: ProcessSpawnOptions,
) -> LuaResult<WaitForChildResult> {
let stdout = options.stdio.stdout;
let stderr = options.stdio.stderr;
) -> LuaResult<Child> {
let stdin = options.stdio.stdin.take();

let mut child = options
.into_command(program, args)
.stdin(if stdin.is_some() {
Stdio::piped()
} else {
Stdio::null()
})
.stdout(stdout.as_stdio())
.stderr(stderr.as_stdio())
.spawn()?;
let mut child = spawn_command(program, args, options)?;

if let Some(stdin) = stdin {
let mut child_stdin = child.stdin.take().unwrap();
child_stdin.write_all(&stdin).await.into_lua_err()?;
}

wait_for_child(child, stdout, stderr).await
Ok(child)
}

fn spawn_command(
program: String,
args: Option<Vec<String>>,
options: ProcessSpawnOptions,
) -> LuaResult<Child> {
let stdout = options.stdio.stdout;
let stderr = options.stdio.stderr;

let child = options
.into_command(program, args)
.stdin(Stdio::piped())
.stdout(stdout.as_stdio())
.stderr(stderr.as_stdio())
.spawn()?;

Ok(child)
}
58 changes: 58 additions & 0 deletions crates/lune-std-process/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use bstr::BString;
use bytes::BytesMut;
use mlua::prelude::*;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

const CHUNK_SIZE: usize = 8;

#[derive(Debug, Clone)]
pub struct ChildProcessReader<R: AsyncRead>(pub R);
#[derive(Debug, Clone)]
pub struct ChildProcessWriter<W: AsyncWrite>(pub W);

impl<R: AsyncRead + Unpin> ChildProcessReader<R> {
pub async fn read(&mut self, chunk_size: Option<usize>) -> LuaResult<Vec<u8>> {
let mut buf = BytesMut::with_capacity(chunk_size.unwrap_or(CHUNK_SIZE));
self.0.read_buf(&mut buf).await?;

Ok(buf.to_vec())
}

pub async fn read_to_end(&mut self) -> LuaResult<Vec<u8>> {
let mut buf = vec![];
self.0.read_to_end(&mut buf).await?;

Ok(buf)
}
}

impl<R: AsyncRead + Unpin + 'static> LuaUserData for ChildProcessReader<R> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("read", |lua, this, chunk_size: Option<usize>| async move {
let buf = this.read(chunk_size).await?;

if buf.is_empty() {
return Ok(LuaValue::Nil);
}

Ok(LuaValue::String(lua.create_string(buf)?))
});

methods.add_async_method_mut("readToEnd", |lua, this, ()| async {
Ok(lua.create_string(this.read_to_end().await?))
});
}
}

impl<W: AsyncWrite + Unpin> ChildProcessWriter<W> {
pub async fn write(&mut self, data: BString) -> LuaResult<()> {
self.0.write_all(data.as_ref()).await?;
Ok(())
}
}

impl<W: AsyncWrite + Unpin + 'static> LuaUserData for ChildProcessWriter<W> {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method_mut("write", |_, this, data| async { this.write(data).await });
}
}
17 changes: 10 additions & 7 deletions crates/lune/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ create_tests! {
process_cwd: "process/cwd",
process_env: "process/env",
process_exit: "process/exit",
process_spawn_async: "process/spawn/async",
process_spawn_basic: "process/spawn/basic",
process_spawn_cwd: "process/spawn/cwd",
process_spawn_no_panic: "process/spawn/no_panic",
process_spawn_shell: "process/spawn/shell",
process_spawn_stdin: "process/spawn/stdin",
process_spawn_stdio: "process/spawn/stdio",
process_exec_async: "process/exec/async",
process_exec_basic: "process/exec/basic",
process_exec_cwd: "process/exec/cwd",
process_exec_no_panic: "process/exec/no_panic",
process_exec_shell: "process/exec/shell",
process_exec_stdin: "process/exec/stdin",
process_exec_stdio: "process/exec/stdio",
process_spawn_non_blocking: "process/create/non_blocking",
process_spawn_status: "process/create/status",
process_spawn_stream: "process/create/stream",
}

#[cfg(feature = "std-regex")]
Expand Down
4 changes: 2 additions & 2 deletions scripts/generate_compression_test_files.luau
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ local BIN_ZLIB = if process.os == "macos" then "/opt/homebrew/bin/pigz" else "pi

local function checkInstalled(program: string, args: { string }?)
print("Checking if", program, "is installed")
local result = process.spawn(program, args)
local result = process.exec(program, args)
if not result.ok then
stdio.ewrite(string.format("Program '%s' is not installed\n", program))
process.exit(1)
Expand All @@ -123,7 +123,7 @@ checkInstalled(BIN_ZLIB, { "--version" })
-- Run them to generate files

local function run(program: string, args: { string }): string
local result = process.spawn(program, args)
local result = process.exec(program, args)
if not result.ok then
stdio.ewrite(string.format("Command '%s' failed\n", program))
if #result.stdout > 0 then
Expand Down
2 changes: 1 addition & 1 deletion tests/datetime/formatLocalTime.luau
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if not runLocaleTests then
return
end

local dateCmd = process.spawn("bash", { "-c", "date +\"%A, %d %B %Y\" --date='@1693068988'" }, {
local dateCmd = process.exec("bash", { "-c", "date +\"%A, %d %B %Y\" --date='@1693068988'" }, {
env = {
LC_ALL = "fr_FR.UTF-8 ",
},
Expand Down
21 changes: 21 additions & 0 deletions tests/process/create/kill.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local process = require("@lune/process")

-- Killing a child process should work as expected

local message = "Hello, world!"
local child = process.create("cat")

child.stdin:write(message)
child.kill()

assert(child.status().code == 9, "Child process should have an exit code of 9 (SIGKILL)")

assert(
child.stdout:readToEnd() == message,
"Reading from stdout of child process should work even after kill"
)

local stdinWriteOk = pcall(function()
child.stdin:write(message)
end)
assert(not stdinWriteOk, "Writing to stdin of child process should not work after kill")
13 changes: 13 additions & 0 deletions tests/process/create/non_blocking.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local process = require("@lune/process")

-- Spawning a child process should not block the thread

local childThread = coroutine.create(process.create)

local ok, err = coroutine.resume(childThread, "echo", { "hello, world" })
assert(ok, err)

assert(
coroutine.status(childThread) == "dead",
"Child process should not block the thread it is running on"
)
15 changes: 15 additions & 0 deletions tests/process/create/status.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local process = require("@lune/process")

-- The exit code of an child process should be correct

local randomExitCode = math.random(0, 255)
local isOk = randomExitCode == 0
local child = process.create("exit", { tostring(randomExitCode) }, { shell = true })
local status = child.status()

assert(
status.code == randomExitCode,
`Child process exited with wrong exit code, expected {randomExitCode}`
)

assert(status.ok == isOk, `Child status should be {if status.ok then "ok" else "not ok"}`)
18 changes: 18 additions & 0 deletions tests/process/create/stream.luau
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
local process = require("@lune/process")

-- Should be able to write and read from child process streams

local msg = "hello, world"

local catChild = process.create("cat")
catChild.stdin:write(msg)
assert(
msg == catChild.stdout:read(#msg),
"Failed to write to stdin or read from stdout of child process"
)

local echoChild = if process.os == "windows"
then process.create("/c", { "echo", msg, "1>&2" }, { shell = "cmd" })
else process.create("echo", { msg, ">>/dev/stderr" }, { shell = true })

assert(msg == echoChild.stderr:read(#msg), "Failed to read from stderr of child process")
Loading
Loading