diff --git a/dttools/src/category.c b/dttools/src/category.c index 6f5ba587d7..6803bf5927 100644 --- a/dttools/src/category.c +++ b/dttools/src/category.c @@ -51,6 +51,7 @@ struct category *category_create(const char *name) c->fast_abort = -1; c->total_tasks = 0; + c->max_concurrent = -1; c->first_allocation = NULL; c->max_allocation = rmsummary_create(-1); diff --git a/dttools/src/category.h b/dttools/src/category.h index 0ab1d05c2f..0779d4fecf 100644 --- a/dttools/src/category.h +++ b/dttools/src/category.h @@ -85,11 +85,15 @@ struct category { /* manager for bucketing mode, if applicable */ bucketing_manager_t* bucketing_manager; + /* number of tasks completed */ int64_t total_tasks; /* completions since last time first-allocation was updated. */ int64_t completions_since_last_reset; + /* maximum number of tasks of this category allowed to be running concurrently. If less than 0, unlimited. */ + int64_t max_concurrent; + /* category is somewhat confident of the maximum seen value. */ int steady_state; diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py index c062503b98..7c1ca4fc17 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py @@ -676,6 +676,20 @@ def set_category_first_allocation_guess(self, category, rmd): setattr(rm, k, rmd[k]) return cvine.vine_set_category_first_allocation_guess(self._taskvine, category, rm) + ## + # Specifies the maximum resources allowed for the given category. + # + # @param self Reference to the current work queue object. + # @param category Name of the category. + # @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default). + # For example: + # @code + # >>> # Do not run more than 5 tasks of "my_category" concurrently: + # >>> q.set_category_max_concurrent("my_category", 5) + # @endcode + def set_category_max_concurrent(self, category, max_concurrent): + return cvine.vine_set_category_max_concurrent(self._work_queue, category, max_concurrent) + ## # Initialize first value of categories # diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index bd072cc295..e94477512d 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -1091,6 +1091,13 @@ int vine_set_draining_by_hostname(struct vine_manager *m, const char *hostname, */ int vine_set_category_mode(struct vine_manager *m, const char *category, vine_category_mode_t mode); +/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default). +@param q A manager object. +@param category A category name. +@param max_concurrent Number of maximum concurrent tasks. +*/ +void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent); + /** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref vine_set_category_mode. @param m A manager object @param category A category name. diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index e9a5c98192..2cf5ae937c 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -3155,6 +3155,12 @@ static int send_one_task(struct vine_manager *q) continue; } + // Skip if category already running maximum allowed tasks + struct category *c = vine_category_lookup_or_create(q, t->category); + if (c->max_concurrent > -1 && c->max_concurrent < c->vine_stats->tasks_running) { + continue; + } + // Skip task if temp input files have not been materialized. if (!vine_manager_check_inputs_available(q, t)) { continue; @@ -4219,6 +4225,31 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t vine_task_state_to_string(new_state), new_state); + struct category *c = vine_category_lookup_or_create(q, t->category); + + /* XXX: update manager task count in the same way */ + switch (old_state) { + case VINE_TASK_INITIAL: + break; + case VINE_TASK_READY: + c->vine_stats->tasks_waiting--; + break; + case VINE_TASK_RUNNING: + c->vine_stats->tasks_running--; + break; + case VINE_TASK_WAITING_RETRIEVAL: + c->vine_stats->tasks_with_results--; + break; + case VINE_TASK_RETRIEVED: + break; + case VINE_TASK_DONE: + break; + } + + c->vine_stats->tasks_on_workers = c->vine_stats->tasks_running + c->vine_stats->tasks_with_results; + c->vine_stats->tasks_submitted = + c->total_tasks + c->vine_stats->tasks_waiting + c->vine_stats->tasks_on_workers; + switch (new_state) { case VINE_TASK_INITIAL: /* should not happen, do nothing */ @@ -4226,12 +4257,15 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t case VINE_TASK_READY: vine_task_set_result(t, VINE_RESULT_UNKNOWN); push_task_to_ready_list(q, t); + c->vine_stats->tasks_waiting++; break; case VINE_TASK_RUNNING: itable_insert(q->running_table, t->task_id, t); + c->vine_stats->tasks_running++; break; case VINE_TASK_WAITING_RETRIEVAL: list_push_head(q->waiting_retrieval_list, t); + c->vine_stats->tasks_with_results++; break; case VINE_TASK_RETRIEVED: if (t->type == VINE_TASK_TYPE_LIBRARY) { @@ -5374,13 +5408,6 @@ void vine_get_stats_category(struct vine_manager *q, const char *category, struc struct vine_stats *cs = c->vine_stats; memcpy(s, cs, sizeof(*s)); - // info about tasks - s->tasks_waiting = task_state_count(q, category, VINE_TASK_READY); - s->tasks_running = task_state_count(q, category, VINE_TASK_RUNNING); - s->tasks_with_results = task_state_count(q, category, VINE_TASK_WAITING_RETRIEVAL); - s->tasks_on_workers = s->tasks_running + s->tasks_with_results; - s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers; - s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name)); } @@ -5668,6 +5695,13 @@ int vine_set_category_mode(struct vine_manager *q, const char *category, vine_ca return 1; } +void vine_set_category_max_concurrent(struct vine_manager *m, const char *category, int max_concurrent) +{ + struct category *c = vine_category_lookup_or_create(m, category); + + c->max_concurrent = MAX(-1, max_concurrent); +} + int vine_enable_category_resource(struct vine_manager *q, const char *category, const char *resource, int autolabel) { diff --git a/work_queue/src/bindings/python3/ndcctools/work_queue.py b/work_queue/src/bindings/python3/ndcctools/work_queue.py index a6b9d2526e..12c42cd1b0 100644 --- a/work_queue/src/bindings/python3/ndcctools/work_queue.py +++ b/work_queue/src/bindings/python3/ndcctools/work_queue.py @@ -1704,6 +1704,21 @@ def specify_category_first_allocation_guess(self, category, rmd): setattr(rm, k, rmd[k]) return work_queue_specify_category_first_allocation_guess(self._work_queue, category, rm) + ## + # Specifies the maximum resources allowed for the given category. + # + # @param self Reference to the current work queue object. + # @param category Name of the category. + # @param max_concurrent Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default). + # For example: + # @code + # >>> # Do not run more than 5 tasks of "my_category" concurrently: + # >>> q.specify_category_max_concurrent("my_category", 5) + # @endcode + def specify_category_max_concurrent(self, category, max_concurrent): + return work_queue_specify_category_max_concurrent(self._work_queue, category, max_concurrent) + + ## # Initialize first value of categories # diff --git a/work_queue/src/work_queue.c b/work_queue/src/work_queue.c index 2bee3c2c45..ebf9e223f8 100644 --- a/work_queue/src/work_queue.c +++ b/work_queue/src/work_queue.c @@ -4597,6 +4597,11 @@ static int send_one_task( struct work_queue *q ) continue; } + struct category *c = work_queue_category_lookup_or_create(q, t->category); + if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) { + continue; + } + // Find the best worker for the task at the head of the list w = find_best_worker(q,t); @@ -6456,25 +6461,59 @@ static work_queue_task_state_t change_task_state( struct work_queue *q, struct w work_queue_task_state_t old_state = (uintptr_t) itable_lookup(q->task_state_map, t->taskid); itable_insert(q->task_state_map, t->taskid, (void *) new_state); + struct category *c = work_queue_category_lookup_or_create(q, t->category); + + /* XXX: update manager task count in the same way */ + switch(old_state) { + case WORK_QUEUE_TASK_UNKNOWN: + break; + case WORK_QUEUE_TASK_READY: + c->wq_stats->tasks_waiting--; + break; + case WORK_QUEUE_TASK_RUNNING: + c->wq_stats->tasks_running--; + break; + case WORK_QUEUE_TASK_WAITING_RETRIEVAL: + c->wq_stats->tasks_with_results--; + break; + case WORK_QUEUE_TASK_RETRIEVED: + break; + case WORK_QUEUE_TASK_DONE: + break; + case WORK_QUEUE_TASK_CANCELED: + break; + } + // insert to corresponding table debug(D_WQ, "Task %d state change: %s (%d) to %s (%d)\n", t->taskid, task_state_str(old_state), old_state, task_state_str(new_state), new_state); switch(new_state) { + case WORK_QUEUE_TASK_UNKNOWN: + break; case WORK_QUEUE_TASK_READY: + c->wq_stats->tasks_waiting++; update_task_result(t, WORK_QUEUE_RESULT_UNKNOWN); push_task_to_ready_list(q, t); break; + case WORK_QUEUE_TASK_RUNNING: + c->wq_stats->tasks_running++; + break; + case WORK_QUEUE_TASK_WAITING_RETRIEVAL: + c->wq_stats->tasks_with_results++; + break; + case WORK_QUEUE_TASK_RETRIEVED: + break; case WORK_QUEUE_TASK_DONE: case WORK_QUEUE_TASK_CANCELED: /* tasks are freed when returned to user, thus we remove them from our local record */ fill_deprecated_tasks_stats(t); itable_remove(q->tasks, t->taskid); break; - default: - /* do nothing */ - break; } + c->wq_stats->tasks_on_workers = c->wq_stats->tasks_running + c->wq_stats->tasks_with_results; + c->wq_stats->tasks_submitted = c->total_tasks + c->wq_stats->tasks_waiting + c->wq_stats->tasks_on_workers; + log_queue_stats(q, 0); write_transaction_task(q, t); @@ -7671,13 +7710,6 @@ void work_queue_get_stats_category(struct work_queue *q, const char *category, s struct work_queue_stats *cs = c->wq_stats; memcpy(s, cs, sizeof(*s)); - //info about tasks - s->tasks_waiting = task_state_count(q, category, WORK_QUEUE_TASK_READY); - s->tasks_running = task_state_count(q, category, WORK_QUEUE_TASK_RUNNING); - s->tasks_with_results = task_state_count(q, category, WORK_QUEUE_TASK_WAITING_RETRIEVAL); - s->tasks_on_workers = s->tasks_running + s->tasks_with_results; - s->tasks_submitted = c->total_tasks + s->tasks_waiting + s->tasks_on_workers; - s->workers_able = count_workers_for_waiting_tasks(q, largest_seen_resources(q, c->name)); } @@ -8138,6 +8170,12 @@ int work_queue_specify_category_mode(struct work_queue *q, const char *category, return 1; } +void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent) { + struct category *c = work_queue_category_lookup_or_create(q, category); + + c->max_concurrent = MAX(-1, max_concurrent); +} + int work_queue_enable_category_resource(struct work_queue *q, const char *category, const char *resource, int autolabel) { struct category *c = work_queue_category_lookup_or_create(q, category); diff --git a/work_queue/src/work_queue.h b/work_queue/src/work_queue.h index bf606277aa..a237aade3d 100644 --- a/work_queue/src/work_queue.h +++ b/work_queue/src/work_queue.h @@ -957,6 +957,13 @@ int work_queue_specify_draining_by_hostname(struct work_queue *q, const char *ho */ int work_queue_specify_category_mode(struct work_queue *q, const char *category, work_queue_category_mode_t mode); +/** Set a maximum number of tasks of this category that can execute concurrently. If less than 0, unlimited (this is the default). +@param q A work queue object. +@param category A category name. +@param max_concurrent Number of maximum concurrent tasks. +*/ +void work_queue_specify_category_max_concurrent(struct work_queue *q, const char *category, int max_concurrent); + /** Turn on or off first-allocation labeling for a given category and resource. This function should be use to fine-tune the defaults from @ref work_queue_specify_category_mode. @param q A work queue object. @param category A category name.