Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple parallel queue types and the Local trait #6626

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/libcore/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ pub fn log_type<T>(level: u32, object: &T) {
}

fn newsched_log_str(msg: ~str) {
use rt::task::Task;
use rt::local::Local;

unsafe {
match rt::local_services::unsafe_try_borrow_local_services() {
match Local::try_unsafe_borrow::<Task>() {
Some(local) => {
// Use the available logger
(*local).logger.log(Left(msg));
Expand Down
12 changes: 6 additions & 6 deletions src/libcore/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use cast;
use util;
use ops::Drop;
use kinds::Owned;
use rt::sched::Coroutine;
use rt::local_sched;
use rt::sched::{Scheduler, Coroutine};
use rt::local::Local;
use unstable::intrinsics::{atomic_xchg, atomic_load};
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<T> ChanOne<T> {
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
}
Expand Down Expand Up @@ -157,7 +157,7 @@ impl<T> PortOne<T> {
// XXX: Optimize this to not require the two context switches when data is available

// Switch to the scheduler to put the ~Task into the Packet state.
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
Expand All @@ -173,7 +173,7 @@ impl<T> PortOne<T> {
STATE_ONE => {
// Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}
_ => util::unreachable()
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<T> Drop for ChanOneHack<T> {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/libcore/rt/io/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

use option::{Option, Some, None};
use result::{Ok, Err};
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory,
use rt::rtio::{IoFactory, IoFactoryObject,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;

pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
Expand All @@ -32,7 +32,7 @@ impl TcpStream {
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
let stream = unsafe {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
rtdebug!("about to connect");
(*io).tcp_connect(addr)
};
Expand Down Expand Up @@ -88,7 +88,10 @@ pub struct TcpListener {

impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
let listener = unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
(*io).tcp_bind(addr)
};
match listener {
Ok(l) => {
Some(TcpListener {
Expand Down
118 changes: 118 additions & 0 deletions src/libcore/rt/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use option::{Option, Some, None};
use rt::sched::Scheduler;
use rt::task::Task;
use rt::local_ptr;
use rt::rtio::{EventLoop, IoFactoryObject};

pub trait Local {
fn put(value: ~Self);
fn take() -> ~Self;
fn exists() -> bool;
fn borrow(f: &fn(&mut Self));
unsafe fn unsafe_borrow() -> *mut Self;
unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
}

impl Local for Scheduler {
fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
fn exists() -> bool { local_ptr::exists() }
fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
}

impl Local for Task {
fn put(value: ~Task) { abort!("unimpl") }
fn take() -> ~Task { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut Task)) {
do Local::borrow::<Scheduler> |sched| {
match sched.current_task {
Some(~ref mut task) => {
f(&mut *task.task)
}
None => {
abort!("no scheduler")
}
}
}
}
unsafe fn unsafe_borrow() -> *mut Task {
match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => {
let s: *mut Task = &mut *task.task;
return s;
}
None => {
// Don't fail. Infinite recursion
abort!("no scheduler")
}
}
}
unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
if Local::exists::<Scheduler>() {
Some(Local::unsafe_borrow())
} else {
None
}
}
}

// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer
impl Local for IoFactoryObject {
fn put(value: ~IoFactoryObject) { abort!("unimpl") }
fn take() -> ~IoFactoryObject { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
let sched = Local::unsafe_borrow::<Scheduler>();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { abort!("unimpl") }
}

#[cfg(test)]
mod test {
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;

#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
}
}
Loading