diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 2e1efb211f..43476d32d1 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -105,8 +105,11 @@ def __init__(self, manager_config: TaskVineManagerConfig = TaskVineManagerConfig(), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(), provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1), + working_dir: str = '.', storage_access: Optional[List[Staging]] = None): + self.working_dir = working_dir + # Set worker launch option for this executor if worker_launch_method == 'factory' or worker_launch_method == 'manual': provider = None @@ -215,9 +218,9 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join(run_dir, self.label, "function_data") + self._function_data_dir = os.path.join("/tmp/function_data/", self.label, str(uuid.uuid4())) os.makedirs(log_dir) - os.makedirs(self._function_data_dir) + os.makedirs(self._function_data_dir, exist_ok=True) # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. @@ -363,6 +366,7 @@ def submit(self, func, resource_specification, *args, **kwargs): # Also consider any *arg that looks like a file as an input: input_files += [self._register_file(f) for f in args if isinstance(f, File)] + logger.debug(f"registered input files {input_files}") for kwarg, maybe_file in kwargs.items(): # Add appropriate input and output files from "stdout" and "stderr" keyword arguments @@ -504,15 +508,18 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - - return ParslFileToVine(parsl_file.filepath, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, stage=to_stage, cache=to_cache, protocol=parsl_file.scheme) + else: + # we must stage url and temp files + ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, stage=True, cache=to_cache, protocol=parsl_file.scheme) + return ptv def _std_output_to_vine(self, fdname, stdfspec): """Find the name of the file that will contain stdout or stderr and return a ParslFileToVine with it. These files are never cached""" fname, mode = putils.get_std_fname_mode(fdname, stdfspec) to_stage = not os.path.isabs(fname) - return ParslFileToVine(fname, stage=to_stage, cache=False) + return ParslFileToVine(fname, fname, stage=to_stage, cache=False, protocol="file") def _prepare_package(self, fn, extra_pkgs): """ Look at source code of apps to figure out their package depedencies diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index e5a4986062..5515eb3d2e 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -128,6 +128,15 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, t.add_environment(poncho_env_file) +def _handle_file_declaration_protocol(m, spec): + if "http" in spec.protocol: + return m.declare_url(spec.parsl_name, cache=spec.cache, peer_transfer=True) + elif spec.protocol == "taskvinetemp": + return m.declare_temp() + else: + return m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + + @wrap_with_logs def _taskvine_submit_wait(ready_task_queue=None, finished_task_queue=None, @@ -364,18 +373,26 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_in_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_in_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + task_in_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file - t.add_input(task_in_file, spec.parsl_name) + logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) + if spec.remote_name == '': + t.add_input(task_in_file, spec.parsl_name) + else: + t.add_input(task_in_file, spec.remote_name) for spec in task.output_files: if spec.stage: if spec.parsl_name in parsl_file_name_to_vine_file: task_out_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_out_file = m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) + task_out_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file - t.add_output(task_out_file, spec.parsl_name) + logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) + if spec.remote_name == '': + t.add_output(task_out_file, spec.parsl_name) + else: + t.add_output(task_out_file, spec.remote_name) # Submit the task to the TaskVine object logger.debug("Submitting executor task {}, {} to TaskVine".format(task.executor_id, t)) diff --git a/parsl/executors/taskvine/taskvine_staging_provider.py b/parsl/executors/taskvine/taskvine_staging_provider.py new file mode 100644 index 0000000000..68aef0049f --- /dev/null +++ b/parsl/executors/taskvine/taskvine_staging_provider.py @@ -0,0 +1,35 @@ +import logging +from concurrent.futures import Future +from typing import Optional + +from parsl.app.futures import DataFuture +from parsl.data_provider.files import File +from parsl.data_provider.staging import Staging +from parsl.utils import RepresentationMixin + +logger = logging.getLogger(__name__) + +known_url_schemes = ["http", "https", "taskvinetemp"] + + +class TaskVineStaging(Staging, RepresentationMixin): + + def can_stage_in(self, file): + logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) + return file.scheme in known_url_schemes + + def can_stage_out(self, file): + logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) + return file.scheme in known_url_schemes + + def stage_in(self, dm, executor: str, file: File, parent_fut: Optional[Future]) -> Optional[DataFuture]: + if file.scheme in ["taskvinetemp", "https", "http"]: + file.local_path = file.url.split('/')[-1] + logger.debug("Task vine staging provider stage in for {}".format(repr(file))) + return None + + def stage_out(self, dm, executor: str, file: File, app_fu: Future) -> Optional[Future]: + if file.scheme in ["taskvinetemp", "https", "http"]: + file.local_path = file.url.split('/')[-1] + logger.debug("Task vine staging provider stage out for {}".format(repr(file))) + return None diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 86cf446b1a..e3f1d31782 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -69,12 +69,16 @@ class ParslFileToVine: """ def __init__(self, parsl_name: str, # name of file + remote_name: str, # name of file if url stage: bool, # whether TaskVine should know about this file - cache: bool # whether TaskVine should cache this file + cache: bool, # whether TaskVine should cache this file + protocol: str, # protocol if url ): self.parsl_name = parsl_name self.stage = stage self.cache = cache + self.remote_name = remote_name + self.protocol = protocol def run_parsl_function(map_file, function_file, argument_file, result_file): diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 276e37cf4e..f42ed9ccbb 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -2,9 +2,12 @@ from parsl.data_provider.file_noop import NoOpFileStaging from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.zip import ZipFileStaging from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig +from parsl.executors.taskvine.taskvine_staging_provider import TaskVineStaging def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), - worker_launch_method='factory')]) + worker_launch_method='factory', + storage_access=[FTPInTaskStaging(), TaskVineStaging(), NoOpFileStaging(), ZipFileStaging()])])