Skip to content

Commit

Permalink
Added thread-local data init/dtor routines.
Browse files Browse the repository at this point in the history
  • Loading branch information
frobnitzem committed Aug 18, 2017
1 parent 15e64b8 commit 2453321
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 23 deletions.
2 changes: 1 addition & 1 deletion dag.pc.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ Description: A library for executing task dags with pthreads
URL: https://www.predictivestatmech.org/dag
Version: @VERSION@
Requires:
Cflags: -I${includedir}
Cflags: -std=c11 -I${includedir}
Libs: -L${libdir} -ldag
55 changes: 34 additions & 21 deletions src/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,9 @@

// responds to options: DEBUG, TIME_THREADS, NTHREADS

#ifndef NTHREADS
#define NTHREADS 8
#endif

// used by progress (but requires task->info point to an int)
#define minfo(x) ((x)->info == NULL ? 0 : *(int *)(x)->info)

// Used to determine error conditions
// (when available queue entries < -MAX_THREADS).
// Of course, it's not possible to naturally
// have more the > MAX_THREADS simultaneous steal attempts
// required to detect a false error.
// Note: You must also change log naming code around "event-000.log"
// if MAX_THREADS will really be over 1000.
#define MAX_THREADS (1000)

#if NTHREADS > MAX_THREADS
#error "Can't have NTHREADS > MAX_THREADS"
#endif
Expand Down Expand Up @@ -68,11 +55,18 @@ struct GlobalInfo {
thread_queue_t *threads;
int nthreads;
run_fn run;
void *runinfo;

struct TaskList *initial; // where tasks was copied to avail for convenience
};

// This closure is passed to the thread_ctor function.
struct SetupInfo {
struct GlobalInfo *global;
setup_fn init;
dtor_fn dtor;
void *info0;
};

// dag.h: typedef struct ThreadQueue thread_queue_t;
struct ThreadQueue {
task_t **deque;
Expand All @@ -83,6 +77,7 @@ struct ThreadQueue {
int deque_size; // len(deque)
int rank;
struct GlobalInfo *global;
void *local;
#ifdef TIME_THREADS
FILE *event_log;
#endif
Expand Down Expand Up @@ -364,6 +359,7 @@ static int enable_successors(thread_queue_t *thr, task_t *n) {
/***************** Per-Thread Programs *****************/
#define MIN(a,b) ((a) < (b) ? (a) : (b))
static void thread_ctor(int rank, int nthreads, void *data, void *info) {
struct SetupInfo *info4 = info;
thread_queue_t *thr = (thread_queue_t *)data;

thr->deque = malloc(sizeof(task_t *)*INIT_STACK);
Expand All @@ -373,7 +369,10 @@ static void thread_ctor(int rank, int nthreads, void *data, void *info) {

thr->deque_size = INIT_STACK;
thr->rank = rank;
thr->global = (struct GlobalInfo *)info;
thr->global = info4->global;
thr->local = info4->info0;
if(info4->init != NULL)
thr->local = info4->init(rank, nthreads, info4->info0);
if(rank == 0) {
thr->global->threads = thr;
}
Expand Down Expand Up @@ -408,8 +407,11 @@ static void thread_ctor(int rank, int nthreads, void *data, void *info) {
}

static void thread_dtor(int rank, int nthreads, void *data, void *info) {
struct SetupInfo *info4 = info;
thread_queue_t *thr = (thread_queue_t *)data;
free(thr->deque);
if(info4->dtor != NULL)
info4->dtor(rank, nthreads, thr->local, info4->info0);
#ifdef TIME_THREADS
fclose(thr->event_log);
#endif
Expand All @@ -427,8 +429,7 @@ static void *thread_work(void *data) {
progress("%d: working on (%d)\n", thr->rank, minfo(task));
if(info != NULL) {
log_event("Running Task");
start = thr->global->run(get_task_info(task),
thr->global->runinfo);
start = thr->global->run(get_task_info(task), thr->local);
log_event("Adding Deps");
}
if(start == NULL) {
Expand Down Expand Up @@ -506,13 +507,25 @@ int link_task(task_t *task, task_t *dep) {
}

void exec_dag(task_t *start, run_fn run, void *runinfo) {
exec_dag2(start, run, NULL, NULL, NTHREADS, runinfo);
}

void exec_dag2(task_t *start, run_fn run, setup_fn init, dtor_fn dtor,
int threads, void *info0) {
if(threads < 1 || threads > MAX_THREADS) {
fprintf(stderr, "Invalid number of threads: %d -- outside [1,%d]\n",
threads, MAX_THREADS);
if(threads < 1) return;
threads = MAX_THREADS;
}
struct GlobalInfo global = {
.threads = NULL, // filled in by rank 0
.nthreads = NTHREADS,
.nthreads = threads,
.run = run,
.runinfo = runinfo,
.initial = atomic_exchange_explicit(&start->successors, NULL, memory_order_relaxed)
};
struct SetupInfo info4 = {&global, init, dtor, info0};

// TODO: Create global mutex/condition
// for handling task starvation and determining whether
// work is complete.
Expand All @@ -528,8 +541,8 @@ void exec_dag(task_t *start, run_fn run, void *runinfo) {
#ifdef TIME_THREADS
dag_start_time = time(NULL);
#endif
run_threaded(global.nthreads, sizeof(thread_queue_t), &thread_ctor,
&thread_work, &thread_dtor, &global);
run_threaded(threads, sizeof(thread_queue_t), &thread_ctor,
&thread_work, &thread_dtor, &info4);
free(global.initial);
}

28 changes: 27 additions & 1 deletion src/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ struct Task {
_Atomic(int) joins;
};

// Execute a single task (may add new nodes).
/* Callback types */
// Execute a single task (may add new nodes and signal with non-NULL return).
typedef task_t *(*run_fn)(void *a, void *runinfo);
// Run in serial before launching threads. It returns runinfo for that thread.
typedef void *(*setup_fn)(int rank, int threads, void *info0);
// Run in serial after all threads are joined.
typedef void (*dtor_fn)(int rank, int threads, void *runinfo, void *info0);

task_t *new_tasks(int n);
task_t *start_task();
Expand All @@ -28,8 +33,29 @@ void del_tasks(int n, task_t *task);
// so this is just informational and shouldn't be used practically.
int link_task(task_t *task, task_t *dep);
void exec_dag(task_t *start, run_fn run, void *runinfo);
// threads < MAX_THREAD
void exec_dag2(task_t *start, run_fn run, setup_fn init, dtor_fn dtor,
int threads, void *info0);

void *get_task_info(task_t *task);
void *set_task_info(task_t *task, void *info);
int activate_task(task_t *task, void *info, task_t *start);

// default number of threads
#ifndef NTHREADS
#define NTHREADS 8
#endif

// Used to indicate error conditions as
// "available queue entries < -MAX_THREADS".
// This works because it's not possible to
// have more than MAX_THREADS simultaneous steal attempts,
// so available queue entries can't naturally go lower than that.
//
// Note: MAX_THREADS is not used to reserve static memory and can
// thus be arbitrarily large. However, if you have more than
// 1000 threads, you must change the log naming code around
// "event-000.log" in dag.c or else event logging will not work.
#define MAX_THREADS (100000)

#endif

0 comments on commit 2453321

Please sign in to comment.