Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

native: Fix a race in select() #13512

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libnative/io/timer_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
let t = active.remove(i).unwrap();
ack.send(t);
}
_ => break
Err(..) => break
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,30 @@ impl rt::Runtime for Ops {
Err(task) => { cast::forget(task.wake()); }
}
} else {
let mut iter = task.make_selectable(times);
let iter = task.make_selectable(times);
let guard = (*me).lock.lock();
(*me).awoken = false;
let success = iter.all(|task| {
match f(task) {
Ok(()) => true,
Err(task) => {
cast::forget(task.wake());
false

// Apply the given closure to all of the "selectable tasks",
// bailing on the first one that produces an error. Note that
// care must be taken such that when an error is occurred, we
// may not own the task, so we may still have to wait for the
// task to become available. In other words, if task.wake()
// returns `None`, then someone else has ownership and we must
// wait for their signal.
match iter.map(f).filter_map(|a| a.err()).next() {
None => {}
Some(task) => {
match task.wake() {
Some(task) => {
cast::forget(task);
(*me).awoken = true;
}
None => {}
}
}
});
while success && !(*me).awoken {
}
while !(*me).awoken {
guard.wait();
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/libstd/io/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn reset_helper(w: ~Writer:Send,
{
let mut t = Local::borrow(None::<Task>);
// Be sure to flush any pending output from the writer
match f(t.get(), w) {
match f(&mut *t, w) {
Some(mut w) => {
drop(t);
// FIXME: is failing right here?
Expand Down Expand Up @@ -230,9 +230,7 @@ fn with_task_stdout(f: |&mut Writer| -> IoResult<()> ) {
// To protect against this, we do a little dance in which we
// temporarily take the task, swap the handles, put the task in TLS,
// and only then drop the previous handle.
let mut t = Local::borrow(None::<Task>);
let prev = replace(&mut t.get().stdout, my_stdout);
drop(t);
let prev = replace(&mut Local::borrow(None::<Task>).stdout, my_stdout);
drop(prev);
ret
}
Expand Down
3 changes: 1 addition & 2 deletions src/libstd/rt/local_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ pub unsafe fn local_free(ptr: *u8) {
}

pub fn live_allocs() -> *mut Box {
let mut task = Local::borrow(None::<Task>);
task.get().heap.live_allocs
Local::borrow(None::<Task>).heap.live_allocs
}

#[cfg(test)]
Expand Down
18 changes: 10 additions & 8 deletions src/libstd/rt/local_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#![allow(dead_code)]

use cast;
use ops::Drop;
use ops::{Drop, Deref, DerefMut};
use ptr::RawPtr;

#[cfg(windows)] // mingw-w32 doesn't like thread_local things
Expand Down Expand Up @@ -48,13 +48,15 @@ impl<T> Drop for Borrowed<T> {
}
}

impl<T> Borrowed<T> {
pub fn get<'a>(&'a mut self) -> &'a mut T {
unsafe {
let val_ptr: &mut ~T = cast::transmute(&mut self.val);
let val_ptr: &'a mut T = *val_ptr;
val_ptr
}
impl<T> Deref<T> for Borrowed<T> {
fn deref<'a>(&'a self) -> &'a T {
unsafe { &*(self.val as *T) }
}
}

impl<T> DerefMut<T> for Borrowed<T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
unsafe { &mut *(self.val as *mut T) }
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/libstd/rt/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ impl Task {
#[allow(unused_must_use)]
fn close_outputs() {
let mut task = Local::borrow(None::<Task>);
let stderr = task.get().stderr.take();
let stdout = task.get().stdout.take();
let stderr = task.stderr.take();
let stdout = task.stdout.take();
drop(task);
match stdout { Some(mut w) => { w.flush(); }, None => {} }
match stderr { Some(mut w) => { w.flush(); }, None => {} }
Expand Down Expand Up @@ -159,8 +159,7 @@ impl Task {
// be intertwined, and miraculously work for now...
let mut task = Local::borrow(None::<Task>);
let storage_map = {
let task = task.get();
let LocalStorage(ref mut optmap) = task.storage;
let &LocalStorage(ref mut optmap) = &mut task.storage;
optmap.take()
};
drop(task);
Expand Down Expand Up @@ -332,8 +331,7 @@ impl BlockedTask {
}

/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks>
{
pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
Expand Down
9 changes: 3 additions & 6 deletions src/libstd/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ pub fn try<T:Send>(f: proc():Send -> T) -> Result<T, ~Any:Send> {
pub fn with_task_name<U>(blk: |Option<&str>| -> U) -> U {
use rt::task::Task;

let mut task = Local::borrow(None::<Task>);
match task.get().name {
let task = Local::borrow(None::<Task>);
match task.name {
Some(ref name) => blk(Some(name.as_slice())),
None => blk(None)
}
Expand All @@ -276,11 +276,8 @@ pub fn deschedule() {

pub fn failing() -> bool {
//! True if the running task has failed

use rt::task::Task;

let mut local = Local::borrow(None::<Task>);
local.get().unwinder.unwinding()
Local::borrow(None::<Task>).unwinder.unwinding()
}

// The following 8 tests test the following 2^3 combinations:
Expand Down
56 changes: 56 additions & 0 deletions src/test/run-pass/issue-13494.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// This test may not always fail, but it can be flaky if the race it used to
// expose is still present.

extern crate green;
extern crate rustuv;
extern crate native;

#[start]
fn start(argc: int, argv: **u8) -> int {
green::start(argc, argv, rustuv::event_loop, main)
}

fn helper(rx: Receiver<Sender<()>>) {
for tx in rx.iter() {
let _ = tx.send_opt(());
}
}

fn test() {
let (tx, rx) = channel();
spawn(proc() { helper(rx) });
let (snd, rcv) = channel();
for _ in range(1, 100000) {
snd.send(1);
let (tx2, rx2) = channel();
tx.send(tx2);
select! {
() = rx2.recv() => (),
_ = rcv.recv() => ()
}
}
}

fn main() {
let (tx, rx) = channel();
spawn(proc() {
tx.send(test());
});
rx.recv();

let (tx, rx) = channel();
native::task::spawn(proc() {
tx.send(test());
});
rx.recv();
}