diff --git a/src/collections/binary_heap.rs b/src/collections/binary_heap.rs index c8f1014c1..fa903121a 100644 --- a/src/collections/binary_heap.rs +++ b/src/collections/binary_heap.rs @@ -56,3 +56,65 @@ delegate_indexed_iterator! { } // `BinaryHeap` doesn't have a mutable `Iterator` + +/// Draining parallel iterator that moves out of a binary heap, +/// but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'a, T: Ord + Send> { + heap: &'a mut BinaryHeap, +} + +impl<'a, T: Ord + Send> ParallelDrainFull for &'a mut BinaryHeap { + type Iter = Drain<'a, T>; + type Item = T; + + fn par_drain(self) -> Self::Iter { + Drain { heap: self } + } +} + +impl<'a, T: Ord + Send> ParallelIterator for Drain<'a, T> { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl<'a, T: Ord + Send> IndexedParallelIterator for Drain<'a, T> { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.heap.len() + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + super::DrainGuard::new(self.heap) + .par_drain(..) + .with_producer(callback) + } +} + +impl<'a, T: Ord + Send> Drop for Drain<'a, T> { + fn drop(&mut self) { + if !self.heap.is_empty() { + // We must not have produced, so just call a normal drain to remove the items. + self.heap.drain(); + } + } +} diff --git a/src/collections/hash_map.rs b/src/collections/hash_map.rs index e9adbc12e..b657851d8 100644 --- a/src/collections/hash_map.rs +++ b/src/collections/hash_map.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::hash::{BuildHasher, Hash}; +use std::marker::PhantomData; use crate::iter::plumbing::*; use crate::iter::*; @@ -65,3 +66,31 @@ delegate_iterator! { IterMut<'a, K, V> => (&'a K, &'a mut V), impl<'a, K: Hash + Eq + Sync + 'a, V: Send + 'a> } + +/// Draining parallel iterator that moves out of a hash map, +/// but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'a, K: Hash + Eq + Send, V: Send> { + inner: vec::IntoIter<(K, V)>, + marker: PhantomData<&'a mut HashMap>, +} + +impl<'a, K: Hash + Eq + Send, V: Send, S: BuildHasher> ParallelDrainFull + for &'a mut HashMap +{ + type Iter = Drain<'a, K, V>; + type Item = (K, V); + + fn par_drain(self) -> Self::Iter { + let vec: Vec<_> = self.drain().collect(); + Drain { + inner: vec.into_par_iter(), + marker: PhantomData, + } + } +} + +delegate_iterator! { + Drain<'_, K, V> => (K, V), + impl +} diff --git a/src/collections/hash_set.rs b/src/collections/hash_set.rs index bf43f89a3..b6ee1c110 100644 --- a/src/collections/hash_set.rs +++ b/src/collections/hash_set.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use std::hash::{BuildHasher, Hash}; +use std::marker::PhantomData; use crate::iter::plumbing::*; use crate::iter::*; @@ -51,3 +52,29 @@ delegate_iterator! { } // `HashSet` doesn't have a mutable `Iterator` + +/// Draining parallel iterator that moves out of a hash set, +/// but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'a, T: Hash + Eq + Send> { + inner: vec::IntoIter, + marker: PhantomData<&'a mut HashSet>, +} + +impl<'a, T: Hash + Eq + Send, S: BuildHasher> ParallelDrainFull for &'a mut HashSet { + type Iter = Drain<'a, T>; + type Item = T; + + fn par_drain(self) -> Self::Iter { + let vec: Vec<_> = self.drain().collect(); + Drain { + inner: vec.into_par_iter(), + marker: PhantomData, + } + } +} + +delegate_iterator! { + Drain<'_, T> => T, + impl +} diff --git a/src/collections/mod.rs b/src/collections/mod.rs index d7e4d201d..d9b7988da 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -28,3 +28,57 @@ pub mod hash_map; pub mod hash_set; pub mod linked_list; pub mod vec_deque; + +use self::drain_guard::DrainGuard; + +mod drain_guard { + use crate::iter::ParallelDrainRange; + use std::mem; + use std::ops::RangeBounds; + + /// A proxy for draining a collection by converting to a `Vec` and back. + /// + /// This is used for draining `BinaryHeap` and `VecDeque`, which both have + /// zero-allocation conversions to/from `Vec`, though not zero-cost: + /// - `BinaryHeap` will heapify from `Vec`, but at least that will be empty. + /// - `VecDeque` has to shift items to offset 0 when converting to `Vec`. + #[allow(missing_debug_implementations)] + pub(super) struct DrainGuard<'a, T, C: From>> { + collection: &'a mut C, + vec: Vec, + } + + impl<'a, T, C> DrainGuard<'a, T, C> + where + C: Default + From>, + Vec: From, + { + pub(super) fn new(collection: &'a mut C) -> Self { + Self { + // Temporarily steal the inner `Vec` so we can drain in place. + vec: Vec::from(mem::replace(collection, C::default())), + collection, + } + } + } + + impl<'a, T, C: From>> Drop for DrainGuard<'a, T, C> { + fn drop(&mut self) { + // Restore the collection from the `Vec` with its original capacity. + *self.collection = C::from(mem::replace(&mut self.vec, Vec::new())); + } + } + + impl<'a, T, C> ParallelDrainRange for &'a mut DrainGuard<'_, T, C> + where + T: Send, + C: From>, + { + type Iter = crate::vec::Drain<'a, T>; + type Item = T; + + fn par_drain>(self, range: R) -> Self::Iter { + self.vec.par_drain(range) + } + } +} diff --git a/src/collections/vec_deque.rs b/src/collections/vec_deque.rs index 54494c471..f87ce6b18 100644 --- a/src/collections/vec_deque.rs +++ b/src/collections/vec_deque.rs @@ -3,9 +3,11 @@ //! unless you have need to name one of the iterator types. use std::collections::VecDeque; +use std::ops::{Range, RangeBounds}; use crate::iter::plumbing::*; use crate::iter::*; +use crate::math::simplify_range; use crate::slice; use crate::vec; @@ -16,9 +18,15 @@ pub struct IntoIter { inner: vec::IntoIter, } -into_par_vec! { - VecDeque => IntoIter, - impl +impl IntoParallelIterator for VecDeque { + type Item = T; + type Iter = IntoIter; + + fn into_par_iter(self) -> Self::Iter { + // NOTE: requires data movement if the deque doesn't start at offset 0. + let inner = Vec::from(self).into_par_iter(); + IntoIter { inner } + } } delegate_indexed_iterator! { @@ -79,3 +87,73 @@ delegate_indexed_iterator! { IterMut<'a, T> => &'a mut T, impl<'a, T: Send + 'a> } + +/// Draining parallel iterator that moves a range out of a double-ended queue, +/// but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'a, T: Send> { + deque: &'a mut VecDeque, + range: Range, + orig_len: usize, +} + +impl<'a, T: Send> ParallelDrainRange for &'a mut VecDeque { + type Iter = Drain<'a, T>; + type Item = T; + + fn par_drain>(self, range: R) -> Self::Iter { + Drain { + orig_len: self.len(), + range: simplify_range(range, self.len()), + deque: self, + } + } +} + +impl<'a, T: Send> ParallelIterator for Drain<'a, T> { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.range.len() + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + // NOTE: requires data movement if the deque doesn't start at offset 0. + super::DrainGuard::new(self.deque) + .par_drain(self.range.clone()) + .with_producer(callback) + } +} + +impl<'a, T: Send> Drop for Drain<'a, T> { + fn drop(&mut self) { + if self.deque.len() != self.orig_len - self.range.len() { + // We must not have produced, so just call a normal drain to remove the items. + assert_eq!(self.deque.len(), self.orig_len); + self.deque.drain(self.range.clone()); + } + } +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index ff4c5dd82..eabc775da 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -84,7 +84,7 @@ use self::private::Try; pub use either::Either; use std::cmp::{self, Ordering}; use std::iter::{Product, Sum}; -use std::ops::Fn; +use std::ops::{Fn, RangeBounds}; pub mod plumbing; @@ -2845,6 +2845,124 @@ where I: IntoParallelIterator; } +/// `ParallelDrainFull` creates a parallel iterator that moves all items +/// from a collection while retaining the original capacity. +/// +/// Types which are indexable typically implement [`ParallelDrainRange`] +/// instead, where you can drain fully with `par_drain(..)`. +/// +/// [`ParallelDrainRange`]: trait.ParallelDrainRange.html +pub trait ParallelDrainFull { + /// The draining parallel iterator type that will be created. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + /// This is usually the same as `IntoParallelIterator::Item`. + type Item: Send; + + /// Returns a draining parallel iterator over an entire collection. + /// + /// When the iterator is dropped, all items are removed, even if the + /// iterator was not fully consumed. If the iterator is leaked, for example + /// using `std::mem::forget`, it is unspecified how many items are removed. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// use std::collections::{BinaryHeap, HashSet}; + /// + /// let squares: HashSet = (0..10).map(|x| x * x).collect(); + /// + /// let mut heap: BinaryHeap<_> = squares.iter().copied().collect(); + /// assert_eq!( + /// // heaps are drained in arbitrary order + /// heap.par_drain() + /// .inspect(|x| assert!(squares.contains(x))) + /// .count(), + /// squares.len(), + /// ); + /// assert!(heap.is_empty()); + /// assert!(heap.capacity() >= squares.len()); + /// ``` + fn par_drain(self) -> Self::Iter; +} + +/// `ParallelDrainRange` creates a parallel iterator that moves a range of items +/// from a collection while retaining the original capacity. +/// +/// Types which are not indexable may implement [`ParallelDrainFull`] instead. +/// +/// [`ParallelDrainFull`]: trait.ParallelDrainFull.html +pub trait ParallelDrainRange { + /// The draining parallel iterator type that will be created. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + /// This is usually the same as `IntoParallelIterator::Item`. + type Item: Send; + + /// Returns a draining parallel iterator over a range of the collection. + /// + /// When the iterator is dropped, all items in the range are removed, even + /// if the iterator was not fully consumed. If the iterator is leaked, for + /// example using `std::mem::forget`, it is unspecified how many items are + /// removed. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let squares: Vec = (0..10).map(|x| x * x).collect(); + /// + /// println!("RangeFull"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(..) + /// .eq(squares.par_iter().copied())); + /// assert!(vec.is_empty()); + /// assert!(vec.capacity() >= squares.len()); + /// + /// println!("RangeFrom"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(5..) + /// .eq(squares[5..].par_iter().copied())); + /// assert_eq!(&vec[..], &squares[..5]); + /// assert!(vec.capacity() >= squares.len()); + /// + /// println!("RangeTo"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(..5) + /// .eq(squares[..5].par_iter().copied())); + /// assert_eq!(&vec[..], &squares[5..]); + /// assert!(vec.capacity() >= squares.len()); + /// + /// println!("RangeToInclusive"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(..=5) + /// .eq(squares[..=5].par_iter().copied())); + /// assert_eq!(&vec[..], &squares[6..]); + /// assert!(vec.capacity() >= squares.len()); + /// + /// println!("Range"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(3..7) + /// .eq(squares[3..7].par_iter().copied())); + /// assert_eq!(&vec[..3], &squares[..3]); + /// assert_eq!(&vec[3..], &squares[7..]); + /// assert!(vec.capacity() >= squares.len()); + /// + /// println!("RangeInclusive"); + /// let mut vec = squares.clone(); + /// assert!(vec.par_drain(3..=7) + /// .eq(squares[3..=7].par_iter().copied())); + /// assert_eq!(&vec[..3], &squares[..3]); + /// assert_eq!(&vec[3..], &squares[8..]); + /// assert!(vec.capacity() >= squares.len()); + /// ``` + fn par_drain>(self, range: R) -> Self::Iter; +} + /// We hide the `Try` trait in a private module, as it's only meant to be a /// stable clone of the standard library's `Try` trait, as yet unstable. mod private { diff --git a/src/lib.rs b/src/lib.rs index 224aa1b2c..76432ed24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,7 @@ pub mod range_inclusive; pub mod result; pub mod slice; pub mod str; +pub mod string; pub mod vec; mod math; diff --git a/src/math.rs b/src/math.rs index 4968063fc..9de588965 100644 --- a/src/math.rs +++ b/src/math.rs @@ -1,3 +1,5 @@ +use std::ops::{Bound, Range, RangeBounds}; + /// Divide `n` by `divisor`, and round up to the nearest integer /// if not evenly divisable. #[inline] @@ -10,6 +12,30 @@ pub(super) fn div_round_up(n: usize, divisor: usize) -> usize { } } +/// Normalize arbitrary `RangeBounds` to a `Range` +pub(super) fn simplify_range(range: impl RangeBounds, len: usize) -> Range { + let start = match range.start_bound() { + Bound::Unbounded => 0, + Bound::Included(&i) if i <= len => i, + Bound::Excluded(&i) if i < len => i + 1, + bound => panic!("range start {:?} should be <= length {}", bound, len), + }; + let end = match range.end_bound() { + Bound::Unbounded => len, + Bound::Excluded(&i) if i <= len => i, + Bound::Included(&i) if i < len => i + 1, + bound => panic!("range end {:?} should be <= length {}", bound, len), + }; + if start > end { + panic!( + "range start {:?} should be <= range end {:?}", + range.start_bound(), + range.end_bound() + ); + } + start..end +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/prelude.rs b/src/prelude.rs index 348547514..6eaca06c1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -8,6 +8,8 @@ pub use crate::iter::IntoParallelIterator; pub use crate::iter::IntoParallelRefIterator; pub use crate::iter::IntoParallelRefMutIterator; pub use crate::iter::ParallelBridge; +pub use crate::iter::ParallelDrainFull; +pub use crate::iter::ParallelDrainRange; pub use crate::iter::ParallelExtend; pub use crate::iter::ParallelIterator; pub use crate::slice::ParallelSlice; diff --git a/src/string.rs b/src/string.rs new file mode 100644 index 000000000..91e69f9a3 --- /dev/null +++ b/src/string.rs @@ -0,0 +1,48 @@ +//! This module contains the parallel iterator types for owned strings +//! (`String`). You will rarely need to interact with it directly +//! unless you have need to name one of the iterator types. + +use crate::iter::plumbing::*; +use crate::math::simplify_range; +use crate::prelude::*; +use std::ops::{Range, RangeBounds}; + +impl<'a> ParallelDrainRange for &'a mut String { + type Iter = Drain<'a>; + type Item = char; + + fn par_drain>(self, range: R) -> Self::Iter { + Drain { + range: simplify_range(range, self.len()), + string: self, + } + } +} + +/// Draining parallel iterator that moves a range of characters out of a string, +/// but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'a> { + string: &'a mut String, + range: Range, +} + +impl<'a> ParallelIterator for Drain<'a> { + type Item = char; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + self.string[self.range.clone()] + .par_chars() + .drive_unindexed(consumer) + } +} + +impl<'a> Drop for Drain<'a> { + fn drop(&mut self) { + // Remove the drained range. + self.string.drain(self.range.clone()); + } +} diff --git a/src/vec.rs b/src/vec.rs index bfbf9395a..686673bc3 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -7,6 +7,12 @@ use crate::iter::plumbing::*; use crate::iter::*; +use crate::math::simplify_range; +use std::iter; +use std::mem; +use std::ops::{Range, RangeBounds}; +use std::ptr; +use std::slice; /// Parallel iterator that moves out of a vector. #[derive(Debug, Clone)] @@ -54,35 +60,123 @@ impl IndexedParallelIterator for IntoIter { where CB: ProducerCallback, { - // The producer will move or drop each item from its slice, effectively taking ownership of - // them. When we're done, the vector only needs to free its buffer. + // Drain every item, and then the vector only needs to free its buffer. + self.vec.par_drain(..).with_producer(callback) + } +} + +impl<'data, T: Send> ParallelDrainRange for &'data mut Vec { + type Iter = Drain<'data, T>; + type Item = T; + + fn par_drain>(self, range: R) -> Self::Iter { + Drain { + orig_len: self.len(), + range: simplify_range(range, self.len()), + vec: self, + } + } +} + +/// Draining parallel iterator that moves a range out of a vector, but keeps the total capacity. +#[derive(Debug)] +pub struct Drain<'data, T: Send> { + vec: &'data mut Vec, + range: Range, + orig_len: usize, +} + +impl<'data, T: Send> ParallelIterator for Drain<'data, T> { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl<'data, T: Send> IndexedParallelIterator for Drain<'data, T> { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.range.len() + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { unsafe { - // Make the vector forget about the actual items. - let len = self.vec.len(); - self.vec.set_len(0); + // Make the vector forget about the drained items, and temporarily the tail too. + let start = self.range.start; + self.vec.set_len(start); - // Get a correct borrow, then extend it to the original length. - let mut slice = self.vec.as_mut_slice(); - slice = std::slice::from_raw_parts_mut(slice.as_mut_ptr(), len); + // Get a correct borrow lifetime, then extend it to the original length. + let mut slice = &mut self.vec[start..]; + slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len()); - callback.callback(VecProducer { slice }) + // The producer will move or drop each item from the drained range. + callback.callback(DrainProducer::new(slice)) + } + } +} + +impl<'data, T: Send> Drop for Drain<'data, T> { + fn drop(&mut self) { + if self.range.len() > 0 { + let Range { start, end } = self.range; + if self.vec.len() != start { + // We must not have produced, so just call a normal drain to remove the items. + assert_eq!(self.vec.len(), self.orig_len); + self.vec.drain(start..end); + } else if end < self.orig_len { + // The producer was responsible for consuming the drained items. + // Move the tail items to their new place, then set the length to include them. + unsafe { + let ptr = self.vec.as_mut_ptr().add(start); + let tail_ptr = self.vec.as_ptr().add(end); + let tail_len = self.orig_len - end; + ptr::copy(tail_ptr, ptr, tail_len); + self.vec.set_len(start + tail_len); + } + } } } } /// //////////////////////////////////////////////////////////////////////// -struct VecProducer<'data, T: Send> { +pub(crate) struct DrainProducer<'data, T: Send> { slice: &'data mut [T], } -impl<'data, T: 'data + Send> Producer for VecProducer<'data, T> { +impl<'data, T: 'data + Send> DrainProducer<'data, T> { + /// Creates a draining producer, which *moves* items from the slice. + /// + /// Unsafe bacause `!Copy` data must not be read after the borrow is released. + pub(crate) unsafe fn new(slice: &'data mut [T]) -> Self { + DrainProducer { slice } + } +} + +impl<'data, T: 'data + Send> Producer for DrainProducer<'data, T> { type Item = T; type IntoIter = SliceDrain<'data, T>; fn into_iter(mut self) -> Self::IntoIter { // replace the slice so we don't drop it twice - let slice = std::mem::replace(&mut self.slice, &mut []); + let slice = mem::replace(&mut self.slice, &mut []); SliceDrain { iter: slice.iter_mut(), } @@ -90,25 +184,24 @@ impl<'data, T: 'data + Send> Producer for VecProducer<'data, T> { fn split_at(mut self, index: usize) -> (Self, Self) { // replace the slice so we don't drop it twice - let slice = std::mem::replace(&mut self.slice, &mut []); + let slice = mem::replace(&mut self.slice, &mut []); let (left, right) = slice.split_at_mut(index); - (VecProducer { slice: left }, VecProducer { slice: right }) + unsafe { (DrainProducer::new(left), DrainProducer::new(right)) } } } -impl<'data, T: 'data + Send> Drop for VecProducer<'data, T> { +impl<'data, T: 'data + Send> Drop for DrainProducer<'data, T> { fn drop(&mut self) { - SliceDrain { - iter: self.slice.iter_mut(), - }; + // use `Drop for [T]` + unsafe { ptr::drop_in_place(self.slice) }; } } /// //////////////////////////////////////////////////////////////////////// // like std::vec::Drain, without updating a source Vec -struct SliceDrain<'data, T> { - iter: std::slice::IterMut<'data, T>, +pub(crate) struct SliceDrain<'data, T> { + iter: slice::IterMut<'data, T>, } impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { @@ -116,19 +209,22 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { fn next(&mut self) -> Option { let ptr = self.iter.next()?; - Some(unsafe { std::ptr::read(ptr) }) + Some(unsafe { ptr::read(ptr) }) } fn size_hint(&self) -> (usize, Option) { - let len = self.len(); - (len, Some(len)) + self.iter.size_hint() + } + + fn count(self) -> usize { + self.iter.len() } } impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { fn next_back(&mut self) -> Option { let ptr = self.iter.next_back()?; - Some(unsafe { std::ptr::read(ptr) }) + Some(unsafe { ptr::read(ptr) }) } } @@ -138,12 +234,12 @@ impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { } } +impl<'data, T: 'data> iter::FusedIterator for SliceDrain<'data, T> {} + impl<'data, T: 'data> Drop for SliceDrain<'data, T> { fn drop(&mut self) { - for ptr in &mut self.iter { - unsafe { - std::ptr::drop_in_place(ptr); - } - } + // extract the iterator so we can use `Drop for [T]` + let iter = mem::replace(&mut self.iter, [].iter_mut()); + unsafe { ptr::drop_in_place(iter.into_slice()) }; } }