From 09884d81f4038dfef999f64888663efa72f7072a Mon Sep 17 00:00:00 2001 From: "Felix L." <50841330+Felix-El@users.noreply.github.com> Date: Thu, 15 Aug 2024 20:50:09 +0200 Subject: [PATCH] Avoid memory leaks This commit replaces the `threadpool` crate with a handcrafted solution based on scoped threads. This leaves `valgrind` much happier than before. We also lose some dependency baggage. --- CHANGELOG.md | 3 +++ Cargo.toml | 1 - src/lib.rs | 75 ++++++++++++++++++++++++++++++++++------------------ 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4631ed..b0e8d91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Replace dependency on threadpool crate with a custom solution built on the + standard library only, and only using scoped threads + -> fixes memory leaks observed when running under valgrind ## [0.7.3] - 2024-05-10 - Default to single-threaded tests for WebAssembly (thanks @alexcrichton) in [#41](https://github.com/LukasKalbertodt/libtest-mimic/pull/41) diff --git a/Cargo.toml b/Cargo.toml index 02d2561..18492ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ exclude = [".github"] [dependencies] clap = { version = "4.0.8", features = ["derive"] } -threadpool = "1.8.1" escape8259 = "0.5.2" anstream = "0.6.14" anstyle = "1.0.7" diff --git a/src/lib.rs b/src/lib.rs index be1e65c..955a615 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,13 +71,19 @@ #![forbid(unsafe_code)] -use std::{borrow::Cow, fmt, process::{self, ExitCode}, sync::mpsc, time::Instant}; +use std::{ + borrow::Cow, + fmt, + process::{self, ExitCode}, + sync::{mpsc, Mutex}, + thread, + time::Instant, +}; mod args; mod printer; use printer::Printer; -use threadpool::ThreadPool; pub use crate::args::{Arguments, ColorSetting, FormatSetting}; @@ -481,7 +487,14 @@ pub fn run(args: &Arguments, mut tests: Vec) -> Conclusion { // Execute all tests. let test_mode = !args.bench; - if platform_defaults_to_one_thread() || args.test_threads == Some(1) { + + let num_threads = platform_defaults_to_one_thread() + .then_some(1) + .or(args.test_threads) + .or_else(|| std::thread::available_parallelism().ok().map(Into::into)) + .unwrap_or(1); + + if num_threads == 1 { // Run test sequentially in main thread for test in tests { // Print `test foo ...`, run the test, then print the outcome in @@ -496,35 +509,45 @@ pub fn run(args: &Arguments, mut tests: Vec) -> Conclusion { } } else { // Run test in thread pool. - let pool = match args.test_threads { - Some(num_threads) => ThreadPool::new(num_threads), - None => ThreadPool::default() - }; let (sender, receiver) = mpsc::channel(); let num_tests = tests.len(); - for test in tests { - if args.is_ignored(&test) { - sender.send((Outcome::Ignored, test.info)).unwrap(); - } else { - let sender = sender.clone(); - pool.execute(move || { - // It's fine to ignore the result of sending. If the - // receiver has hung up, everything will wind down soon - // anyway. - let outcome = run_single(test.runner, test_mode); - let _ = sender.send((outcome, test.info)); + // TODO: this should use a mpmc channel, once that's stabilized in std. + let iter = Mutex::new(tests.into_iter()); + thread::scope(|scope| { + // Start worker threads + for _ in 0..num_threads { + scope.spawn(|| { + // Get next test to process from the iterator. We have the + // extra `let` binding as otherwise, the mutex would be + // locked for the whole `if` body. + let next_trial = iter.lock().unwrap().next(); + if let Some(trial) = next_trial { + let payload = if args.is_ignored(&trial) { + (Outcome::Ignored, trial.info) + } else { + let outcome = run_single(trial.runner, test_mode); + (outcome, trial.info) + }; + + // It's fine to ignore the result of sending. If the + // receiver has hung up, everything will wind down soon + // anyway. + let _ = sender.send(payload); + } }); } - } - for (outcome, test_info) in receiver.iter().take(num_tests) { - // In multithreaded mode, we do only print the start of the line - // after the test ran, as otherwise it would lead to terribly - // interleaved output. - printer.print_test(&test_info); - handle_outcome(outcome, test_info, &mut printer); - } + // Print results of tests that already dinished + for (outcome, test_info) in receiver.iter().take(num_tests) { + // In multithreaded mode, we do only print the start of the line + // after the test ran, as otherwise it would lead to terribly + // interleaved output. + printer.print_test(&test_info); + handle_outcome(outcome, test_info, &mut printer); + } + }); + } // Print failures if there were any, and the final summary.