diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index fc45848b9d..e970899241 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -3,19 +3,20 @@ //! MMTk has many GC threads. There are many GC worker threads and one GC controller thread. //! The GC controller thread responds to GC requests and coordinates the workers to perform GC. -use std::sync::mpsc::Receiver; -use std::sync::Arc; +use std::sync::{Arc, Condvar, Mutex}; use crate::plan::gc_requester::GCRequester; use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection}; -use crate::scheduler::{CoordinatorMessage, GCWork}; use crate::util::VMWorkerThread; use crate::vm::VMBinding; use crate::MMTK; -use atomic::Ordering; + +use self::channel::{Event, Receiver}; use super::{CoordinatorWork, GCWorkScheduler, GCWorker}; +pub(crate) mod channel; + /// The thread local struct for the GC controller, the counterpart of `GCWorker`. pub struct GCController { /// The reference to the MMTk instance. @@ -24,18 +25,18 @@ pub struct GCController { requester: Arc>, /// The reference to the scheduler. scheduler: Arc>, - /// The receiving end of the channel to get controller/coordinator message from workers. - receiver: Receiver>, + /// Receive coordinator work packets and notifications from GC workers through this. + receiver: Receiver, /// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`. coordinator_worker: GCWorker, } impl GCController { - pub fn new( + pub(crate) fn new( mmtk: &'static MMTK, requester: Arc>, scheduler: Arc>, - receiver: Receiver>, + receiver: Receiver, coordinator_worker: GCWorker, ) -> Box> { Box::new(Self { @@ -62,21 +63,58 @@ impl GCController { } } - /// Process a message. Return true if the GC is finished. - fn process_message(&mut self, message: CoordinatorMessage) -> bool { + /// Find more work for workers to do. Return true if more work is available. + fn find_more_work_for_workers(&mut self) -> bool { + if self.scheduler.worker_group.has_designated_work() { + return true; + } + + // See if any bucket has a sentinel. + if self.scheduler.schedule_sentinels() { + return true; + } + + // Try to open new buckets. + if self.scheduler.update_buckets() { + return true; + } + + // If all fo the above failed, it means GC has finished. + false + } + + /// Reset the "all workers parked" state and resume workers. + fn reset_and_resume_workers(&mut self) { + self.receiver.reset_all_workers_parked(); + self.scheduler.worker_monitor.notify_work_available(true); + debug!("Workers resumed"); + } + + /// Handle the "all workers have parked" event. Return true if GC is finished. + fn on_all_workers_parked(&mut self) -> bool { + assert!(self.scheduler.all_activated_buckets_are_empty()); + + let new_work_available = self.find_more_work_for_workers(); + + if new_work_available { + self.reset_and_resume_workers(); + // If there is more work to do, GC has not finished. + return false; + } + + assert!(self.scheduler.all_buckets_empty()); + + true + } + + /// Process an event. Return true if the GC is finished. + fn process_event(&mut self, message: Event) -> bool { match message { - CoordinatorMessage::Work(mut work) => { + Event::Work(mut work) => { self.execute_coordinator_work(work.as_mut(), true); false } - CoordinatorMessage::Finish => { - // Quit only if all the buckets are empty. - // For concurrent GCs, the coordinator thread may receive this message when - // some buckets are still not empty. Under such case, the coordinator - // should ignore the message. - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty() - } + Event::AllParked => self.on_all_workers_parked(), } } @@ -84,7 +122,7 @@ impl GCController { pub fn do_gc_until_completion(&mut self) { let gc_start = std::time::Instant::now(); // Schedule collection. - self.initiate_coordinator_work(&mut ScheduleCollection, true); + self.execute_coordinator_work(&mut ScheduleCollection, true); // Tell GC trigger that GC started - this happens after ScheduleCollection so we // will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix) @@ -95,30 +133,24 @@ impl GCController { .policy .on_gc_start(self.mmtk); - // Drain the message queue and execute coordinator work. + // React to worker-generated events until finished. loop { - let message = self.receiver.recv().unwrap(); - let finished = self.process_message(message); + let event = self.receiver.poll_event(); + let finished = self.process_event(event); if finished { break; } } + + // All GC workers must have parked by now. + debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping()); debug_assert!(!self.scheduler.worker_group.has_designated_work()); - // Sometimes multiple finish messages will be sent. Skip them. - for message in self.receiver.try_iter() { - match message { - CoordinatorMessage::Work(_) => unreachable!(), - CoordinatorMessage::Finish => {} - } - } - { - // Note: GC workers may spuriously wake up, examining the states of work buckets and - // trying to open them. Use lock to ensure workers do not wake up when we deactivate - // buckets. - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - self.scheduler.deactivate_all(); - } + // Deactivate all work buckets to prepare for the next GC. + // NOTE: There is no need to hold any lock. + // All GC workers are doing "group sleeping" now, + // so they will not wake up while we deactivate buckets. + self.scheduler.deactivate_all(); // Tell GC trigger that GC ended - this happens before EndOfGC where we resume mutators. self.mmtk.plan.base().gc_trigger.policy.on_gc_end(self.mmtk); @@ -132,50 +164,11 @@ impl GCController { elapsed: gc_start.elapsed(), }; - // Note: We cannot use `initiate_coordinator_work` here. If we increment the - // `pending_coordinator_packets` counter when a worker spuriously wakes up, it may try to - // open new buckets and result in an assertion error. - // See: https://github.com/mmtk/mmtk-core/issues/770 - // - // The `pending_coordinator_packets` counter and the `initiate_coordinator_work` function - // were introduced to prevent any buckets from being opened while `ScheduleCollection` or - // `StopMutators` is being executed. (See the doc comment of `initiate_coordinator_work`.) - // `EndOfGC` doesn't add any new work packets, therefore it does not need this layer of - // synchronization. - // - // FIXME: We should redesign the synchronization mechanism to properly address the opening - // condition of buckets. See: https://github.com/mmtk/mmtk-core/issues/774 - end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); + self.execute_coordinator_work(&mut end_of_gc, false); self.scheduler.debug_assert_all_buckets_deactivated(); } - /// The controller uses this method to start executing a coordinator work immediately. - /// - /// Note: GC workers will start executing work packets as soon as individual work packets - /// are added. If the coordinator work (such as `ScheduleCollection`) adds multiple work - /// packets into different buckets, workers may open subsequent buckets while the coordinator - /// work still has packets to be added to prior buckets. For this reason, we use the - /// `pending_coordinator_packets` to prevent the workers from opening any work buckets while - /// this coordinator work is being executed. - /// - /// # Arguments - /// - /// - `work`: The work to execute. - /// - `notify_workers`: Notify one worker after the work is finished. Useful for proceeding - /// to the next work bucket stage. - fn initiate_coordinator_work( - &mut self, - work: &mut dyn CoordinatorWork, - notify_workers: bool, - ) { - self.scheduler - .pending_coordinator_packets - .fetch_add(1, Ordering::SeqCst); - - self.execute_coordinator_work(work, notify_workers) - } - fn execute_coordinator_work( &mut self, work: &mut dyn CoordinatorWork, @@ -183,16 +176,8 @@ impl GCController { ) { work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); - self.scheduler - .pending_coordinator_packets - .fetch_sub(1, Ordering::SeqCst); - if notify_workers { - // When a coordinator work finishes, there is a chance that all GC workers parked, and - // no work packets are added to any open buckets. We need to wake up one GC worker so - // that it can open more work buckets. - let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - self.scheduler.worker_monitor.1.notify_one(); + self.reset_and_resume_workers(); }; } } diff --git a/src/scheduler/controller/channel.rs b/src/scheduler/controller/channel.rs new file mode 100644 index 0000000000..548f2c6e38 --- /dev/null +++ b/src/scheduler/controller/channel.rs @@ -0,0 +1,114 @@ +use std::collections::VecDeque; + +use super::*; + +/// A one-way channel for workers to send coordinator packets and notifications to the controller. +struct Channel { + sync: Mutex>, + cond: Condvar, +} + +/// The synchronized parts of `Channel`. +struct ChannelSync { + /// Pending coordinator work packets. + coordinator_packets: VecDeque>>, + /// Whether all workers have parked. + /// + /// NOTE: This field is set to `true` by the last parked worker. + /// It is used to notify the coordinator about the event that all workers have parked. + /// To resume workers from "group sleeping", use `WorkerMonitor::notify_work_available`. + all_workers_parked: bool, +} + +/// Each worker holds an instance of this. +/// +/// It wraps a channel, and only allows workers to access it in expected ways. +pub struct Sender { + chan: Arc>, +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + chan: self.chan.clone(), + } + } +} + +impl Sender { + /// Send a coordinator work packet to the coordinator. + pub fn add_coordinator_work(&self, work: Box>) { + let mut sync = self.chan.sync.lock().unwrap(); + sync.coordinator_packets.push_back(work); + debug!("A worker has sent a coordinator work packet."); + self.chan.cond.notify_one(); + } + + /// Notify the coordinator that all workers have parked. + pub fn notify_all_workers_parked(&self) { + let mut sync = self.chan.sync.lock().unwrap(); + sync.all_workers_parked = true; + debug!("Notified the coordinator that all workers have parked."); + self.chan.cond.notify_one(); + } +} + +/// The coordinator holds an instance of this. +/// +/// It wraps a channel, and only allows the coordinator to access it in expected ways. +pub struct Receiver { + chan: Arc>, +} + +impl Receiver { + /// Get an event. + pub(super) fn poll_event(&self) -> Event { + let mut sync = self.chan.sync.lock().unwrap(); + loop { + // Make sure the coordinator always sees packets before seeing "all parked". + if let Some(work) = sync.coordinator_packets.pop_front() { + debug!("Received a coordinator packet."); + return Event::Work(work); + } + + if sync.all_workers_parked { + debug!("Observed all workers parked."); + return Event::AllParked; + } + + sync = self.chan.cond.wait(sync).unwrap(); + } + } + + /// Reset the "all workers have parked" flag. + pub fn reset_all_workers_parked(&self) { + let mut sync = self.chan.sync.lock().unwrap(); + sync.all_workers_parked = false; + debug!("The all_workers_parked state is reset."); + } +} + +/// This type represents the events the `Receiver` observes. +pub(crate) enum Event { + /// Send a work-packet to the coordinator thread. + Work(Box>), + /// Notify the coordinator thread that all GC tasks are finished. + /// When sending this message, all the work buckets should be + /// empty, and all the workers should be parked. + AllParked, +} + +/// Create a Sender-Receiver pair. +pub(crate) fn make_channel() -> (Sender, Receiver) { + let chan = Arc::new(Channel { + sync: Mutex::new(ChannelSync { + coordinator_packets: Default::default(), + all_workers_parked: false, + }), + cond: Default::default(), + }); + + let sender = Sender { chan: chan.clone() }; + let receiver = Receiver { chan }; + (sender, receiver) +} diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 6fe7ddc823..dee41d00fb 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -177,8 +177,9 @@ impl GCWork for StopMutators { // If the VM requires that only the coordinator thread can stop the world, // we delegate the work to the coordinator. if ::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() { - mmtk.scheduler - .add_coordinator_work(StopMutators::::new(), worker); + worker + .sender + .add_coordinator_work(Box::new(StopMutators::::new())); return; } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index fd793ffc25..cad3c9137c 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -4,7 +4,6 @@ pub mod affinity; #[allow(clippy::module_inception)] mod scheduler; -pub(crate) use scheduler::CoordinatorMessage; pub(crate) use scheduler::GCWorkScheduler; mod stat; diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index b6bbe45884..8dbf8ba390 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,6 +1,6 @@ use super::stat::SchedulerStat; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, ParkingGuard, ThreadId, WorkerGroup}; +use super::worker::{GCWorker, GCWorkerShared, ThreadId, WorkerGroup, WorkerMonitor}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; @@ -11,30 +11,17 @@ use crossbeam::deque::{self, Steal}; use enum_map::Enum; use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, Condvar, Mutex}; - -pub enum CoordinatorMessage { - /// Send a work-packet to the coordinator thread/ - Work(Box>), - /// Notify the coordinator thread that all GC tasks are finished. - /// When sending this message, all the work buckets should be - /// empty, and all the workers should be parked. - Finish, -} +use std::sync::Arc; pub struct GCWorkScheduler { /// Work buckets pub work_buckets: EnumMap>, /// Workers - pub worker_group: Arc>, + pub(crate) worker_group: Arc>, /// The shared part of the GC worker object of the controller thread coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization - pub worker_monitor: Arc<(Mutex<()>, Condvar)>, - /// Counter for pending coordinator messages. - pub(super) pending_coordinator_packets: AtomicUsize, + pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. affinity: AffinityKind, } @@ -46,27 +33,27 @@ unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { pub fn new(num_workers: usize, affinity: AffinityKind) -> Arc { - let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); + let worker_monitor: Arc = Arc::new(WorkerMonitor::new(num_workers)); let worker_group = WorkerGroup::new(num_workers); // Create work buckets for workers. let mut work_buckets = enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::VMRefClosure => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::VMRefForwarding => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone(), worker_group.clone()), + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::SoftRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::WeakRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::FinalRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::PhantomRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::VMRefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::SecondRoots => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::FinalizableForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::VMRefForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), }; // Set the open condition of each bucket. @@ -97,7 +84,6 @@ impl GCWorkScheduler { worker_group, coordinator_worker_shared, worker_monitor, - pending_coordinator_packets: AtomicUsize::new(0), affinity, }) } @@ -109,7 +95,7 @@ impl GCWorkScheduler { /// Create GC threads, including the controller thread and all workers. pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { // Create the communication channel. - let (sender, receiver) = channel::>(); + let (sender, receiver) = controller::channel::make_channel(); // Spawn the controller thread. let coordinator_worker = GCWorker::new( @@ -239,10 +225,6 @@ impl GCWorkScheduler { } fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool { - debug_assert!( - self.pending_coordinator_packets.load(Ordering::SeqCst) == 0, - "GCWorker attempted to open buckets when there are pending coordinator work packets" - ); buckets.iter().all(|&b| self.work_buckets[b].is_drained()) } @@ -251,7 +233,7 @@ impl GCWorkScheduler { } /// Schedule "sentinel" work packets for all activated buckets. - fn schedule_sentinels(&self) -> bool { + pub(crate) fn schedule_sentinels(&self) -> bool { let mut new_packets = false; for (id, work_bucket) in self.work_buckets.iter() { if work_bucket.is_activated() && work_bucket.maybe_schedule_sentinel() { @@ -268,7 +250,7 @@ impl GCWorkScheduler { /// No workers will be waked up by this function. The caller is responsible for that. /// /// Return true if there're any non-empty buckets updated. - fn update_buckets(&self) -> bool { + pub(crate) fn update_buckets(&self) -> bool { let mut buckets_updated = false; let mut new_packets = false; for i in 0..WorkBucketStage::LENGTH { @@ -324,17 +306,8 @@ impl GCWorkScheduler { } } - pub fn add_coordinator_work(&self, work: impl CoordinatorWork, worker: &GCWorker) { - self.pending_coordinator_packets - .fetch_add(1, Ordering::SeqCst); - worker - .sender - .send(CoordinatorMessage::Work(Box::new(work))) - .unwrap(); - } - /// Check if all the work buckets are empty - fn all_activated_buckets_are_empty(&self) -> bool { + pub(crate) fn all_activated_buckets_are_empty(&self) -> bool { for bucket in self.work_buckets.values() { if bucket.is_activated() && !bucket.is_drained() { return false; @@ -404,59 +377,48 @@ impl GCWorkScheduler { fn poll_slow(&self, worker: &GCWorker) -> Box> { // Note: The lock is released during `wait` in the loop. - let mut guard = self.worker_monitor.0.lock().unwrap(); - 'polling_loop: loop { + let mut sync = self.worker_monitor.sync.lock().unwrap(); + loop { // Retry polling if let Some(work) = self.poll_schedulable_work(worker) { return work; } - // Prepare to park this worker - let parking_guard = ParkingGuard::new(self.worker_group.as_ref()); - // If all workers are parked, try activate new buckets - if parking_guard.all_parked() { - // If there're any designated work, resume the workers and process them - if self.worker_group.has_designated_work() { - assert!( - worker.shared.designated_work.is_empty(), - "The last parked worker has designated work." - ); - self.worker_monitor.1.notify_all(); - // The current worker is going to wait, because the designated work is not for it. - } else if self.pending_coordinator_packets.load(Ordering::SeqCst) == 0 { - // See if any bucket has a sentinel. - if self.schedule_sentinels() { - // We're not going to sleep since new work packets are just scheduled. - break 'polling_loop; - } - // Try to open new buckets. - if self.update_buckets() { - // We're not going to sleep since a new bucket is just open. - break 'polling_loop; - } - debug_assert!(!self.worker_group.has_designated_work()); - // The current pause is finished if we can't open more buckets. - worker.sender.send(CoordinatorMessage::Finish).unwrap(); - } - // Otherwise, if there is still pending coordinator work, the last parked - // worker will wait on the monitor, too. The coordinator will notify a - // worker (maybe not the current one) once it finishes executing all - // coordinator work packets. + + // Park this worker + let all_parked = sync.inc_parked_workers(); + + if all_parked { + // If all workers are parked, enter "group sleeping" and notify controller. + sync.group_sleep = true; + debug!("Entered group-sleeping state"); + worker.sender.notify_all_workers_parked(); + } else { + // Otherwise wait until notified. + // Note: The condition for this `cond.wait` is "more work is available". + // If this worker spuriously wakes up, then in the next loop iteration, the + // `poll_schedulable_work` invocation above will fail, and the worker will reach + // here and wait again. + sync = self.worker_monitor.cond.wait(sync).unwrap(); } - // Wait - guard = self.worker_monitor.1.wait(guard).unwrap(); - // The worker is unparked here where `parking_guard` goes out of scope. - } - // We guarantee that we can at least fetch one packet when we reach here. - let work = self.poll_schedulable_work(worker).unwrap(); - // Optimize for the case that a newly opened bucket only has one packet. - // We only notify_all if there're more than one packets available. - if !self.all_activated_buckets_are_empty() { - // Have more jobs in this buckets. Notify other workers. - self.worker_monitor.1.notify_all(); + // Keep waiting if we have entered "group sleeping" state. + // The coordinator will let the worker leave the "group sleeping" state + // once the coordinator finished its work. + // + // Note: `wait_while` checks `sync.group_sleep` before actually starting to wait. + // This is expected because the controller may run so fast that it opened new buckets + // and unset `sync.group_sleep` before we even reached here. If that happens, waiting + // blindly will result in all workers sleeping forever. So we should always check + // `sync.group_sleep` before waiting. + sync = self + .worker_monitor + .cond + .wait_while(sync, |sync| sync.group_sleep) + .unwrap(); + + // Unpark this worker. + sync.dec_parked_workers(); } - // Return this packet and execute it. - work } pub fn enable_stat(&self) { @@ -483,8 +445,15 @@ impl GCWorkScheduler { mmtk.plan.base().gc_requester.clear_request(); let first_stw_bucket = &self.work_buckets[WorkBucketStage::first_stw_stage()]; debug_assert!(!first_stw_bucket.is_activated()); + // Note: This is the only place where a non-coordinator thread opens a bucket. + // If the `StopMutators` is executed by the coordinator thread, it will open + // the `Prepare` bucket and let workers start executing packets while the coordinator + // can still add more work packets to `Prepare`. However, since `Prepare` is the first STW + // bucket and only the coordinator can open any subsequent buckets, workers cannot execute + // work packets out of order. This is not generally true if we are not opening the first + // STW bucket. In the future, we should redesign the opening condition of work buckets to + // make the synchronization more robust, first_stw_bucket.activate(); - let _guard = self.worker_monitor.0.lock().unwrap(); - self.worker_monitor.1.notify_all(); + self.worker_monitor.notify_work_available(true); } } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 9b3c18dfeb..e2a2fb6288 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -1,10 +1,10 @@ -use super::worker::WorkerGroup; +use super::worker::WorkerMonitor; use super::*; use crate::vm::VMBinding; use crossbeam::deque::{Injector, Steal, Worker}; use enum_map::Enum; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; struct BucketQueue { queue: Injector>>, @@ -45,7 +45,7 @@ pub struct WorkBucket { active: AtomicBool, queue: BucketQueue, prioritized_queue: Option>, - monitor: Arc<(Mutex<()>, Condvar)>, + monitor: Arc, can_open: Option>, /// After this bucket is activated and all pending work packets (including the packets in this /// bucket) are drained, this work packet, if exists, will be added to this bucket. When this @@ -59,15 +59,10 @@ pub struct WorkBucket { /// recursively, such as ephemerons and Java-style SoftReference and finalizers. Sentinels /// can be used repeatedly to discover and process more such objects. sentinel: Mutex>>>, - group: Arc>, } impl WorkBucket { - pub fn new( - active: bool, - monitor: Arc<(Mutex<()>, Condvar)>, - group: Arc>, - ) -> Self { + pub(crate) fn new(active: bool, monitor: Arc) -> Self { Self { active: AtomicBool::new(active), queue: BucketQueue::new(), @@ -75,7 +70,6 @@ impl WorkBucket { monitor, can_open: None, sentinel: Mutex::new(None), - group, } } @@ -85,10 +79,7 @@ impl WorkBucket { return; } // Notify one if there're any parked workers. - if self.group.parked_workers() > 0 { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_one() - } + self.monitor.notify_work_available(false); } pub fn notify_all_workers(&self) { @@ -97,10 +88,7 @@ impl WorkBucket { return; } // Notify all if there're any parked workers. - if self.group.parked_workers() > 0 { - let _guard = self.monitor.0.lock().unwrap(); - self.monitor.1.notify_all() - } + self.monitor.notify_work_available(true); } pub fn is_activated(&self) -> bool { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 2a0f3611dd..1b2c91fa71 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -9,9 +9,8 @@ use atomic::Atomic; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; use crossbeam::deque::{self, Stealer}; use crossbeam::queue::ArrayQueue; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::Sender; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Condvar, Mutex}; /// Represents the ID of a GC worker thread. pub type ThreadId = usize; @@ -51,6 +50,85 @@ impl GCWorkerShared { } } +/// Used to synchronize mutually exclusive operations between workers and controller, +/// and also waking up workers when more work packets are available. +/// NOTE: All fields are public in order to support the complex control structure +/// in `GCWorkScheduler::poll_slow`. +pub(crate) struct WorkerMonitor { + pub sync: Mutex, + pub cond: Condvar, +} + +/// The synchronized part of `WorkerMonitor`. +pub(crate) struct WorkerMonitorSync { + /// The total number of workers. + worker_count: usize, + /// Number of parked workers. + parked_workers: usize, + /// This flag is set to true when all workers have parked, + /// and cleared if a work packet is added to an open bucket, or a new bucket is opened. + /// No workers can unpark while this flag is set. + /// + /// Note that testing this flag is *not* equivalent to testing `parked_workers == num_workers`. + /// This field is used to *receive notification* for the event of more work becoming available. + pub group_sleep: bool, +} + +impl WorkerMonitor { + pub fn new(worker_count: usize) -> Self { + Self { + sync: Mutex::new(WorkerMonitorSync { + worker_count, + parked_workers: 0, + group_sleep: false, + }), + cond: Default::default(), + } + } + + /// Wake up workers when more work packets are made available for workers. + /// This function will get workers out of the "group sleeping" state. + pub fn notify_work_available(&self, all: bool) { + let mut sync = self.sync.lock().unwrap(); + sync.group_sleep = false; + if all { + self.cond.notify_all(); + } else { + self.cond.notify_one(); + } + } + + /// Test if workers are in group sleeping state. Used for debugging. + pub fn debug_is_group_sleeping(&self) -> bool { + let sync = self.sync.lock().unwrap(); + sync.group_sleep + } +} + +impl WorkerMonitorSync { + /// Increase the packed-workers counter. + /// Called before a worker is parked. + /// + /// Return true if all the workers are parked. + pub fn inc_parked_workers(&mut self) -> bool { + let old = self.parked_workers; + debug_assert!(old < self.worker_count); + let new = old + 1; + self.parked_workers = new; + new == self.worker_count + } + + /// Decrease the packed-workers counter. + /// Called after a worker is resumed from the parked state. + pub fn dec_parked_workers(&mut self) { + let old = self.parked_workers; + debug_assert!(old <= self.worker_count); + debug_assert!(old > 0); + let new = old - 1; + self.parked_workers = new; + } +} + /// A GC worker. This part is privately owned by a worker thread. /// The GC controller also has an embedded `GCWorker` because it may also execute work packets. pub struct GCWorker { @@ -64,7 +142,7 @@ pub struct GCWorker { /// The copy context, used to implement copying GC. copy: GCWorkerCopyContext, /// The sending end of the channel to send message to the controller thread. - pub sender: Sender>, + pub(crate) sender: controller::channel::Sender, /// The reference to the MMTk instance. pub mmtk: &'static MMTK, /// True if this struct is the embedded GCWorker of the controller thread. @@ -94,12 +172,12 @@ impl GCWorkerShared { } impl GCWorker { - pub fn new( + pub(crate) fn new( mmtk: &'static MMTK, ordinal: ThreadId, scheduler: Arc>, is_coordinator: bool, - sender: Sender>, + sender: controller::channel::Sender, shared: Arc>, local_work_buffer: deque::Worker>>, ) -> Self { @@ -194,10 +272,9 @@ impl GCWorker { } /// A worker group to manage all the GC workers (except the coordinator worker). -pub struct WorkerGroup { +pub(crate) struct WorkerGroup { /// Shared worker data pub workers_shared: Vec>>, - parked_workers: AtomicUsize, unspawned_local_work_queues: Mutex>>>>, } @@ -218,7 +295,6 @@ impl WorkerGroup { Arc::new(Self { workers_shared, - parked_workers: Default::default(), unspawned_local_work_queues: Mutex::new(unspawned_local_work_queues), }) } @@ -227,7 +303,7 @@ impl WorkerGroup { pub fn spawn( &self, mmtk: &'static MMTK, - sender: Sender>, + sender: controller::channel::Sender, tls: VMThread, ) { let mut unspawned_local_work_queues = self.unspawned_local_work_queues.lock().unwrap(); @@ -252,33 +328,6 @@ impl WorkerGroup { self.workers_shared.len() } - /// Increase the packed-workers counter. - /// Called before a worker is parked. - /// - /// Return true if all the workers are parked. - pub fn inc_parked_workers(&self) -> bool { - let old = self.parked_workers.fetch_add(1, Ordering::SeqCst); - debug_assert!(old < self.worker_count()); - old + 1 == self.worker_count() - } - - /// Decrease the packed-workers counter. - /// Called after a worker is resumed from the parked state. - pub fn dec_parked_workers(&self) { - let old = self.parked_workers.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old <= self.worker_count()); - } - - /// Get the number of parked workers in the group - pub fn parked_workers(&self) -> usize { - self.parked_workers.load(Ordering::SeqCst) - } - - /// Check if all the workers are packed - pub fn all_parked(&self) -> bool { - self.parked_workers() == self.worker_count() - } - /// Return true if there're any pending designated work pub fn has_designated_work(&self) -> bool { self.workers_shared @@ -286,29 +335,3 @@ impl WorkerGroup { .any(|w| !w.designated_work.is_empty()) } } - -/// This ensures the worker always decrements the parked worker count on all control flow paths. -pub(crate) struct ParkingGuard<'a, VM: VMBinding> { - worker_group: &'a WorkerGroup, - all_parked: bool, -} - -impl<'a, VM: VMBinding> ParkingGuard<'a, VM> { - pub fn new(worker_group: &'a WorkerGroup) -> Self { - let all_parked = worker_group.inc_parked_workers(); - ParkingGuard { - worker_group, - all_parked, - } - } - - pub fn all_parked(&self) -> bool { - self.all_parked - } -} - -impl<'a, VM: VMBinding> Drop for ParkingGuard<'a, VM> { - fn drop(&mut self) { - self.worker_group.dec_parked_workers(); - } -}