-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Let the coordinator thread open buckets #782
Merged
Merged
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
48c32b8
WIP: new controller/worker sync mechanism
wks fba3489
Move the controller channel to a dedicated module
wks 53b8896
Remove unused items
wks e01ff66
Comments, logging and style
wks 25f6f10
Revert file renaming for easier review
wks fe7e064
Use VecDeque instead of Vec
wks 541a284
Merge branch 'master' into coord-open-buckets
wks 4bd2943
Minor style changes
wks 93bb121
Move `WorkerGroup.parked_workers` to the monitor
wks 2988898
pub(crate) to pub
wks a08d3e0
Merge remote-tracking branch 'origin/master' into coord-open-buckets
wks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
use std::collections::VecDeque; | ||
|
||
use super::*; | ||
|
||
/// A one-way channel for workers to send coordinator packets and notifications to the controller. | ||
struct Channel<VM: VMBinding> { | ||
sync: Mutex<ChannelSync<VM>>, | ||
cond: Condvar, | ||
} | ||
|
||
/// The synchronized parts of `Channel`. | ||
struct ChannelSync<VM: VMBinding> { | ||
/// Pending coordinator work packets. | ||
coordinator_packets: VecDeque<Box<dyn CoordinatorWork<VM>>>, | ||
/// Whether all workers have parked. | ||
/// | ||
/// NOTE: This field is set to `true` by the last parked worker. | ||
/// It is used to notify the coordinator about the event that all workers have parked. | ||
/// To resume workers from "group sleeping", use `WorkerMonitor::notify_work_available`. | ||
all_workers_parked: bool, | ||
} | ||
|
||
/// Each worker holds an instance of this. | ||
/// | ||
/// It wraps a channel, and only allows workers to access it in expected ways. | ||
pub struct Sender<VM: VMBinding> { | ||
chan: Arc<Channel<VM>>, | ||
} | ||
|
||
impl<VM: VMBinding> Clone for Sender<VM> { | ||
fn clone(&self) -> Self { | ||
Self { | ||
chan: self.chan.clone(), | ||
} | ||
} | ||
} | ||
|
||
impl<VM: VMBinding> Sender<VM> { | ||
/// Send a coordinator work packet to the coordinator. | ||
pub fn add_coordinator_work(&self, work: Box<dyn CoordinatorWork<VM>>) { | ||
let mut sync = self.chan.sync.lock().unwrap(); | ||
sync.coordinator_packets.push_back(work); | ||
debug!("A worker has sent a coordinator work packet."); | ||
self.chan.cond.notify_one(); | ||
} | ||
|
||
/// Notify the coordinator that all workers have parked. | ||
pub fn notify_all_workers_parked(&self) { | ||
let mut sync = self.chan.sync.lock().unwrap(); | ||
sync.all_workers_parked = true; | ||
debug!("Notified the coordinator that all workers have parked."); | ||
self.chan.cond.notify_one(); | ||
} | ||
} | ||
|
||
/// The coordinator holds an instance of this. | ||
/// | ||
/// It wraps a channel, and only allows the coordinator to access it in expected ways. | ||
pub struct Receiver<VM: VMBinding> { | ||
chan: Arc<Channel<VM>>, | ||
} | ||
|
||
impl<VM: VMBinding> Receiver<VM> { | ||
/// Get an event. | ||
pub(super) fn poll_event(&self) -> Event<VM> { | ||
let mut sync = self.chan.sync.lock().unwrap(); | ||
loop { | ||
// Make sure the coordinator always sees packets before seeing "all parked". | ||
if let Some(work) = sync.coordinator_packets.pop_front() { | ||
debug!("Received a coordinator packet."); | ||
return Event::Work(work); | ||
} | ||
|
||
if sync.all_workers_parked { | ||
debug!("Observed all workers parked."); | ||
return Event::AllParked; | ||
} | ||
|
||
sync = self.chan.cond.wait(sync).unwrap(); | ||
} | ||
} | ||
|
||
/// Reset the "all workers have parked" flag. | ||
pub fn reset_all_workers_parked(&self) { | ||
let mut sync = self.chan.sync.lock().unwrap(); | ||
sync.all_workers_parked = false; | ||
debug!("The all_workers_parked state is reset."); | ||
} | ||
} | ||
|
||
/// This type represents the events the `Receiver` observes. | ||
pub(crate) enum Event<VM: VMBinding> { | ||
/// Send a work-packet to the coordinator thread. | ||
Work(Box<dyn CoordinatorWork<VM>>), | ||
/// Notify the coordinator thread that all GC tasks are finished. | ||
/// When sending this message, all the work buckets should be | ||
/// empty, and all the workers should be parked. | ||
AllParked, | ||
} | ||
|
||
/// Create a Sender-Receiver pair. | ||
pub(crate) fn make_channel<VM: VMBinding>() -> (Sender<VM>, Receiver<VM>) { | ||
let chan = Arc::new(Channel { | ||
sync: Mutex::new(ChannelSync { | ||
coordinator_packets: Default::default(), | ||
all_workers_parked: false, | ||
}), | ||
cond: Default::default(), | ||
}); | ||
|
||
let sender = Sender { chan: chan.clone() }; | ||
let receiver = Receiver { chan }; | ||
(sender, receiver) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from a standard multi-producer single-consumer channel (https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The differences are
all_workers_parked
flag twice is the same as setting it once. It avoids the previous problem of sending multipleCoordinatorMessage::Finish
messages in a row, which forced the coordinator to "consume" extraneousFinish
messages after GC.all_workers_parked
flag. In this way, the coordinator is sure that no "pending coordinator packets" exist when opening new buckets.