Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: Worker contact address #3767

Merged
merged 10 commits into from
Apr 17, 2024
1 change: 1 addition & 0 deletions doc/man/m4/vine_worker.m4
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ OPTION_ARG_LONG(feature, feature)Specifies a user-defined feature the worker pro
OPTION_ARG_LONG(volatility, chance)Set the percent chance per minute that the worker will shut down (simulates worker failures, for testing only).
OPTION_ARG_LONG(connection-mode, mode)When using -M, override manager preference to resolve its address. One of by_ip, by_hostname, or by_apparent_ip. Default is set by manager.
OPTION_ARG_LONG(transfer-port,port) Listening port for worker-worker transfers. (default: any))
OPTION_ARG_LONG(transfer-address,addr) Explicit contact address for worker-worker transfers. (default: :<transfer_port>)

OPTION_FLAG_LONG(ssl)Enable tls connection to manager (manager should support it).
OPTION_ARG_LONG(tls-sni)SNI domain name if different from manager hostname. Implies --ssl.
Expand Down
1 change: 1 addition & 0 deletions doc/man/md/vine_worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ OPTION_LONG(keep-workspace) Do not delete the contents of the workspace on worke
- **--volatility=_&lt;chance&gt;_**<br />Set the percent chance per minute that the worker will shut down (simulates worker failures, for testing only).
- **--connection-mode=_&lt;mode&gt;_**<br />When using -M, override manager preference to resolve its address. One of by_ip, by_hostname, or by_apparent_ip. Default is set by manager.
- **--transfer-port=_&lt;port&gt;_**<br /> Listening port for worker-worker transfers. (default: any))
- **--transfer-address=_&lt;addr&gt;_**<br /> Explicit contact address for worker-worker transfers. (default: :<transfer_port>)
btovar marked this conversation as resolved.
Show resolved Hide resolved

- **--ssl**<br />Enable tls connection to manager (manager should support it).
- **--tls-sni=_&lt;&gt;_**<br />SNI domain name if different from manager hostname. Implies --ssl.
Expand Down
18 changes: 11 additions & 7 deletions taskvine/src/manager/vine_file_replica_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager

if ((replica = hash_table_lookup(peer->current_files, cachename)) &&
replica->state == VINE_FILE_REPLICA_STATE_READY) {
// generate a peer address stub as it would appear in the transfer table
char *peer_addr = string_format("worker://%s:%d", peer->transfer_addr, peer->transfer_port);
int current_transfers = vine_current_transfers_source_in_use(q, peer);
free(peer_addr);

if (current_transfers < q->worker_source_max_transfers) {
peer_selected = peer;
if (random_index < 0) {
Expand Down Expand Up @@ -147,8 +143,7 @@ int vine_file_replica_table_replicate(struct vine_manager *m, struct vine_file *
continue;
}

char *source_addr = string_format(
"worker://%s:%d/%s", source->transfer_addr, source->transfer_port, f->cached_name);
char *source_addr = string_format("%s/%s", source->transfer_addr_url, f->cached_name);
int source_in_use = vine_current_transfers_source_in_use(m, source);

char *id;
Expand Down Expand Up @@ -231,5 +226,14 @@ int vine_file_replica_table_exists_somewhere(struct vine_manager *q, const char
return 0;
}

return set_size(workers) > 0;
struct vine_worker_info *peer;

SET_ITERATE(workers, peer)
{
if (peer->transfer_port_active) {
return 1;
}
}

return 0;
}
29 changes: 20 additions & 9 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ See the file COPYING for details.
#include "load_average.h"
#include "macros.h"
#include "path.h"
#include "pattern.h"
#include "process.h"
#include "random.h"
#include "rmonitor.h"
Expand Down Expand Up @@ -465,13 +466,25 @@ on its own port to receive get requests from other workers.
static int handle_transfer_address(struct vine_manager *q, struct vine_worker_info *w, const char *line)
{
int dummy_port;
if (sscanf(line, "transfer-address %s %d", w->transfer_addr, &w->transfer_port)) {
w->transfer_port_active = 1;
link_address_remote(w->link, w->transfer_addr, &dummy_port);
return VINE_MSG_PROCESSED;
} else {
int explicit;
btovar marked this conversation as resolved.
Show resolved Hide resolved

int n = sscanf(line, "transfer-address %d %s %d", &explicit, w->transfer_addr, &w->transfer_port);
if (n != 3) {
return VINE_MSG_FAILURE;
}

w->transfer_port_active = 1;

if (!explicit) {
link_address_remote(w->link, w->transfer_addr, &dummy_port);
}

int is_ip = pattern_match(w->transfer_addr, "^%d+%.%d+%.%d+%.%d+$") != -1;

free(w->transfer_addr_url);
w->transfer_addr_url = string_format("worker%s://%s:%d", is_ip ? "ip" : "", w->transfer_addr, w->transfer_port);

return VINE_MSG_PROCESSED;
}

/*
Expand Down Expand Up @@ -2934,10 +2947,8 @@ static int vine_manager_transfer_capacity_available(
/* Provide a substitute file object to describe the peer. */
if (!(m->file->flags & VINE_PEER_NOSHARE) && (m->file->cache_level > VINE_CACHE_LEVEL_TASK)) {
if ((peer = vine_file_replica_table_find_worker(q, m->file->cached_name))) {
char *peer_source = string_format("worker://%s:%d/%s",
peer->transfer_addr,
peer->transfer_port,
m->file->cached_name);
char *peer_source =
string_format("%s/%s", peer->transfer_addr_url, m->file->cached_name);
m->substitute = vine_file_substitute_url(m->file, peer_source, peer);
free(peer_source);
found_match = 1;
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_worker_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void vine_worker_delete(struct vine_worker_info *w)
free(w->workerid);
free(w->addrport);
free(w->hashkey);
free(w->transfer_addr_url);

vine_resources_delete(w->resources);
hash_table_clear(w->features, 0);
Expand Down
4 changes: 3 additions & 1 deletion taskvine/src/manager/vine_worker_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ See the file COPYING for details.
#include "taskvine.h"
#include "vine_resources.h"

#include "domain_name.h"
#include "hash_table.h"
#include "link.h"
#include "itable.h"
Expand Down Expand Up @@ -42,9 +43,10 @@ struct vine_worker_info {
char *hashkey;

/* Address and port where this worker will accept transfers from peers. */
char transfer_addr[LINK_ADDRESS_MAX];
char transfer_addr[DOMAIN_NAME_MAX];
btovar marked this conversation as resolved.
Show resolved Hide resolved
int transfer_port;
int transfer_port_active;
char *transfer_addr_url; /* worker(ip)?://transfer_addr:transfer_port */
btovar marked this conversation as resolved.
Show resolved Hide resolved

/* Worker condition that may affect task start or cancellation. */
int draining; // if 1, worker does not accept anymore tasks. It is shutdown if no task running.
Expand Down
34 changes: 31 additions & 3 deletions taskvine/src/worker/vine_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ See the file COPYING for details.

#include "copy_stream.h"
#include "debug.h"
#include "domain_name_cache.h"
#include "hash_table.h"
#include "link.h"
#include "link_auth.h"
Expand Down Expand Up @@ -448,6 +449,27 @@ static int do_mini_task(struct vine_cache *c, struct vine_cache_file *f, char **
}
}

// rewrite hostname of source as seen from this worker
static int rewrite_source_to_ip(struct vine_cache_file *f, char **error_message)
{
int port_num;
char host[VINE_LINE_MAX], source_path[VINE_LINE_MAX];
char addr[LINK_ADDRESS_MAX];

// expect the form: worker://host:port/path/to/file
sscanf(f->source, "worker://%256[^:]:%d/%s", host, &port_num, source_path);

if (!domain_name_cache_lookup(host, addr)) {
*error_message = string_format("Couldn't resolve hostname %s", host);
return 0;
}

free(f->source);
f->source = string_format("worker://%s:%d/%s", addr, port_num, source_path);

return 1;
}

/*
Transfer a single input file from a worker url to a local file name.
*/
Expand All @@ -459,8 +481,9 @@ static int do_worker_transfer(
int stoptime;
struct link *worker_link;

// expect the form: worker://addr:port/path/to/file
sscanf(f->source, "worker://%99[^:]:%d/%s", addr, &port_num, source_path);
// expect the form: workerip://host:port/path/to/file
sscanf(f->source, "workerip://%256[^:]:%d/%s", addr, &port_num, source_path);

debug(D_VINE, "cache: setting up worker transfer file %s", f->source);

stoptime = time(0) + 15;
Expand Down Expand Up @@ -516,8 +539,13 @@ static int do_transfer(struct vine_cache *c, struct vine_cache_file *f, const ch

int result = 0;

if (strncmp(f->source, "worker://", 9) == 0) {
if (strncmp(f->source, "workerip://", 11) == 0) {
result = do_worker_transfer(c, f, cachename, error_message);
} else if (strncmp(f->source, "worker://", 9) == 0) {
result = rewrite_source_to_ip(f, error_message);
if (result) {
result = do_worker_transfer(c, f, cachename, error_message);
}
} else {
result = do_curl_transfer(c, f, transfer_path, cache_path, error_message);
}
Expand Down
15 changes: 14 additions & 1 deletion taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,21 @@ static void send_transfer_address(struct link *manager)
{
btovar marked this conversation as resolved.
Show resolved Hide resolved
char addr[LINK_ADDRESS_MAX];
int port;
int explicit_addr = 0;

vine_transfer_server_address(addr, &port);
send_message(manager, "transfer-address %s %d\n", addr, port);

char *addr_to_send = addr;
if (options->contact_address) {
addr_to_send = options->contact_address;
explicit_addr = 1;
}

if (options->contact_port > 0) {
port = options->contact_port;
}

send_message(manager, "transfer-address %d %s %d\n", explicit_addr, addr_to_send, port);
}

/*
Expand Down
48 changes: 48 additions & 0 deletions taskvine/src/worker/vine_worker_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
#include "vine_catalog.h"
#include "vine_transfer_server.h"

#include "address.h"
#include "catalog_query.h"
#include "cctools.h"
#include "copy_stream.h"
#include "create_dir.h"
#include "debug.h"
#include "domain_name.h"
#include "hash_table.h"
#include "macros.h"
#include "path.h"
Expand Down Expand Up @@ -54,11 +56,14 @@ struct vine_worker_options *vine_worker_options_create()
self->transfer_port_min = 0;
self->transfer_port_max = 0;

self->contact_address = 0;

return self;
}

void vine_worker_options_delete(struct vine_worker_options *self)
{
/* XXX: if part not needed here, free(NULL) is legal */
if (self->os_name)
free(self->os_name);
if (self->arch_name)
Expand All @@ -69,6 +74,8 @@ void vine_worker_options_delete(struct vine_worker_options *self)
free(self->catalog_hosts);
if (self->factory_name)
free(self->factory_name);
if (self->contact_address)
btovar marked this conversation as resolved.
Show resolved Hide resolved
free(self->contact_address);

hash_table_delete(self->features);
free(self);
Expand Down Expand Up @@ -149,6 +156,8 @@ void vine_worker_options_show_help(const char *cmd, struct vine_worker_options *
printf(" %-30s Single-shot mode -- quit immediately after disconnection.\n", "--single-shot");
printf(" %-30s Listening port for worker-worker transfers. Either port or port_min:port_max (default: any)\n",
"--transfer-port");
printf(" %-30s Explicit contact address for worker-worker transfers. (default: :<transfer_port>)\n",
"--transfer-address");

printf(" %-30s Enable tls connection to manager (manager should support it).\n", "--ssl");
printf(" %-30s SNI domain name if different from manager hostname. Implies --ssl.\n",
Expand Down Expand Up @@ -177,6 +186,7 @@ enum {
LONG_OPT_PYTHON_FUNCTION,
LONG_OPT_FROM_FACTORY,
LONG_OPT_TRANSFER_PORT,
LONG_OPT_CONTACT_ADDRESS,
LONG_OPT_WORKSPACE,
LONG_OPT_KEEP_WORKSPACE,
};
Expand Down Expand Up @@ -219,6 +229,7 @@ static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'},
{"tls-sni", required_argument, 0, LONG_OPT_TLS_SNI},
{"from-factory", required_argument, 0, LONG_OPT_FROM_FACTORY},
{"transfer-port", required_argument, 0, LONG_OPT_TRANSFER_PORT},
{"transfer-address", required_argument, 0, LONG_OPT_CONTACT_ADDRESS},
{0, 0, 0, 0}};

static void vine_worker_options_get_env(const char *name, int64_t *manual_option)
Expand All @@ -240,6 +251,40 @@ static void vine_worker_options_get_envs(struct vine_worker_options *options)
vine_worker_options_get_env("GPUS", &options->gpus_total);
}

void set_contact_address(struct vine_worker_options *options, const char *hostport)
{
int error = 0;

free(options->contact_address);
options->contact_address = NULL;

if (!hostport) {
error = 1;
} else if (strlen(hostport) == 0) {
error = 1;
} else if (hostport[0] == ':') {
char *end = NULL;
errno = 0;
options->contact_port = strtoll((hostport + 1), &end, 10);
if (options->contact_port == 0 && error != 0) {
error = 1;
}
} else {
int port;
char host[DOMAIN_NAME_MAX];
if (address_parse_hostport(hostport, host, &port, 0)) {
options->contact_address = xxstrdup(host);
options->contact_port = port;
} else {
error = 1;
}
}

if (error) {
fatal("contact address not of the form HOSTNAME:PORT or :PORT");
}
}

void set_min_max_ports(struct vine_worker_options *options, const char *range)
{
char *r = xxstrdup(range);
Expand Down Expand Up @@ -439,6 +484,9 @@ void vine_worker_options_get(struct vine_worker_options *options, int argc, char
case LONG_OPT_TRANSFER_PORT:
set_min_max_ports(options, optarg);
break;
case LONG_OPT_CONTACT_ADDRESS:
set_contact_address(options, optarg);
break;
default:
vine_worker_options_show_help(argv[0], options);
exit(1);
Expand Down
4 changes: 4 additions & 0 deletions taskvine/src/worker/vine_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ struct vine_worker_options {
/* Range of ports allowed to set the server for transfers between workers. */
int transfer_port_min;
int transfer_port_max;

/* Explicit contact address for transfers bewteen workers. */
char *contact_address;
int contact_port;
};

struct vine_worker_options * vine_worker_options_create();
Expand Down
Loading