Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

small enhancements to Task #13963

Merged
merged 3 commits into from
Nov 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ endif

JULIAGC := MARKSWEEP
JULIACODEGEN := LLVM
USE_COPY_STACKS := 1

# flag for disabling assertions
ifeq ($(FORCE_ASSERTIONS), 1)
Expand Down
1 change: 0 additions & 1 deletion base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@

# type Task
# parent::Task
# last::Task
# storage::Any
# consumers
# started::Bool
Expand Down
13 changes: 2 additions & 11 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6646,10 +6646,8 @@ of the argument:
* `Task`: Wait for a `Task` to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that called `wait`).
* `RawFD`: Wait for changes on a file descriptor (see `poll_fd` for keyword arguments and return code)
If no argument is passed, the task blocks for an undefined period. If the task's
state is set to `:waiting`, it can only be restarted by an explicit call to
`schedule` or `yieldto`. If the task's state is `:runnable`, it might be
restarted unpredictably.
If no argument is passed, the task blocks for an undefined period.
A task can only be restarted by an explicit call to `schedule` or `yieldto`.
Often `wait` is called within a `while` loop to ensure a waited-for condition
is met before proceeding.
Expand Down Expand Up @@ -10406,13 +10404,6 @@ Returns `true` if the value of the sign of `x` is negative, otherwise `false`.
"""
signbit

doc"""
istaskstarted(task) -> Bool
Tell whether a task has started executing.
"""
istaskstarted

doc"""
clamp(x, lo, hi)
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,6 @@ export
Condition,
consume,
current_task,
istaskstarted,
istaskdone,
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes an exported function without a deprecation. Is there now no way to tell that a task has started?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C should be able to still compute this if needed

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should have this function call such C code, and then put a proper deprecation warning on it if we're actually going to remove it. I don't see why though. Is there a good reason to remove it?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In various intermediate versions of the PR, it wasn't possible, so I eliminated it. Later on, I didn't see an obvious need to reimplement it. Plus, the only usage of it in Base (in serialize) appears to be a race condition.

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored in 07ef29f

lock,
notify,
Expand Down
4 changes: 2 additions & 2 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ end

function serialize(s::SerializationState, t::Task)
serialize_cycle(s, t) && return
if istaskstarted(t) && !istaskdone(t)
if !istaskdone(t)
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the old version of this code, it was possible to make a task and then serialize it right away (a slightly strange thing to do, but it could fall out of reasonable code quasi-accidentally). Is that no longer possible?

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I just don't see how it could safely fall out of reasonable code. This condition introduces behavior whereby you can create copies of a Task, but only until it starts running. However, since the next few lines call serialize (and thus yield), the Task could start running while this function is going leading to inconsistent serialization results. (I realize the same could be said for all of the serialize methods, since we don't buffer / deepcopy the whole tree first, but perhaps such is life)

I think there are ways of avoiding this, depending on the desired behavior. A stable result is guaranteed once the task is finished, however, making that state the only state that is definitely safe to serialize.

error("cannot serialize a running Task")
end
writetag(s.io, TASK_TAG)
serialize(s, t.code)
serialize(s, t.storage)
serialize(s, t.state == :queued || t.state == :waiting ? (:runnable) : t.state)
serialize(s, t.state == :queued || t.state == :runnable ? (:runnable) : t.state)
serialize(s, t.result)
serialize(s, t.exception)
end
Expand Down
1 change: 0 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ function uv_write(s::LibuvStream, p::Ptr, n::UInt)
end
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
return Int(n)
end
Expand Down
95 changes: 38 additions & 57 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ end

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)

yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)

Expand Down Expand Up @@ -119,61 +118,52 @@ suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tl
function task_done_hook(t::Task)
err = (t.state == :failed)
result = t.result
nexttask = t.last
handled = true
handled = false
if err
t.backtrace = catch_backtrace()
end

q = t.consumers
t.consumers = nothing

if isa(t.donenotify, Condition) && !isempty(t.donenotify.waitq)
handled = true
notify(t.donenotify, result, error=err)
end

#### un-optimized version
#isa(q,Condition) && notify(q, result, error=err)
if isa(q,Task)
handled = true
nexttask = q
nexttask.state = :runnable
if err
nexttask.exception = result
end
yieldto(nexttask, result) # this terminates the task
elseif isa(q,Condition) && !isempty(q.waitq)
handled = true
notify(q, result, error=err)
else
handled = false
end

t.consumers = nothing

if isa(t.donenotify,Condition)
handled |= !isempty(t.donenotify.waitq)
notify(t.donenotify, result, error=err)
end

if nexttask.state == :runnable
if err
nexttask.exception = result
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
yieldto(nexttask, result)
else
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :waiting && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
end
end
# if a finished task accidentally gets into the queue, wait()
# could return. in that case just take the next task off the queue.
while true
wait()
end
end
wait()
end


Expand Down Expand Up @@ -201,13 +191,9 @@ function produce(v)
wait()
end

t.state = :runnable
t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
if empty
if isempty(Workqueue)
yieldto(t, v)
else
schedule_and_wait(t, v)
end
schedule_and_wait(t, v)
while true
# wait until there are more consumers
q = ct.consumers
Expand Down Expand Up @@ -254,9 +240,8 @@ function consume(P::Task, values...)
end
push!(P.consumers.waitq, ct)
end
ct.state = :waiting

schedule_and_wait(P)
P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice
end

start(t::Task) = nothing
Expand All @@ -279,16 +264,12 @@ end
function wait(c::Condition)
ct = current_task()

ct.state = :waiting
push!(c.waitq, ct)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
if ct.state == :waiting
ct.state = :runnable
end
rethrow()
end
end
Expand Down Expand Up @@ -318,7 +299,8 @@ notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)
global const Workqueue = Any[]

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Void, (Ptr{Void},), eventloop())
push!(Workqueue, t)
t.state = :queued
t
Expand All @@ -338,16 +320,13 @@ end

# fast version of schedule(t,v);wait()
function schedule_and_wait(t, v=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
if t.state == :runnable
return yieldto(t, v)
end
return yieldto(t, v)
else
if t.state == :runnable
t.result = v
push!(Workqueue, t)
t.state = :queued
end
t.result = v
push!(Workqueue, t)
t.state = :queued
end
wait()
end
Expand All @@ -365,10 +344,12 @@ function wait()
end
else
t = shift!(Workqueue)
t.state == :queued || throw(AssertionError("shift!(Workqueue).state == :queued"))
arg = t.result
t.result = nothing
t.state = :runnable
result = yieldto(t, arg)
current_task().state == :runnable || throw(AssertionError("current_task().state == :runnable"))
process_events(false)
# return when we come out of the queue
return result
Expand Down
3 changes: 0 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ BUILDDIR := .
include $(JULIAHOME)/deps/Versions.make
include $(JULIAHOME)/Make.inc

ifeq ($(USE_COPY_STACKS),1)
JCFLAGS += -DCOPY_STACKS
endif
override CFLAGS += $(JCFLAGS)
override CXXFLAGS += $(JCXXFLAGS)
override CPPFLAGS += $(JCPPFLAGS)
Expand Down
2 changes: 1 addition & 1 deletion src/Windows.mk
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ LIB = $(LIB);C:\Program Files\llvm\lib\Debug
LIB = $(LIB);C:\Program Files\llvm\lib\Release
!endif

CFLAGS = $(CFLAGS) -DCOPY_STACKS -D_CRT_SECURE_NO_WARNINGS
CFLAGS = $(CFLAGS) -D_CRT_SECURE_NO_WARNINGS
CFLAGS = $(CFLAGS) -DJL_SYSTEM_IMAGE_PATH=\"../lib/julia/sys.ji\" -DLIBRARY_EXPORTS

LIBWINDOWS = \
Expand Down
33 changes: 17 additions & 16 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1590,31 +1590,32 @@ NOINLINE static int gc_mark_module(jl_module_t *m, int d)

static void gc_mark_task_stack(jl_task_t *ta, int d)
{
if (ta->stkbuf != NULL || ta == jl_current_task) {
if (ta->stkbuf != NULL) {
gc_setmark_buf(ta->stkbuf, gc_bits(jl_astaggedvalue(ta)));
}
#ifdef COPY_STACKS
int stkbuf = (ta->stkbuf != (void*)(intptr_t)-1 && ta->stkbuf != NULL);
// FIXME - we need to mark stacks on other threads
int curtask = (ta == *jl_all_task_states[0].pcurrent_task);
if (stkbuf) {
#ifndef COPY_STACKS
if (ta != jl_root_task) // stkbuf isn't owned by julia for the root task
#endif
gc_setmark_buf(ta->stkbuf, gc_bits(jl_astaggedvalue(ta)));
}
if (curtask) {
gc_mark_stack((jl_value_t*)ta, *jl_all_pgcstacks[0], 0, d);
}
else if (stkbuf) {
ptrint_t offset;
if (ta == *jl_all_task_states[ta->tid].pcurrent_task) {
offset = 0;
// FIXME - do we need to mark stacks on other threads?
gc_mark_stack((jl_value_t*)ta, *jl_all_pgcstacks[ta->tid], offset, d);
}
else {
offset = (char *)ta->stkbuf - ((char *)jl_stackbase - ta->ssize);
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
#ifdef COPY_STACKS
offset = (char *)ta->stkbuf - ((char *)jl_stackbase - ta->ssize);
#else
gc_mark_stack((jl_value_t*)ta, ta->gcstack, 0, d);
offset = 0;
#endif
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
}

NOINLINE static void gc_mark_task(jl_task_t *ta, int d)
{
if (ta->parent) gc_push_root(ta->parent, d);
if (ta->last) gc_push_root(ta->last, d);
gc_push_root(ta->tls, d);
gc_push_root(ta->consumers, d);
gc_push_root(ta->donenotify, d);
Expand Down
4 changes: 0 additions & 4 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,6 @@ typedef struct _jl_handler_t {
typedef struct _jl_task_t {
JL_DATA_TYPE
struct _jl_task_t *parent;
struct _jl_task_t *last;
jl_value_t *tls;
jl_sym_t *state;
jl_value_t *consumers;
Expand All @@ -1452,9 +1451,6 @@ typedef struct _jl_task_t {
jl_value_t *backtrace;
jl_function_t *start;
jl_jmp_buf ctx;
#ifndef COPY_STACKS
void *stack;
#endif
size_t bufsz;
void *stkbuf;
size_t ssize;
Expand Down
4 changes: 2 additions & 2 deletions src/signals-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ static int is_addr_on_stack(void *addr)
return ((char*)addr > (char*)jl_stack_lo-3000000 &&
(char*)addr < (char*)jl_stack_hi);
#else
return ((char*)addr > (char*)jl_current_task->stack-8192 &&
(char*)addr < (char*)jl_current_task->stack+jl_current_task->ssize);
return ((char*)addr > (char*)jl_current_task->stkbuf &&
(char*)addr < (char*)jl_current_task->stkbuf + jl_current_task->ssize);
#endif
}

Expand Down
Loading