diff --git a/src/lib.rs b/src/lib.rs index 778ec80..bd84057 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ //! //! While a normal thread pool is only able to execute actions as soon as //! possible, a scheduled thread pool can execute actions after a specific -//! delay, or excecute actions periodically. +//! delay, or execute actions periodically. #![warn(missing_docs)] use parking_lot::{Condvar, Mutex}; @@ -73,6 +73,7 @@ impl Eq for Job {} struct InnerPool { queue: BinaryHeap, shutdown: bool, + on_drop_behavior: OnPoolDropBehavior, } struct SharedPool { @@ -98,10 +99,99 @@ impl SharedPool { } } +/// Options for what the behavior should be in regards to pending scheduled +/// executions when the pool is dropped. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum OnPoolDropBehavior { + /// Any pending scheduled executions will be run, but periodic actions will + /// not be rescheduled once these have completed. + /// + /// This is the default behavior. + CompletePendingScheduled, + + /// Don't run any pending scheduled executions. + DiscardPendingScheduled, +} + +/// A builder for a scheduled thread pool. +pub struct ScheduledThreadPoolBuilder<'a> { + thread_name_prefix: Option<&'a str>, + on_drop_behavior: OnPoolDropBehavior, +} + +impl<'a> ScheduledThreadPoolBuilder<'a> { + /// Constructs a new `ScheduledThreadPoolBuilder`. + /// + /// Parameters are initialized with their default values. The number of + /// threads to be used in the pool is set when you call + /// `build_with_num_threads`. + pub fn new() -> ScheduledThreadPoolBuilder<'a> { + ScheduledThreadPoolBuilder::default() + } + + /// Sets the prefix to be used when naming threads created to be part of the + /// pool. + /// + /// The substring `{}` in the name will be replaced with an integer + /// identifier of the thread. + /// + /// Defaults to `None`. + pub fn thread_name_prefix( + mut self, + thread_name_prefix: Option<&'a str>, + ) -> ScheduledThreadPoolBuilder<'a> { + self.thread_name_prefix = thread_name_prefix; + self + } + + /// Sets the behavior for what to do with pending scheduled executions when + /// the pool is dropped. + /// + /// Defaults to [OnPoolDropBehavior::CompletePendingScheduled]. + pub fn on_drop_behavior( + mut self, + on_drop_behavior: OnPoolDropBehavior, + ) -> ScheduledThreadPoolBuilder<'a> { + self.on_drop_behavior = on_drop_behavior; + self + } + + /// Consumes the builder, returning a new pool. + /// + /// # Panics + /// + /// Panics if `num_threads` is 0. + pub fn build_with_num_threads(self, num_threads: usize) -> ScheduledThreadPool { + assert!(num_threads > 0, "num_threads must be positive"); + ScheduledThreadPool::new_inner(self.thread_name_prefix, num_threads, self.on_drop_behavior) + } +} + +impl<'a> std::fmt::Debug for ScheduledThreadPoolBuilder<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("ScheduledThreadPoolBuilder") + .field("thread_name_prefix", &self.thread_name_prefix) + .field("on_drop_behavior", &self.on_drop_behavior) + .finish() + } +} + +impl<'a> Default for ScheduledThreadPoolBuilder<'a> { + fn default() -> ScheduledThreadPoolBuilder<'a> { + ScheduledThreadPoolBuilder { + thread_name_prefix: None, + on_drop_behavior: OnPoolDropBehavior::CompletePendingScheduled, + } + } +} + /// A pool of threads which can run tasks at specific time intervals. /// -/// When the pool drops, all pending scheduled executions will be run, but -/// periodic actions will not be rescheduled after that. +/// By default, when the pool drops, all pending scheduled executions will be +/// run, but periodic actions will not be rescheduled after that. +/// +/// If you want different behavior on drop then you can specify it using +/// [OnPoolDropBehavior]. pub struct ScheduledThreadPool { shared: Arc, } @@ -120,7 +210,11 @@ impl ScheduledThreadPool { /// /// Panics if `num_threads` is 0. pub fn new(num_threads: usize) -> ScheduledThreadPool { - ScheduledThreadPool::new_inner(None, num_threads) + ScheduledThreadPool::new_inner( + None, + num_threads, + OnPoolDropBehavior::CompletePendingScheduled, + ) } /// Creates a new thread pool with the specified number of threads which @@ -133,15 +227,29 @@ impl ScheduledThreadPool { /// /// Panics if `num_threads` is 0. pub fn with_name(thread_name: &str, num_threads: usize) -> ScheduledThreadPool { - ScheduledThreadPool::new_inner(Some(thread_name), num_threads) + ScheduledThreadPool::new_inner( + Some(thread_name), + num_threads, + OnPoolDropBehavior::CompletePendingScheduled, + ) } - fn new_inner(thread_name: Option<&str>, num_threads: usize) -> ScheduledThreadPool { + /// Returns a builder type to configure a new pool. + pub fn builder<'a>() -> ScheduledThreadPoolBuilder<'a> { + ScheduledThreadPoolBuilder::new() + } + + fn new_inner( + thread_name: Option<&str>, + num_threads: usize, + on_drop_behavior: OnPoolDropBehavior, + ) -> ScheduledThreadPool { assert!(num_threads > 0, "num_threads must be positive"); let inner = InnerPool { queue: BinaryHeap::new(), shutdown: false, + on_drop_behavior, }; let shared = SharedPool { @@ -226,13 +334,9 @@ impl ScheduledThreadPool { /// # Panics /// /// If the closure panics, it will not be run again. - pub fn execute_at_dynamic_rate( - &self, - initial_delay: Duration, - f: F, - ) -> JobHandle - where - F: FnMut() -> Option + Send + 'static + pub fn execute_at_dynamic_rate(&self, initial_delay: Duration, f: F) -> JobHandle + where + F: FnMut() -> Option + Send + 'static, { let canceled = Arc::new(AtomicBool::new(false)); let job = Job { @@ -286,13 +390,9 @@ impl ScheduledThreadPool { /// # Panics /// /// If the closure panics, it will not be run again. - pub fn execute_with_dynamic_delay( - &self, - initial_delay: Duration, - f: F, - ) -> JobHandle - where - F: FnMut() -> Option + Send + 'static + pub fn execute_with_dynamic_delay(&self, initial_delay: Duration, f: F) -> JobHandle + where + F: FnMut() -> Option + Send + 'static, { let canceled = Arc::new(AtomicBool::new(false)); let job = Job { @@ -340,6 +440,13 @@ impl Worker { let need = match inner.queue.peek() { None if inner.shutdown => return None, None => Need::Wait, + Some(_) + if inner.shutdown + && inner.on_drop_behavior + == OnPoolDropBehavior::DiscardPendingScheduled => + { + return None + } Some(e) if e.time <= now => break, Some(e) => Need::WaitTimeout(e.time - now), }; @@ -406,11 +513,11 @@ impl Worker { #[cfg(test)] mod test { - use std::sync::mpsc::channel; + use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Barrier}; use std::time::Duration; - use super::ScheduledThreadPool; + use super::{OnPoolDropBehavior, ScheduledThreadPool}; const TEST_TASKS: usize = 4; @@ -429,12 +536,33 @@ mod test { assert_eq!(rx.iter().take(TEST_TASKS).sum::(), TEST_TASKS); } + #[test] + fn test_works_with_builder() { + let pool = ScheduledThreadPool::builder().build_with_num_threads(TEST_TASKS); + + let (tx, rx) = channel(); + for _ in 0..TEST_TASKS { + let tx = tx.clone(); + pool.execute(move || { + tx.send(1usize).unwrap(); + }); + } + + assert_eq!(rx.iter().take(TEST_TASKS).sum::(), TEST_TASKS); + } + #[test] #[should_panic(expected = "num_threads must be positive")] fn test_zero_tasks_panic() { ScheduledThreadPool::new(0); } + #[test] + #[should_panic(expected = "num_threads must be positive")] + fn test_num_threads_zero_panics_with_builder() { + ScheduledThreadPool::builder().build_with_num_threads(0); + } + #[test] fn test_recovery_from_subtask_panic() { let pool = ScheduledThreadPool::new(TEST_TASKS); @@ -493,90 +621,124 @@ mod test { } #[test] - fn test_fixed_delay_jobs_stop_after_drop() { - let pool = Arc::new(ScheduledThreadPool::new(TEST_TASKS)); + fn test_jobs_do_not_complete_after_drop_if_behavior_is_discard() { + let pool = ScheduledThreadPool::builder() + .on_drop_behavior(OnPoolDropBehavior::DiscardPendingScheduled) + .build_with_num_threads(TEST_TASKS); let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let mut pool2 = Some(pool.clone()); - let mut i = 0i32; - pool.execute_at_fixed_rate( - Duration::from_millis(500), - Duration::from_millis(500), - move || { - i += 1; - tx.send(i).unwrap(); - rx2.recv().unwrap(); - if i == 2 { - drop(pool2.take().unwrap()); - } - }, - ); + let tx1 = tx.clone(); + pool.execute_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap()); + pool.execute_after(Duration::from_millis(500), move || tx.send(2usize).unwrap()); + drop(pool); - assert_eq!(Ok(1), rx.recv()); - tx2.send(()).unwrap(); - assert_eq!(Ok(2), rx.recv()); - tx2.send(()).unwrap(); assert!(rx.recv().is_err()); } #[test] - fn test_dynamic_rate_jobs_stop_after_drop() { - let pool = Arc::new(ScheduledThreadPool::new(TEST_TASKS)); + fn test_jobs_do_not_complete_after_drop_if_behavior_is_discard_using_builder() { + let pool = ScheduledThreadPool::builder() + .on_drop_behavior(OnPoolDropBehavior::DiscardPendingScheduled) + .build_with_num_threads(TEST_TASKS); let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let mut pool2 = Some(pool.clone()); - let mut i = 0i32; - pool.execute_with_dynamic_delay( - Duration::from_millis(500), - move || { - i += 1; - tx.send(i).unwrap(); - rx2.recv().unwrap(); - if i == 2 { - drop(pool2.take().unwrap()); - } - Some(Duration::from_millis(500)) - }, - ); + let tx1 = tx.clone(); + pool.execute_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap()); + pool.execute_after(Duration::from_millis(500), move || tx.send(2usize).unwrap()); + drop(pool); - assert_eq!(Ok(1), rx.recv()); - tx2.send(()).unwrap(); - assert_eq!(Ok(2), rx.recv()); - tx2.send(()).unwrap(); assert!(rx.recv().is_err()); } + #[test] + fn test_fixed_rate_jobs_stop_after_drop() { + test_jobs_stop_after_drop( + |pool: &Arc, tx: Sender, rx2: Receiver<()>| { + let mut pool2 = Some(pool.clone()); + let mut i = 0i32; + pool.execute_at_fixed_rate( + Duration::from_millis(500), + Duration::from_millis(500), + move || { + i += 1; + tx.send(i).unwrap(); + rx2.recv().unwrap(); + if i == 2 { + drop(pool2.take().unwrap()); + } + }, + ); + }, + ); + } + #[test] fn test_dynamic_delay_jobs_stop_after_drop() { - let pool = Arc::new(ScheduledThreadPool::new(TEST_TASKS)); - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); + test_jobs_stop_after_drop( + |pool: &Arc, tx: Sender, rx2: Receiver<()>| { + let mut pool2 = Some(pool.clone()); + let mut i = 0i32; + pool.execute_with_dynamic_delay(Duration::from_millis(500), move || { + i += 1; + tx.send(i).unwrap(); + rx2.recv().unwrap(); + if i == 2 { + drop(pool2.take().unwrap()); + } + Some(Duration::from_millis(500)) + }); + }, + ); + } - let mut pool2 = Some(pool.clone()); - let mut i = 0i32; - pool.execute_at_dynamic_rate( - Duration::from_millis(500), - move || { - i += 1; - tx.send(i).unwrap(); - rx2.recv().unwrap(); - if i == 2 { - drop(pool2.take().unwrap()); - } - Some(Duration::from_millis(500)) + #[test] + fn test_dynamic_rate_jobs_stop_after_drop() { + test_jobs_stop_after_drop( + |pool: &Arc, tx: Sender, rx2: Receiver<()>| { + let mut pool2 = Some(pool.clone()); + let mut i = 0i32; + pool.execute_at_dynamic_rate(Duration::from_millis(500), move || { + i += 1; + tx.send(i).unwrap(); + rx2.recv().unwrap(); + if i == 2 { + drop(pool2.take().unwrap()); + } + Some(Duration::from_millis(500)) + }); }, ); - drop(pool); + } - assert_eq!(Ok(1), rx.recv()); - tx2.send(()).unwrap(); - assert_eq!(Ok(2), rx.recv()); - tx2.send(()).unwrap(); - assert!(rx.recv().is_err()); + fn test_jobs_stop_after_drop(mut execute_fn: F) + where + F: FnMut(&Arc, Sender, Receiver<()>) -> (), + { + use super::OnPoolDropBehavior::*; + for drop_behavior in [CompletePendingScheduled, DiscardPendingScheduled] { + let pool = Arc::new( + ScheduledThreadPool::builder() + .on_drop_behavior(drop_behavior) + .build_with_num_threads(TEST_TASKS), + ); + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + + // Run the provided function that executes something on the pool + execute_fn(&pool, tx, rx2); + + // Immediately drop the reference to the pool we have here after the + // job has been scheduled + drop(pool); + + assert_eq!(Ok(1), rx.recv()); + tx2.send(()).unwrap(); + assert_eq!(Ok(2), rx.recv()); + tx2.send(()).unwrap(); + assert!(rx.recv().is_err()); + } } #[test]