Skip to content

Commit

Permalink
Add Robust Error Checking to Library and Worker's Communication Patte…
Browse files Browse the repository at this point in the history
…rns (#3974)

* scheduling with library protocol version checked

* add check exec_mode of library to worker

* fix bug

* fix bug 2

* fix jx lookup int

* format code

* add doc

* remove conversion from int

* remove library version
  • Loading branch information
tphung3 authored Nov 22, 2024
1 parent 96a03b1 commit 8530039
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 19 deletions.
15 changes: 13 additions & 2 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2815,13 +2815,24 @@ The `compute` call above may receive the following keyword arguments:
| resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"|
| task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"|

## Appendix for Developers

### Library - Worker Communication Patterns

### Further Information
This subsection describes the communication patterns between a library and a worker, agnostic of programming languages a library is implemented in.

Upon library startup, it should send to its worker a json object as a byte stream.
The json object should have the following keys and associated values' types: `{"name": type-string, "taskid": type-int, "exec\_mode": type-string}`.
`"name"` should be the name of the library.
`"taskid"` should be the library' taskid as assigned by a taskvine manager.
`"exec\_mode"` should be the function execution mode of the library.
A worker upon receiving a proper library startup message should check all keys against what it knows about the library, and mark the library as ready to receive function calls if the library passes the worker's startup check.

## Further Information

For more information, please see [Getting Help](../help.md) or visit the [Cooperative Computing Lab](http://ccl.cse.nd.edu) website.

### Copyright
## Copyright

CCTools is Copyright (C) 2022 The University of Notre Dame. This software is distributed under the GNU General Public License Version 2. See the file COPYING for
details.
12 changes: 7 additions & 5 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,18 @@ def main():
if context_vars:
(load_variable_from_library.__globals__).update(context_vars)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# send configuration of library, just its name for now
config = {
"name": library_info['library_name']
"name": library_info['library_name'],
"taskid": args.task_id,
"exec_mode": exec_method,
}
send_configuration(config, out_pipe_fd, args.worker_pid)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# register sigchld handler to turn a sigchld signal into an I/O event
signal.signal(signal.SIGCHLD, sigchld_handler)

Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ vine_result_code_t vine_manager_put_task(
if (t->provides_library) {
vine_manager_send(q, w, "provides_library %s\n", t->provides_library);
vine_manager_send(q, w, "function_slots %d\n", t->function_slots_total);
vine_manager_send(q, w, "func_exec_mode %d\n", t->func_exec_mode);
}

vine_manager_send(q, w, "category %s\n", t->category);
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/manager/vine_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API.
#ifndef VINE_PROTOCOL_H
#define VINE_PROTOCOL_H

#define VINE_PROTOCOL_VERSION 11
#define VINE_PROTOCOL_VERSION 12

#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */

Expand Down
19 changes: 14 additions & 5 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,7 @@ void vine_task_set_function_exec_mode(struct vine_task *t, vine_task_func_exec_m
void vine_task_set_function_exec_mode_from_string(struct vine_task *t, const char *exec_mode)
{
if (exec_mode && t->provides_library) {
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_FORK;
} else {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
t->func_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);
}
}

Expand Down Expand Up @@ -1008,3 +1004,16 @@ char *vine_task_to_json(struct vine_task *t)
buffer_free(&b);
return json;
}

/* Converts a string to a valid vine_task_func_exec_mode_t.
* Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the string. */
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode)
{
if (!strncmp(exec_mode, "direct", strlen("direct"))) {
return VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
return VINE_TASK_FUNC_EXEC_MODE_FORK;
}
return VINE_TASK_FUNC_EXEC_MODE_INVALID;
}
5 changes: 4 additions & 1 deletion taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ typedef enum {
} vine_task_state_t;

typedef enum {
VINE_TASK_FUNC_EXEC_MODE_DIRECT = 0, /**< A library task will execute function calls directly in its process **/
VINE_TASK_FUNC_EXEC_MODE_INVALID = -1,
VINE_TASK_FUNC_EXEC_MODE_DIRECT = 1, /**< A library task will execute function calls directly in its process **/
VINE_TASK_FUNC_EXEC_MODE_FORK, /**< A library task will fork and execute each function call. **/
} vine_task_func_exec_mode_t;

Expand Down Expand Up @@ -164,6 +165,8 @@ const char *vine_task_state_to_string( vine_task_state_t task_state );
struct jx * vine_task_to_jx( struct vine_manager *q, struct vine_task *t );
char * vine_task_to_json(struct vine_task *t);

vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode);


/** Attach an input or outputs to tasks without declaring files to manager.
* Only really useful at the worker where tasks are created without a manager. */
Expand Down
32 changes: 27 additions & 5 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
task->function_slots_requested = n;
/* Also set the total number determined by the manager. */
task->function_slots_total = n;
} else if (sscanf(line, "func_exec_mode %" PRId64, &n) == 1) {
vine_task_func_exec_mode_t func_exec_mode = n;
if (func_exec_mode == VINE_TASK_FUNC_EXEC_MODE_INVALID) {
debug(D_VINE | D_NOTICE, "invalid func_exec_mode from manager: %s", line);
vine_task_delete(task);
return 0;
}
task->func_exec_mode = func_exec_mode;
} else if (sscanf(line, "infile %s %s %d", localname, taskname_encoded, &flags)) {
url_decode(taskname_encoded, taskname, VINE_LINE_MAX);
vine_hack_do_not_compute_cached_name = 1;
Expand Down Expand Up @@ -1546,10 +1554,25 @@ static int check_library_startup(struct vine_process *p)
struct jx *response = jx_parse_string(buffer);

const char *name = jx_lookup_string(response, "name");
int taskid = jx_lookup_integer(response, "taskid");
const char *exec_mode = jx_lookup_string(response, "exec_mode");

int ok = 1;

int ok = 0;
if (!strcmp(name, p->task->provides_library)) {
ok = 1;
if (!name || !taskid || !exec_mode) {
ok = 0;
} else {
vine_task_func_exec_mode_t converted_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);

if (!p->task->provides_library || strcmp(name, p->task->provides_library)) {
ok = 0;
}
if (taskid != p->task->task_id) {
ok = 0;
}
if (p->task->func_exec_mode && converted_exec_mode != p->task->func_exec_mode) {
ok = 0;
}
}
if (response) {
jx_delete(response);
Expand Down Expand Up @@ -1585,8 +1608,7 @@ static void check_libraries_ready(struct link *manager)
debug(D_VINE, "Library %s reports ready to execute functions.", library_process->task->provides_library);
library_process->library_ready = 1;
} else {
/* Kill library if the name reported back doesn't match its name or
* if there's any problem. */
/* Kill library if it fails the startup check. */
debug(D_VINE,
"Library %s task id %" PRIu64 " verification failed (unexpected response). Killing it.",
library_process->task->provides_library,
Expand Down

0 comments on commit 8530039

Please sign in to comment.