Skip to content

Commit

Permalink
Merge pull request #13963 from JuliaLang/jn/task_enhancements
Browse files Browse the repository at this point in the history
small enhancements to Task
  • Loading branch information
vtjnash committed Nov 16, 2015
2 parents 7812d49 + b238ac2 commit ab9389c
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 177 deletions.
1 change: 0 additions & 1 deletion Make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ endif

JULIAGC := MARKSWEEP
JULIACODEGEN := LLVM
USE_COPY_STACKS := 1

# flag for disabling assertions
ifeq ($(FORCE_ASSERTIONS), 1)
Expand Down
1 change: 0 additions & 1 deletion base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@

# type Task
# parent::Task
# last::Task
# storage::Any
# consumers
# started::Bool
Expand Down
13 changes: 2 additions & 11 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6646,10 +6646,8 @@ of the argument:
* `Task`: Wait for a `Task` to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that called `wait`).
* `RawFD`: Wait for changes on a file descriptor (see `poll_fd` for keyword arguments and return code)
If no argument is passed, the task blocks for an undefined period. If the task's
state is set to `:waiting`, it can only be restarted by an explicit call to
`schedule` or `yieldto`. If the task's state is `:runnable`, it might be
restarted unpredictably.
If no argument is passed, the task blocks for an undefined period.
A task can only be restarted by an explicit call to `schedule` or `yieldto`.
Often `wait` is called within a `while` loop to ensure a waited-for condition
is met before proceeding.
Expand Down Expand Up @@ -10406,13 +10404,6 @@ Returns `true` if the value of the sign of `x` is negative, otherwise `false`.
"""
signbit

doc"""
istaskstarted(task) -> Bool
Tell whether a task has started executing.
"""
istaskstarted

doc"""
clamp(x, lo, hi)
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,6 @@ export
Condition,
consume,
current_task,
istaskstarted,
istaskdone,
lock,
notify,
Expand Down
4 changes: 2 additions & 2 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ end

function serialize(s::SerializationState, t::Task)
serialize_cycle(s, t) && return
if istaskstarted(t) && !istaskdone(t)
if !istaskdone(t)
error("cannot serialize a running Task")
end
writetag(s.io, TASK_TAG)
serialize(s, t.code)
serialize(s, t.storage)
serialize(s, t.state == :queued || t.state == :waiting ? (:runnable) : t.state)
serialize(s, t.state == :queued || t.state == :runnable ? (:runnable) : t.state)
serialize(s, t.result)
serialize(s, t.exception)
end
Expand Down
1 change: 0 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ function uv_write(s::LibuvStream, p::Ptr, n::UInt)
end
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
return Int(n)
end
Expand Down
95 changes: 38 additions & 57 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ end

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)

yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)

Expand Down Expand Up @@ -119,61 +118,52 @@ suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tl
function task_done_hook(t::Task)
err = (t.state == :failed)
result = t.result
nexttask = t.last
handled = true
handled = false
if err
t.backtrace = catch_backtrace()
end

q = t.consumers
t.consumers = nothing

if isa(t.donenotify, Condition) && !isempty(t.donenotify.waitq)
handled = true
notify(t.donenotify, result, error=err)
end

#### un-optimized version
#isa(q,Condition) && notify(q, result, error=err)
if isa(q,Task)
handled = true
nexttask = q
nexttask.state = :runnable
if err
nexttask.exception = result
end
yieldto(nexttask, result) # this terminates the task
elseif isa(q,Condition) && !isempty(q.waitq)
handled = true
notify(q, result, error=err)
else
handled = false
end

t.consumers = nothing

if isa(t.donenotify,Condition)
handled |= !isempty(t.donenotify.waitq)
notify(t.donenotify, result, error=err)
end

if nexttask.state == :runnable
if err
nexttask.exception = result
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
yieldto(nexttask, result)
else
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :waiting && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
end
end
# if a finished task accidentally gets into the queue, wait()
# could return. in that case just take the next task off the queue.
while true
wait()
end
end
wait()
end


Expand Down Expand Up @@ -201,13 +191,9 @@ function produce(v)
wait()
end

t.state = :runnable
t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
if empty
if isempty(Workqueue)
yieldto(t, v)
else
schedule_and_wait(t, v)
end
schedule_and_wait(t, v)
while true
# wait until there are more consumers
q = ct.consumers
Expand Down Expand Up @@ -254,9 +240,8 @@ function consume(P::Task, values...)
end
push!(P.consumers.waitq, ct)
end
ct.state = :waiting

schedule_and_wait(P)
P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice
end

start(t::Task) = nothing
Expand All @@ -279,16 +264,12 @@ end
function wait(c::Condition)
ct = current_task()

ct.state = :waiting
push!(c.waitq, ct)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
if ct.state == :waiting
ct.state = :runnable
end
rethrow()
end
end
Expand Down Expand Up @@ -318,7 +299,8 @@ notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)
global const Workqueue = Any[]

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Void, (Ptr{Void},), eventloop())
push!(Workqueue, t)
t.state = :queued
t
Expand All @@ -338,16 +320,13 @@ end

# fast version of schedule(t,v);wait()
function schedule_and_wait(t, v=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
if t.state == :runnable
return yieldto(t, v)
end
return yieldto(t, v)
else
if t.state == :runnable
t.result = v
push!(Workqueue, t)
t.state = :queued
end
t.result = v
push!(Workqueue, t)
t.state = :queued
end
wait()
end
Expand All @@ -365,10 +344,12 @@ function wait()
end
else
t = shift!(Workqueue)
t.state == :queued || throw(AssertionError("shift!(Workqueue).state == :queued"))
arg = t.result
t.result = nothing
t.state = :runnable
result = yieldto(t, arg)
current_task().state == :runnable || throw(AssertionError("current_task().state == :runnable"))
process_events(false)
# return when we come out of the queue
return result
Expand Down
3 changes: 0 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ BUILDDIR := .
include $(JULIAHOME)/deps/Versions.make
include $(JULIAHOME)/Make.inc

ifeq ($(USE_COPY_STACKS),1)
JCFLAGS += -DCOPY_STACKS
endif
override CFLAGS += $(JCFLAGS)
override CXXFLAGS += $(JCXXFLAGS)
override CPPFLAGS += $(JCPPFLAGS)
Expand Down
2 changes: 1 addition & 1 deletion src/Windows.mk
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ LIB = $(LIB);C:\Program Files\llvm\lib\Debug
LIB = $(LIB);C:\Program Files\llvm\lib\Release
!endif

CFLAGS = $(CFLAGS) -DCOPY_STACKS -D_CRT_SECURE_NO_WARNINGS
CFLAGS = $(CFLAGS) -D_CRT_SECURE_NO_WARNINGS
CFLAGS = $(CFLAGS) -DJL_SYSTEM_IMAGE_PATH=\"../lib/julia/sys.ji\" -DLIBRARY_EXPORTS

LIBWINDOWS = \
Expand Down
33 changes: 17 additions & 16 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1592,31 +1592,32 @@ NOINLINE static int gc_mark_module(jl_module_t *m, int d)

static void gc_mark_task_stack(jl_task_t *ta, int d)
{
if (ta->stkbuf != NULL || ta == jl_current_task) {
if (ta->stkbuf != NULL) {
gc_setmark_buf(ta->stkbuf, gc_bits(jl_astaggedvalue(ta)));
}
#ifdef COPY_STACKS
int stkbuf = (ta->stkbuf != (void*)(intptr_t)-1 && ta->stkbuf != NULL);
// FIXME - we need to mark stacks on other threads
int curtask = (ta == *jl_all_task_states[0].pcurrent_task);
if (stkbuf) {
#ifndef COPY_STACKS
if (ta != jl_root_task) // stkbuf isn't owned by julia for the root task
#endif
gc_setmark_buf(ta->stkbuf, gc_bits(jl_astaggedvalue(ta)));
}
if (curtask) {
gc_mark_stack((jl_value_t*)ta, *jl_all_pgcstacks[0], 0, d);
}
else if (stkbuf) {
ptrint_t offset;
if (ta == *jl_all_task_states[ta->tid].pcurrent_task) {
offset = 0;
// FIXME - do we need to mark stacks on other threads?
gc_mark_stack((jl_value_t*)ta, *jl_all_pgcstacks[ta->tid], offset, d);
}
else {
offset = (char *)ta->stkbuf - ((char *)jl_stackbase - ta->ssize);
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
#ifdef COPY_STACKS
offset = (char *)ta->stkbuf - ((char *)jl_stackbase - ta->ssize);
#else
gc_mark_stack((jl_value_t*)ta, ta->gcstack, 0, d);
offset = 0;
#endif
gc_mark_stack((jl_value_t*)ta, ta->gcstack, offset, d);
}
}

NOINLINE static void gc_mark_task(jl_task_t *ta, int d)
{
if (ta->parent) gc_push_root(ta->parent, d);
if (ta->last) gc_push_root(ta->last, d);
gc_push_root(ta->tls, d);
gc_push_root(ta->consumers, d);
gc_push_root(ta->donenotify, d);
Expand Down
4 changes: 0 additions & 4 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,6 @@ typedef struct _jl_handler_t {
typedef struct _jl_task_t {
JL_DATA_TYPE
struct _jl_task_t *parent;
struct _jl_task_t *last;
jl_value_t *tls;
jl_sym_t *state;
jl_value_t *consumers;
Expand All @@ -1464,9 +1463,6 @@ typedef struct _jl_task_t {
jl_value_t *backtrace;
jl_function_t *start;
jl_jmp_buf ctx;
#ifndef COPY_STACKS
void *stack;
#endif
size_t bufsz;
void *stkbuf;
size_t ssize;
Expand Down
4 changes: 2 additions & 2 deletions src/signals-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ static int is_addr_on_stack(void *addr)
return ((char*)addr > (char*)jl_stack_lo-3000000 &&
(char*)addr < (char*)jl_stack_hi);
#else
return ((char*)addr > (char*)jl_current_task->stack-8192 &&
(char*)addr < (char*)jl_current_task->stack+jl_current_task->ssize);
return ((char*)addr > (char*)jl_current_task->stkbuf &&
(char*)addr < (char*)jl_current_task->stkbuf + jl_current_task->ssize);
#endif
}

Expand Down
Loading

1 comment on commit ab9389c

@josefsachsconning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit is suspected of breaking build with JULIA_THREADS=1. See issue #14028.

Please sign in to comment.