Skip to content

Commit

Permalink
vine, wq: set a maximum number of task to run per category (#3759)
Browse files Browse the repository at this point in the history
* wq: move category task counts to change_task_state

* wq: adds q.specify_category_max_concurrent("category", max)

* vine: move category task counts to change_task_state

* vine: adds m.specify_category_max_concurrent("category", max)
  • Loading branch information
btovar authored Apr 17, 2024
1 parent e5ed488 commit a933943
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 17 deletions.
1 change: 1 addition & 0 deletions dttools/src/category.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct category *category_create(const char *name)
c->fast_abort = -1;

c->total_tasks = 0;
c->max_concurrent = -1;

c->first_allocation = NULL;
c->max_allocation = rmsummary_create(-1);
Expand Down
4 changes: 4 additions & 0 deletions dttools/src/category.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ struct category {
/* manager for bucketing mode, if applicable */
bucketing_manager_t* bucketing_manager;

/* number of tasks completed */
int64_t total_tasks;

/* completions since last time first-allocation was updated. */
int64_t completions_since_last_reset;

/* maximum number of tasks of this category allowed to be running concurrently. If less than 0, unlimited. */
int64_t max_concurrent;

/* category is somewhat confident of the maximum seen value. */
int steady_state;

Expand Down
14 changes: 14 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,20 @@ def set_category_first_allocation_guess(self, category, rmd):
setattr(rm, k, rmd[k])
return cvine.vine_set_category_first_allocation_guess(self._taskvine, category, rm)

##
# Specifies the maximum resources allowed for the given category.
#
# @param self Reference to the current work queue object.
# @param category Name of the category.
# @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default).
# For example:
# @code
# >>> # Do not run more than 5 tasks of "my_category" concurrently:
# >>> q.set_category_max_concurrent("my_category", 5)
# @endcode
def set_category_max_concurrent(self, category, max_concurrent):
return cvine.vine_set_category_max_concurrent(self._work_queue, category, max_concurrent)

##
# Initialize first value of categories
#
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,13 @@ int vine_set_draining_by_hostname(struct vine_manager *m, const char *hostname,
*/
int vine_set_category_mode(struct vine_manager *m, const char *category, vine_category_mode_t mode);

/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default).
@param q A manager object.
@param category A category name.
@param max_concurrent Number of maximum concurrent tasks.
*/
void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent);

/** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref vine_set_category_mode.
@param m A manager object
@param category A category name.
Expand Down
48 changes: 41 additions & 7 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3191,6 +3191,12 @@ static int send_one_task(struct vine_manager *q)
continue;
}

// Skip if category already running maximum allowed tasks
struct category *c = vine_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent < c->vine_stats->tasks_running) {
continue;
}

// Skip task if temp input files have not been materialized.
if (!vine_manager_check_inputs_available(q, t)) {
continue;
Expand Down Expand Up @@ -4255,19 +4261,47 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t
vine_task_state_to_string(new_state),
new_state);

struct category *c = vine_category_lookup_or_create(q, t->category);

/* XXX: update manager task count in the same way */
switch (old_state) {
case VINE_TASK_INITIAL:
break;
case VINE_TASK_READY:
c->vine_stats->tasks_waiting--;
break;
case VINE_TASK_RUNNING:
c->vine_stats->tasks_running--;
break;
case VINE_TASK_WAITING_RETRIEVAL:
c->vine_stats->tasks_with_results--;
break;
case VINE_TASK_RETRIEVED:
break;
case VINE_TASK_DONE:
break;
}

c->vine_stats->tasks_on_workers = c->vine_stats->tasks_running + c->vine_stats->tasks_with_results;
c->vine_stats->tasks_submitted =
c->total_tasks + c->vine_stats->tasks_waiting + c->vine_stats->tasks_on_workers;

switch (new_state) {
case VINE_TASK_INITIAL:
/* should not happen, do nothing */
break;
case VINE_TASK_READY:
vine_task_set_result(t, VINE_RESULT_UNKNOWN);
push_task_to_ready_list(q, t);
c->vine_stats->tasks_waiting++;
break;
case VINE_TASK_RUNNING:
itable_insert(q->running_table, t->task_id, t);
c->vine_stats->tasks_running++;
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_push_head(q->waiting_retrieval_list, t);
c->vine_stats->tasks_with_results++;
break;
case VINE_TASK_RETRIEVED:
if (t->type == VINE_TASK_TYPE_LIBRARY) {
Expand Down Expand Up @@ -5410,13 +5444,6 @@ void vine_get_stats_category(struct vine_manager *q, const char *category, struc
struct vine_stats *cs = c->vine_stats;
memcpy(s, cs, sizeof(*s));

// info about tasks
s->tasks_waiting = task_state_count(q, category, VINE_TASK_READY);
s->tasks_running = task_state_count(q, category, VINE_TASK_RUNNING);
s->tasks_with_results = task_state_count(q, category, VINE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = s->tasks_running + s->tasks_with_results;
s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers;

s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name));
}

Expand Down Expand Up @@ -5704,6 +5731,13 @@ int vine_set_category_mode(struct vine_manager *q, const char *category, vine_ca
return 1;
}

void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent)
{
struct category *c = vine_category_lookup_or_create(m, category);

c->max_concurrent = MAX(-1, max_concurrent);
}

int vine_enable_category_resource(struct vine_manager *q, const char *category, const char *resource, int autolabel)
{

Expand Down
15 changes: 15 additions & 0 deletions work_queue/src/bindings/python3/ndcctools/work_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,21 @@ def specify_category_first_allocation_guess(self, category, rmd):
setattr(rm, k, rmd[k])
return work_queue_specify_category_first_allocation_guess(self._work_queue, category, rm)

##
# Specifies the maximum resources allowed for the given category.
#
# @param self Reference to the current work queue object.
# @param category Name of the category.
# @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default).
# For example:
# @code
# >>> # Do not run more than 5 tasks of "my_category" concurrently:
# >>> q.specify_category_max_concurrent("my_category", 5)
# @endcode
def specify_category_max_concurrent(self, category, max_concurrent):
return work_queue_specify_category_max_concurrent(self._work_queue, category, max_concurrent)


##
# Initialize first value of categories
#
Expand Down
58 changes: 48 additions & 10 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4597,6 +4597,11 @@ static int send_one_task( struct work_queue *q )
continue;
}

struct category *c = work_queue_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) {
continue;
}

// Find the best worker for the task at the head of the list
w = find_best_worker(q,t);

Expand Down Expand Up @@ -6456,25 +6461,59 @@ static work_queue_task_state_t change_task_state( struct work_queue *q, struct w
work_queue_task_state_t old_state = (uintptr_t) itable_lookup(q->task_state_map, t->taskid);
itable_insert(q->task_state_map, t->taskid, (void *) new_state);

struct category *c = work_queue_category_lookup_or_create(q, t->category);

/* XXX: update manager task count in the same way */
switch(old_state) {
case WORK_QUEUE_TASK_UNKNOWN:
break;
case WORK_QUEUE_TASK_READY:
c->wq_stats->tasks_waiting--;
break;
case WORK_QUEUE_TASK_RUNNING:
c->wq_stats->tasks_running--;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
c->wq_stats->tasks_with_results--;
break;
case WORK_QUEUE_TASK_RETRIEVED:
break;
case WORK_QUEUE_TASK_DONE:
break;
case WORK_QUEUE_TASK_CANCELED:
break;
}

// insert to corresponding table
debug(D_WQ, "Task %d state change: %s (%d) to %s (%d)\n", t->taskid, task_state_str(old_state), old_state, task_state_str(new_state), new_state);

switch(new_state) {
case WORK_QUEUE_TASK_UNKNOWN:
break;
case WORK_QUEUE_TASK_READY:
c->wq_stats->tasks_waiting++;
update_task_result(t, WORK_QUEUE_RESULT_UNKNOWN);
push_task_to_ready_list(q, t);
break;
case WORK_QUEUE_TASK_RUNNING:
c->wq_stats->tasks_running++;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
c->wq_stats->tasks_with_results++;
break;
case WORK_QUEUE_TASK_RETRIEVED:
break;
case WORK_QUEUE_TASK_DONE:
case WORK_QUEUE_TASK_CANCELED:
/* tasks are freed when returned to user, thus we remove them from our local record */
fill_deprecated_tasks_stats(t);
itable_remove(q->tasks, t->taskid);
break;
default:
/* do nothing */
break;
}

c->wq_stats->tasks_on_workers = c->wq_stats->tasks_running + c->wq_stats->tasks_with_results;
c->wq_stats->tasks_submitted = c->total_tasks + c->wq_stats->tasks_waiting + c->wq_stats->tasks_on_workers;

log_queue_stats(q, 0);
write_transaction_task(q, t);

Expand Down Expand Up @@ -7671,13 +7710,6 @@ void work_queue_get_stats_category(struct work_queue *q, const char *category, s
struct work_queue_stats *cs = c->wq_stats;
memcpy(s, cs, sizeof(*s));

//info about tasks
s->tasks_waiting = task_state_count(q, category, WORK_QUEUE_TASK_READY);
s->tasks_running = task_state_count(q, category, WORK_QUEUE_TASK_RUNNING);
s->tasks_with_results = task_state_count(q, category, WORK_QUEUE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = s->tasks_running + s->tasks_with_results;
s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers;

s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name));
}

Expand Down Expand Up @@ -8138,6 +8170,12 @@ int work_queue_specify_category_mode(struct work_queue *q, const char *category,
return 1;
}

void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent) {
struct category *c = work_queue_category_lookup_or_create(q, category);

c->max_concurrent = MAX(-1, max_concurrent);
}

int work_queue_enable_category_resource(struct work_queue *q, const char *category, const char *resource, int autolabel) {

struct category *c = work_queue_category_lookup_or_create(q, category);
Expand Down
7 changes: 7 additions & 0 deletions work_queue/src/work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,13 @@ int work_queue_specify_draining_by_hostname(struct work_queue *q, const char *ho
*/
int work_queue_specify_category_mode(struct work_queue *q, const char *category, work_queue_category_mode_t mode);

/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default).
@param q A work queue object.
@param category A category name.
@param max_concurrent Number of maximum concurrent tasks.
*/
void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent);

/** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref work_queue_specify_category_mode.
@param q A work queue object.
@param category A category name.
Expand Down

0 comments on commit a933943

Please sign in to comment.