From 8530039e2e7560b6a2a93e9468b552ecce343e9e Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Fri, 22 Nov 2024 08:58:49 -0500 Subject: [PATCH] Add Robust Error Checking to Library and Worker's Communication Patterns (#3974) * scheduling with library protocol version checked * add check exec_mode of library to worker * fix bug * fix bug 2 * fix jx lookup int * format code * add doc * remove conversion from int * remove library version --- doc/manuals/taskvine/index.md | 15 +++++++++-- poncho/src/poncho/library_network_code.py | 12 +++++---- taskvine/src/manager/vine_manager_put.c | 1 + taskvine/src/manager/vine_protocol.h | 2 +- taskvine/src/manager/vine_task.c | 19 ++++++++++---- taskvine/src/manager/vine_task.h | 5 +++- taskvine/src/worker/vine_worker.c | 32 +++++++++++++++++++---- 7 files changed, 67 insertions(+), 19 deletions(-) diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 2c7ed24335..655eb88d4d 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -2815,13 +2815,24 @@ The `compute` call above may receive the following keyword arguments: | resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"| | task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"| +## Appendix for Developers +### Library - Worker Communication Patterns -### Further Information +This subsection describes the communication patterns between a library and a worker, agnostic of programming languages a library is implemented in. + +Upon library startup, it should send to its worker a json object as a byte stream. +The json object should have the following keys and associated values' types: `{"name": type-string, "taskid": type-int, "exec\_mode": type-string}`. +`"name"` should be the name of the library. +`"taskid"` should be the library' taskid as assigned by a taskvine manager. +`"exec\_mode"` should be the function execution mode of the library. +A worker upon receiving a proper library startup message should check all keys against what it knows about the library, and mark the library as ready to receive function calls if the library passes the worker's startup check. + +## Further Information For more information, please see [Getting Help](../help.md) or visit the [Cooperative Computing Lab](http://ccl.cse.nd.edu) website. -### Copyright +## Copyright CCTools is Copyright (C) 2022 The University of Notre Dame. This software is distributed under the GNU General Public License Version 2. See the file COPYING for details. diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index 159d16dd45..29a5ae9238 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -379,16 +379,18 @@ def main(): if context_vars: (load_variable_from_library.__globals__).update(context_vars) + # set execution mode of functions in this library + global exec_method + exec_method = library_info['exec_mode'] + # send configuration of library, just its name for now config = { - "name": library_info['library_name'] + "name": library_info['library_name'], + "taskid": args.task_id, + "exec_mode": exec_method, } send_configuration(config, out_pipe_fd, args.worker_pid) - # set execution mode of functions in this library - global exec_method - exec_method = library_info['exec_mode'] - # register sigchld handler to turn a sigchld signal into an I/O event signal.signal(signal.SIGCHLD, sigchld_handler) diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index 83dbaeb64f..e8e22b22bf 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -509,6 +509,7 @@ vine_result_code_t vine_manager_put_task( if (t->provides_library) { vine_manager_send(q, w, "provides_library %s\n", t->provides_library); vine_manager_send(q, w, "function_slots %d\n", t->function_slots_total); + vine_manager_send(q, w, "func_exec_mode %d\n", t->func_exec_mode); } vine_manager_send(q, w, "category %s\n", t->category); diff --git a/taskvine/src/manager/vine_protocol.h b/taskvine/src/manager/vine_protocol.h index 7a3dca9fcd..0fe03196cf 100644 --- a/taskvine/src/manager/vine_protocol.h +++ b/taskvine/src/manager/vine_protocol.h @@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API. #ifndef VINE_PROTOCOL_H #define VINE_PROTOCOL_H -#define VINE_PROTOCOL_VERSION 11 +#define VINE_PROTOCOL_VERSION 12 #define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */ diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index 4d9c4dae97..147f8170d3 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -329,11 +329,7 @@ void vine_task_set_function_exec_mode(struct vine_task *t, vine_task_func_exec_m void vine_task_set_function_exec_mode_from_string(struct vine_task *t, const char *exec_mode) { if (exec_mode && t->provides_library) { - if (!strncmp(exec_mode, "fork", strlen("fork"))) { - t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_FORK; - } else { - t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_DIRECT; - } + t->func_exec_mode = vine_task_func_exec_mode_from_string(exec_mode); } } @@ -1008,3 +1004,16 @@ char *vine_task_to_json(struct vine_task *t) buffer_free(&b); return json; } + +/* Converts a string to a valid vine_task_func_exec_mode_t. + * Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the string. */ +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode) +{ + if (!strncmp(exec_mode, "direct", strlen("direct"))) { + return VINE_TASK_FUNC_EXEC_MODE_DIRECT; + } + if (!strncmp(exec_mode, "fork", strlen("fork"))) { + return VINE_TASK_FUNC_EXEC_MODE_FORK; + } + return VINE_TASK_FUNC_EXEC_MODE_INVALID; +} diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h index 4c53ece509..ec6889c096 100644 --- a/taskvine/src/manager/vine_task.h +++ b/taskvine/src/manager/vine_task.h @@ -38,7 +38,8 @@ typedef enum { } vine_task_state_t; typedef enum { - VINE_TASK_FUNC_EXEC_MODE_DIRECT = 0, /**< A library task will execute function calls directly in its process **/ + VINE_TASK_FUNC_EXEC_MODE_INVALID = -1, + VINE_TASK_FUNC_EXEC_MODE_DIRECT = 1, /**< A library task will execute function calls directly in its process **/ VINE_TASK_FUNC_EXEC_MODE_FORK, /**< A library task will fork and execute each function call. **/ } vine_task_func_exec_mode_t; @@ -164,6 +165,8 @@ const char *vine_task_state_to_string( vine_task_state_t task_state ); struct jx * vine_task_to_jx( struct vine_manager *q, struct vine_task *t ); char * vine_task_to_json(struct vine_task *t); +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode); + /** Attach an input or outputs to tasks without declaring files to manager. * Only really useful at the worker where tasks are created without a manager. */ diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index dd2eddc555..016eb6ef69 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -865,6 +865,14 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t task->function_slots_requested = n; /* Also set the total number determined by the manager. */ task->function_slots_total = n; + } else if (sscanf(line, "func_exec_mode %" PRId64, &n) == 1) { + vine_task_func_exec_mode_t func_exec_mode = n; + if (func_exec_mode == VINE_TASK_FUNC_EXEC_MODE_INVALID) { + debug(D_VINE | D_NOTICE, "invalid func_exec_mode from manager: %s", line); + vine_task_delete(task); + return 0; + } + task->func_exec_mode = func_exec_mode; } else if (sscanf(line, "infile %s %s %d", localname, taskname_encoded, &flags)) { url_decode(taskname_encoded, taskname, VINE_LINE_MAX); vine_hack_do_not_compute_cached_name = 1; @@ -1546,10 +1554,25 @@ static int check_library_startup(struct vine_process *p) struct jx *response = jx_parse_string(buffer); const char *name = jx_lookup_string(response, "name"); + int taskid = jx_lookup_integer(response, "taskid"); + const char *exec_mode = jx_lookup_string(response, "exec_mode"); + + int ok = 1; - int ok = 0; - if (!strcmp(name, p->task->provides_library)) { - ok = 1; + if (!name || !taskid || !exec_mode) { + ok = 0; + } else { + vine_task_func_exec_mode_t converted_exec_mode = vine_task_func_exec_mode_from_string(exec_mode); + + if (!p->task->provides_library || strcmp(name, p->task->provides_library)) { + ok = 0; + } + if (taskid != p->task->task_id) { + ok = 0; + } + if (p->task->func_exec_mode && converted_exec_mode != p->task->func_exec_mode) { + ok = 0; + } } if (response) { jx_delete(response); @@ -1585,8 +1608,7 @@ static void check_libraries_ready(struct link *manager) debug(D_VINE, "Library %s reports ready to execute functions.", library_process->task->provides_library); library_process->library_ready = 1; } else { - /* Kill library if the name reported back doesn't match its name or - * if there's any problem. */ + /* Kill library if it fails the startup check. */ debug(D_VINE, "Library %s task id %" PRIu64 " verification failed (unexpected response). Killing it.", library_process->task->provides_library,