Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine, wq: set a maximum number of task to run per category #3759

Merged
merged 4 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dttools/src/category.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct category *category_create(const char *name)
c->fast_abort = -1;

c->total_tasks = 0;
c->max_concurrent = -1;

c->first_allocation = NULL;
c->max_allocation = rmsummary_create(-1);
Expand Down
4 changes: 4 additions & 0 deletions dttools/src/category.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ struct category {
/* manager for bucketing mode, if applicable */
bucketing_manager_t* bucketing_manager;

/* number of tasks completed */
int64_t total_tasks;

/* completions since last time first-allocation was updated. */
int64_t completions_since_last_reset;

/* maximum number of tasks of this category allowed to be running concurrently. If less than 0, unlimited. */
int64_t max_concurrent;

/* category is somewhat confident of the maximum seen value. */
int steady_state;

Expand Down
14 changes: 14 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,20 @@ def set_category_first_allocation_guess(self, category, rmd):
setattr(rm, k, rmd[k])
return cvine.vine_set_category_first_allocation_guess(self._taskvine, category, rm)

##
# Specifies the maximum resources allowed for the given category.
#
# @param self Reference to the current work queue object.
# @param category Name of the category.
# @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default).
# For example:
# @code
# >>> # Do not run more than 5 tasks of "my_category" concurrently:
# >>> q.set_category_max_concurrent("my_category", 5)
# @endcode
def set_category_max_concurrent(self, category, max_concurrent):
return cvine.vine_set_category_max_concurrent(self._work_queue, category, max_concurrent)

##
# Initialize first value of categories
#
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,13 @@ int vine_set_draining_by_hostname(struct vine_manager *m, const char *hostname,
*/
int vine_set_category_mode(struct vine_manager *m, const char *category, vine_category_mode_t mode);

/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default).
@param q A manager object.
@param category A category name.
@param max_concurrent Number of maximum concurrent tasks.
*/
void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent);

/** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref vine_set_category_mode.
@param m A manager object
@param category A category name.
Expand Down
48 changes: 41 additions & 7 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3155,6 +3155,12 @@ static int send_one_task(struct vine_manager *q)
continue;
}

// Skip if category already running maximum allowed tasks
struct category *c = vine_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent < c->vine_stats->tasks_running) {
continue;
}

// Skip task if temp input files have not been materialized.
if (!vine_manager_check_inputs_available(q, t)) {
continue;
Expand Down Expand Up @@ -4219,19 +4225,47 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t
vine_task_state_to_string(new_state),
new_state);

struct category *c = vine_category_lookup_or_create(q, t->category);

/* XXX: update manager task count in the same way */
switch (old_state) {
case VINE_TASK_INITIAL:
break;
case VINE_TASK_READY:
c->vine_stats->tasks_waiting--;
break;
case VINE_TASK_RUNNING:
c->vine_stats->tasks_running--;
break;
case VINE_TASK_WAITING_RETRIEVAL:
c->vine_stats->tasks_with_results--;
break;
case VINE_TASK_RETRIEVED:
break;
case VINE_TASK_DONE:
break;
}

c->vine_stats->tasks_on_workers = c->vine_stats->tasks_running + c->vine_stats->tasks_with_results;
c->vine_stats->tasks_submitted =
c->total_tasks + c->vine_stats->tasks_waiting + c->vine_stats->tasks_on_workers;

switch (new_state) {
case VINE_TASK_INITIAL:
/* should not happen, do nothing */
break;
case VINE_TASK_READY:
vine_task_set_result(t, VINE_RESULT_UNKNOWN);
push_task_to_ready_list(q, t);
c->vine_stats->tasks_waiting++;
break;
case VINE_TASK_RUNNING:
itable_insert(q->running_table, t->task_id, t);
c->vine_stats->tasks_running++;
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_push_head(q->waiting_retrieval_list, t);
c->vine_stats->tasks_with_results++;
break;
case VINE_TASK_RETRIEVED:
if (t->type == VINE_TASK_TYPE_LIBRARY) {
Expand Down Expand Up @@ -5374,13 +5408,6 @@ void vine_get_stats_category(struct vine_manager *q, const char *category, struc
struct vine_stats *cs = c->vine_stats;
memcpy(s, cs, sizeof(*s));

// info about tasks
s->tasks_waiting = task_state_count(q, category, VINE_TASK_READY);
s->tasks_running = task_state_count(q, category, VINE_TASK_RUNNING);
s->tasks_with_results = task_state_count(q, category, VINE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = s->tasks_running + s->tasks_with_results;
s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers;

s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name));
}

Expand Down Expand Up @@ -5668,6 +5695,13 @@ int vine_set_category_mode(struct vine_manager *q, const char *category, vine_ca
return 1;
}

void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent)
{
struct category *c = vine_category_lookup_or_create(m, category);

c->max_concurrent = MAX(-1, max_concurrent);
}

int vine_enable_category_resource(struct vine_manager *q, const char *category, const char *resource, int autolabel)
{

Expand Down
15 changes: 15 additions & 0 deletions work_queue/src/bindings/python3/ndcctools/work_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,21 @@ def specify_category_first_allocation_guess(self, category, rmd):
setattr(rm, k, rmd[k])
return work_queue_specify_category_first_allocation_guess(self._work_queue, category, rm)

##
# Specifies the maximum resources allowed for the given category.
#
# @param self Reference to the current work queue object.
# @param category Name of the category.
# @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default).
# For example:
# @code
# >>> # Do not run more than 5 tasks of "my_category" concurrently:
# >>> q.specify_category_max_concurrent("my_category", 5)
# @endcode
def specify_category_max_concurrent(self, category, max_concurrent):
return work_queue_specify_category_max_concurrent(self._work_queue, category, max_concurrent)


##
# Initialize first value of categories
#
Expand Down
58 changes: 48 additions & 10 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4597,6 +4597,11 @@ static int send_one_task( struct work_queue *q )
continue;
}

struct category *c = work_queue_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) {
continue;
}

// Find the best worker for the task at the head of the list
w = find_best_worker(q,t);

Expand Down Expand Up @@ -6456,25 +6461,59 @@ static work_queue_task_state_t change_task_state( struct work_queue *q, struct w
work_queue_task_state_t old_state = (uintptr_t) itable_lookup(q->task_state_map, t->taskid);
itable_insert(q->task_state_map, t->taskid, (void *) new_state);

struct category *c = work_queue_category_lookup_or_create(q, t->category);

/* XXX: update manager task count in the same way */
switch(old_state) {
case WORK_QUEUE_TASK_UNKNOWN:
break;
case WORK_QUEUE_TASK_READY:
c->wq_stats->tasks_waiting--;
break;
case WORK_QUEUE_TASK_RUNNING:
c->wq_stats->tasks_running--;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
c->wq_stats->tasks_with_results--;
break;
case WORK_QUEUE_TASK_RETRIEVED:
break;
case WORK_QUEUE_TASK_DONE:
break;
case WORK_QUEUE_TASK_CANCELED:
break;
}

// insert to corresponding table
debug(D_WQ, "Task %d state change: %s (%d) to %s (%d)\n", t->taskid, task_state_str(old_state), old_state, task_state_str(new_state), new_state);

switch(new_state) {
case WORK_QUEUE_TASK_UNKNOWN:
break;
case WORK_QUEUE_TASK_READY:
c->wq_stats->tasks_waiting++;
update_task_result(t, WORK_QUEUE_RESULT_UNKNOWN);
push_task_to_ready_list(q, t);
break;
case WORK_QUEUE_TASK_RUNNING:
c->wq_stats->tasks_running++;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
c->wq_stats->tasks_with_results++;
break;
case WORK_QUEUE_TASK_RETRIEVED:
break;
case WORK_QUEUE_TASK_DONE:
case WORK_QUEUE_TASK_CANCELED:
/* tasks are freed when returned to user, thus we remove them from our local record */
fill_deprecated_tasks_stats(t);
itable_remove(q->tasks, t->taskid);
break;
default:
/* do nothing */
break;
}

c->wq_stats->tasks_on_workers = c->wq_stats->tasks_running + c->wq_stats->tasks_with_results;
c->wq_stats->tasks_submitted = c->total_tasks + c->wq_stats->tasks_waiting + c->wq_stats->tasks_on_workers;

log_queue_stats(q, 0);
write_transaction_task(q, t);

Expand Down Expand Up @@ -7671,13 +7710,6 @@ void work_queue_get_stats_category(struct work_queue *q, const char *category, s
struct work_queue_stats *cs = c->wq_stats;
memcpy(s, cs, sizeof(*s));

//info about tasks
s->tasks_waiting = task_state_count(q, category, WORK_QUEUE_TASK_READY);
s->tasks_running = task_state_count(q, category, WORK_QUEUE_TASK_RUNNING);
s->tasks_with_results = task_state_count(q, category, WORK_QUEUE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = s->tasks_running + s->tasks_with_results;
s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers;

s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name));
}

Expand Down Expand Up @@ -8138,6 +8170,12 @@ int work_queue_specify_category_mode(struct work_queue *q, const char *category,
return 1;
}

void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent) {
struct category *c = work_queue_category_lookup_or_create(q, category);

c->max_concurrent = MAX(-1, max_concurrent);
}

int work_queue_enable_category_resource(struct work_queue *q, const char *category, const char *resource, int autolabel) {

struct category *c = work_queue_category_lookup_or_create(q, category);
Expand Down
7 changes: 7 additions & 0 deletions work_queue/src/work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,13 @@ int work_queue_specify_draining_by_hostname(struct work_queue *q, const char *ho
*/
int work_queue_specify_category_mode(struct work_queue *q, const char *category, work_queue_category_mode_t mode);

/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default).
@param q A work queue object.
@param category A category name.
@param max_concurrent Number of maximum concurrent tasks.
*/
void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent);

/** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref work_queue_specify_category_mode.
@param q A work queue object.
@param category A category name.
Expand Down
Loading