Skip to content

Commit

Permalink
Let the coordinator thread open buckets (#782)
Browse files Browse the repository at this point in the history
`Condvar::wait` is allowed to spuriously wake up at any time even
without another thread notifying it.  Currently, work buckets are opened
and closed under the assumption that all GC workers have parked.  Due to
spurious wake-ups, any GC worker can wake up at any time, and break the
assumption.

This PR makes the following changes:

1.  Only the coordinator can open more buckets (with the exception of
    `Prepare`).

2.  When all workers have parked, they notify the coordinator that all
    workers have parked, and wait for the coordinator to open buckets.

    -   Because of this, workers no longer report the "GC finished"
        event to the coordinator.  Now it is the coordinator that
        determines whether GC has finished.

3.  When all workers have parked, a boolean flag
    `WorkerMonitor::group_sleep` is set.  No GC workers can unpark until
    this flag is explicitly cleared by the coordinator.

    -   This flag makes the GC workers robust against spurious wake-ups.
        Even if any worker spuriously wakes up, it will find the
        `group_sleep` flag still set, and will wait again.

    -   Concretely, the flag is cleared if the coordinator adds more
        work packets to open buckets, or opens new buckets.  If no more
        work is available, GC finishes.

To implement those changes, some data structures are modified.

1.  The worker-to-coordinator channel is changed from `mpsc::Channel` to
    a custom `scheduler::controller::channel::Channel`.

    -   It is not a strict FIFO channel.  The coordinator always
        executes all pending coordinator work packets before handling
        the "all workers have parked" condition.

2.  Introduced a `WorkerMonitor` struct as the anything-to-workers
    notification channel.

    -   It encapsulate the existing `Mutex` and `Condvar` so that the
        coordinator or other workers can notify workers.

    -   It includes the `parked_workers` counter.  `WorkerGroup` is no
        longer responsible for recording parked workers.

    -   It includes the `group_sleep` boolean flag.

3.  Removed the `pending_coordinator_packets` counter.

Fixes: #778
  • Loading branch information
wks authored Apr 12, 2023
1 parent f8469b0 commit 8dd1b73
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 270 deletions.
157 changes: 71 additions & 86 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::{CoordinatorMessage, GCWork};
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;
use atomic::Ordering;

use self::channel::{Event, Receiver};

use super::{CoordinatorWork, GCWorkScheduler, GCWorker};

pub(crate) mod channel;

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
/// The reference to the MMTk instance.
Expand All @@ -24,18 +25,18 @@ pub struct GCController<VM: VMBinding> {
requester: Arc<GCRequester<VM>>,
/// The reference to the scheduler.
scheduler: Arc<GCWorkScheduler<VM>>,
/// The receiving end of the channel to get controller/coordinator message from workers.
receiver: Receiver<CoordinatorMessage<VM>>,
/// Receive coordinator work packets and notifications from GC workers through this.
receiver: Receiver<VM>,
/// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
coordinator_worker: GCWorker<VM>,
}

impl<VM: VMBinding> GCController<VM> {
pub fn new(
pub(crate) fn new(
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<CoordinatorMessage<VM>>,
receiver: Receiver<VM>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
Expand All @@ -62,29 +63,66 @@ impl<VM: VMBinding> GCController<VM> {
}
}

/// Process a message. Return true if the GC is finished.
fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
/// Find more work for workers to do. Return true if more work is available.
fn find_more_work_for_workers(&mut self) -> bool {
if self.scheduler.worker_group.has_designated_work() {
return true;
}

// See if any bucket has a sentinel.
if self.scheduler.schedule_sentinels() {
return true;
}

// Try to open new buckets.
if self.scheduler.update_buckets() {
return true;
}

// If all fo the above failed, it means GC has finished.
false
}

/// Reset the "all workers parked" state and resume workers.
fn reset_and_resume_workers(&mut self) {
self.receiver.reset_all_workers_parked();
self.scheduler.worker_monitor.notify_work_available(true);
debug!("Workers resumed");
}

/// Handle the "all workers have parked" event. Return true if GC is finished.
fn on_all_workers_parked(&mut self) -> bool {
assert!(self.scheduler.all_activated_buckets_are_empty());

let new_work_available = self.find_more_work_for_workers();

if new_work_available {
self.reset_and_resume_workers();
// If there is more work to do, GC has not finished.
return false;
}

assert!(self.scheduler.all_buckets_empty());

true
}

/// Process an event. Return true if the GC is finished.
fn process_event(&mut self, message: Event<VM>) -> bool {
match message {
CoordinatorMessage::Work(mut work) => {
Event::Work(mut work) => {
self.execute_coordinator_work(work.as_mut(), true);
false
}
CoordinatorMessage::Finish => {
// Quit only if all the buckets are empty.
// For concurrent GCs, the coordinator thread may receive this message when
// some buckets are still not empty. Under such case, the coordinator
// should ignore the message.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
}
Event::AllParked => self.on_all_workers_parked(),
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();
// Schedule collection.
self.initiate_coordinator_work(&mut ScheduleCollection, true);
self.execute_coordinator_work(&mut ScheduleCollection, true);

// Tell GC trigger that GC started - this happens after ScheduleCollection so we
// will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
Expand All @@ -95,30 +133,24 @@ impl<VM: VMBinding> GCController<VM> {
.policy
.on_gc_start(self.mmtk);

// Drain the message queue and execute coordinator work.
// React to worker-generated events until finished.
loop {
let message = self.receiver.recv().unwrap();
let finished = self.process_message(message);
let event = self.receiver.poll_event();
let finished = self.process_event(event);
if finished {
break;
}
}

// All GC workers must have parked by now.
debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping());
debug_assert!(!self.scheduler.worker_group.has_designated_work());
// Sometimes multiple finish messages will be sent. Skip them.
for message in self.receiver.try_iter() {
match message {
CoordinatorMessage::Work(_) => unreachable!(),
CoordinatorMessage::Finish => {}
}
}

{
// Note: GC workers may spuriously wake up, examining the states of work buckets and
// trying to open them. Use lock to ensure workers do not wake up when we deactivate
// buckets.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.deactivate_all();
}
// Deactivate all work buckets to prepare for the next GC.
// NOTE: There is no need to hold any lock.
// All GC workers are doing "group sleeping" now,
// so they will not wake up while we deactivate buckets.
self.scheduler.deactivate_all();

// Tell GC trigger that GC ended - this happens before EndOfGC where we resume mutators.
self.mmtk.plan.base().gc_trigger.policy.on_gc_end(self.mmtk);
Expand All @@ -132,67 +164,20 @@ impl<VM: VMBinding> GCController<VM> {
elapsed: gc_start.elapsed(),
};

// Note: We cannot use `initiate_coordinator_work` here. If we increment the
// `pending_coordinator_packets` counter when a worker spuriously wakes up, it may try to
// open new buckets and result in an assertion error.
// See: https://github.com/mmtk/mmtk-core/issues/770
//
// The `pending_coordinator_packets` counter and the `initiate_coordinator_work` function
// were introduced to prevent any buckets from being opened while `ScheduleCollection` or
// `StopMutators` is being executed. (See the doc comment of `initiate_coordinator_work`.)
// `EndOfGC` doesn't add any new work packets, therefore it does not need this layer of
// synchronization.
//
// FIXME: We should redesign the synchronization mechanism to properly address the opening
// condition of buckets. See: https://github.com/mmtk/mmtk-core/issues/774
end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
self.execute_coordinator_work(&mut end_of_gc, false);

self.scheduler.debug_assert_all_buckets_deactivated();
}

/// The controller uses this method to start executing a coordinator work immediately.
///
/// Note: GC workers will start executing work packets as soon as individual work packets
/// are added. If the coordinator work (such as `ScheduleCollection`) adds multiple work
/// packets into different buckets, workers may open subsequent buckets while the coordinator
/// work still has packets to be added to prior buckets. For this reason, we use the
/// `pending_coordinator_packets` to prevent the workers from opening any work buckets while
/// this coordinator work is being executed.
///
/// # Arguments
///
/// - `work`: The work to execute.
/// - `notify_workers`: Notify one worker after the work is finished. Useful for proceeding
/// to the next work bucket stage.
fn initiate_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
self.scheduler
.pending_coordinator_packets
.fetch_add(1, Ordering::SeqCst);

self.execute_coordinator_work(work, notify_workers)
}

fn execute_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler
.pending_coordinator_packets
.fetch_sub(1, Ordering::SeqCst);

if notify_workers {
// When a coordinator work finishes, there is a chance that all GC workers parked, and
// no work packets are added to any open buckets. We need to wake up one GC worker so
// that it can open more work buckets.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_monitor.1.notify_one();
self.reset_and_resume_workers();
};
}
}
114 changes: 114 additions & 0 deletions src/scheduler/controller/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::collections::VecDeque;

use super::*;

/// A one-way channel for workers to send coordinator packets and notifications to the controller.
struct Channel<VM: VMBinding> {
sync: Mutex<ChannelSync<VM>>,
cond: Condvar,
}

/// The synchronized parts of `Channel`.
struct ChannelSync<VM: VMBinding> {
/// Pending coordinator work packets.
coordinator_packets: VecDeque<Box<dyn CoordinatorWork<VM>>>,
/// Whether all workers have parked.
///
/// NOTE: This field is set to `true` by the last parked worker.
/// It is used to notify the coordinator about the event that all workers have parked.
/// To resume workers from "group sleeping", use `WorkerMonitor::notify_work_available`.
all_workers_parked: bool,
}

/// Each worker holds an instance of this.
///
/// It wraps a channel, and only allows workers to access it in expected ways.
pub struct Sender<VM: VMBinding> {
chan: Arc<Channel<VM>>,
}

impl<VM: VMBinding> Clone for Sender<VM> {
fn clone(&self) -> Self {
Self {
chan: self.chan.clone(),
}
}
}

impl<VM: VMBinding> Sender<VM> {
/// Send a coordinator work packet to the coordinator.
pub fn add_coordinator_work(&self, work: Box<dyn CoordinatorWork<VM>>) {
let mut sync = self.chan.sync.lock().unwrap();
sync.coordinator_packets.push_back(work);
debug!("A worker has sent a coordinator work packet.");
self.chan.cond.notify_one();
}

/// Notify the coordinator that all workers have parked.
pub fn notify_all_workers_parked(&self) {
let mut sync = self.chan.sync.lock().unwrap();
sync.all_workers_parked = true;
debug!("Notified the coordinator that all workers have parked.");
self.chan.cond.notify_one();
}
}

/// The coordinator holds an instance of this.
///
/// It wraps a channel, and only allows the coordinator to access it in expected ways.
pub struct Receiver<VM: VMBinding> {
chan: Arc<Channel<VM>>,
}

impl<VM: VMBinding> Receiver<VM> {
/// Get an event.
pub(super) fn poll_event(&self) -> Event<VM> {
let mut sync = self.chan.sync.lock().unwrap();
loop {
// Make sure the coordinator always sees packets before seeing "all parked".
if let Some(work) = sync.coordinator_packets.pop_front() {
debug!("Received a coordinator packet.");
return Event::Work(work);
}

if sync.all_workers_parked {
debug!("Observed all workers parked.");
return Event::AllParked;
}

sync = self.chan.cond.wait(sync).unwrap();
}
}

/// Reset the "all workers have parked" flag.
pub fn reset_all_workers_parked(&self) {
let mut sync = self.chan.sync.lock().unwrap();
sync.all_workers_parked = false;
debug!("The all_workers_parked state is reset.");
}
}

/// This type represents the events the `Receiver` observes.
pub(crate) enum Event<VM: VMBinding> {
/// Send a work-packet to the coordinator thread.
Work(Box<dyn CoordinatorWork<VM>>),
/// Notify the coordinator thread that all GC tasks are finished.
/// When sending this message, all the work buckets should be
/// empty, and all the workers should be parked.
AllParked,
}

/// Create a Sender-Receiver pair.
pub(crate) fn make_channel<VM: VMBinding>() -> (Sender<VM>, Receiver<VM>) {
let chan = Arc::new(Channel {
sync: Mutex::new(ChannelSync {
coordinator_packets: Default::default(),
all_workers_parked: false,
}),
cond: Default::default(),
});

let sender = Sender { chan: chan.clone() };
let receiver = Receiver { chan };
(sender, receiver)
}
5 changes: 3 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
// If the VM requires that only the coordinator thread can stop the world,
// we delegate the work to the coordinator.
if <E::VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() {
mmtk.scheduler
.add_coordinator_work(StopMutators::<E>::new(), worker);
worker
.sender
.add_coordinator_work(Box::new(StopMutators::<E>::new()));
return;
}

Expand Down
1 change: 0 additions & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod affinity;

#[allow(clippy::module_inception)]
mod scheduler;
pub(crate) use scheduler::CoordinatorMessage;
pub(crate) use scheduler::GCWorkScheduler;

mod stat;
Expand Down
Loading

0 comments on commit 8dd1b73

Please sign in to comment.