Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std::uv::global_loop and std::timer #2261

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6694cfe
making brson's req. cleanups in #2134 plus change printf to LOG in c++
olsonjeffery Apr 8, 2012
d4a6e9f
adding low-level uv_timer_* stuff to libuv bindings
olsonjeffery Apr 10, 2012
839a4f8
add needed fields for global libuv loop + bindings to manage from rust
olsonjeffery Apr 12, 2012
3f68911
exporting priv::weaken_task and adding some debug logging
olsonjeffery Apr 12, 2012
fa7acb1
bindings to get/set data field on uv_loop_t* and debug log cleanup
olsonjeffery Apr 13, 2012
194ea3c
rt: whitespace cleanup for existing libuv integration
olsonjeffery Apr 16, 2012
ccdca3f
tweaking rust getter/setters for libuv data to use generics
olsonjeffery Apr 16, 2012
4590f6a
end-to-end impl of global loop w/ high-level ref counting.. needs work
olsonjeffery Apr 16, 2012
f55857b
clean and trying the global loop test as two separate loop lifetimes..
olsonjeffery Apr 16, 2012
a5187a9
make weak task that runs libuv loop unsupervised
olsonjeffery Apr 17, 2012
d06b311
don't use ::malloc for initializing the global_async_handle in rust_k…
olsonjeffery Apr 17, 2012
472ed67
fix a race in global loop test; unref_handle now takes a close_cb
olsonjeffery Apr 17, 2012
d5e54ae
remove rustrt.def.in entry for no-longer-existent c++ function
olsonjeffery Apr 17, 2012
0fafaea
replace impl of globa_async_handle with one using atomic compare-and-…
olsonjeffery Apr 17, 2012
9581700
uv::hl::get_global_loop() -> uv::global_loop::get()
olsonjeffery Apr 17, 2012
053c048
adding missing binding to rustrt.def.in
olsonjeffery Apr 17, 2012
d576e5a
fix uv_timer_t size in 32bit linux and windows
olsonjeffery Apr 17, 2012
133878d
std: refactor global_loop::get.. make it reusable
olsonjeffery Apr 19, 2012
9ba95f5
std: get_monitor_task_gl() is global_loop::get() default
olsonjeffery Apr 19, 2012
d81eed0
whitespace cleanup
olsonjeffery Apr 19, 2012
9cfedcd
std: fail if exiting hl_loop has unref_handles at weaken_task exit
olsonjeffery Apr 19, 2012
455b44d
std::uv : cleanup and an isolated test for hand-rolled high_level_loops
olsonjeffery Apr 19, 2012
22c2708
std: dump old std::uv API.. move remaining tests into uv::ll
olsonjeffery Apr 19, 2012
1b4a172
std: add std::timer and timer::delayed_send and timer::sleep
olsonjeffery Apr 20, 2012
7d1e2ae
std: add timer::recv_timeout() and whitespace cleanup
olsonjeffery Apr 20, 2012
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/libcore/priv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[doc(hidden)];

export chan_from_global_ptr;
export chan_from_global_ptr, weaken_task;

import compare_and_swap = rustrt::rust_compare_and_swap_ptr;

Expand Down Expand Up @@ -32,8 +32,11 @@ unsafe fn chan_from_global_ptr<T: send>(
abort
}

log(debug,"ENTERING chan_from_global_ptr, before is_prob_zero check");
let is_probably_zero = *global == 0u;
log(debug,"after is_prob_zero check");
if is_probably_zero {
log(debug,"is probably zero...");
// There's no global channel. We must make it

let setup_po = comm::port();
Expand All @@ -51,14 +54,17 @@ unsafe fn chan_from_global_ptr<T: send>(
}
};

log(debug,"before setup recv..");
// This is the proposed global channel
let ch = comm::recv(setup_po);
// 0 is our sentinal value. It is not a valid channel
assert unsafe::reinterpret_cast(ch) != 0u;

// Install the channel
log(debug,"BEFORE COMPARE AND SWAP");
let swapped = compare_and_swap(
global, 0u, unsafe::reinterpret_cast(ch));
log(debug,#fmt("AFTER .. swapped? %?", swapped));

if swapped {
// Success!
Expand All @@ -70,6 +76,7 @@ unsafe fn chan_from_global_ptr<T: send>(
unsafe::reinterpret_cast(*global)
}
} else {
log(debug, "global != 0");
unsafe::reinterpret_cast(*global)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/libstd/std.rc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use core(vers = "0.2");
import core::*;

export net, uv;
export c_vec, util;
export c_vec, util, timer;
export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind;
export rope, arena;
export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint;
Expand All @@ -28,12 +28,14 @@ mod net;
mod uv;
mod uv_ll;
mod uv_hl;
mod uv_global_loop;


// Utility modules

mod c_vec;
mod util;
mod timer;


// Collections
Expand Down
183 changes: 183 additions & 0 deletions src/libstd/timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#[doc ="
Utilities that leverage libuv's `uv_timer_*` API
"];

import uv = uv;
export delayed_send, sleep, recv_timeout;

#[doc = "
Wait for timeout period then send provided value over a channel

This call returns immediately. Useful as the building block for a number
of higher-level timer functions.

Is not guaranteed to wait for exactly the specified time, but will wait
for *at least* that period of time.

# Arguments

* msecs - a timeout period, in milliseconds, to wait
* ch - a channel of type T to send a `val` on
* val - a value of type T to send over the provided `ch`
"]
fn delayed_send<T: send>(msecs: uint, ch: comm::chan<T>, val: T) {
task::spawn() {||
unsafe {
let timer_done_po = comm::port::<()>();
let timer_done_ch = comm::chan(timer_done_po);
let timer_done_ch_ptr = ptr::addr_of(timer_done_ch);
let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(timer);
let hl_loop = uv::global_loop::get();
uv::hl::interact(hl_loop) {|loop_ptr|
uv::hl::ref(hl_loop, timer_ptr);
let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
if (init_result == 0i32) {
let start_result = uv::ll::timer_start(
timer_ptr, delayed_send_cb, msecs, 0u);
if (start_result == 0i32) {
uv::ll::set_data_for_uv_handle(
timer_ptr,
timer_done_ch_ptr as *libc::c_void);
}
else {
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail "timer::delayed_send() start failed: "+error_msg;
}
}
else {
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail "timer::delayed_send() init failed: "+error_msg;
}
};
// delayed_send_cb has been processed by libuv
comm::recv(timer_done_po);
// notify the caller immediately
comm::send(ch, copy(val));
// then clean up our handle
uv::hl::unref_and_close(hl_loop, timer_ptr,
delayed_send_close_cb);
// uv_close for this timer has been processed
comm::recv(timer_done_po);
}
};
}

#[doc = "
Blocks the current task for (at least) the specified time period.

Is not guaranteed to sleep for exactly the specified time, but will sleep
for *at least* that period of time.

# Arguments

* msecs - an amount of time, in milliseconds, for the current task to block
"]
fn sleep(msecs: uint) {
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
delayed_send(msecs, exit_ch, ());
comm::recv(exit_po);
}

#[doc = "
Receive on a port for (up to) a specified time, then return an `option<T>`

This call will block to receive on the provided port for up to the specified
timeout. Depending on whether the provided port receives in that time period,
`recv_timeout` will return an `option<T>` representing the result.

# Arguments

* msecs - an mount of time, in milliseconds, to wait to receive
* wait_port - a `comm::port<T>` to receive on

# Returns

An `option<T>` representing the outcome of the call. If the call `recv`'d on
the provided port in the allotted timeout period, then the result will be a
`some(T)`. If not, then `none` will be returned.
"]
fn recv_timeout<T: send>(msecs: uint, wait_po: comm::port<T>) -> option<T> {
let timeout_po = comm::port::<()>();
let timeout_ch = comm::chan(timeout_po);
delayed_send(msecs, timeout_ch, ());
either::either(
{|left_val|
log(debug, #fmt("recv_time .. left_val %?",
left_val));
none
}, {|right_val|
some(right_val)
}, comm::select2(timeout_po, wait_po)
)
}

// INTERNAL API
crust fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
status: libc::c_int) unsafe {
log(debug, #fmt("delayed_send_cb handle %? status %?", handle, status));
let timer_done_ch =
*(uv::ll::get_data_for_uv_handle(handle) as *comm::chan<()>);
let stop_result = uv::ll::timer_stop(handle);
if (stop_result == 0i32) {
comm::send(timer_done_ch, ());
}
else {
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail "timer::sleep() init failed: "+error_msg;
}
}

crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe {
log(debug, #fmt("delayed_send_close_cb handle %?", handle));
let timer_done_ch =
*(uv::ll::get_data_for_uv_handle(handle) as *comm::chan<()>);
comm::send(timer_done_ch, ());
}

#[cfg(test)]
mod test {
#[test]
fn test_timer_simple_sleep_test() {
sleep(1u);
}

#[test]
fn test_timer_recv_timeout_before_time_passes() {
let expected = rand::rng().gen_str(16u);
let test_po = comm::port::<str>();
let test_ch = comm::chan(test_po);

task::spawn() {||
delayed_send(1u, test_ch, expected);
};

let actual = alt recv_timeout(1000u, test_po) {
some(val) { val }
_ { fail "test_timer_recv_timeout_before_time_passes:"+
" didn't receive result before timeout"; }
};
assert actual == expected;
}

#[test]
fn test_timer_recv_timeout_after_time_passes() {
let expected = rand::rng().gen_str(16u);
let fail_msg = rand::rng().gen_str(16u);
let test_po = comm::port::<str>();
let test_ch = comm::chan(test_po);

task::spawn() {||
delayed_send(1000u, test_ch, expected);
};

let actual = alt recv_timeout(1u, test_po) {
none { fail_msg }
_ { fail "test_timer_recv_timeout_before_time_passes:"+
" didn't receive result before timeout"; }
};
assert actual == fail_msg;
}
}
Loading