Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract the runtime used in runtime tests so that it's not hardcoded as Spin #2211

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions tests/runtime-tests/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::io::Read;

/// Helper for reading from a child process stream in a non-blocking way
pub struct OutputStream {
rx: std::sync::mpsc::Receiver<Result<Vec<u8>, std::io::Error>>,
buffer: Vec<u8>,
}

impl OutputStream {
pub fn new<R: Read + Send + 'static>(mut stream: R) -> Self {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let mut buffer = vec![0; 1024];
loop {
let msg = stream.read(&mut buffer).map(|n| buffer[..n].to_vec());
if let Err(e) = tx.send(msg) {
if let Err(e) = e.0 {
eprintln!("Error reading from stream: {e}");
}
break;
}
}
});
Self {
rx,
buffer: Vec::new(),
}
}

/// Get the output of the stream so far
pub fn output(&mut self) -> &[u8] {
while let Ok(Ok(s)) = self.rx.try_recv() {
self.buffer.extend(s);
}
&self.buffer
}

/// Get the output of the stream so far
///
/// Returns None if the output is not valid utf8
pub fn output_as_str(&mut self) -> Option<&str> {
std::str::from_utf8(self.output()).ok()
}
}
246 changes: 39 additions & 207 deletions tests/runtime-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
pub(crate) mod io;
mod services;
pub mod spin;

use std::{
io::Read,
path::{Path, PathBuf},
process::{Command, Stdio},
sync::OnceLock,
};

use anyhow::Context;
use services::Services;

/// A callback to create a runtime given a path to a temporary directory and a set of services
pub type RuntimeCreator = dyn Fn(&Path, &mut Services) -> anyhow::Result<Box<dyn Runtime>>;

/// Configuration for the test suite
pub struct Config {
/// The path to the Spin binary under test
pub spin_binary_path: PathBuf,
/// The runtime under test
pub create_runtime: Box<RuntimeCreator>,
/// The path to the tests directory which contains all the runtime test definitions
pub tests_path: PathBuf,
/// What to do when an individual test fails
pub on_error: OnTestError,
}

#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy)]
/// What to do on a test error
pub enum OnTestError {
Panic,
Expand Down Expand Up @@ -53,21 +56,24 @@ pub fn run_all(config: Config) -> anyhow::Result<()> {
}

/// Bootstrap and run a test at a path against the given config
pub fn bootstrap_and_run(test_path: &Path, config: &Config) -> Result<(), anyhow::Error> {
pub fn bootstrap_and_run(test_path: &Path, config: &Config) -> anyhow::Result<()> {
log::info!("Testing: {}", test_path.display());
let temp = temp_dir::TempDir::new()
.context("failed to produce a temporary directory to run the test in")?;
log::trace!("Temporary directory: {}", temp.path().display());
let mut services = services::start_services(test_path)?;
copy_manifest(test_path, &temp, &mut services)?;
let spin = Spin::start(&config.spin_binary_path, temp.path(), &mut services)?;
log::debug!("Spin started on port {}.", spin.port());
run_test(test_path, spin, config.on_error);
let runtime = &mut *(config.create_runtime)(temp.path(), &mut services)?;
run_test(runtime, test_path, config.on_error);
Ok(())
}

pub trait Runtime {
fn test(&mut self) -> anyhow::Result<TestResult>;
}

/// Run an individual test
fn run_test(test_path: &Path, mut spin: Spin, on_error: OnTestError) {
fn run_test(runtime: &mut dyn Runtime, test_path: &Path, on_error: OnTestError) {
// macro which will look at `on_error` and do the right thing
macro_rules! error {
($on_error:expr, $($arg:tt)*) => {
Expand All @@ -80,26 +86,16 @@ fn run_test(test_path: &Path, mut spin: Spin, on_error: OnTestError) {
}
};
}

let response = match make_http_request(&mut spin) {
let response = match runtime.test() {
Ok(r) => r,
Err(e) => {
error!(
on_error,
"Test failed trying to connect to http server: {e}"
);
}
};
let response = match ResponseKind::from_response(response) {
Ok(r) => r,
Err(e) => {
error!(on_error, "failed to parse response from Spin server: {e}")
error!(on_error, "failed to run test: {e}")
}
};
let error_file = test_path.join("error.txt");
match response {
ResponseKind::Ok if !error_file.exists() => log::info!("Test passed!"),
ResponseKind::Ok => {
TestResult::Pass if !error_file.exists() => log::info!("Test passed!"),
TestResult::Pass => {
let expected = match std::fs::read_to_string(&error_file) {
Ok(e) => e,
Err(e) => error!(on_error, "failed to read error.txt file: {}", e),
Expand All @@ -109,38 +105,32 @@ fn run_test(test_path: &Path, mut spin: Spin, on_error: OnTestError) {
"Test passed but should have failed with error: {expected}"
)
}
ResponseKind::Err(e) if error_file.exists() => {
TestResult::Fail(e, extra) if error_file.exists() => {
let expected = match std::fs::read_to_string(&error_file) {
Ok(e) => e,
Err(e) => error!(on_error, "failed to read error.txt file: {}", e),
Err(e) => error!(on_error, "failed to read error.txt file: {e}"),
};
if e.contains(&expected) {
log::info!("Test passed!");
} else {
error!(
on_error,
"Test errored but not in the expected way.\n\texpected: {}\n\tgot: {}\n\nSpin stderr:\n{}",
expected,
e,
spin.stderr.output_as_str().unwrap_or("<non-utf8>")

"Test errored but not in the expected way.\n\texpected: {expected}\n\tgot: {e}\n\n{extra}",
)
}
}
// An empty error message may indicate that the component panicked
ResponseKind::Err(e) if e.is_empty() => {
let e = spin
.stderr
.output_as_str()
.unwrap_or("Spin server did not return body and did not write to stderr");
error!(on_error, "Test '{}' errored: {e}", test_path.display());
TestResult::Fail(e, extra) => {
error!(
on_error,
"Test '{}' errored: {e}\n{extra}",
test_path.display()
);
}
ResponseKind::Err(e) => {
TestResult::RuntimeError(extra) => {
error!(
on_error,
"Test '{}' errored: {e}\nSpin stderr: {}",
test_path.display(),
spin.stderr.output_as_str().unwrap_or("<non-utf8>")
"Test '{}' failed fatally: {extra}",
test_path.display()
);
}
}
Expand Down Expand Up @@ -210,169 +200,11 @@ fn copy_manifest(
}

#[derive(Debug)]
enum ResponseKind {
Ok,
Err(String),
}

impl ResponseKind {
fn from_response(response: reqwest::blocking::Response) -> anyhow::Result<Self> {
if response.status() == 200 {
return Ok(Self::Ok);
}
if response.status() != 500 {
anyhow::bail!("Response was neither 200 nor 500")
}

Ok(Self::Err(response.text()?))
}
}

fn make_http_request(spin: &mut Spin) -> Result<reqwest::blocking::Response, anyhow::Error> {
if let Some(status) = spin.try_wait()? {
anyhow::bail!("Spin exited early with status code {:?}", status.code());
}
log::debug!("Connecting to HTTP server on port {}...", spin.port());
let response = reqwest::blocking::get(format!("http://127.0.0.1:{}", spin.port()))?;
log::debug!("Awaiting response from server");
if let Some(status) = spin.try_wait()? {
anyhow::bail!("Spin exited early with status code {:?}", status.code());
}
Ok(response)
}

struct Spin {
process: std::process::Child,
#[allow(dead_code)]
stdout: OutputStream,
stderr: OutputStream,
port: u16,
}

impl Spin {
fn start(
spin_binary_path: &Path,
current_dir: &Path,
services: &mut services::Services,
) -> Result<Self, anyhow::Error> {
let port = get_random_port()?;
let mut child = Command::new(spin_binary_path)
.arg("up")
.current_dir(current_dir)
.args(["--listen", &format!("127.0.0.1:{port}")])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = OutputStream::new(child.stdout.take().unwrap());
let stderr = OutputStream::new(child.stderr.take().unwrap());
log::debug!("Awaiting spin binary to start up on port {port}...");
let mut spin = Self {
process: child,
stdout,
stderr,
port,
};
for _ in 0..80 {
services.error()?;
match std::net::TcpStream::connect(format!("127.0.0.1:{port}")) {
Ok(_) => return Ok(spin),
Err(e) => {
let stderr = spin.stderr.output_as_str().unwrap_or("<non-utf8>");
log::trace!("Checking that the Spin server started returned an error: {e}");
log::trace!("Current spin stderr = '{stderr}'");
}
}
if let Some(status) = spin.try_wait()? {
anyhow::bail!(
"Spin exited early with status code {:?}\n{}{}",
status.code(),
spin.stdout.output_as_str().unwrap_or("<non-utf8>"),
spin.stderr.output_as_str().unwrap_or("<non-utf8>")
);
}

std::thread::sleep(std::time::Duration::from_millis(250));
}
anyhow::bail!(
"`spin up` did not start server or error after 20 seconds. stderr:\n\t{}",
spin.stderr.output_as_str().unwrap_or("<non-utf8>")
)
}

/// The port Spin is running on
fn port(&self) -> u16 {
self.port
}

fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
self.process.try_wait()
}
}

impl Drop for Spin {
fn drop(&mut self) {
kill_process(&mut self.process);
}
}

fn kill_process(process: &mut std::process::Child) {
#[cfg(windows)]
{
let _ = process.kill();
}
#[cfg(not(windows))]
{
let pid = nix::unistd::Pid::from_raw(process.id() as i32);
let _ = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM);
}
}

/// Uses a track to ge a random unused port
fn get_random_port() -> anyhow::Result<u16> {
Ok(std::net::TcpListener::bind("localhost:0")?
.local_addr()?
.port())
}

/// Helper for reading from a child process stream in a non-blocking way
struct OutputStream {
rx: std::sync::mpsc::Receiver<Result<Vec<u8>, std::io::Error>>,
buffer: Vec<u8>,
}

impl OutputStream {
fn new<R: Read + Send + 'static>(mut stream: R) -> Self {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let mut buffer = vec![0; 1024];
loop {
let msg = stream.read(&mut buffer).map(|n| buffer[..n].to_vec());
if let Err(e) = tx.send(msg) {
if let Err(e) = e.0 {
eprintln!("Error reading from stream: {e}");
}
break;
}
}
});
Self {
rx,
buffer: Vec::new(),
}
}

/// Get the output of the stream so far
fn output(&mut self) -> &[u8] {
while let Ok(Ok(s)) = self.rx.try_recv() {
self.buffer.extend(s);
}
&self.buffer
}

/// Get the output of the stream so far
///
/// Returns None if the output is not valid utf8
fn output_as_str(&mut self) -> Option<&str> {
std::str::from_utf8(self.output()).ok()
}
pub enum TestResult {
/// The test passed
Pass,
/// Wasm errored (the wasm error, additional error info)
Fail(String, String),
/// Wasm failed to run (additional error info)
RuntimeError(String),
}
7 changes: 5 additions & 2 deletions tests/runtime-tests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use runtime_tests::{run_all, Config, OnTestError};
use runtime_tests::{run_all, spin::Spin, Config, OnTestError};

fn main() -> anyhow::Result<()> {
env_logger::init();
Expand All @@ -9,8 +9,11 @@ fn main() -> anyhow::Result<()> {
let tests_path = args
.next()
.unwrap_or_else(|| PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests"));

let config = Config {
spin_binary_path,
create_runtime: Box::new(move |temp, services| {
Ok(Box::new(Spin::start(&spin_binary_path, temp, services)?) as _)
}),
tests_path,
on_error: OnTestError::Log,
};
Expand Down
2 changes: 1 addition & 1 deletion tests/runtime-tests/src/services/python.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::OutputStream;
use crate::io::OutputStream;

use super::Service;
use anyhow::Context as _;
Expand Down
Loading