diff --git a/src/libcore/core.rc b/src/libcore/core.rc index 7b12bae746716..026f49cac45b1 100644 --- a/src/libcore/core.rc +++ b/src/libcore/core.rc @@ -243,7 +243,8 @@ pub mod unicode; #[path = "num/cmath.rs"] pub mod cmath; pub mod stackwalk; - +#[path = "rt/mod.rs"] +pub mod rt; // A curious inner-module that's not exported that contains the binding // 'core' so that macro-expanded references to core::error and such diff --git a/src/libcore/option.rs b/src/libcore/option.rs index e0393fdf5e35a..6a38eff0343bd 100644 --- a/src/libcore/option.rs +++ b/src/libcore/option.rs @@ -130,6 +130,27 @@ pub pure fn get_ref(opt: &r/Option) -> &r/T { } } +pub pure fn get_mut_ref(opt: &r/mut Option) -> &r/mut T { + /*! + Gets a mutable reference to the value inside an option. + + # Failure + + Fails if the value equals `None` + + # Safety note + + In general, because this function may fail, its use is discouraged + (calling `get` on `None` is akin to dereferencing a null pointer). + Instead, prefer to use pattern matching and handle the `None` + case explicitly. + */ + match *opt { + Some(ref mut x) => x, + None => fail!(~"option::get_mut_ref none") + } +} + #[inline(always)] pub pure fn map(opt: &r/Option, f: &fn(x: &r/T) -> U) -> Option { //! Maps a `some` value by reference from one type to another @@ -364,6 +385,23 @@ pub impl Option { #[inline(always)] pure fn get_ref(&self) -> &self/T { get_ref(self) } + /** + Gets a mutable reference to the value inside an option. + + # Failure + + Fails if the value equals `None` + + # Safety note + + In general, because this function may fail, its use is discouraged + (calling `get` on `None` is akin to dereferencing a null pointer). + Instead, prefer to use pattern matching and handle the `None` + case explicitly. + */ + #[inline(always)] + pure fn get_mut_ref(&mut self) -> &self/mut T { get_mut_ref(self) } + /** * Gets the value out of an option without copying. * diff --git a/src/libcore/rt/context.rs b/src/libcore/rt/context.rs new file mode 100644 index 0000000000000..de96a7d17932b --- /dev/null +++ b/src/libcore/rt/context.rs @@ -0,0 +1,156 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use super::stack::StackSegment; +use libc::c_void; +use cast::{transmute, transmute_mut_unsafe, + transmute_region, transmute_mut_region}; + +// XXX: Registers is boxed so that it is 16-byte aligned, for storing +// SSE regs. It would be marginally better not to do this. In C++ we +// use an attribute on a struct. +pub struct Context(~Registers); + +pub impl Context { + static fn empty() -> Context { + Context(new_regs()) + } + + /// Create a new context that will resume execution by running ~fn() + /// # Safety Note + /// The `start` closure must remain valid for the life of the Task + static fn new(start: &~fn(), stack: &mut StackSegment) -> Context { + + // The C-ABI function that is the task entry point + extern fn task_start_wrapper(f: &~fn()) { (*f)() } + + let fp: *c_void = task_start_wrapper as *c_void; + let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; + let sp: *uint = stack.end(); + let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; + + // Save and then immediately load the current context, + // which we will then modify to call the given function when restored + let mut regs = new_regs(); + unsafe { + swap_registers(transmute_mut_region(&mut *regs), + transmute_region(&*regs)) + }; + + initialize_call_frame(&mut *regs, fp, argp, sp); + + return Context(regs); + } + + static fn swap(out_context: &mut Context, in_context: &Context) { + let out_regs: &mut Registers = match out_context { + &Context(~ref mut r) => r + }; + let in_regs: &Registers = match in_context { + &Context(~ref r) => r + }; + + unsafe { swap_registers(out_regs, in_regs) }; + } +} + +extern { + fn swap_registers(out_regs: *mut Registers, in_regs: *Registers); +} + +// Definitions of these registers are in rt/arch/x86_64/regs.h +#[cfg(target_arch = "x86_64")] +type Registers = [uint * 22]; + +#[cfg(target_arch = "x86_64")] +fn new_regs() -> ~Registers { ~[0, .. 22] } + +#[cfg(target_arch = "x86_64")] +fn initialize_call_frame(regs: &mut Registers, + fptr: *c_void, arg: *c_void, sp: *mut uint) { + + // Redefinitions from regs.h + const RUSTRT_ARG0: uint = 3; + const RUSTRT_RSP: uint = 1; + const RUSTRT_IP: uint = 8; + const RUSTRT_RBP: uint = 2; + + let sp = align_down(sp); + let sp = mut_offset(sp, -1); + + // The final return address. 0 indicates the bottom of the stack + unsafe { *sp = 0; } + + rtdebug!("creating call frame"); + rtdebug!("fptr %x", fptr as uint); + rtdebug!("arg %x", arg as uint); + rtdebug!("sp %x", sp as uint); + + regs[RUSTRT_ARG0] = arg as uint; + regs[RUSTRT_RSP] = sp as uint; + regs[RUSTRT_IP] = fptr as uint; + + // Last base pointer on the stack should be 0 + regs[RUSTRT_RBP] = 0; +} + +#[cfg(target_arch = "x86")] +struct Registers { + eax: u32, ebx: u32, ecx: u32, edx: u32, + ebp: u32, esi: u32, edi: u32, esp: u32, + cs: u16, ds: u16, ss: u16, es: u16, fs: u16, gs: u16, + eflags: u32, eip: u32 +} + +#[cfg(target_arch = "x86")] +fn new_regs() -> ~Registers { + ~Registers { + eax: 0, ebx: 0, ecx: 0, edx: 0, + ebp: 0, esi: 0, edi: 0, esp: 0, + cs: 0, ds: 0, ss: 0, es: 0, fs: 0, gs: 0, + eflags: 0, eip: 0 + } +} + +#[cfg(target_arch = "x86")] +fn initialize_call_frame(regs: &mut Registers, + fptr: *c_void, arg: *c_void, sp: *mut uint) { + + let sp = align_down(sp); + let sp = mut_offset(sp, -4); // XXX: -4 words? Needs this be done at all? + + unsafe { *sp = arg as uint; } + let sp = mut_offset(sp, -1); + unsafe { *sp = 0; } // The final return address + + regs.esp = sp as u32; + regs.eip = fptr as u32; + + // Last base pointer on the stack is 0 + regs.ebp = 0; +} + +fn align_down(sp: *mut uint) -> *mut uint { + unsafe { + let sp = transmute::<*mut uint, uint>(sp); + let sp = sp & !(16 - 1); + transmute::(sp) + } +} + +// XXX: ptr::offset is positive ints only +#[inline(always)] +pub pure fn mut_offset(ptr: *mut T, count: int) -> *mut T { + use core::sys::size_of; + unsafe { + (ptr as int + count * (size_of::() as int)) as *mut T + } +} + diff --git a/src/libcore/rt/io.rs b/src/libcore/rt/io.rs new file mode 100644 index 0000000000000..3a94c01e0a419 --- /dev/null +++ b/src/libcore/rt/io.rs @@ -0,0 +1,45 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +use result::*; + +// XXX: ~object doesn't work currently so these are some placeholder +// types to use instead +pub type EventLoopObject = super::uvio::UvEventLoop; +pub type IoFactoryObject = super::uvio::UvIoFactory; +pub type StreamObject = super::uvio::UvStream; +pub type TcpListenerObject = super::uvio::UvTcpListener; + +pub trait EventLoop { + fn run(&mut self); + fn callback(&mut self, ~fn()); + /// The asynchronous I/O services. Not all event loops may provide one + fn io(&mut self) -> Option<&self/mut IoFactoryObject>; +} + +pub trait IoFactory { + fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>; + fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>; +} + +pub trait TcpListener { + fn listen(&mut self) -> Option<~StreamObject>; +} + +pub trait Stream { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), ()>; +} + +pub enum IpAddr { + Ipv4(u8, u8, u8, u8, u16), + Ipv6 +} diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs new file mode 100644 index 0000000000000..772690c8dcdc6 --- /dev/null +++ b/src/libcore/rt/mod.rs @@ -0,0 +1,51 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// XXX: Missing some implementation for other architectures +#[cfg(target_os = "linux")]; +#[cfg(target_os = "mac")]; +#[cfg(target_os = "win32")]; + +// Some basic logging +macro_rules! rtdebug ( + ($( $arg:expr),+) => ( { + dumb_println(fmt!( $($arg),+ )); + + fn dumb_println(s: &str) { + use str::as_c_str; + use libc::c_char; + + extern { + fn printf(s: *c_char); + } + + do as_c_str(s.to_str() + "\n") |s| { + unsafe { printf(s); } + } + } + + } ) +) + +// An alternate version with no output, for turning off logging +macro_rules! rtdebug_ ( + ($( $arg:expr),+) => ( $(let _ = $arg)*; ) +) + +mod sched; +mod io; +mod uvio; +mod uv; +// FIXME #5248: The import in `sched` doesn't resolve unless this is pub! +pub mod thread_local_storage; +mod work_queue; +mod stack; +mod context; +mod thread; diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs new file mode 100644 index 0000000000000..8f315452e5e5c --- /dev/null +++ b/src/libcore/rt/sched.rs @@ -0,0 +1,564 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +use sys; +use cast::transmute; +use libc::c_void; +use ptr::mut_null; + +use super::work_queue::WorkQueue; +use super::stack::{StackPool, StackSegment}; +use super::io::{EventLoop, EventLoopObject}; +use super::context::Context; +use tls = super::thread_local_storage; + +#[cfg(test)] use super::uvio::UvEventLoop; +#[cfg(test)] use unstable::run_in_bare_thread; +#[cfg(test)] use int; + +/// The Scheduler is responsible for coordinating execution of Tasks +/// on a single thread. When the scheduler is running it is owned by +/// thread local storage and the running task is owned by the +/// scheduler. +pub struct Scheduler { + task_queue: WorkQueue<~Task>, + stack_pool: StackPool, + /// The event loop used to drive the scheduler and perform I/O + event_loop: ~EventLoopObject, + /// The scheduler's saved context. + /// Always valid when a task is executing, otherwise not + priv saved_context: Context, + /// The currently executing task + priv current_task: Option<~Task>, + /// A queue of jobs to perform immediately upon return from task + /// context to scheduler context. + /// XXX: This probably should be a single cleanup action and it + /// should run after a context switch, not on return from the + /// scheduler + priv cleanup_jobs: ~[CleanupJob] +} + +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait HackAroundBorrowCk { + static fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); +} +impl HackAroundBorrowCk for UnsafeTaskReceiver { + static fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { + unsafe { transmute(f) } + } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { + unsafe { transmute(self) } + } +} + +enum CleanupJob { + RescheduleTask(~Task), + RecycleTask(~Task), + GiveTask(~Task, UnsafeTaskReceiver) +} + +pub impl Scheduler { + + static fn new(event_loop: ~EventLoopObject) -> Scheduler { + Scheduler { + event_loop: event_loop, + task_queue: WorkQueue::new(), + stack_pool: StackPool::new(), + saved_context: Context::empty(), + current_task: None, + cleanup_jobs: ~[] + } + } + + // XXX: This may eventually need to be refactored so that + // the scheduler itself doesn't have to call event_loop.run. + // That will be important for embedding the runtime into external + // event loops. + fn run(~self) -> ~Scheduler { + fail_unless!(!self.in_task_context()); + + // Give ownership of the scheduler (self) to the thread + do self.install |scheduler| { + fn run_scheduler_once() { + do Scheduler::local |scheduler| { + if scheduler.resume_task_from_queue() { + // Ok, a task ran. Nice! We'll do it again later + scheduler.event_loop.callback(run_scheduler_once); + } + } + } + + scheduler.event_loop.callback(run_scheduler_once); + scheduler.event_loop.run(); + } + } + + fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler { + let mut tlsched = ThreadLocalScheduler::new(); + tlsched.put_scheduler(self); + { + let sched = tlsched.get_scheduler(); + f(sched); + } + return tlsched.take_scheduler(); + } + + static fn local(f: &fn(&mut Scheduler)) { + let mut tlsched = ThreadLocalScheduler::new(); + f(tlsched.get_scheduler()); + } + + // * Scheduler-context operations + + fn resume_task_from_queue(&mut self) -> bool { + fail_unless!(!self.in_task_context()); + + let mut self = self; + match self.task_queue.pop_front() { + Some(task) => { + self.resume_task_immediately(task); + return true; + } + None => { + rtdebug!("no tasks in queue"); + return false; + } + } + } + + fn resume_task_immediately(&mut self, task: ~Task) { + fail_unless!(!self.in_task_context()); + + rtdebug!("scheduling a task"); + + // Store the task in the scheduler so it can be grabbed later + self.current_task = Some(task); + self.swap_in_task(); + // The running task should have passed ownership elsewhere + fail_unless!(self.current_task.is_none()); + + // Running tasks may have asked us to do some cleanup + self.run_cleanup_jobs(); + } + + + // * Task-context operations + + /// Called by a running task to end execution, after which it will + /// be recycled by the scheduler for reuse in a new task. + fn terminate_current_task(&mut self) { + fail_unless!(self.in_task_context()); + + rtdebug!("ending running task"); + + let dead_task = self.current_task.swap_unwrap(); + self.enqueue_cleanup_job(RecycleTask(dead_task)); + let dead_task = self.task_from_last_cleanup_job(); + self.swap_out_task(dead_task); + } + + /// Block a running task, context switch to the scheduler, then pass the + /// blocked task to a closure. + /// + /// # Safety note + /// + /// The closure here is a *stack* closure that lives in the + /// running task. It gets transmuted to the scheduler's lifetime + /// and called while the task is blocked. + fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { + fail_unless!(self.in_task_context()); + + rtdebug!("blocking task"); + + let blocked_task = self.current_task.swap_unwrap(); + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); + self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); + let blocked_task = self.task_from_last_cleanup_job(); + + self.swap_out_task(blocked_task); + } + + /// Switch directly to another task, without going through the scheduler. + /// You would want to think hard about doing this, e.g. if there are + /// pending I/O events it would be a bad idea. + fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) { + fail_unless!(self.in_task_context()); + + rtdebug!("switching tasks"); + + let old_running_task = self.current_task.swap_unwrap(); + self.enqueue_cleanup_job(RescheduleTask(old_running_task)); + let old_running_task = self.task_from_last_cleanup_job(); + + self.current_task = Some(next_task); + self.swap_in_task_from_running_task(old_running_task); + } + + + // * Context switching + + // NB: When switching to a task callers are expected to first set + // self.running_task. When switching away from a task likewise move + // out of the self.running_task + + priv fn swap_in_task(&mut self) { + // Take pointers to both the task and scheduler's saved registers. + let running_task: &~Task = self.current_task.get_ref(); + let task_context = &running_task.saved_context; + let scheduler_context = &mut self.saved_context; + + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(scheduler_context, task_context); + } + + priv fn swap_out_task(&mut self, running_task: &mut Task) { + let task_context = &mut running_task.saved_context; + let scheduler_context = &self.saved_context; + Context::swap(task_context, scheduler_context); + } + + priv fn swap_in_task_from_running_task(&mut self, + running_task: &mut Task) { + let running_task_context = &mut running_task.saved_context; + let next_context = &self.current_task.get_ref().saved_context; + Context::swap(running_task_context, next_context); + } + + + // * Other stuff + + fn in_task_context(&self) -> bool { self.current_task.is_some() } + + fn enqueue_cleanup_job(&mut self, job: CleanupJob) { + self.cleanup_jobs.unshift(job); + } + + fn run_cleanup_jobs(&mut self) { + fail_unless!(!self.in_task_context()); + rtdebug!("running cleanup jobs"); + + while !self.cleanup_jobs.is_empty() { + match self.cleanup_jobs.pop() { + RescheduleTask(task) => { + // NB: Pushing to the *front* of the queue + self.task_queue.push_front(task); + } + RecycleTask(task) => task.recycle(&mut self.stack_pool), + GiveTask(task, f) => (f.to_fn())(self, task) + } + } + } + + // XXX: Hack. This should return &self/mut but I don't know how to + // make the borrowcheck happy + fn task_from_last_cleanup_job(&mut self) -> &mut Task { + fail_unless!(!self.cleanup_jobs.is_empty()); + let last_job: &self/mut CleanupJob = &mut self.cleanup_jobs[0]; + let last_task: &self/Task = match last_job { + &RescheduleTask(~ref task) => task, + &RecycleTask(~ref task) => task, + &GiveTask(~ref task, _) => task, + }; + // XXX: Pattern matching mutable pointers above doesn't work + // because borrowck thinks the three patterns are conflicting + // borrows + return unsafe { transmute::<&Task, &mut Task>(last_task) }; + } +} + +const TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + +pub struct Task { + /// The task entry point, saved here for later destruction + priv start: ~~fn(), + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, +} + +impl Task { + static fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task { + // XXX: Putting main into a ~ so it's a thin pointer and can + // be passed to the spawn function. Another unfortunate + // allocation + let start = ~Task::build_start_wrapper(start); + let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE); + // NB: Context holds a pointer to that ~fn + let initial_context = Context::new(&*start, &mut stack); + return Task { + start: start, + current_stack_segment: stack, + saved_context: initial_context, + }; + } + + static priv fn build_start_wrapper(start: ~fn()) -> ~fn() { + // XXX: The old code didn't have this extra allocation + let wrapper: ~fn() = || { + start(); + + let mut sched = ThreadLocalScheduler::new(); + let sched = sched.get_scheduler(); + sched.terminate_current_task(); + }; + return wrapper; + } + + /// Destroy the task and try to reuse its components + fn recycle(~self, stack_pool: &mut StackPool) { + match self { + ~Task {current_stack_segment, _} => { + stack_pool.give_segment(current_stack_segment); + } + } + } +} + +// NB: This is a type so we can use make use of the &self region. +struct ThreadLocalScheduler(tls::Key); + +impl ThreadLocalScheduler { + static fn new() -> ThreadLocalScheduler { + unsafe { + // NB: This assumes that the TLS key has been created prior. + // Currently done in rust_start. + let key: *mut c_void = rust_get_sched_tls_key(); + let key: &mut tls::Key = transmute(key); + ThreadLocalScheduler(*key) + } + } + + fn put_scheduler(&mut self, scheduler: ~Scheduler) { + unsafe { + let key = match self { &ThreadLocalScheduler(key) => key }; + let value: *mut c_void = + transmute::<~Scheduler, *mut c_void>(scheduler); + tls::set(key, value); + } + } + + fn get_scheduler(&mut self) -> &self/mut Scheduler { + unsafe { + let key = match self { &ThreadLocalScheduler(key) => key }; + let mut value: *mut c_void = tls::get(key); + fail_unless!(value.is_not_null()); + { + let value_ptr = &mut value; + let sched: &mut ~Scheduler = + transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr); + let sched: &mut Scheduler = &mut **sched; + return sched; + } + } + } + + fn take_scheduler(&mut self) -> ~Scheduler { + unsafe { + let key = match self { &ThreadLocalScheduler(key) => key }; + let value: *mut c_void = tls::get(key); + fail_unless!(value.is_not_null()); + let sched = transmute(value); + tls::set(key, mut_null()); + return sched; + } + } +} + +extern { + fn rust_get_sched_tls_key() -> *mut c_void; +} + +#[test] +fn thread_local_scheduler_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + let mut tls_scheduler = ThreadLocalScheduler::new(); + tls_scheduler.put_scheduler(scheduler); + { + let _scheduler = tls_scheduler.get_scheduler(); + } + let _scheduler = tls_scheduler.take_scheduler(); +} + +#[test] +fn thread_local_scheduler_two_instances() { + let scheduler = ~UvEventLoop::new_scheduler(); + let mut tls_scheduler = ThreadLocalScheduler::new(); + tls_scheduler.put_scheduler(scheduler); + { + + let _scheduler = tls_scheduler.get_scheduler(); + } + { + let scheduler = tls_scheduler.take_scheduler(); + tls_scheduler.put_scheduler(scheduler); + } + + let mut tls_scheduler = ThreadLocalScheduler::new(); + { + let _scheduler = tls_scheduler.get_scheduler(); + } + let _scheduler = tls_scheduler.take_scheduler(); +} + +#[test] +fn test_simple_scheduling() { + do run_in_bare_thread { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + + let mut sched = ~UvEventLoop::new_scheduler(); + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { *task_ran_ptr = true; } + }; + sched.task_queue.push_back(task); + sched.run(); + fail_unless!(task_ran); + } +} + +#[test] +fn test_several_tasks() { + do run_in_bare_thread { + let total = 10; + let mut task_count = 0; + let task_count_ptr: *mut int = &mut task_count; + + let mut sched = ~UvEventLoop::new_scheduler(); + for int::range(0, total) |_| { + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { *task_count_ptr = *task_count_ptr + 1; } + }; + sched.task_queue.push_back(task); + } + sched.run(); + fail_unless!(task_count == total); + } +} + +#[test] +fn test_swap_tasks() { + do run_in_bare_thread { + let mut count = 0; + let count_ptr: *mut int = &mut count; + + let mut sched = ~UvEventLoop::new_scheduler(); + let task1 = ~do Task::new(&mut sched.stack_pool) { + unsafe { *count_ptr = *count_ptr + 1; } + do Scheduler::local |sched| { + let task2 = ~do Task::new(&mut sched.stack_pool) { + unsafe { *count_ptr = *count_ptr + 1; } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task2); + } + unsafe { *count_ptr = *count_ptr + 1; } + }; + sched.task_queue.push_back(task1); + sched.run(); + fail_unless!(count == 3); + } +} + +#[bench] #[test] #[ignore(reason = "long test")] +fn test_run_a_lot_of_tasks_queued() { + do run_in_bare_thread { + const MAX: int = 1000000; + let mut count = 0; + let count_ptr: *mut int = &mut count; + + let mut sched = ~UvEventLoop::new_scheduler(); + + let start_task = ~do Task::new(&mut sched.stack_pool) { + run_task(count_ptr); + }; + sched.task_queue.push_back(start_task); + sched.run(); + + fail_unless!(count == MAX); + + fn run_task(count_ptr: *mut int) { + do Scheduler::local |sched| { + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { + *count_ptr = *count_ptr + 1; + if *count_ptr != MAX { + run_task(count_ptr); + } + } + }; + sched.task_queue.push_back(task); + } + }; + } +} + +#[bench] #[test] #[ignore(reason = "too much stack allocation")] +fn test_run_a_lot_of_tasks_direct() { + do run_in_bare_thread { + const MAX: int = 100000; + let mut count = 0; + let count_ptr: *mut int = &mut count; + + let mut sched = ~UvEventLoop::new_scheduler(); + + let start_task = ~do Task::new(&mut sched.stack_pool) { + run_task(count_ptr); + }; + sched.task_queue.push_back(start_task); + sched.run(); + + fail_unless!(count == MAX); + + fn run_task(count_ptr: *mut int) { + do Scheduler::local |sched| { + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { + *count_ptr = *count_ptr + 1; + if *count_ptr != MAX { + run_task(count_ptr); + } + } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task); + } + }; + } +} + +#[test] +fn test_block_task() { + do run_in_bare_thread { + let mut sched = ~UvEventLoop::new_scheduler(); + let task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + fail_unless!(sched.in_task_context()); + do sched.block_running_task_and_then() |sched, task| { + fail_unless!(!sched.in_task_context()); + sched.task_queue.push_back(task); + } + } + }; + sched.task_queue.push_back(task); + sched.run(); + } +} diff --git a/src/libcore/rt/stack.rs b/src/libcore/rt/stack.rs new file mode 100644 index 0000000000000..02c47218ed83f --- /dev/null +++ b/src/libcore/rt/stack.rs @@ -0,0 +1,49 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use vec; + +pub struct StackSegment { + buf: ~[u8] +} + +pub impl StackSegment { + static fn new(size: uint) -> StackSegment { + // Crate a block of uninitialized values + let mut stack = vec::with_capacity(size); + unsafe { + vec::raw::set_len(&mut stack, size); + } + + StackSegment { + buf: stack + } + } + + fn end(&self) -> *uint { + unsafe { + vec::raw::to_ptr(self.buf).offset(self.buf.len()) as *uint + } + } +} + +pub struct StackPool(()); + +impl StackPool { + + static fn new() -> StackPool { StackPool(()) } + + fn take_segment(&self, min_size: uint) -> StackSegment { + StackSegment::new(min_size) + } + + fn give_segment(&self, _stack: StackSegment) { + } +} diff --git a/src/libcore/rt/thread.rs b/src/libcore/rt/thread.rs new file mode 100644 index 0000000000000..cd46127451281 --- /dev/null +++ b/src/libcore/rt/thread.rs @@ -0,0 +1,44 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc; +use ops::Drop; + +#[allow(non_camel_case_types)] // runtime type +type raw_thread = libc::c_void; + +struct Thread { + main: ~fn(), + raw_thread: *raw_thread +} + +impl Thread { + static fn start(main: ~fn()) -> Thread { + fn substart(main: &fn()) -> *raw_thread { + unsafe { rust_raw_thread_start(main) } + } + let raw = substart(main); + Thread { + main: main, + raw_thread: raw + } + } +} + +impl Drop for Thread { + fn finalize(&self) { + unsafe { rust_raw_thread_join_delete(self.raw_thread) } + } +} + +extern { + pub unsafe fn rust_raw_thread_start(f: &fn()) -> *raw_thread; + pub unsafe fn rust_raw_thread_join_delete(thread: *raw_thread); +} diff --git a/src/libcore/rt/thread_local_storage.rs b/src/libcore/rt/thread_local_storage.rs new file mode 100644 index 0000000000000..58b5a54438606 --- /dev/null +++ b/src/libcore/rt/thread_local_storage.rs @@ -0,0 +1,91 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::{c_void}; +#[cfg(unix)] +use libc::{c_uint, c_int}; +#[cfg(unix)] +use ptr::null; +#[cfg(windows)] +use libc::types::os::arch::extra::{DWORD, LPVOID, BOOL}; + +#[cfg(unix)] +pub type Key = pthread_key_t; + +#[cfg(unix)] +pub unsafe fn create(key: &mut Key) { + unsafe { fail_unless!(0 == pthread_key_create(key, null())); } +} + +#[cfg(unix)] +pub unsafe fn set(key: Key, value: *mut c_void) { + unsafe { fail_unless!(0 == pthread_setspecific(key, value)); } +} + +#[cfg(unix)] +pub unsafe fn get(key: Key) -> *mut c_void { + unsafe { pthread_getspecific(key) } +} + +#[cfg(unix)] +#[allow(non_camel_case_types)] // foreign type +type pthread_key_t = c_uint; + +#[cfg(unix)] +extern { + fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int; + fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int; + fn pthread_getspecific(key: pthread_key_t) -> *mut c_void; +} + +#[cfg(windows)] +pub type Key = DWORD; + +#[cfg(windows)] +pub unsafe fn create(key: &mut Key) { + const TLS_OUT_OF_INDEXES: DWORD = 0xFFFFFFFF; + *key = unsafe { TlsAlloc() }; + fail_unless!(*key != TLS_OUT_OF_INDEXES); +} + +#[cfg(windows)] +pub unsafe fn set(key: Key, value: *mut c_void) { + unsafe { fail_unless!(0 != TlsSetValue(key, value)) } +} + +#[cfg(windows)] +pub unsafe fn get(key: Key) -> *mut c_void { + TlsGetValue(key) +} + +#[cfg(windows)] +#[abi = "stdcall"] +extern { + fn TlsAlloc() -> DWORD; + fn TlsSetValue(dwTlsIndex: DWORD, lpTlsvalue: LPVOID) -> BOOL; + fn TlsGetValue(dwTlsIndex: DWORD) -> LPVOID; +} + +#[test] +fn tls_smoke_test() { + use cast::transmute; + unsafe { + let mut key = 0; + let value = ~20; + create(&mut key); + set(key, transmute(value)); + let value: ~int = transmute(get(key)); + fail_unless!(value == ~20); + let value = ~30; + set(key, transmute(value)); + let value: ~int = transmute(get(key)); + fail_unless!(value == ~30); + } +} diff --git a/src/libcore/rt/uv.rs b/src/libcore/rt/uv.rs new file mode 100644 index 0000000000000..c947e4dde4c15 --- /dev/null +++ b/src/libcore/rt/uv.rs @@ -0,0 +1,919 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Bindings to libuv. + +UV types consist of the event loop (Loop), Watchers, Requests and +Callbacks. + +Watchers and Requests encapsulate pointers to uv *handles*, which have +subtyping relationships with each other. This subtyping is reflected +in the bindings with explicit or implicit coercions. For example, an +upcast from TcpWatcher to StreamWatcher is done with +`tcp_watcher.as_stream()`. In other cases a callback on a specific +type of watcher will be passed a watcher of a supertype. + +Currently all use of Request types (connect/write requests) are +encapsulated in the bindings and don't need to be dealt with by the +caller. + +# Safety note + +Due to the complex lifecycle of uv handles, as well as compiler bugs, +this module is not memory safe and requires explicit memory management, +via `close` and `delete` methods. + +*/ + +use option::*; +use str::raw::from_c_str; +use to_str::ToStr; +use vec; +use ptr; +use libc::{c_void, c_int, size_t, malloc, free, ssize_t}; +use cast::{transmute, transmute_mut_region}; +use ptr::null; +use sys::size_of; +use unstable::uvll; +use super::io::{IpAddr, Ipv4, Ipv6}; + +#[cfg(test)] use unstable::run_in_bare_thread; +#[cfg(test)] use super::thread::Thread; +#[cfg(test)] use cell::Cell; + +fn ip4_to_uv_ip4(addr: IpAddr) -> uvll::sockaddr_in { + match addr { + Ipv4(a, b, c, d, p) => { + unsafe { + uvll::ip4_addr(fmt!("%u.%u.%u.%u", + a as uint, + b as uint, + c as uint, + d as uint), p as int) + } + } + Ipv6 => fail!() + } +} + +/// A trait for callbacks to implement. Provides a little extra type safety +/// for generic, unsafe interop functions like `set_watcher_callback`. +trait Callback { } + +type NullCallback = ~fn(); +impl Callback for NullCallback { } + +/// A type that wraps a native handle +trait NativeHandle { + static pub fn from_native_handle(T) -> Self; + pub fn native_handle(&self) -> T; +} + +/// XXX: Loop(*handle) is buggy with destructors. Normal structs +/// with dtors may not be destructured, but tuple structs can, +/// but the results are not correct. +pub struct Loop { + handle: *uvll::uv_loop_t +} + +pub impl Loop { + static fn new() -> Loop { + let handle = unsafe { uvll::loop_new() }; + fail_unless!(handle.is_not_null()); + NativeHandle::from_native_handle(handle) + } + + fn run(&mut self) { + unsafe { uvll::run(self.native_handle()) }; + } + + fn close(&mut self) { + unsafe { uvll::loop_delete(self.native_handle()) }; + } +} + +impl NativeHandle<*uvll::uv_loop_t> for Loop { + static fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop { + Loop { handle: handle } + } + fn native_handle(&self) -> *uvll::uv_loop_t { + self.handle + } +} + +/// The trait implemented by uv 'watchers' (handles). Watchers are +/// non-owning wrappers around the uv handles and are not completely +/// safe - there may be multiple instances for a single underlying +/// handle. Watchers are generally created, then `start`ed, `stop`ed +/// and `close`ed, but due to their complex life cycle may not be +/// entirely memory safe if used in unanticipated patterns. +trait Watcher { + fn event_loop(&self) -> Loop; +} + +pub struct IdleWatcher(*uvll::uv_idle_t); + +impl Watcher for IdleWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +type IdleCallback = ~fn(IdleWatcher, Option); +impl Callback for IdleCallback { } + +pub impl IdleWatcher { + static fn new(loop_: &mut Loop) -> IdleWatcher { + unsafe { + let handle = uvll::idle_new(); + fail_unless!(handle.is_not_null()); + fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle)); + uvll::set_data_for_uv_handle(handle, null::<()>()); + NativeHandle::from_native_handle(handle) + } + } + + fn start(&mut self, cb: IdleCallback) { + + set_watcher_callback(self, cb); + unsafe { + fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb)) + }; + + extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let idle_watcher: IdleWatcher = + NativeHandle::from_native_handle(handle); + let cb: &IdleCallback = + borrow_callback_from_watcher(&idle_watcher); + let status = status_to_maybe_uv_error(handle, status); + (*cb)(idle_watcher, status); + } + } + + fn stop(&mut self) { + unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); } + } + + fn close(self) { + unsafe { uvll::close(self.native_handle(), close_cb) }; + + extern fn close_cb(handle: *uvll::uv_idle_t) { + let mut idle_watcher = NativeHandle::from_native_handle(handle); + drop_watcher_callback::(&mut idle_watcher); + unsafe { uvll::idle_delete(handle) }; + } + } +} + +impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { + static fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher { + IdleWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_idle_t { + match self { &IdleWatcher(ptr) => ptr } + } +} + +// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t +// and uv_file_t +pub struct StreamWatcher(*uvll::uv_stream_t); + +impl Watcher for StreamWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); +impl Callback for ReadCallback { } + +// XXX: The uv alloc callback also has a *uv_handle_t arg +pub type AllocCallback = ~fn(uint) -> Buf; +impl Callback for AllocCallback { } + +pub impl StreamWatcher { + + fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { + // XXX: Borrowchk problems + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + + let handle = self.native_handle(); + unsafe { uvll::read_start(handle, alloc_cb, read_cb); } + + extern fn alloc_cb(stream: *uvll::uv_stream_t, + suggested_size: size_t) -> Buf { + let mut stream_watcher: StreamWatcher = + NativeHandle::from_native_handle(stream); + let data = get_watcher_data(&mut stream_watcher); + let alloc_cb = data.alloc_cb.get_ref(); + return (*alloc_cb)(suggested_size as uint); + } + + extern fn read_cb(stream: *uvll::uv_stream_t, + nread: ssize_t, ++buf: Buf) { + rtdebug!("buf addr: %x", buf.base as uint); + rtdebug!("buf len: %d", buf.len as int); + let mut stream_watcher: StreamWatcher = + NativeHandle::from_native_handle(stream); + let data = get_watcher_data(&mut stream_watcher); + let cb = data.read_cb.get_ref(); + let status = status_to_maybe_uv_error(stream, nread as c_int); + (*cb)(stream_watcher, nread as int, buf, status); + } + } + + fn read_stop(&mut self) { + // It would be nice to drop the alloc and read callbacks here, + // but read_stop may be called from inside one of them and we + // would end up freeing the in-use environment + let handle = self.native_handle(); + unsafe { uvll::read_stop(handle); } + } + + // XXX: Needs to take &[u8], not ~[u8] + fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) { + // XXX: Borrowck + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + fail_unless!(data.write_cb.is_none()); + data.write_cb = Some(cb); + + let req = WriteRequest::new(); + let buf = vec_to_uv_buf(msg); + // XXX: Allocation + let bufs = ~[buf]; + unsafe { + fail_unless!(0 == uvll::write(req.native_handle(), + self.native_handle(), + &bufs, write_cb)); + } + // XXX: Freeing immediately after write. Is this ok? + let _v = vec_from_uv_buf(buf); + + extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { + let write_request: WriteRequest = + NativeHandle::from_native_handle(req); + let mut stream_watcher = write_request.stream(); + write_request.delete(); + let cb = get_watcher_data(&mut stream_watcher) + .write_cb.swap_unwrap(); + let status = status_to_maybe_uv_error( + stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + + fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + unsafe { + fail_unless!(0 == uvll::accept(self_handle, stream_handle)); + } + } + + fn close(self, cb: NullCallback) { + { + let mut self = self; + let data = get_watcher_data(&mut self); + fail_unless!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut stream_watcher: StreamWatcher = + NativeHandle::from_native_handle(handle); + { + let mut data = get_watcher_data(&mut stream_watcher); + data.close_cb.swap_unwrap()(); + } + drop_watcher_data(&mut stream_watcher); + unsafe { free(handle as *c_void) } + } + } +} + +impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { + static fn from_native_handle( + handle: *uvll::uv_stream_t) -> StreamWatcher { + StreamWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_stream_t { + match self { &StreamWatcher(ptr) => ptr } + } +} + +pub struct TcpWatcher(*uvll::uv_tcp_t); + +impl Watcher for TcpWatcher { + fn event_loop(&self) -> Loop { + loop_from_watcher(self) + } +} + +type ConnectionCallback = ~fn(StreamWatcher, Option); +impl Callback for ConnectionCallback { } + +pub impl TcpWatcher { + static fn new(loop_: &mut Loop) -> TcpWatcher { + unsafe { + let size = size_of::() as size_t; + let handle = malloc(size) as *uvll::uv_tcp_t; + fail_unless!(handle.is_not_null()); + fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle)); + let mut watcher = NativeHandle::from_native_handle(handle); + install_watcher_data(&mut watcher); + return watcher; + } + } + + fn bind(&mut self, address: IpAddr) { + match address { + Ipv4(*) => { + let addr = ip4_to_uv_ip4(address); + let result = unsafe { + uvll::tcp_bind(self.native_handle(), &addr) + }; + // XXX: bind is likely to fail. need real error handling + fail_unless!(result == 0); + } + _ => fail!() + } + } + + fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) { + unsafe { + fail_unless!(get_watcher_data(self).connect_cb.is_none()); + get_watcher_data(self).connect_cb = Some(cb); + + let mut connect_watcher = ConnectRequest::new(); + let connect_handle = connect_watcher.native_handle(); + match address { + Ipv4(*) => { + let addr = ip4_to_uv_ip4(address); + rtdebug!("connect_t: %x", connect_handle as uint); + fail_unless!(0 == uvll::tcp_connect(connect_handle, + self.native_handle(), + &addr, connect_cb)); + } + _ => fail!() + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + rtdebug!("connect_t: %x", req as uint); + let connect_request: ConnectRequest = + NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + let cb: ConnectionCallback = { + let data = get_watcher_data(&mut stream_watcher); + data.connect_cb.swap_unwrap() + }; + let status = status_to_maybe_uv_error( + stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + } + + fn listen(&mut self, cb: ConnectionCallback) { + // XXX: Borrowck + let data = get_watcher_data(unsafe { transmute_mut_region(self) }); + fail_unless!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + + unsafe { + const BACKLOG: c_int = 128; // XXX should be configurable + // XXX: This can probably fail + fail_unless!(0 == uvll::listen(self.native_handle(), + BACKLOG, connection_cb)); + } + + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + rtdebug!("connection_cb"); + let mut stream_watcher: StreamWatcher = + NativeHandle::from_native_handle(handle); + let cb = get_watcher_data(&mut stream_watcher) + .connect_cb.swap_unwrap(); + let status = status_to_maybe_uv_error( + stream_watcher.native_handle(), status); + cb(stream_watcher, status); + } + } + + fn as_stream(&self) -> StreamWatcher { + NativeHandle::from_native_handle( + self.native_handle() as *uvll::uv_stream_t) + } +} + +impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { + static fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher { + TcpWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_tcp_t { + match self { &TcpWatcher(ptr) => ptr } + } +} + +trait Request { } + +type ConnectCallback = ~fn(ConnectRequest, Option); +impl Callback for ConnectCallback { } + +// uv_connect_t is a subclass of uv_req_t +struct ConnectRequest(*uvll::uv_connect_t); + +impl Request for ConnectRequest { } + +impl ConnectRequest { + + static fn new() -> ConnectRequest { + let connect_handle = unsafe { + malloc(size_of::() as size_t) + }; + fail_unless!(connect_handle.is_not_null()); + let connect_handle = connect_handle as *uvll::uv_connect_t; + ConnectRequest(connect_handle) + } + + fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = + uvll::get_stream_handle_from_connect_req( + self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + fn delete(self) { + unsafe { free(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest { + static fn from_native_handle( + handle: *uvll:: uv_connect_t) -> ConnectRequest { + ConnectRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_connect_t { + match self { &ConnectRequest(ptr) => ptr } + } +} + +pub struct WriteRequest(*uvll::uv_write_t); + +impl Request for WriteRequest { } + +impl WriteRequest { + + static fn new() -> WriteRequest { + let write_handle = unsafe { + malloc(size_of::() as size_t) + }; + fail_unless!(write_handle.is_not_null()); + let write_handle = write_handle as *uvll::uv_write_t; + WriteRequest(write_handle) + } + + fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = + uvll::get_stream_handle_from_write_req(self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + fn delete(self) { + unsafe { free(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_write_t> for WriteRequest { + static fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest { + WriteRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_write_t { + match self { &WriteRequest(ptr) => ptr } + } +} + +// XXX: Need to define the error constants like EOF so they can be +// compared to the UvError type + +struct UvError(uvll::uv_err_t); + +impl UvError { + + pure fn name(&self) -> ~str { + unsafe { + let inner = match self { &UvError(ref a) => a }; + let name_str = uvll::err_name(inner); + fail_unless!(name_str.is_not_null()); + from_c_str(name_str) + } + } + + pure fn desc(&self) -> ~str { + unsafe { + let inner = match self { &UvError(ref a) => a }; + let desc_str = uvll::strerror(inner); + fail_unless!(desc_str.is_not_null()); + from_c_str(desc_str) + } + } +} + +impl ToStr for UvError { + pure fn to_str(&self) -> ~str { + fmt!("%s: %s", self.name(), self.desc()) + } +} + +#[test] +fn error_smoke_test() { + let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; + let err: UvError = UvError(err); + fail_unless!(err.to_str() == ~"EOF: end of file"); +} + + +/// Given a uv handle, convert a callback status to a UvError +// XXX: Follow the pattern below by parameterizing over T: Watcher, not T +fn status_to_maybe_uv_error(handle: *T, status: c_int) -> Option { + if status != -1 { + None + } else { + unsafe { + rtdebug!("handle: %x", handle as uint); + let loop_ = uvll::get_loop_for_uv_handle(handle); + rtdebug!("loop: %x", loop_ as uint); + let err = uvll::last_error(loop_); + Some(UvError(err)) + } + } +} + +/// Get the uv event loop from a Watcher +pub fn loop_from_watcher>( + watcher: &W) -> Loop { + + let handle = watcher.native_handle(); + let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) }; + NativeHandle::from_native_handle(loop_) +} + +/// Set the custom data on a handle to a callback Note: This is only +/// suitable for watchers that make just one type of callback. For +/// others use WatcherData +fn set_watcher_callback, CB: Callback>( + watcher: &mut W, cb: CB) { + + drop_watcher_callback::(watcher); + // XXX: Boxing the callback so it fits into a + // pointer. Unfortunate extra allocation + let boxed_cb = ~cb; + let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) }; + unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) }; +} + +/// Delete a callback from a handle's custom data +fn drop_watcher_callback, CB: Callback>( + watcher: &mut W) { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + if handle_data.is_not_null() { + // Take ownership of the callback and drop it + let _cb = transmute::<*c_void, ~CB>(handle_data); + // Make sure the pointer is zeroed + uvll::set_data_for_uv_handle( + watcher.native_handle(), null::<()>()); + } + } +} + +/// Take a pointer to the callback installed as custom data +fn borrow_callback_from_watcher, + CB: Callback>(watcher: &W) -> &CB { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + fail_unless!(handle_data.is_not_null()); + let cb = transmute::<&*c_void, &~CB>(&handle_data); + return &**cb; + } +} + +/// Take ownership of the callback installed as custom data +fn take_callback_from_watcher, CB: Callback>( + watcher: &mut W) -> CB { + + unsafe { + let handle = watcher.native_handle(); + let handle_data: *c_void = uvll::get_data_for_uv_handle(handle); + fail_unless!(handle_data.is_not_null()); + uvll::set_data_for_uv_handle(handle, null::<()>()); + let cb: ~CB = transmute::<*c_void, ~CB>(handle_data); + let cb = match cb { ~cb => cb }; + return cb; + } +} + +/// Callbacks used by StreamWatchers, set as custom data on the foreign handle +struct WatcherData { + read_cb: Option, + write_cb: Option, + connect_cb: Option, + close_cb: Option, + alloc_cb: Option +} + +fn install_watcher_data>(watcher: &mut W) { + unsafe { + let data = ~WatcherData { + read_cb: None, + write_cb: None, + connect_cb: None, + close_cb: None, + alloc_cb: None + }; + let data = transmute::<~WatcherData, *c_void>(data); + uvll::set_data_for_uv_handle(watcher.native_handle(), data); + } +} + +fn get_watcher_data>( + watcher: &r/mut W) -> &r/mut WatcherData { + + unsafe { + let data = uvll::get_data_for_uv_handle(watcher.native_handle()); + let data = transmute::<&*c_void, &mut ~WatcherData>(&data); + return &mut **data; + } +} + +fn drop_watcher_data>(watcher: &mut W) { + unsafe { + let data = uvll::get_data_for_uv_handle(watcher.native_handle()); + let _data = transmute::<*c_void, ~WatcherData>(data); + uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>()); + } +} + +#[test] +fn test_slice_to_uv_buf() { + let slice = [0, .. 20]; + let buf = slice_to_uv_buf(slice); + + fail_unless!(buf.len == 20); + + unsafe { + let base = transmute::<*u8, *mut u8>(buf.base); + (*base) = 1; + (*ptr::mut_offset(base, 1)) = 2; + } + + fail_unless!(slice[0] == 1); + fail_unless!(slice[1] == 2); +} + +/// The uv buffer type +pub type Buf = uvll::uv_buf_t; + +/// Borrow a slice to a Buf +pub fn slice_to_uv_buf(v: &[u8]) -> Buf { + let data = unsafe { vec::raw::to_ptr(v) }; + unsafe { uvll::buf_init(data, v.len()) } +} + +// XXX: Do these conversions without copying + +/// Transmute an owned vector to a Buf +fn vec_to_uv_buf(v: ~[u8]) -> Buf { + let data = unsafe { malloc(v.len() as size_t) } as *u8; + fail_unless!(data.is_not_null()); + do vec::as_imm_buf(v) |b, l| { + let data = data as *mut u8; + unsafe { ptr::copy_memory(data, b, l) } + } + let buf = unsafe { uvll::buf_init(data, v.len()) }; + return buf; +} + +/// Transmute a Buf that was once a ~[u8] back to ~[u8] +fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { + if !(buf.len == 0 && buf.base.is_null()) { + let v = unsafe { vec::from_buf(buf.base, buf.len as uint) }; + unsafe { free(buf.base as *c_void) }; + return Some(v); + } else { + // No buffer + return None; + } +} + +#[test] +fn loop_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "valgrind - loop destroyed before watcher?")] +fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(); + } +} + +#[test] +fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + fail_unless!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + fail_unless!(count == 10); + } +} + +#[test] +fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + fail_unless!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + fail_unless!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(); + } + } + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "ffi struct issues")] +fn connect_close() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = Ipv4(127, 0, 0, 1, 2923); + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("tcp_watcher.connect!"); + fail_unless!(status.is_some()); + fail_unless!(status.get().name() == ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "need a server to connect to")] +fn connect_read() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = Ipv4(127, 0, 0, 1, 2924); + do tcp_watcher.connect(addr) |stream_watcher, status| { + let mut stream_watcher = stream_watcher; + rtdebug!("tcp_watcher.connect!"); + fail_unless!(status.is_none()); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do stream_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + let buf = vec_from_uv_buf(buf); + rtdebug!("read cb!"); + if status.is_none() { + let bytes = buf.unwrap(); + rtdebug!("%s", bytes.slice(0, nread as uint).to_str()); + } else { + rtdebug!("status after read: %s", status.get().to_str()); + rtdebug!("closing"); + stream_watcher.close(||()); + } + } + } + loop_.run(); + loop_.close(); + } +} + +#[test] +#[ignore(reason = "ffi struct issues")] +fn listen() { + do run_in_bare_thread() { + const MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = Ipv4(127, 0, 0, 1, 2925); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + rtdebug!("listening"); + do server_tcp_watcher.listen |server_stream_watcher, status| { + rtdebug!("listened!"); + fail_unless!(status.is_none()); + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_; + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell(0); + let server_stream_watcher = server_stream_watcher; + rtdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0)) + }; + do client_tcp_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + rtdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + rtdebug!("got %d bytes", nread); + let buf = buf.unwrap(); + for buf.view(0, nread as uint).each |byte| { + fail_unless!(*byte == count as u8); + rtdebug!("%u", *byte as uint); + count += 1; + } + } else { + fail_unless!(count == MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } + } + count_cell.put_back(count); + } + } + + let _client_thread = do Thread::start { + rtdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connecting"); + fail_unless!(status.is_none()); + let mut stream_watcher = stream_watcher; + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + do stream_watcher.write(msg) |stream_watcher, status| { + rtdebug!("writing"); + fail_unless!(status.is_none()); + stream_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; + loop_.run(); + loop_.close(); + } +} diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs new file mode 100644 index 0000000000000..f7275652e7f39 --- /dev/null +++ b/src/libcore/rt/uvio.rs @@ -0,0 +1,475 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +use result::*; + +use super::uv::*; +use super::io::*; +use ops::Drop; +use cell::{Cell, empty_cell}; +use cast::transmute; +use super::StreamObject; +use super::sched::Scheduler; +use super::IoFactoryObject; + +#[cfg(test)] use super::sched::Task; +#[cfg(test)] use unstable::run_in_bare_thread; +#[cfg(test)] use uint; + +pub struct UvEventLoop { + uvio: UvIoFactory +} + +pub impl UvEventLoop { + static fn new() -> UvEventLoop { + UvEventLoop { + uvio: UvIoFactory(Loop::new()) + } + } + + /// A convenience constructor + static fn new_scheduler() -> Scheduler { + Scheduler::new(~UvEventLoop::new()) + } +} + +impl Drop for UvEventLoop { + fn finalize(&self) { + // XXX: Need mutable finalizer + let self = unsafe { + transmute::<&UvEventLoop, &mut UvEventLoop>(self) + }; + let mut uv_loop = self.uvio.uv_loop(); + uv_loop.close(); + } +} + +impl EventLoop for UvEventLoop { + + fn run(&mut self) { + self.uvio.uv_loop().run(); + } + + fn callback(&mut self, f: ~fn()) { + let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); + do idle_watcher.start |idle_watcher, status| { + fail_unless!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(); + f(); + } + } + + fn io(&mut self) -> Option<&self/mut IoFactoryObject> { + Some(&mut self.uvio) + } +} + +#[test] +fn test_callback_run_once() { + do run_in_bare_thread { + let mut event_loop = UvEventLoop::new(); + let mut count = 0; + let count_ptr: *mut int = &mut count; + do event_loop.callback { + unsafe { *count_ptr += 1 } + } + event_loop.run(); + fail_unless!(count == 1); + } +} + +pub struct UvIoFactory(Loop); + +pub impl UvIoFactory { + fn uv_loop(&mut self) -> &self/mut Loop { + match self { &UvIoFactory(ref mut ptr) => ptr } + } +} + +impl IoFactory for UvIoFactory { + // Connect to an address and return a new stream + // NB: This blocks the task waiting on the connection. + // It would probably be better to return a future + fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> { + // Create a cell in the task to hold the result. We will fill + // the cell before resuming the task. + let result_cell = empty_cell(); + let result_cell_ptr: *Cell> = &result_cell; + + do Scheduler::local |scheduler| { + fail_unless!(scheduler.in_task_context()); + + // Block this task and take ownership, switch to scheduler context + do scheduler.block_running_task_and_then |scheduler, task| { + + rtdebug!("connect: entered scheduler context"); + fail_unless!(!scheduler.in_task_context()); + let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let task_cell = Cell(task); + + // Wait for a connection + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connect: in connect callback"); + let maybe_stream = if status.is_none() { + rtdebug!("status is none"); + Some(~UvStream(stream_watcher)) + } else { + rtdebug!("status is some"); + stream_watcher.close(||()); + None + }; + + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(maybe_stream); } + + // Context switch + do Scheduler::local |scheduler| { + scheduler.resume_task_immediately(task_cell.take()); + } + } + } + } + + fail_unless!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> { + let mut watcher = TcpWatcher::new(self.uv_loop()); + watcher.bind(addr); + return Some(~UvTcpListener(watcher)); + } +} + +pub struct UvTcpListener(TcpWatcher); + +impl UvTcpListener { + fn watcher(&self) -> TcpWatcher { + match self { &UvTcpListener(w) => w } + } + + fn close(&self) { + // XXX: Need to wait until close finishes before returning + self.watcher().as_stream().close(||()); + } +} + +impl Drop for UvTcpListener { + fn finalize(&self) { + // XXX: Again, this never gets called. Use .close() instead + //self.watcher().as_stream().close(||()); + } +} + +impl TcpListener for UvTcpListener { + + fn listen(&mut self) -> Option<~StreamObject> { + rtdebug!("entering listen"); + let result_cell = empty_cell(); + let result_cell_ptr: *Cell> = &result_cell; + + let server_tcp_watcher = self.watcher(); + + do Scheduler::local |scheduler| { + fail_unless!(scheduler.in_task_context()); + + do scheduler.block_running_task_and_then |_, task| { + let task_cell = Cell(task); + let mut server_tcp_watcher = server_tcp_watcher; + do server_tcp_watcher.listen |server_stream_watcher, status| { + let maybe_stream = if status.is_none() { + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = + loop_from_watcher(&server_stream_watcher); + let mut client_tcp_watcher = + TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = + client_tcp_watcher.as_stream(); + // XXX: Need's to be surfaced in interface + server_stream_watcher.accept(client_tcp_watcher); + Some(~UvStream::new(client_tcp_watcher)) + } else { + None + }; + + unsafe { (*result_cell_ptr).put_back(maybe_stream); } + + rtdebug!("resuming task from listen"); + // Context switch + do Scheduler::local |scheduler| { + scheduler.resume_task_immediately(task_cell.take()); + } + } + } + } + + fail_unless!(!result_cell.is_empty()); + return result_cell.take(); + } +} + +pub struct UvStream(StreamWatcher); + +impl UvStream { + static fn new(watcher: StreamWatcher) -> UvStream { + UvStream(watcher) + } + + fn watcher(&self) -> StreamWatcher { + match self { &UvStream(w) => w } + } + + // XXX: finalize isn't working for ~UvStream??? + fn close(&self) { + // XXX: Need to wait until this finishes before returning + self.watcher().close(||()); + } +} + +impl Drop for UvStream { + fn finalize(&self) { + rtdebug!("closing stream"); + //self.watcher().close(||()); + } +} + +impl Stream for UvStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + let result_cell = empty_cell(); + let result_cell_ptr: *Cell> = &result_cell; + + do Scheduler::local |scheduler| { + fail_unless!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.block_running_task_and_then |scheduler, task| { + rtdebug!("read: entered scheduler context"); + fail_unless!(!scheduler.in_task_context()); + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.read_start(alloc) |watcher, nread, _buf, status| { + + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + let mut watcher = watcher; + watcher.read_stop(); + + let result = if status.is_none() { + fail_unless!(nread >= 0); + Ok(nread as uint) + } else { + Err(()) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + do Scheduler::local |scheduler| { + scheduler.resume_task_immediately(task_cell.take()); + } + } + } + } + + fail_unless!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn write(&mut self, buf: &[u8]) -> Result<(), ()> { + let result_cell = empty_cell(); + let result_cell_ptr: *Cell> = &result_cell; + do Scheduler::local |scheduler| { + fail_unless!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&[u8] = &buf; + do scheduler.block_running_task_and_then |_, task| { + let mut watcher = watcher; + let task_cell = Cell(task); + let buf = unsafe { &*buf_ptr }; + // XXX: OMGCOPIES + let buf = buf.to_vec(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(()) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + do Scheduler::local |scheduler| { + scheduler.resume_task_immediately(task_cell.take()); + } + } + } + } + + fail_unless!(!result_cell.is_empty()); + return result_cell.take(); + } +} + +#[test] +#[ignore(reason = "ffi struct issues")] +fn test_simple_io_no_connect() { + do run_in_bare_thread { + let mut sched = ~UvEventLoop::new_scheduler(); + let task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let addr = Ipv4(127, 0, 0, 1, 2926); + let maybe_chan = io.connect(addr); + fail_unless!(maybe_chan.is_none()); + } + }; + sched.task_queue.push_back(task); + sched.run(); + } +} + +#[test] +#[ignore(reason = "ffi struct issues")] +fn test_simple_tcp_server_and_client() { + do run_in_bare_thread { + let mut sched = ~UvEventLoop::new_scheduler(); + let addr = Ipv4(127, 0, 0, 1, 2929); + + let client_task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); + } + }; + + let server_task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + fail_unless!(nread == 8); + for uint::range(0, nread) |i| { + rtdebug!("%u", buf[i] as uint); + fail_unless!(buf[i] == i as u8); + } + stream.close(); + listener.close(); + } + }; + + // Start the server first so it listens before the client connects + sched.task_queue.push_back(server_task); + sched.task_queue.push_back(client_task); + sched.run(); + } +} + +#[test] #[ignore(reason = "busted")] +fn test_read_and_block() { + do run_in_bare_thread { + let mut sched = ~UvEventLoop::new_scheduler(); + let addr = Ipv4(127, 0, 0, 1, 2930); + + let client_task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); + } + }; + + let server_task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + + let expected = 32; + let mut current = 0; + let mut reads = 0; + + while current < expected { + let nread = stream.read(buf).unwrap(); + for uint::range(0, nread) |i| { + let val = buf[i] as uint; + fail_unless!(val == current % 8); + current += 1; + } + reads += 1; + + do Scheduler::local |scheduler| { + // Yield to the other task in hopes that it + // will trigger a read callback while we are + // not ready for it + do scheduler.block_running_task_and_then + |scheduler, task| { + scheduler.task_queue.push_back(task); + } + } + } + + // Make sure we had multiple reads + fail_unless!(reads > 1); + + stream.close(); + listener.close(); + } + }; + + // Start the server first so it listens before the client connects + sched.task_queue.push_back(server_task); + sched.task_queue.push_back(client_task); + sched.run(); + } +} + +#[test] #[ignore(reason = "needs server")] +fn test_read_read_read() { + do run_in_bare_thread { + let mut sched = ~UvEventLoop::new_scheduler(); + let addr = Ipv4(127, 0, 0, 1, 2931); + + let client_task = ~do Task::new(&mut sched.stack_pool) { + do Scheduler::local |sched| { + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < 500000000 { + let nread = stream.read(buf).unwrap(); + rtdebug!("read %u bytes", nread as uint); + total_bytes_read += nread; + } + rtdebug_!("read %u bytes total", total_bytes_read as uint); + stream.close(); + } + }; + + sched.task_queue.push_back(client_task); + sched.run(); + } +} diff --git a/src/libcore/rt/work_queue.rs b/src/libcore/rt/work_queue.rs new file mode 100644 index 0000000000000..1be2eb26e6292 --- /dev/null +++ b/src/libcore/rt/work_queue.rs @@ -0,0 +1,47 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; + +pub struct WorkQueue { + priv queue: ~[T] +} + +pub impl WorkQueue { + static fn new() -> WorkQueue { + WorkQueue { + queue: ~[] + } + } + + fn push_back(&mut self, value: T) { + self.queue.push(value) + } + + fn pop_back(&mut self) -> Option { + if !self.queue.is_empty() { + Some(self.queue.pop()) + } else { + None + } + } + + fn push_front(&mut self, value: T) { + self.queue.unshift(value) + } + + fn pop_front(&mut self) -> Option { + if !self.queue.is_empty() { + Some(self.queue.shift()) + } else { + None + } + } +} diff --git a/src/libcore/unstable.rs b/src/libcore/unstable.rs index 4f45535d0f856..7936b18dbe20c 100644 --- a/src/libcore/unstable.rs +++ b/src/libcore/unstable.rs @@ -35,6 +35,8 @@ pub mod extfmt; #[path = "unstable/lang.rs"] #[cfg(notest)] pub mod lang; +#[path = "unstable/uvll.rs"] +pub mod uvll; mod rustrt { use unstable::{raw_thread, rust_little_lock}; @@ -61,7 +63,7 @@ for it to terminate. The executing thread has no access to a task pointer and will be using a normal large stack. */ -pub unsafe fn run_in_bare_thread(f: ~fn()) { +pub fn run_in_bare_thread(f: ~fn()) { let (port, chan) = comm::stream(); // FIXME #4525: Unfortunate that this creates an extra scheduler but it's // necessary since rust_raw_thread_join_delete is blocking @@ -80,22 +82,18 @@ pub unsafe fn run_in_bare_thread(f: ~fn()) { #[test] fn test_run_in_bare_thread() { - unsafe { - let i = 100; - do run_in_bare_thread { - fail_unless!(i == 100); - } + let i = 100; + do run_in_bare_thread { + fail_unless!(i == 100); } } #[test] fn test_run_in_bare_thread_exchange() { - unsafe { - // Does the exchange heap work without the runtime? - let i = ~100; - do run_in_bare_thread { - fail_unless!(i == ~100); - } + // Does the exchange heap work without the runtime? + let i = ~100; + do run_in_bare_thread { + fail_unless!(i == ~100); } } diff --git a/src/libstd/uv_ll.rs b/src/libcore/unstable/uvll.rs similarity index 86% rename from src/libstd/uv_ll.rs rename to src/libcore/unstable/uvll.rs index fa415e0875b3e..0aed2567a220f 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libcore/unstable/uvll.rs @@ -32,14 +32,15 @@ #[allow(non_camel_case_types)]; // C types -use core::libc::size_t; -use core::libc; -use core::prelude::*; -use core::ptr::to_unsafe_ptr; -use core::ptr; -use core::str; -use core::vec; -use core::comm::{stream, Chan, SharedChan, Port}; +use libc::size_t; +use libc::c_void; +use prelude::*; +use ptr::to_unsafe_ptr; + +pub type uv_handle_t = c_void; +pub type uv_loop_t = c_void; +pub type uv_idle_t = c_void; +pub type uv_idle_cb = *u8; // libuv struct mappings pub struct uv_ip4_addr { @@ -355,7 +356,10 @@ pub struct uv_getaddrinfo_t { } pub mod uv_ll_struct_stubgen { - use uv_ll::{ + + use ptr; + + use super::{ uv_async_t, uv_connect_t, uv_getaddrinfo_t, @@ -369,15 +373,13 @@ pub mod uv_ll_struct_stubgen { #[cfg(target_os = "android")] #[cfg(target_os = "macos")] #[cfg(target_os = "freebsd")] - use uv_ll::{ + use super::{ uv_async_t_32bit_unix_riders, uv_tcp_t_32bit_unix_riders, uv_timer_t_32bit_unix_riders, uv_write_t_32bit_unix_riders, }; - use core::ptr; - pub fn gen_stub_uv_tcp_t() -> uv_tcp_t { return gen_stub_os(); #[cfg(target_os = "linux")] @@ -724,157 +726,157 @@ pub mod uv_ll_struct_stubgen { } } -pub mod rustrt { - use super::{addrinfo, sockaddr_in, sockaddr_in6, uv_async_t, uv_buf_t}; - use super::{uv_connect_t, uv_err_t, uv_getaddrinfo_t, uv_stream_t}; - use super::{uv_tcp_t, uv_timer_t, uv_write_t}; - - use core::libc; - - #[nolink] - pub extern { - // libuv public API - unsafe fn rust_uv_loop_new() -> *libc::c_void; - unsafe fn rust_uv_loop_delete(lp: *libc::c_void); - unsafe fn rust_uv_run(loop_handle: *libc::c_void); - unsafe fn rust_uv_close(handle: *libc::c_void, cb: *u8); - unsafe fn rust_uv_walk(loop_handle: *libc::c_void, cb: *u8, - arg: *libc::c_void); - unsafe fn rust_uv_async_send(handle: *uv_async_t); - unsafe fn rust_uv_async_init(loop_handle: *libc::c_void, - async_handle: *uv_async_t, - cb: *u8) -> libc::c_int; - unsafe fn rust_uv_tcp_init( - loop_handle: *libc::c_void, - handle_ptr: *uv_tcp_t) -> libc::c_int; - // FIXME ref #2604 .. ? - unsafe fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, - len: libc::size_t); - unsafe fn rust_uv_last_error(loop_handle: *libc::c_void) -> uv_err_t; - // FIXME ref #2064 - unsafe fn rust_uv_strerror(err: *uv_err_t) -> *libc::c_char; - // FIXME ref #2064 - unsafe fn rust_uv_err_name(err: *uv_err_t) -> *libc::c_char; - unsafe fn rust_uv_ip4_addr(ip: *u8, port: libc::c_int) - -> sockaddr_in; - unsafe fn rust_uv_ip6_addr(ip: *u8, port: libc::c_int) - -> sockaddr_in6; - unsafe fn rust_uv_ip4_name(src: *sockaddr_in, - dst: *u8, - size: libc::size_t) - -> libc::c_int; - unsafe fn rust_uv_ip6_name(src: *sockaddr_in6, - dst: *u8, - size: libc::size_t) - -> libc::c_int; - unsafe fn rust_uv_ip4_port(src: *sockaddr_in) -> libc::c_uint; - unsafe fn rust_uv_ip6_port(src: *sockaddr_in6) -> libc::c_uint; - // FIXME ref #2064 - unsafe fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - ++after_cb: *u8, - ++addr: *sockaddr_in) -> libc::c_int; - // FIXME ref #2064 - unsafe fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, - ++addr: *sockaddr_in) -> libc::c_int; - // FIXME ref #2064 - unsafe fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - ++after_cb: *u8, - ++addr: *sockaddr_in6) -> libc::c_int; - // FIXME ref #2064 - unsafe fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, - ++addr: *sockaddr_in6) -> libc::c_int; - unsafe fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, - ++name: *sockaddr_in) - -> libc::c_int; - unsafe fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, - ++name: *sockaddr_in6) - -> libc::c_int; - unsafe fn rust_uv_listen(stream: *libc::c_void, - backlog: libc::c_int, - cb: *u8) -> libc::c_int; - unsafe fn rust_uv_accept(server: *libc::c_void, client: *libc::c_void) - -> libc::c_int; - unsafe fn rust_uv_write(req: *libc::c_void, - stream: *libc::c_void, - ++buf_in: *uv_buf_t, - buf_cnt: libc::c_int, - cb: *u8) - -> libc::c_int; - unsafe fn rust_uv_read_start(stream: *libc::c_void, - on_alloc: *u8, - on_read: *u8) - -> libc::c_int; - unsafe fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int; - unsafe fn rust_uv_timer_init(loop_handle: *libc::c_void, - timer_handle: *uv_timer_t) - -> libc::c_int; - unsafe fn rust_uv_timer_start( - timer_handle: *uv_timer_t, - cb: *u8, - timeout: libc::c_uint, - repeat: libc::c_uint) -> libc::c_int; - unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int; - - unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void, - handle: *uv_getaddrinfo_t, - cb: *u8, - node_name_ptr: *u8, - service_name_ptr: *u8, - // should probably only pass ptr::null() - hints: *addrinfo) - -> libc::c_int; - unsafe fn rust_uv_freeaddrinfo(res: *addrinfo); - - // data accessors/helpers for rust-mapped uv structs - unsafe fn rust_uv_helper_get_INADDR_NONE() -> u32; - unsafe fn rust_uv_is_ipv4_addrinfo(input: *addrinfo) -> bool; - unsafe fn rust_uv_is_ipv6_addrinfo(input: *addrinfo) -> bool; - unsafe fn rust_uv_get_next_addrinfo(input: *addrinfo) -> *addrinfo; - unsafe fn rust_uv_addrinfo_as_sockaddr_in(input: *addrinfo) - -> *sockaddr_in; - unsafe fn rust_uv_addrinfo_as_sockaddr_in6(input: *addrinfo) - -> *sockaddr_in6; - unsafe fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8; - unsafe fn rust_uv_free_base_of_buf(++buf: uv_buf_t); - unsafe fn rust_uv_get_stream_handle_from_connect_req( - connect_req: *uv_connect_t) - -> *uv_stream_t; - unsafe fn rust_uv_get_stream_handle_from_write_req( - write_req: *uv_write_t) - -> *uv_stream_t; - unsafe fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void) - -> *libc::c_void; - unsafe fn rust_uv_get_data_for_uv_loop(loop_ptr: *libc::c_void) - -> *libc::c_void; - unsafe fn rust_uv_set_data_for_uv_loop(loop_ptr: *libc::c_void, - data: *libc::c_void); - unsafe fn rust_uv_get_data_for_uv_handle(handle: *libc::c_void) - -> *libc::c_void; - unsafe fn rust_uv_set_data_for_uv_handle(handle: *libc::c_void, - data: *libc::c_void); - unsafe fn rust_uv_get_data_for_req(req: *libc::c_void) - -> *libc::c_void; - unsafe fn rust_uv_set_data_for_req(req: *libc::c_void, +#[nolink] +extern mod rustrt { + + // libuv public API + unsafe fn rust_uv_loop_new() -> *libc::c_void; + unsafe fn rust_uv_loop_delete(lp: *libc::c_void); + unsafe fn rust_uv_run(loop_handle: *libc::c_void); + unsafe fn rust_uv_close(handle: *libc::c_void, cb: *u8); + unsafe fn rust_uv_walk(loop_handle: *libc::c_void, cb: *u8, + arg: *libc::c_void); + + unsafe fn rust_uv_idle_new() -> *uv_idle_t; + unsafe fn rust_uv_idle_delete(handle: *uv_idle_t); + unsafe fn rust_uv_idle_init(loop_handle: *uv_loop_t, + handle: *uv_idle_t) -> libc::c_int; + unsafe fn rust_uv_idle_start(handle: *uv_idle_t, + cb: uv_idle_cb) -> libc::c_int; + unsafe fn rust_uv_idle_stop(handle: *uv_idle_t) -> libc::c_int; + + unsafe fn rust_uv_async_send(handle: *uv_async_t); + unsafe fn rust_uv_async_init(loop_handle: *libc::c_void, + async_handle: *uv_async_t, + cb: *u8) -> libc::c_int; + unsafe fn rust_uv_tcp_init( + loop_handle: *libc::c_void, + handle_ptr: *uv_tcp_t) -> libc::c_int; + // FIXME ref #2604 .. ? + unsafe fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, + len: libc::size_t); + unsafe fn rust_uv_last_error(loop_handle: *libc::c_void) -> uv_err_t; + // FIXME ref #2064 + unsafe fn rust_uv_strerror(err: *uv_err_t) -> *libc::c_char; + // FIXME ref #2064 + unsafe fn rust_uv_err_name(err: *uv_err_t) -> *libc::c_char; + unsafe fn rust_uv_ip4_addr(ip: *u8, port: libc::c_int) + -> sockaddr_in; + unsafe fn rust_uv_ip6_addr(ip: *u8, port: libc::c_int) + -> sockaddr_in6; + unsafe fn rust_uv_ip4_name(src: *sockaddr_in, + dst: *u8, + size: libc::size_t) + -> libc::c_int; + unsafe fn rust_uv_ip6_name(src: *sockaddr_in6, + dst: *u8, + size: libc::size_t) + -> libc::c_int; + unsafe fn rust_uv_ip4_port(src: *sockaddr_in) -> libc::c_uint; + unsafe fn rust_uv_ip6_port(src: *sockaddr_in6) -> libc::c_uint; + // FIXME ref #2064 + unsafe fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + ++after_cb: *u8, + ++addr: *sockaddr_in) -> libc::c_int; + // FIXME ref #2064 + unsafe fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, + ++addr: *sockaddr_in) -> libc::c_int; + // FIXME ref #2064 + unsafe fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t, + tcp_handle_ptr: *uv_tcp_t, + ++after_cb: *u8, + ++addr: *sockaddr_in6) -> libc::c_int; + // FIXME ref #2064 + unsafe fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, + ++addr: *sockaddr_in6) -> libc::c_int; + unsafe fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, + ++name: *sockaddr_in) -> libc::c_int; + unsafe fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, + ++name: *sockaddr_in6) ->libc::c_int; + unsafe fn rust_uv_listen(stream: *libc::c_void, + backlog: libc::c_int, + cb: *u8) -> libc::c_int; + unsafe fn rust_uv_accept(server: *libc::c_void, client: *libc::c_void) + -> libc::c_int; + unsafe fn rust_uv_write(req: *libc::c_void, + stream: *libc::c_void, + ++buf_in: *uv_buf_t, + buf_cnt: libc::c_int, + cb: *u8) + -> libc::c_int; + unsafe fn rust_uv_read_start(stream: *libc::c_void, + on_alloc: *u8, + on_read: *u8) + -> libc::c_int; + unsafe fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int; + unsafe fn rust_uv_timer_init(loop_handle: *libc::c_void, + timer_handle: *uv_timer_t) + -> libc::c_int; + unsafe fn rust_uv_timer_start( + timer_handle: *uv_timer_t, + cb: *u8, + timeout: libc::c_uint, + repeat: libc::c_uint) -> libc::c_int; + unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int; + + unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void, + handle: *uv_getaddrinfo_t, + cb: *u8, + node_name_ptr: *u8, + service_name_ptr: *u8, + // should probably only pass ptr::null() + hints: *addrinfo) + -> libc::c_int; + unsafe fn rust_uv_freeaddrinfo(res: *addrinfo); + + // data accessors/helpers for rust-mapped uv structs + unsafe fn rust_uv_helper_get_INADDR_NONE() -> u32; + unsafe fn rust_uv_is_ipv4_addrinfo(input: *addrinfo) -> bool; + unsafe fn rust_uv_is_ipv6_addrinfo(input: *addrinfo) -> bool; + unsafe fn rust_uv_get_next_addrinfo(input: *addrinfo) -> *addrinfo; + unsafe fn rust_uv_addrinfo_as_sockaddr_in(input: *addrinfo) + -> *sockaddr_in; + unsafe fn rust_uv_addrinfo_as_sockaddr_in6(input: *addrinfo) + -> *sockaddr_in6; + unsafe fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8; + unsafe fn rust_uv_free_base_of_buf(++buf: uv_buf_t); + unsafe fn rust_uv_get_stream_handle_from_connect_req( + connect_req: *uv_connect_t) + -> *uv_stream_t; + unsafe fn rust_uv_get_stream_handle_from_write_req( + write_req: *uv_write_t) + -> *uv_stream_t; + unsafe fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void) + -> *libc::c_void; + unsafe fn rust_uv_get_data_for_uv_loop(loop_ptr: *libc::c_void) + -> *libc::c_void; + unsafe fn rust_uv_set_data_for_uv_loop(loop_ptr: *libc::c_void, data: *libc::c_void); - unsafe fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8; - unsafe fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t; - - // sizeof testing helpers - unsafe fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_write_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_err_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint; - unsafe fn rust_uv_helper_sockaddr_in6_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_async_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_timer_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_uv_getaddrinfo_t_size() -> libc::c_uint; - unsafe fn rust_uv_helper_addrinfo_size() -> libc::c_uint; - unsafe fn rust_uv_helper_addr_in_size() -> libc::c_uint; - } + unsafe fn rust_uv_get_data_for_uv_handle(handle: *libc::c_void) + -> *libc::c_void; + unsafe fn rust_uv_set_data_for_uv_handle(handle: *libc::c_void, + data: *libc::c_void); + unsafe fn rust_uv_get_data_for_req(req: *libc::c_void) + -> *libc::c_void; + unsafe fn rust_uv_set_data_for_req(req: *libc::c_void, + data: *libc::c_void); + unsafe fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8; + unsafe fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t; + + // sizeof testing helpers + unsafe fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_write_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_err_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint; + unsafe fn rust_uv_helper_sockaddr_in6_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_async_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_timer_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_uv_getaddrinfo_t_size() -> libc::c_uint; + unsafe fn rust_uv_helper_addrinfo_size() -> libc::c_uint; + unsafe fn rust_uv_helper_addr_in_size() -> libc::c_uint; } pub unsafe fn loop_new() -> *libc::c_void { @@ -897,6 +899,27 @@ pub unsafe fn walk(loop_handle: *libc::c_void, cb: *u8, arg: *libc::c_void) { rustrt::rust_uv_walk(loop_handle, cb, arg); } +pub unsafe fn idle_new() -> *uv_idle_t { + rustrt::rust_uv_idle_new() +} + +pub unsafe fn idle_delete(handle: *uv_idle_t) { + rustrt::rust_uv_idle_delete(handle) +} + +pub unsafe fn idle_init(loop_handle: *uv_loop_t, + handle: *uv_idle_t) -> libc::c_int { + rustrt::rust_uv_idle_init(loop_handle, handle) +} + +pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> libc::c_int { + rustrt::rust_uv_idle_start(handle, cb) +} + +pub unsafe fn idle_stop(handle: *uv_idle_t) -> libc::c_int { + rustrt::rust_uv_idle_stop(handle) +} + pub unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t) -> libc::c_int { return rustrt::rust_uv_tcp_init(loop_handle, handle); @@ -1215,19 +1238,11 @@ pub unsafe fn addrinfo_as_sockaddr_in6(input: *addrinfo) -> *sockaddr_in6 { rustrt::rust_uv_addrinfo_as_sockaddr_in6(input) } -//#[cfg(test)] +#[cfg(test)] pub mod test { - use core::prelude::*; - - use uv_ll::*; - - use core::comm::{SharedChan, stream}; - use core::libc; - use core::ptr; - use core::str; - use core::sys; - use core::task; - use core::vec; + use prelude::*; + use super::*; + use comm::{SharedChan, stream, GenericChan, GenericPort}; enum tcp_read_data { tcp_read_eof, @@ -1473,7 +1488,7 @@ pub mod test { let client_data = get_data_for_uv_handle( client_stream_ptr as *libc::c_void) as *tcp_server_data; - let server_kill_msg = (*client_data).server_kill_msg; + let server_kill_msg = copy (*client_data).server_kill_msg; let write_req = (*client_data).server_write_req; if str::contains(request_str, server_kill_msg) { log(debug, ~"SERVER: client req contains kill_msg!"); @@ -1606,8 +1621,8 @@ pub mod test { fn impl_uv_tcp_server(server_ip: &str, server_port: int, - +kill_server_msg: ~str, - +server_resp_msg: ~str, + kill_server_msg: ~str, + server_resp_msg: ~str, server_chan: SharedChan<~str>, continue_chan: SharedChan) { unsafe { @@ -1725,10 +1740,12 @@ pub mod test { let (continue_port, continue_chan) = stream::(); let continue_chan = SharedChan(continue_chan); + let kill_server_msg_copy = copy kill_server_msg; + let server_resp_msg_copy = copy server_resp_msg; do task::spawn_sched(task::ManualThreads(1)) { impl_uv_tcp_server(bind_ip, port, - kill_server_msg, - server_resp_msg, + copy kill_server_msg_copy, + copy server_resp_msg_copy, server_chan.clone(), continue_chan.clone()); }; @@ -1738,9 +1755,10 @@ pub mod test { continue_port.recv(); log(debug, ~"received on continue port, set up tcp client"); + let kill_server_msg_copy = copy kill_server_msg; do task::spawn_sched(task::ManualThreads(1u)) { impl_uv_tcp_request(request_ip, port, - kill_server_msg, + kill_server_msg_copy, client_chan.clone()); }; @@ -1760,11 +1778,10 @@ pub mod test { pub mod tcp_and_server_client_test { #[cfg(target_arch="x86_64")] pub mod impl64 { - use uv_ll::test::*; #[test] pub fn test_uv_ll_tcp_server_and_request() { unsafe { - impl_uv_tcp_server_and_request(); + super::super::impl_uv_tcp_server_and_request(); } } } @@ -1772,12 +1789,11 @@ pub mod test { #[cfg(target_arch="arm")] #[cfg(target_arch="mips")] pub mod impl32 { - use uv_ll::test::*; #[test] #[ignore(cfg(target_os = "linux"))] pub fn test_uv_ll_tcp_server_and_request() { unsafe { - impl_uv_tcp_server_and_request(); + super::super::impl_uv_tcp_server_and_request(); } } } @@ -1804,7 +1820,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_tcp_t", - ::uv_ll::rustrt::rust_uv_helper_uv_tcp_t_size() + super::rustrt::rust_uv_helper_uv_tcp_t_size() ); } } @@ -1813,7 +1829,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_connect_t", - ::uv_ll::rustrt::rust_uv_helper_uv_connect_t_size() + super::rustrt::rust_uv_helper_uv_connect_t_size() ); } } @@ -1822,7 +1838,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_buf_t", - ::uv_ll::rustrt::rust_uv_helper_uv_buf_t_size() + super::rustrt::rust_uv_helper_uv_buf_t_size() ); } } @@ -1831,7 +1847,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_write_t", - ::uv_ll::rustrt::rust_uv_helper_uv_write_t_size() + super::rustrt::rust_uv_helper_uv_write_t_size() ); } } @@ -1841,7 +1857,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"sockaddr_in", - ::uv_ll::rustrt::rust_uv_helper_sockaddr_in_size() + super::rustrt::rust_uv_helper_sockaddr_in_size() ); } } @@ -1849,7 +1865,7 @@ pub mod test { fn test_uv_ll_struct_size_sockaddr_in6() { unsafe { let foreign_handle_size = - ::uv_ll::rustrt::rust_uv_helper_sockaddr_in6_size(); + super::rustrt::rust_uv_helper_sockaddr_in6_size(); let rust_handle_size = sys::size_of::(); let output = fmt!("sockaddr_in6 -- foreign: %u rust: %u", foreign_handle_size as uint, rust_handle_size); @@ -1868,7 +1884,7 @@ pub mod test { fn test_uv_ll_struct_size_addr_in() { unsafe { let foreign_handle_size = - ::uv_ll::rustrt::rust_uv_helper_addr_in_size(); + super::rustrt::rust_uv_helper_addr_in_size(); let rust_handle_size = sys::size_of::(); let output = fmt!("addr_in -- foreign: %u rust: %u", foreign_handle_size as uint, rust_handle_size); @@ -1884,7 +1900,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_async_t", - ::uv_ll::rustrt::rust_uv_helper_uv_async_t_size() + super::rustrt::rust_uv_helper_uv_async_t_size() ); } } @@ -1894,7 +1910,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_timer_t", - ::uv_ll::rustrt::rust_uv_helper_uv_timer_t_size() + super::rustrt::rust_uv_helper_uv_timer_t_size() ); } } @@ -1905,7 +1921,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"uv_getaddrinfo_t", - ::uv_ll::rustrt::rust_uv_helper_uv_getaddrinfo_t_size() + super::rustrt::rust_uv_helper_uv_getaddrinfo_t_size() ); } } @@ -1916,7 +1932,7 @@ pub mod test { unsafe { struct_size_check_common::( ~"addrinfo", - ::uv_ll::rustrt::rust_uv_helper_uv_timer_t_size() + super::rustrt::rust_uv_helper_uv_timer_t_size() ); } } diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 8021162188f79..a5e689077738b 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -21,20 +21,20 @@ use core::vec; use iotask = uv::iotask::IoTask; use interact = uv::iotask::interact; -use sockaddr_in = uv::ll::sockaddr_in; -use sockaddr_in6 = uv::ll::sockaddr_in6; -use addrinfo = uv::ll::addrinfo; -use uv_getaddrinfo_t = uv::ll::uv_getaddrinfo_t; -use uv_ip4_name = uv::ll::ip4_name; -use uv_ip4_port = uv::ll::ip4_port; -use uv_ip6_name = uv::ll::ip6_name; -use uv_ip6_port = uv::ll::ip6_port; -use uv_getaddrinfo = uv::ll::getaddrinfo; -use uv_freeaddrinfo = uv::ll::freeaddrinfo; -use create_uv_getaddrinfo_t = uv::ll::getaddrinfo_t; -use set_data_for_req = uv::ll::set_data_for_req; -use get_data_for_req = uv::ll::get_data_for_req; -use ll = uv::ll; +use sockaddr_in = core::unstable::uvll::sockaddr_in; +use sockaddr_in6 = core::unstable::uvll::sockaddr_in6; +use addrinfo = core::unstable::uvll::addrinfo; +use uv_getaddrinfo_t = core::unstable::uvll::uv_getaddrinfo_t; +use uv_ip4_name = core::unstable::uvll::ip4_name; +use uv_ip4_port = core::unstable::uvll::ip4_port; +use uv_ip6_name = core::unstable::uvll::ip6_name; +use uv_ip6_port = core::unstable::uvll::ip6_port; +use uv_getaddrinfo = core::unstable::uvll::getaddrinfo; +use uv_freeaddrinfo = core::unstable::uvll::freeaddrinfo; +use create_uv_getaddrinfo_t = core::unstable::uvll::getaddrinfo_t; +use set_data_for_req = core::unstable::uvll::set_data_for_req; +use get_data_for_req = core::unstable::uvll::get_data_for_req; +use ll = core::unstable::uvll; /// An IP address pub enum IpAddr { diff --git a/src/libstd/std.rc b/src/libstd/std.rc index a3b18e50df26a..85e914a60a140 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -36,6 +36,8 @@ not required in or otherwise suitable for the core library. extern mod core(vers = "0.6"); use core::*; +pub use uv_ll = core::unstable::uvll; + // General io and system-services modules pub mod net; @@ -45,7 +47,6 @@ pub mod net_url; // libuv modules pub mod uv; -pub mod uv_ll; pub mod uv_iotask; pub mod uv_global_loop; diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 1a4f8e8990646..aaddc9b6836f3 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -33,6 +33,6 @@ * facilities. */ -pub use ll = uv_ll; +pub use ll = core::unstable::uvll; pub use iotask = uv_iotask; pub use global_loop = uv_global_loop; diff --git a/src/libuv b/src/libuv index 218ab86721eef..576ab1db8ea03 160000 --- a/src/libuv +++ b/src/libuv @@ -1 +1 @@ -Subproject commit 218ab86721eefd7b7e97fa6d9f95a80a1fa8686c +Subproject commit 576ab1db8ea03889eb7b2274654afe7c5c867230 diff --git a/src/rt/arch/i386/_context.S b/src/rt/arch/i386/_context.S index a9e329f0bf358..d8b7281e72b75 100644 --- a/src/rt/arch/i386/_context.S +++ b/src/rt/arch/i386/_context.S @@ -15,9 +15,15 @@ getcontext. The registers_t variable is in (%esp) */ +#if defined(__APPLE__) || defined(_WIN32) +#define SWAP_REGISTERS _swap_registers +#else +#define SWAP_REGISTERS swap_registers +#endif + // swap_registers(registers_t *oregs, registers_t *regs) -.globl swap_registers -swap_registers: +.globl SWAP_REGISTERS +SWAP_REGISTERS: // save the old context movl 4(%esp), %eax movl %ebx, 4(%eax) diff --git a/src/rt/arch/i386/context.cpp b/src/rt/arch/i386/context.cpp index 50a15e8d86c82..94e6f0418d07c 100644 --- a/src/rt/arch/i386/context.cpp +++ b/src/rt/arch/i386/context.cpp @@ -13,8 +13,7 @@ #include "../../rust_globals.h" extern "C" uint32_t CDECL swap_registers(registers_t *oregs, - registers_t *regs) - asm ("swap_registers"); + registers_t *regs); context::context() { diff --git a/src/rt/arch/x86_64/_context.S b/src/rt/arch/x86_64/_context.S index 7fdc6114b0a6b..1f9ae1c83c565 100644 --- a/src/rt/arch/x86_64/_context.S +++ b/src/rt/arch/x86_64/_context.S @@ -49,9 +49,15 @@ First four arguments: anyhow. */ +#if defined(__APPLE__) || defined(_WIN32) +#define SWAP_REGISTERS _swap_registers +#else +#define SWAP_REGISTERS swap_registers +#endif + // swap_registers(registers_t *oregs, registers_t *regs) -.globl swap_registers -swap_registers: +.globl SWAP_REGISTERS +SWAP_REGISTERS: // n.b. when we enter, the return address is at the top of // the stack (i.e., 0(%RSP)) and the argument is in // RUSTRT_ARG0_S. We diff --git a/src/rt/arch/x86_64/context.cpp b/src/rt/arch/x86_64/context.cpp index b7f82b574680b..6a265dff76156 100644 --- a/src/rt/arch/x86_64/context.cpp +++ b/src/rt/arch/x86_64/context.cpp @@ -13,8 +13,7 @@ #include "../../rust_globals.h" extern "C" void CDECL swap_registers(registers_t *oregs, - registers_t *regs) -asm ("swap_registers"); + registers_t *regs); context::context() { diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 803da32cbc8ac..d9ef6a52dbef6 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -21,6 +21,17 @@ void* global_crate_map = NULL; +#ifndef _WIN32 +pthread_key_t sched_key; +#else +DWORD sched_key; +#endif + +extern "C" void* +rust_get_sched_tls_key() { + return &sched_key; +} + /** The runtime entrypoint. The (C ABI) main function generated by rustc calls `rust_start`, providing the address of the Rust ABI main function, the @@ -30,6 +41,10 @@ void* global_crate_map = NULL; extern "C" CDECL int rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { +#ifndef _WIN32 + pthread_key_create(&sched_key, NULL); +#endif + // Load runtime configuration options from the environment. // FIXME #1497: Should provide a way to get these from the command // line as well. diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a621d61cdf792..248f851e5b9f8 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -769,20 +769,20 @@ extern "C" CDECL void record_sp_limit(void *limit); class raw_thread: public rust_thread { public: - fn_env_pair *fn; + fn_env_pair fn; - raw_thread(fn_env_pair *fn) : fn(fn) { } + raw_thread(fn_env_pair fn) : fn(fn) { } virtual void run() { record_sp_limit(0); - fn->f(NULL, fn->env, NULL); + fn.f(NULL, fn.env, NULL); } }; extern "C" raw_thread* rust_raw_thread_start(fn_env_pair *fn) { assert(fn); - raw_thread *thread = new raw_thread(fn); + raw_thread *thread = new raw_thread(*fn); thread->start(); return thread; } diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index f08261c336dcd..5159434873733 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -376,16 +376,7 @@ current_kernel_malloc_alloc_cb(uv_handle_t* handle, extern "C" void rust_uv_buf_init(uv_buf_t* out_buf, char* base, size_t len) { - rust_task* task = rust_get_current_task(); - LOG(task, stdlib,"rust_uv_buf_init: base: %lu" \ - "len: %lu", - (unsigned long int)base, - (unsigned long int)len); *out_buf = uv_buf_init(base, len); - LOG(task, stdlib, "rust_uv_buf_init: after: " - "result->base: %" PRIxPTR " len: %" PRIxPTR, - (unsigned long int)(*out_buf).base, - (unsigned long int)(*out_buf).len); } extern "C" uv_loop_t* @@ -481,18 +472,11 @@ rust_uv_free_base_of_buf(uv_buf_t buf) { extern "C" struct sockaddr_in rust_uv_ip4_addr(const char* ip, int port) { - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "before creating addr_ptr.. ip %s" \ - " port %d\n", ip, port); struct sockaddr_in addr = uv_ip4_addr(ip, port); - LOG(task, stdlib, "after creating .. port: %d", addr.sin_port); return addr; } extern "C" struct sockaddr_in6 rust_uv_ip6_addr(const char* ip, int port) { - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "before creating addr_ptr.. ip %s" \ - " port %d\n", ip, port); return uv_ip6_addr(ip, port); } extern "C" int @@ -554,3 +538,28 @@ extern "C" sockaddr_in6* rust_uv_addrinfo_as_sockaddr_in6(addrinfo* input) { return (sockaddr_in6*)input->ai_addr; } + +extern "C" uv_idle_t* +rust_uv_idle_new() { + return new uv_idle_t; +} + +extern "C" void +rust_uv_idle_delete(uv_idle_t* handle) { + delete handle; +} + +extern "C" int +rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) { + return uv_idle_init(loop, idle); +} + +extern "C" int +rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) { + return uv_idle_start(idle, cb); +} + +extern "C" int +rust_uv_idle_stop(uv_idle_t* idle) { + return uv_idle_stop(idle); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 284f827bc753a..e27e0d5240503 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -140,6 +140,11 @@ rust_uv_current_kernel_malloc rust_uv_current_kernel_free rust_uv_getaddrinfo rust_uv_freeaddrinfo +rust_uv_idle_new +rust_uv_idle_delete +rust_uv_idle_init +rust_uv_idle_start +rust_uv_idle_stop rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock @@ -187,3 +192,5 @@ rust_get_global_data_ptr rust_inc_kernel_live_count rust_dec_kernel_live_count rust_get_exchange_count_ptr +rust_get_sched_tls_key +swap_registers