Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the coordinator (controller) thread #1062

Closed
wants to merge 14 commits into from
3 changes: 0 additions & 3 deletions docs/header/mmtk.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ extern void mmtk_scan_region();
// Request MMTk to trigger a GC. Note that this may not actually trigger a GC
extern void mmtk_handle_user_collection_request(void* tls);

// Run the main loop for the GC controller thread. Does not return
extern void mmtk_start_control_collector(void* tls, void* worker);

// Run the main loop for a GC worker. Does not return
extern void mmtk_start_worker(void* tls, void* worker);

Expand Down
37 changes: 33 additions & 4 deletions src/global_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Instant;

use atomic::Atomic;
use atomic_refcell::AtomicRefCell;
use bytemuck::NoUninit;

/// This stores some global states for an MMTK instance.
/// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it.
Expand All @@ -17,7 +21,7 @@ pub struct GlobalState {
/// bindings to temporarily disable GC, at which point, we do not trigger GC even if the heap is full.
pub(crate) trigger_gc_when_heap_is_full: AtomicBool,
/// The current GC status.
pub(crate) gc_status: Mutex<GcStatus>,
pub(crate) gc_status: Atomic<GcStatus>,
/// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should
/// attempt to collect as much as we can.
pub(crate) emergency_collection: AtomicBool,
Expand All @@ -40,6 +44,12 @@ pub struct GlobalState {
pub(crate) stacks_prepared: AtomicBool,
/// A counter that keeps tracks of the number of bytes allocated since last stress test
pub(crate) allocation_bytes: AtomicUsize,
/// The time when the current GC started. Currently only used for logging.
/// Note that some (but not all) `GCTriggerPolicy` implementations do their own time tracking
/// independently for their own need.
/// This field only accessible in `ScheduleCollection` and `GCWorkScheduler::on_gc_finished`
/// which never happen at the same time, so `AtomicRefCell` is enough.
pub(crate) gc_start_time: AtomicRefCell<Option<Instant>>,
/// A counteer that keeps tracks of the number of bytes allocated by malloc
#[cfg(feature = "malloc_counted_size")]
pub(crate) malloc_bytes: AtomicUsize,
Expand Down Expand Up @@ -203,7 +213,7 @@ impl Default for GlobalState {
Self {
initialized: AtomicBool::new(false),
trigger_gc_when_heap_is_full: AtomicBool::new(true),
gc_status: Mutex::new(GcStatus::NotInGC),
gc_status: Atomic::new(GcStatus::NotInGC),
stacks_prepared: AtomicBool::new(false),
emergency_collection: AtomicBool::new(false),
user_triggered_collection: AtomicBool::new(false),
Expand All @@ -214,6 +224,7 @@ impl Default for GlobalState {
cur_collection_attempts: AtomicUsize::new(0),
scanned_stacks: AtomicUsize::new(0),
allocation_bytes: AtomicUsize::new(0),
gc_start_time: AtomicRefCell::new(None),
#[cfg(feature = "malloc_counted_size")]
malloc_bytes: AtomicUsize::new(0),
#[cfg(feature = "count_live_bytes_in_gc")]
Expand All @@ -222,9 +233,27 @@ impl Default for GlobalState {
}
}

#[derive(PartialEq)]
/// GC status.
///
/// - It starts in the `NotInGC` state.
/// - It enters `GcPrepare` when GC starts (i.e. when `ScheduleCollection` is executed).
/// - It enters `GcProper` when all mutators have stopped.
/// - It returns to `NotInGC` when GC finishes.
///
/// All states except `NotInGC` are considered "in GC".
///
/// The status is checked by GC workers. The last parked worker only tries to open new buckets
/// during GC.
///
/// The distinction between `GcPrepare` and `GcProper` is inherited from JikesRVM. Several
/// assertions in JikesRVM involve those two states.
#[derive(PartialEq, Clone, Copy, NoUninit, Debug)]
#[repr(u8)]
pub enum GcStatus {
/// Not in GC
NotInGC,
/// GC has started, but not all stacks have been scanned, yet.
GcPrepare,
/// GC has started, and all stacks have been scanned.
GcProper,
}
17 changes: 1 addition & 16 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::mmtk::MMTK;
use crate::plan::AllocationSemantics;
use crate::plan::{Mutator, MutatorContext};
use crate::scheduler::WorkBucketStage;
use crate::scheduler::{GCController, GCWork, GCWorker};
use crate::scheduler::{GCWork, GCWorker};
use crate::util::alloc::allocators::AllocatorSelector;
use crate::util::constants::{LOG_BYTES_IN_PAGE, MIN_OBJECT_SIZE};
use crate::util::heap::layout::vm_layout::vm_layout;
Expand Down Expand Up @@ -461,21 +461,6 @@ pub fn gc_poll<VM: VMBinding>(mmtk: &MMTK<VM>, tls: VMMutatorThread) {
}
}

/// Run the main loop for the GC controller thread. This method does not return.
///
/// Arguments:
/// * `tls`: The thread that will be used as the GC controller.
/// * `gc_controller`: The execution context of the GC controller threa.
/// It is the `GCController` passed to `Collection::spawn_gc_thread`.
/// * `mmtk`: A reference to an MMTk instance.
pub fn start_control_collector<VM: VMBinding>(
_mmtk: &'static MMTK<VM>,
tls: VMWorkerThread,
gc_controller: &mut GCController<VM>,
) {
gc_controller.run(tls);
}

/// Run the main loop of a GC worker. This method does not return.
///
/// Arguments:
Expand Down
18 changes: 10 additions & 8 deletions src/mmtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<VM: VMBinding> MMTK<VM> {

let state = Arc::new(GlobalState::default());

let gc_requester = Arc::new(GCRequester::new());
let gc_requester = Arc::new(GCRequester::new(scheduler.clone()));

let gc_trigger = Arc::new(GCTrigger::new(
options.clone(),
Expand Down Expand Up @@ -255,15 +255,15 @@ impl<VM: VMBinding> MMTK<VM> {
self.inside_sanity.load(Ordering::Relaxed)
}

pub(crate) fn set_gc_status(&self, s: GcStatus) {
let mut gc_status = self.state.gc_status.lock().unwrap();
if *gc_status == GcStatus::NotInGC {
pub(crate) fn set_gc_status(&self, new_status: GcStatus) {
let old_status = self.state.gc_status.swap(new_status, Ordering::SeqCst);
debug_assert_ne!(old_status, new_status);
if old_status == GcStatus::NotInGC {
self.state.stacks_prepared.store(false, Ordering::SeqCst);
// FIXME stats
self.stats.start_gc();
}
*gc_status = s;
if *gc_status == GcStatus::NotInGC {
if new_status == GcStatus::NotInGC {
// FIXME stats
if self.stats.get_gathering_stats() {
self.stats.end_gc();
Expand All @@ -273,12 +273,12 @@ impl<VM: VMBinding> MMTK<VM> {

/// Return true if a collection is in progress.
pub fn gc_in_progress(&self) -> bool {
*self.state.gc_status.lock().unwrap() != GcStatus::NotInGC
self.state.gc_status.load(Ordering::SeqCst) != GcStatus::NotInGC
}

/// Return true if a collection is in progress and past the preparatory stage.
pub fn gc_in_progress_proper(&self) -> bool {
*self.state.gc_status.lock().unwrap() == GcStatus::GcProper
self.state.gc_status.load(Ordering::SeqCst) == GcStatus::GcProper
}

/// Return true if the current GC is an emergency GC.
Expand Down Expand Up @@ -344,6 +344,8 @@ impl<VM: VMBinding> MMTK<VM> {
self.state
.internal_triggered_collection
.store(true, Ordering::Relaxed);
// TODO: The current `GCRequester::request()` is probably incorrect for internally triggered GC.
// Consider removing functions related to "internal triggered collection".
self.gc_requester.request();
}

Expand Down
91 changes: 63 additions & 28 deletions src/plan/gc_requester.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,101 @@
use crate::scheduler::GCWorkScheduler;
use crate::vm::VMBinding;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
use std::sync::{Arc, Mutex};

struct RequestSync {
request_count: isize,
last_request_count: isize,
/// Are the GC workers already aware that we requested a GC?
///
/// Mutators call `GCRequester::request()` to trigger GC. It communicates with workers by
/// calling `GCWorkScheduler::request_schedule_collection`. Under the hood, it sets a
/// synchronized variable and notifies one worker. Conceptually, it is sending a message to GC
/// workers.
///
/// The purpose of this variable is preventing the message above from being sent while GC
/// workers are still doing GC. This is mainly significant for concurrent GC. In the current
/// design (inherited from JikesRVM), `request_flag` is cleared when all mutators are
/// suspended, at which time GC is still in progress. If `GCRequester::request()` is called
/// after `request_flag` is cleared but before `on_gc_finished` is called, the mutator will not
/// send the message. But GC workers will check `request_flag` when GC finishes, and schedule
/// the next GC.
gc_scheduled: bool,
}

/// GC requester. This object allows other threads to request (trigger) GC,
/// and the GC coordinator thread waits for GC requests using this object.
/// This data structure lets mutators trigger GC.
pub struct GCRequester<VM: VMBinding> {
request_sync: Mutex<RequestSync>,
request_condvar: Condvar,
/// An atomic flag outside `RequestSync` so that mutators can check if GC has already been
/// requested in `poll` without acquiring the mutex.
request_flag: AtomicBool,
phantom: PhantomData<VM>,
}

// Clippy says we need this...
impl<VM: VMBinding> Default for GCRequester<VM> {
fn default() -> Self {
Self::new()
}
scheduler: Arc<GCWorkScheduler<VM>>,
}

impl<VM: VMBinding> GCRequester<VM> {
pub fn new() -> Self {
pub fn new(scheduler: Arc<GCWorkScheduler<VM>>) -> Self {
GCRequester {
request_sync: Mutex::new(RequestSync {
request_count: 0,
last_request_count: -1,
gc_scheduled: false,
}),
request_condvar: Condvar::new(),
request_flag: AtomicBool::new(false),
phantom: PhantomData,
scheduler,
}
}

/// Request a GC. Called by mutators when polling (during allocation) and when handling user
/// GC requests (e.g. `System.gc();` in Java).
pub fn request(&self) {
// Note: This is the double-checked locking algorithm.
// The load has the `Relaxed` order instead of `Acquire` because we are not doing lazy
// initialization here. We are only using this flag to remove successive requests.
if self.request_flag.load(Ordering::Relaxed) {
return;
}

let mut guard = self.request_sync.lock().unwrap();
if !self.request_flag.load(Ordering::Relaxed) {
self.request_flag.store(true, Ordering::Relaxed);
guard.request_count += 1;
self.request_condvar.notify_all();

let should_schedule_gc = self.try_schedule_collection(&mut guard);
if should_schedule_gc {
self.scheduler.request_schedule_collection();
// Note: We do not clear `request_flag` now. It will be cleared by `clear_request`
// after all mutators have stopped.
}
}
}

/// Clear the "GC requested" flag so that mutators can trigger the next GC.
/// Called by a GC worker when all mutators have come to a stop.
pub fn clear_request(&self) {
let guard = self.request_sync.lock().unwrap();
let _guard = self.request_sync.lock().unwrap();
self.request_flag.store(false, Ordering::Relaxed);
drop(guard);
}

pub fn wait_for_request(&self) {
/// Called by a GC worker when a GC has finished.
/// This will check the `request_flag` again and check if we should immediately schedule the
/// next GC. If we should, `gc_scheduled` will be set back to `true` and this function will
/// return `true`.
pub fn on_gc_finished(&self) -> bool {
let mut guard = self.request_sync.lock().unwrap();
guard.last_request_count += 1;
while guard.last_request_count == guard.request_count {
guard = self.request_condvar.wait(guard).unwrap();
guard.gc_scheduled = false;

self.try_schedule_collection(&mut guard)
}

/// Decide whether we should schedule a new collection. Will transition the state of
/// `gc_scheduled` from `false` to `true` if we should schedule a new collection.
///
/// Return `true` if the state transition happens.
fn try_schedule_collection(&self, sync: &mut RequestSync) -> bool {
// The time to schedule a collection is when `request_flag` is `true` but `gc_scheduled` is
// `false`. If `gc_scheduled` is `true` when GC is requested, we do nothing now. But when
// the currrent GC finishes, a GC worker will call `on_gc_finished` which clears the
// `gc_scheduled` flag, and checks the `request_flag` again to trigger the next GC.
if self.request_flag.load(Ordering::Relaxed) && !sync.gc_scheduled {
sync.gc_scheduled = true;
true
} else {
false
}
}
}
Loading
Loading