Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let the coordinator thread open buckets #782

Merged
merged 11 commits into from
Apr 12, 2023
157 changes: 71 additions & 86 deletions src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::{CoordinatorMessage, GCWork};
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;
use atomic::Ordering;

use self::channel::{Event, Receiver};

use super::{CoordinatorWork, GCWorkScheduler, GCWorker};

pub(crate) mod channel;

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
/// The reference to the MMTk instance.
Expand All @@ -24,18 +25,18 @@ pub struct GCController<VM: VMBinding> {
requester: Arc<GCRequester<VM>>,
/// The reference to the scheduler.
scheduler: Arc<GCWorkScheduler<VM>>,
/// The receiving end of the channel to get controller/coordinator message from workers.
receiver: Receiver<CoordinatorMessage<VM>>,
/// Receive coordinator work packets and notifications from GC workers through this.
receiver: Receiver<VM>,
/// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
coordinator_worker: GCWorker<VM>,
}

impl<VM: VMBinding> GCController<VM> {
pub fn new(
pub(crate) fn new(
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<CoordinatorMessage<VM>>,
receiver: Receiver<VM>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
Expand All @@ -62,29 +63,66 @@ impl<VM: VMBinding> GCController<VM> {
}
}

/// Process a message. Return true if the GC is finished.
fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
/// Find more work for workers to do. Return true if more work is available.
fn find_more_work_for_workers(&mut self) -> bool {
if self.scheduler.worker_group.has_designated_work() {
return true;
}

// See if any bucket has a sentinel.
if self.scheduler.schedule_sentinels() {
return true;
}

// Try to open new buckets.
if self.scheduler.update_buckets() {
return true;
}

// If all fo the above failed, it means GC has finished.
false
}

/// Reset the "all workers parked" state and resume workers.
fn reset_and_resume_workers(&mut self) {
self.receiver.reset_all_workers_parked();
self.scheduler.worker_monitor.notify_work_available(true);
debug!("Workers resumed");
}

/// Handle the "all workers have parked" event. Return true if GC is finished.
fn on_all_workers_parked(&mut self) -> bool {
assert!(self.scheduler.all_activated_buckets_are_empty());

let new_work_available = self.find_more_work_for_workers();

if new_work_available {
self.reset_and_resume_workers();
// If there is more work to do, GC has not finished.
return false;
}

assert!(self.scheduler.all_buckets_empty());

true
}

/// Process an event. Return true if the GC is finished.
fn process_event(&mut self, message: Event<VM>) -> bool {
match message {
CoordinatorMessage::Work(mut work) => {
Event::Work(mut work) => {
self.execute_coordinator_work(work.as_mut(), true);
false
}
CoordinatorMessage::Finish => {
// Quit only if all the buckets are empty.
// For concurrent GCs, the coordinator thread may receive this message when
// some buckets are still not empty. Under such case, the coordinator
// should ignore the message.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
}
Event::AllParked => self.on_all_workers_parked(),
}
}

/// Coordinate workers to perform GC in response to a GC request.
pub fn do_gc_until_completion(&mut self) {
let gc_start = std::time::Instant::now();
// Schedule collection.
self.initiate_coordinator_work(&mut ScheduleCollection, true);
self.execute_coordinator_work(&mut ScheduleCollection, true);

// Tell GC trigger that GC started - this happens after ScheduleCollection so we
// will know what kind of GC this is (e.g. nursery vs mature in gen copy, defrag vs fast in Immix)
Expand All @@ -95,30 +133,24 @@ impl<VM: VMBinding> GCController<VM> {
.policy
.on_gc_start(self.mmtk);

// Drain the message queue and execute coordinator work.
// React to worker-generated events until finished.
loop {
let message = self.receiver.recv().unwrap();
let finished = self.process_message(message);
let event = self.receiver.poll_event();
let finished = self.process_event(event);
if finished {
break;
}
}

// All GC workers must have parked by now.
debug_assert!(self.scheduler.worker_monitor.debug_is_group_sleeping());
debug_assert!(!self.scheduler.worker_group.has_designated_work());
// Sometimes multiple finish messages will be sent. Skip them.
for message in self.receiver.try_iter() {
match message {
CoordinatorMessage::Work(_) => unreachable!(),
CoordinatorMessage::Finish => {}
}
}

{
// Note: GC workers may spuriously wake up, examining the states of work buckets and
// trying to open them. Use lock to ensure workers do not wake up when we deactivate
// buckets.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.deactivate_all();
}
// Deactivate all work buckets to prepare for the next GC.
// NOTE: There is no need to hold any lock.
// All GC workers are doing "group sleeping" now,
// so they will not wake up while we deactivate buckets.
self.scheduler.deactivate_all();

// Tell GC trigger that GC ended - this happens before EndOfGC where we resume mutators.
self.mmtk.plan.base().gc_trigger.policy.on_gc_end(self.mmtk);
Expand All @@ -132,67 +164,20 @@ impl<VM: VMBinding> GCController<VM> {
elapsed: gc_start.elapsed(),
};

// Note: We cannot use `initiate_coordinator_work` here. If we increment the
// `pending_coordinator_packets` counter when a worker spuriously wakes up, it may try to
// open new buckets and result in an assertion error.
// See: https://github.com/mmtk/mmtk-core/issues/770
//
// The `pending_coordinator_packets` counter and the `initiate_coordinator_work` function
// were introduced to prevent any buckets from being opened while `ScheduleCollection` or
// `StopMutators` is being executed. (See the doc comment of `initiate_coordinator_work`.)
// `EndOfGC` doesn't add any new work packets, therefore it does not need this layer of
// synchronization.
//
// FIXME: We should redesign the synchronization mechanism to properly address the opening
// condition of buckets. See: https://github.com/mmtk/mmtk-core/issues/774
end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
self.execute_coordinator_work(&mut end_of_gc, false);

self.scheduler.debug_assert_all_buckets_deactivated();
}

/// The controller uses this method to start executing a coordinator work immediately.
///
/// Note: GC workers will start executing work packets as soon as individual work packets
/// are added. If the coordinator work (such as `ScheduleCollection`) adds multiple work
/// packets into different buckets, workers may open subsequent buckets while the coordinator
/// work still has packets to be added to prior buckets. For this reason, we use the
/// `pending_coordinator_packets` to prevent the workers from opening any work buckets while
/// this coordinator work is being executed.
///
/// # Arguments
///
/// - `work`: The work to execute.
/// - `notify_workers`: Notify one worker after the work is finished. Useful for proceeding
/// to the next work bucket stage.
fn initiate_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
self.scheduler
.pending_coordinator_packets
.fetch_add(1, Ordering::SeqCst);

self.execute_coordinator_work(work, notify_workers)
}

fn execute_coordinator_work(
&mut self,
work: &mut dyn CoordinatorWork<VM>,
notify_workers: bool,
) {
work.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

self.scheduler
.pending_coordinator_packets
.fetch_sub(1, Ordering::SeqCst);

if notify_workers {
// When a coordinator work finishes, there is a chance that all GC workers parked, and
// no work packets are added to any open buckets. We need to wake up one GC worker so
// that it can open more work buckets.
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_monitor.1.notify_one();
self.reset_and_resume_workers();
};
}
}
114 changes: 114 additions & 0 deletions src/scheduler/controller/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::collections::VecDeque;

use super::*;

/// A one-way channel for workers to send coordinator packets and notifications to the controller.
struct Channel<VM: VMBinding> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from a standard multi-producer single-consumer channel (https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The differences are

  1. The message "all workers have parked" is idempotent, i.e. setting the all_workers_parked flag twice is the same as setting it once. It avoids the previous problem of sending multiple CoordinatorMessage::Finish messages in a row, which forced the coordinator to "consume" extraneous Finish messages after GC.
  2. The coordinator always processes all pending coordinator packets before processing the all_workers_parked flag. In this way, the coordinator is sure that no "pending coordinator packets" exist when opening new buckets.

sync: Mutex<ChannelSync<VM>>,
cond: Condvar,
}

/// The synchronized parts of `Channel`.
struct ChannelSync<VM: VMBinding> {
/// Pending coordinator work packets.
coordinator_packets: VecDeque<Box<dyn CoordinatorWork<VM>>>,
/// Whether all workers have parked.
///
/// NOTE: This field is set to `true` by the last parked worker.
/// It is used to notify the coordinator about the event that all workers have parked.
/// To resume workers from "group sleeping", use `WorkerMonitor::notify_work_available`.
all_workers_parked: bool,
}

/// Each worker holds an instance of this.
///
/// It wraps a channel, and only allows workers to access it in expected ways.
pub struct Sender<VM: VMBinding> {
chan: Arc<Channel<VM>>,
}

impl<VM: VMBinding> Clone for Sender<VM> {
fn clone(&self) -> Self {
Self {
chan: self.chan.clone(),
}
}
}

impl<VM: VMBinding> Sender<VM> {
/// Send a coordinator work packet to the coordinator.
pub fn add_coordinator_work(&self, work: Box<dyn CoordinatorWork<VM>>) {
let mut sync = self.chan.sync.lock().unwrap();
sync.coordinator_packets.push_back(work);
debug!("A worker has sent a coordinator work packet.");
self.chan.cond.notify_one();
}

/// Notify the coordinator that all workers have parked.
pub fn notify_all_workers_parked(&self) {
let mut sync = self.chan.sync.lock().unwrap();
sync.all_workers_parked = true;
debug!("Notified the coordinator that all workers have parked.");
self.chan.cond.notify_one();
}
}

/// The coordinator holds an instance of this.
///
/// It wraps a channel, and only allows the coordinator to access it in expected ways.
pub struct Receiver<VM: VMBinding> {
chan: Arc<Channel<VM>>,
}

impl<VM: VMBinding> Receiver<VM> {
/// Get an event.
pub(super) fn poll_event(&self) -> Event<VM> {
let mut sync = self.chan.sync.lock().unwrap();
loop {
// Make sure the coordinator always sees packets before seeing "all parked".
if let Some(work) = sync.coordinator_packets.pop_front() {
debug!("Received a coordinator packet.");
return Event::Work(work);
}

if sync.all_workers_parked {
debug!("Observed all workers parked.");
return Event::AllParked;
}

sync = self.chan.cond.wait(sync).unwrap();
}
}

/// Reset the "all workers have parked" flag.
pub fn reset_all_workers_parked(&self) {
let mut sync = self.chan.sync.lock().unwrap();
sync.all_workers_parked = false;
debug!("The all_workers_parked state is reset.");
}
}

/// This type represents the events the `Receiver` observes.
pub(crate) enum Event<VM: VMBinding> {
/// Send a work-packet to the coordinator thread.
Work(Box<dyn CoordinatorWork<VM>>),
/// Notify the coordinator thread that all GC tasks are finished.
/// When sending this message, all the work buckets should be
/// empty, and all the workers should be parked.
AllParked,
}

/// Create a Sender-Receiver pair.
pub(crate) fn make_channel<VM: VMBinding>() -> (Sender<VM>, Receiver<VM>) {
let chan = Arc::new(Channel {
sync: Mutex::new(ChannelSync {
coordinator_packets: Default::default(),
all_workers_parked: false,
}),
cond: Default::default(),
});

let sender = Sender { chan: chan.clone() };
let receiver = Receiver { chan };
(sender, receiver)
}
5 changes: 3 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ impl<E: ProcessEdgesWork> GCWork<E::VM> for StopMutators<E> {
// If the VM requires that only the coordinator thread can stop the world,
// we delegate the work to the coordinator.
if <E::VM as VMBinding>::VMCollection::COORDINATOR_ONLY_STW && !worker.is_coordinator() {
mmtk.scheduler
.add_coordinator_work(StopMutators::<E>::new(), worker);
worker
.sender
.add_coordinator_work(Box::new(StopMutators::<E>::new()));
return;
}

Expand Down
1 change: 0 additions & 1 deletion src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod affinity;

#[allow(clippy::module_inception)]
mod scheduler;
pub(crate) use scheduler::CoordinatorMessage;
pub(crate) use scheduler::GCWorkScheduler;

mod stat;
Expand Down
Loading