Skip to content

Commit

Permalink
reactor: use AtomicTask::register to reduce unnecessary task clones (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored and carllerche committed Feb 18, 2019
1 parent 27a42b9 commit d1d72dc
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions tokio-reactor/src/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ struct Inner {
token: usize,
}

#[derive(PartialEq)]
enum Notify {
Yes,
No,
}

/// Tasks waiting on readiness notifications.
#[derive(Debug)]
struct Node {
Expand Down Expand Up @@ -276,7 +282,7 @@ impl Registration {
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Read, true, || task::current())
self.poll_ready(Direction::Read, Notify::Yes)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
Expand All @@ -291,7 +297,7 @@ impl Registration {
///
/// [`poll_read_ready`]: #method.poll_read_ready
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, false, || panic!())
self.poll_ready(Direction::Read, Notify::No)

}

Expand Down Expand Up @@ -328,7 +334,7 @@ impl Registration {
///
/// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Write, true, || task::current())
self.poll_ready(Direction::Write, Notify::Yes)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
Expand All @@ -343,12 +349,11 @@ impl Registration {
///
/// [`poll_write_ready`]: #method.poll_write_ready
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, false, || unreachable!())
self.poll_ready(Direction::Write, Notify::No)
}

fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F)
fn poll_ready(&self, direction: Direction, notify: Notify)
-> io::Result<Option<mio::Ready>>
where F: Fn() -> Task
{
let mut state = self.state.load(SeqCst);

Expand All @@ -363,17 +368,17 @@ impl Registration {
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(direction, notify, task);
return inner.poll_ready(direction, notify);
}
LOCKED => {
if !notify {
if let Notify::No = notify {
// Skip the notification tracking junk.
return Ok(None);
}

let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;

let task = task();
let task = task::current();

// Get the node
let mut n = node.take().unwrap_or_else(|| {
Expand Down Expand Up @@ -473,9 +478,8 @@ impl Inner {
inner.deregister_source(io)
}

fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F)
fn poll_ready(&self, direction: Direction, notify: Notify)
-> io::Result<Option<mio::Ready>>
where F: FnOnce() -> Task
{
if self.token == ERROR {
return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
Expand Down Expand Up @@ -503,13 +507,12 @@ impl Inner {
let mut ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst));

if ready.is_empty() && notify {
if ready.is_empty() && notify == Notify::Yes {
debug!("scheduling {:?} for: {}", direction, self.token);
let task = task();
// Update the task info
match direction {
Direction::Read => sched.reader.register_task(task),
Direction::Write => sched.writer.register_task(task),
Direction::Read => sched.reader.register(),
Direction::Write => sched.writer.register(),
}

// Try again
Expand Down

0 comments on commit d1d72dc

Please sign in to comment.