From caf707e3400665632aa334e5514128680a04c6a4 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Mon, 28 Aug 2023 20:35:18 -0700 Subject: [PATCH 1/7] monkeypatch coerce for workflow related nodes --- src/toil/wdl/wdltoil.py | 165 ++++++++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 66 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 7cd6951a80..e85739c850 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -31,7 +31,7 @@ import tempfile import uuid -from contextlib import ExitStack +from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator from urllib.parse import urlsplit, urljoin, quote, unquote @@ -480,7 +480,7 @@ def _virtualize_filename(self, filename: str) -> str: if self._is_url(filename): # Already virtual - logger.debug('Virtualized %s as WDL file %s', filename, filename) + logger.debug('Already virtualized %s as WDL file %s', filename, filename) return filename # Otherwise this is a local file and we want to fake it as a Toil file store file @@ -1467,6 +1467,10 @@ def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Prom Make a new job to run a workflow node to completion. """ super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, **kwargs) + # Always run workflow nodes on the leader for issue #4554 + # https://github.com/DataBiosphere/toil/issues/4554 + if 'local' not in kwargs: + kwargs['local'] = True self._node = node self._prev_node_results = prev_node_results @@ -1486,58 +1490,58 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library standard_library = ToilWDLStdLibBase(file_store) - - if isinstance(self._node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', self._node.name, self._node.expr) - value = evaluate_decl(self._node, incoming_bindings, standard_library) - return self.postprocess(incoming_bindings.bind(self._node.name, value)) - elif isinstance(self._node, WDL.Tree.Call): - # This is a call of a task or workflow - - # Fetch all the inputs we are passing and bind them. - # The call is only allowed to use these. - logger.debug("Evaluating step inputs") - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) - - # Bindings may also be added in from the enclosing workflow inputs - # TODO: this is letting us also inject them from the workflow body. - # TODO: Can this result in picking up non-namespaced values that - # aren't meant to be inputs, by not changing their names? - passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) - - if isinstance(self._node.callee, WDL.Tree.Workflow): - # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + with monkeypatch_coerce(standard_library): + if isinstance(self._node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', self._node.name, self._node.expr) + value = evaluate_decl(self._node, incoming_bindings, standard_library) + return self.postprocess(incoming_bindings.bind(self._node.name, value)) + elif isinstance(self._node, WDL.Tree.Call): + # This is a call of a task or workflow + + # Fetch all the inputs we are passing and bind them. + # The call is only allowed to use these. + logger.debug("Evaluating step inputs") + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) + + # Bindings may also be added in from the enclosing workflow inputs + # TODO: this is letting us also inject them from the workflow body. + # TODO: Can this result in picking up non-namespaced values that + # aren't meant to be inputs, by not changing their names? + passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) + + if isinstance(self._node.callee, WDL.Tree.Workflow): + # This is a call of a workflow + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + self.addChild(subjob) + elif isinstance(self._node.callee, WDL.Tree.Task): + # This is a call of a task + subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + self.addChild(subjob) + else: + raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) + + # We need to agregate outputs namespaced with our node name, and existing bindings + subjob.then_namespace(self._node.name) + subjob.then_overlay(incoming_bindings) + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Scatter): + subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) self.addChild(subjob) - elif isinstance(self._node.callee, WDL.Tree.Task): - # This is a call of a task - subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + # Scatters don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Conditional): + subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) self.addChild(subjob) + # Conditionals don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() else: - raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) - - # We need to agregate outputs namespaced with our node name, and existing bindings - subjob.then_namespace(self._node.name) - subjob.then_overlay(incoming_bindings) - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Scatters don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Conditionals don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - else: - raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) + raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) class WDLWorkflowNodeListJob(WDLBaseJob): """ @@ -1551,6 +1555,10 @@ def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequen Make a new job to run a list of workflow nodes to completion. """ super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', **kwargs) + # Always run workflow nodes on the leader for issue #4554 + # https://github.com/DataBiosphere/toil/issues/4554 + if 'local' not in kwargs: + kwargs['local'] = True self._nodes = nodes self._prev_node_results = prev_node_results @@ -1571,14 +1579,15 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library standard_library = ToilWDLStdLibBase(file_store) - for node in self._nodes: - if isinstance(node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', node.name, node.expr) - value = evaluate_decl(node, current_bindings, standard_library) - current_bindings = current_bindings.bind(node.name, value) - else: - raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) + with monkeypatch_coerce(standard_library): + for node in self._nodes: + if isinstance(node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', node.name, node.expr) + value = evaluate_decl(node, current_bindings, standard_library) + current_bindings = current_bindings.bind(node.name, value) + else: + raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) return self.postprocess(current_bindings) @@ -2031,7 +2040,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get what to scatter over - scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) assert isinstance(scatter_value, WDL.Value.Array) @@ -2162,7 +2172,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get the expression value. Fake a name. - expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) if expr_value.value: # Evaluated to true! @@ -2223,9 +2234,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) if self._workflow.inputs: - for input_decl in self._workflow.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + with monkeypatch_coerce(standard_library): + for input_decl in self._workflow.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) @@ -2266,8 +2278,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Evaluate all the outputs in the normal, non-task-outputs library context standard_library = ToilWDLStdLibBase(file_store) output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() - for output_decl in self._outputs: - output_bindings = output_bindings.bind(output_decl.name, evaluate_decl(output_decl, unwrap(self._bindings), standard_library)) + with monkeypatch_coerce(standard_library): + for output_decl in self._outputs: + output_bindings = output_bindings.bind(output_decl.name, evaluate_decl(output_decl, unwrap(self._bindings), standard_library)) return self.postprocess(output_bindings) @@ -2303,6 +2316,26 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: self.defer_postprocessing(workflow_job) return workflow_job.rv() +# monkey patch miniwdl's WDL.Value.Base.coerce() function +# miniwdl recognizes when a string needs to be converted into a file +# However miniwdl's string to file conversions is to just store the filepath +# Toil needs to virtualize the file into the jobstore +# So monkey patch coerce to always virtualize whenever a file is expected +# _virtualize_filename should detect if the value is already a file and return immediately if so +@contextmanager +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase): + def coerce(self, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + if isinstance(desired_type, WDL.Type.File): + self.value = standard_library._virtualize_filename(self.value) + return self + return old_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + old_coerce = WDL.Value.Base.coerce + try: + WDL.Value.Base.coerce = coerce + yield + finally: + WDL.Value.Base.coerce = old_coerce + def main() -> None: """ A Toil workflow to interpret WDL input files. From 62b19d90700b4d10e728bc926b34f2f974f9bb9a Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 31 Aug 2023 16:16:36 -0700 Subject: [PATCH 2/7] Fix task inputs string coerce --- src/toil/wdl/wdltoil.py | 138 +++++++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 51 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index e85739c850..905eed90e4 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -408,7 +408,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): """ Set up the standard library. """ @@ -426,6 +426,8 @@ def __init__(self, file_store: AbstractFileStore): # Keep the file store around so we can access files. self._file_store = file_store + self._execution_dir = execution_dir + def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL @@ -465,7 +467,12 @@ def _devirtualize_filename(self, filename: str) -> str: result = self._file_store.readGlobalFile(imported) else: # This is a local file - result = filename + # To support relative paths, join the execution dir and filename + # if filename is already an abs path, join() will do nothing + if self._execution_dir is not None: + result = os.path.join(self._execution_dir, filename) + else: + result = filename logger.debug('Devirtualized %s as openable file %s', filename, result) assert os.path.exists(result), f"Virtualized file {filename} looks like a local file but isn't!" @@ -484,7 +491,13 @@ def _virtualize_filename(self, filename: str) -> str: return filename # Otherwise this is a local file and we want to fake it as a Toil file store file - file_id = self._file_store.writeGlobalFile(filename) + + # To support relative paths from execution directory, join the execution dir and filename + # If filename is already an abs path, join() will not do anything + if self._execution_dir is not None: + file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename)) + else: + file_id = self._file_store.writeGlobalFile(filename) result = pack_toil_uri(file_id, os.path.basename(filename)) logger.debug('Virtualized %s as WDL file %s', filename, result) return result @@ -716,15 +729,20 @@ def evaluate_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.Std return evaluate_named_expression(node, node.name, node.type, node.expr, environment, stdlib) -def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_list: Optional[List[WDL.Tree.Decl]] = None) -> WDLBindings: """ Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. """ new_bindings: WDLBindings = WDL.Env.Bindings() + inputs_dict = {e.name: e.type for e in inputs_list or []} for k, v in expressions.items(): # Add each binding in turn - new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, None, v, environment, stdlib)) + # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None + expected_type = None + if not v.type.optional: + expected_type = inputs_dict.get(k, None) + new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) return new_bindings def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base: @@ -735,7 +753,10 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std try: if node.name in environment and not isinstance(environment[node.name], WDL.Value.Null): logger.debug('Name %s is already defined with a non-null value, not using default', node.name) - return environment[node.name] + if not isinstance(environment[node.name], type(node.type)): + return environment[node.name].coerce(node.type) + else: + return environment[node.name] else: if node.type is not None and not node.type.optional and node.expr is None: # We need a value for this but there isn't one. @@ -944,7 +965,7 @@ class WDLBaseJob(Job): as the job's run method calls postprocess(). """ - def __init__(self, **kwargs: Any) -> None: + def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDL-related job. @@ -971,6 +992,8 @@ def __init__(self, **kwargs: Any) -> None: # jobs returning other jobs' promised RVs. self._postprocessing_steps: List[Tuple[str, Union[str, Promised[WDLBindings]]]] = [] + self._execution_dir = execution_dir + # TODO: We're not allowed by MyPy to override a method and widen the return # type, so this has to be Any. def run(self, file_store: AbstractFileStore) -> Any: @@ -1462,15 +1485,11 @@ class WDLWorkflowNodeJob(WDLBaseJob): Job that evaluates a WDL workflow node. """ - def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a workflow node to completion. """ - super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, **kwargs) - # Always run workflow nodes on the leader for issue #4554 - # https://github.com/DataBiosphere/toil/issues/4554 - if 'local' not in kwargs: - kwargs['local'] = True + super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, execution_dir=execution_dir, **kwargs) self._node = node self._prev_node_results = prev_node_results @@ -1489,7 +1508,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) with monkeypatch_coerce(standard_library): if isinstance(self._node, WDL.Tree.Decl): # This is a variable assignment @@ -1502,7 +1521,12 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Fetch all the inputs we are passing and bind them. # The call is only allowed to use these. logger.debug("Evaluating step inputs") - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) + if self._node.callee is None: + # This should never be None, but mypy gets unhappy and this is better than an assert + input_decls = None + else: + input_decls = self._node.callee.inputs + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, input_decls) # Bindings may also be added in from the enclosing workflow inputs # TODO: this is letting us also inject them from the workflow body. @@ -1512,7 +1536,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if isinstance(self._node.callee, WDL.Tree.Workflow): # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', self._execution_dir) self.addChild(subjob) elif isinstance(self._node.callee, WDL.Tree.Task): # This is a call of a task @@ -1527,14 +1551,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: self.defer_postprocessing(subjob) return subjob.rv() elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) + subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) # Scatters don't really make a namespace, just kind of a scope? # TODO: Let stuff leave scope! self.defer_postprocessing(subjob) return subjob.rv() elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) + subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) # Conditionals don't really make a namespace, just kind of a scope? # TODO: Let stuff leave scope! @@ -1550,15 +1574,11 @@ class WDLWorkflowNodeListJob(WDLBaseJob): workflows or tasks or sections. """ - def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a list of workflow nodes to completion. """ - super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', **kwargs) - # Always run workflow nodes on the leader for issue #4554 - # https://github.com/DataBiosphere/toil/issues/4554 - if 'local' not in kwargs: - kwargs['local'] = True + super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', execution_dir=execution_dir, **kwargs) self._nodes = nodes self._prev_node_results = prev_node_results @@ -1577,7 +1597,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) with monkeypatch_coerce(standard_library): for node in self._nodes: @@ -1756,12 +1776,12 @@ class WDLSectionJob(WDLBaseJob): Job that can create more graph for a section of the wrokflow. """ - def __init__(self, namespace: str, **kwargs: Any) -> None: + def __init__(self, namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDLSectionJob where the interior runs in the given namespace, starting with the root workflow. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._namespace = namespace @staticmethod @@ -1907,10 +1927,10 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: if len(node_ids) == 1: # Make a one-node job - job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace) + job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace, self._execution_dir) else: # Make a multi-node job - job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace) + job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace, self._execution_dir) for prev_job in prev_jobs: # Connect up the happens-after relationships to make sure the # return values are available. @@ -2007,11 +2027,11 @@ class WDLScatterJob(WDLSectionJob): instance of the body. If an instance of the body doesn't create a binding, it gets a null value in the corresponding array. """ - def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL scatter. The scatter itself and the contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, execution_dir=execution_dir) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2143,11 +2163,11 @@ class WDLConditionalJob(WDLSectionJob): """ Job that evaluates a conditional in a WDL workflow. """ - def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL conditional. The conditional itself and its contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, execution_dir=execution_dir) # Once again we need to ship the whole body template to be instantiated # into Toil jobs only if it will actually run. @@ -2194,7 +2214,7 @@ class WDLWorkflowJob(WDLSectionJob): Job that evaluates an entire WDL workflow. """ - def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL workflow. The job returns the return value of the workflow. @@ -2202,7 +2222,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom :param namespace: the namespace that the workflow's *contents* will be in. Caller has already added the workflow's own name. """ - super().__init__(namespace, **kwargs) + super().__init__(namespace, execution_dir, **kwargs) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2231,7 +2251,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) if self._workflow.inputs: with monkeypatch_coerce(standard_library): @@ -2244,7 +2264,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if self._workflow.outputs: # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv()) + outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2260,11 +2280,11 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], **kwargs: Any): + def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._outputs = outputs self._bindings = bindings @@ -2276,7 +2296,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: super().run(file_store) # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() with monkeypatch_coerce(standard_library): for output_decl in self._outputs: @@ -2284,6 +2304,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: return self.postprocess(output_bindings) + class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs @@ -2291,13 +2312,13 @@ class WDLRootJob(WDLSectionJob): the workflow name; both forms are accepted. """ - def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree to run the workflow and namespace the outputs. """ # The root workflow names the root namespace - super().__init__(workflow.name, **kwargs) + super().__init__(workflow.name, execution_dir, **kwargs) self._workflow = workflow self._inputs = inputs @@ -2310,31 +2331,43 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Run the workflow. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. - workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace) + workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace, self._execution_dir) workflow_job.then_namespace(self._namespace) self.addChild(workflow_job) self.defer_postprocessing(workflow_job) return workflow_job.rv() -# monkey patch miniwdl's WDL.Value.Base.coerce() function +# Monkey patch miniwdl's WDL.Value.Base.coerce() function # miniwdl recognizes when a string needs to be converted into a file # However miniwdl's string to file conversions is to just store the filepath # Toil needs to virtualize the file into the jobstore # So monkey patch coerce to always virtualize whenever a file is expected # _virtualize_filename should detect if the value is already a file and return immediately if so +# Sometimes string coerce is called instead, so monkeypatch string coerce @contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase): - def coerce(self, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Any: + def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: if isinstance(desired_type, WDL.Type.File): self.value = standard_library._virtualize_filename(self.value) return self - return old_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce - old_coerce = WDL.Value.Base.coerce + return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + return old_str_coerce(self, desired_type) + + old_base_coerce = WDL.Value.Base.coerce + old_str_coerce = WDL.Value.String.coerce try: - WDL.Value.Base.coerce = coerce + # Mypy does not like monkeypatching: + # https://github.com/python/mypy/issues/2427#issuecomment-1419206807 + WDL.Value.Base.coerce = base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = string_coerce # type: ignore[method-assign] yield finally: - WDL.Value.Base.coerce = old_coerce + WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] + def main() -> None: """ @@ -2428,8 +2461,11 @@ def main() -> None: # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? + # Get the execution directory + execution_dir = os.getcwd() + # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(document.workflow, input_bindings) + root_job = WDLRootJob(document.workflow, input_bindings, execution_dir) output_bindings = toil.start(root_job) assert isinstance(output_bindings, WDL.Env.Bindings) From 723246d5dbf029d9eac1779106006def8f046a3f Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 8 Sep 2023 16:45:08 -0700 Subject: [PATCH 3/7] Disable kubernetes --- .gitlab-ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d74fef5650..b198658021 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,7 +9,7 @@ variables: # Used to tell pytest which tests to be run by specifying markers, # Allows partitioning of tests to prevent duplicate running of tests in different jobs. # Currently specifies special tests that are not run by quick_test_offline. - MARKER: "tes or integrative or encryption or kubernetes or mesos or server_mode or fetchable_appliance or appliance or slow or docker or cwl or singularity or rsync3" + MARKER: "tes or integrative or encryption or mesos or server_mode or fetchable_appliance or appliance or slow or docker or cwl or singularity or rsync3 and not kubernetes" TEST_THREADS: "3" before_script: # Log where we are running, in case some Kubernetes hosts are busted. IPs are assigned per host. @@ -239,7 +239,6 @@ cwl_v1.1_kubernetes: - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance_with_caching" cwl_v1.2_kubernetes: - stage: main_tests script: - pwd - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws,kubernetes] From af4f0c38dd591cdc82eda894516ead5ec22fd2e6 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 8 Sep 2023 16:56:05 -0700 Subject: [PATCH 4/7] Comment out cwl kubernetes --- .gitlab-ci.yml | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b198658021..a09ff21d55 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -238,25 +238,26 @@ cwl_v1.1_kubernetes: - mkdir -p ${TOIL_WORKDIR} - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv11Test::test_kubernetes_cwl_conformance_with_caching" -cwl_v1.2_kubernetes: - script: - - pwd - - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws,kubernetes] - - export TOIL_KUBERNETES_OWNER=toiltest - - export TOIL_AWS_SECRET_NAME=shared-s3-credentials - - export TOIL_KUBERNETES_HOST_PATH=/data/scratch - - export TOIL_WORKDIR=/var/lib/toil - - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache - - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi - - mkdir -p ${TOIL_WORKDIR} - - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv12Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv12Test::test_kubernetes_cwl_conformance_with_caching" - artifacts: - reports: - junit: "*.junit.xml" - paths: - - "*.junit.xml" - when: always - expire_in: 14 days +#cwl_v1.2_kubernetes: +# stage: main_tests +# script: +# - pwd +# - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws,kubernetes] +# - export TOIL_KUBERNETES_OWNER=toiltest +# - export TOIL_AWS_SECRET_NAME=shared-s3-credentials +# - export TOIL_KUBERNETES_HOST_PATH=/data/scratch +# - export TOIL_WORKDIR=/var/lib/toil +# - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache +# - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi +# - mkdir -p ${TOIL_WORKDIR} +# - make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py::CWLv12Test::test_kubernetes_cwl_conformance src/toil/test/cwl/cwlTest.py::CWLv12Test::test_kubernetes_cwl_conformance_with_caching" +# artifacts: +# reports: +# junit: "*.junit.xml" +# paths: +# - "*.junit.xml" +# when: always +# expire_in: 14 days wdl: stage: main_tests From 0a35ba3ed19a7d71482fd702dd094185a59a572a Mon Sep 17 00:00:00 2001 From: stxue1 Date: Mon, 11 Sep 2023 11:27:28 -0700 Subject: [PATCH 5/7] Maybe markers are wrong and comment out cactus-on-kubernetes --- .gitlab-ci.yml | 58 +++++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a09ff21d55..f2ebf3394c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,7 +9,7 @@ variables: # Used to tell pytest which tests to be run by specifying markers, # Allows partitioning of tests to prevent duplicate running of tests in different jobs. # Currently specifies special tests that are not run by quick_test_offline. - MARKER: "tes or integrative or encryption or mesos or server_mode or fetchable_appliance or appliance or slow or docker or cwl or singularity or rsync3 and not kubernetes" + MARKER: "(tes or integrative or encryption or mesos or server_mode or fetchable_appliance or appliance or slow or docker or cwl or singularity or rsync3) and not kubernetes" TEST_THREADS: "3" before_script: # Log where we are running, in case some Kubernetes hosts are busted. IPs are assigned per host. @@ -323,31 +323,31 @@ google_jobstore: - make test threads="${TEST_THREADS}" tests=src/toil/test/jobStores/jobStoreTest.py::GoogleJobStoreTest # Cactus-on-Kubernetes integration (as a script and not a pytest test) -cactus_integration: - stage: integration - script: - - set -e - - ${MAIN_PYTHON_PKG} -m virtualenv --system-site-packages venv - - . venv/bin/activate - - pip install -U pip wheel - - pip install .[aws,kubernetes] - - export TOIL_KUBERNETES_OWNER=toiltest - - export TOIL_AWS_SECRET_NAME=shared-s3-credentials - - export TOIL_KUBERNETES_HOST_PATH=/data/scratch - - export TOIL_WORKDIR=/var/lib/toil - - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache - - mkdir -p ${TOIL_WORKDIR} - - BUCKET_NAME=toil-test-$RANDOM-$RANDOM-$RANDOM - - cd - - git clone https://github.com/ComparativeGenomicsToolkit/cactus.git --recursive - - cd cactus - - git fetch origin - - git checkout f5adf4013326322ae58ef1eccb8409b71d761583 - - git submodule update --init --recursive - # We can't use setuptools 66 on Ubuntu due to https://github.com/pypa/setuptools/issues/3772 - - pip install --upgrade 'setuptools<66' pip - - pip install --upgrade . - - pip install --upgrade numpy psutil # Cactus installs an old psutil that Toil isn't compatible with. TODO: Do we really need Numpy? - - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi - - toil clean aws:us-west-2:${BUCKET_NAME} - - time cactus --setEnv SINGULARITY_DOCKER_HUB_MIRROR --batchSystem kubernetes --retryCount=3 --consCores 2 --binariesMode singularity --clean always aws:us-west-2:${BUCKET_NAME} examples/evolverMammals.txt examples/evolverMammals.hal --root mr --defaultDisk "8G" --logDebug +#cactus_integration: +# stage: integration +# script: +# - set -e +# - ${MAIN_PYTHON_PKG} -m virtualenv --system-site-packages venv +# - . venv/bin/activate +# - pip install -U pip wheel +# - pip install .[aws] +# - export TOIL_KUBERNETES_OWNER=toiltest +# - export TOIL_AWS_SECRET_NAME=shared-s3-credentials +# - export TOIL_KUBERNETES_HOST_PATH=/data/scratch +# - export TOIL_WORKDIR=/var/lib/toil +# - export SINGULARITY_CACHEDIR=/var/lib/toil/singularity-cache +# - mkdir -p ${TOIL_WORKDIR} +# - BUCKET_NAME=toil-test-$RANDOM-$RANDOM-$RANDOM +# - cd +# - git clone https://github.com/ComparativeGenomicsToolkit/cactus.git --recursive +# - cd cactus +# - git fetch origin +# - git checkout f5adf4013326322ae58ef1eccb8409b71d761583 +# - git submodule update --init --recursive +# # We can't use setuptools 66 on Ubuntu due to https://github.com/pypa/setuptools/issues/3772 +# - pip install --upgrade 'setuptools<66' pip +# - pip install --upgrade . +# - pip install --upgrade numpy psutil # Cactus installs an old psutil that Toil isn't compatible with. TODO: Do we really need Numpy? +# - if [[ ! -z "${KUBERNETES_DOCKER_HUB_MIRROR}" ]] ; then export SINGULARITY_DOCKER_HUB_MIRROR="${KUBERNETES_DOCKER_HUB_MIRROR}" ; fi +# - toil clean aws:us-west-2:${BUCKET_NAME} +# - time cactus --setEnv SINGULARITY_DOCKER_HUB_MIRROR --batchSystem kubernetes --retryCount=3 --consCores 2 --binariesMode singularity --clean always aws:us-west-2:${BUCKET_NAME} examples/evolverMammals.txt examples/evolverMammals.hal --root mr --defaultDisk "8G" --logDebug From ecc67152f9337b91535c89fa4b24ee0166dac3ef Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 19 Sep 2023 11:58:38 -0700 Subject: [PATCH 6/7] Add docstrings to changed functions + change input list to dict --- src/toil/wdl/wdltoil.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 74c06cdd7d..9c01363962 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -33,7 +33,8 @@ from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter -from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator +from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator, \ + Generator from urllib.parse import urlsplit, urljoin, quote, unquote import WDL @@ -748,18 +749,18 @@ def evaluate_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.Std return evaluate_named_expression(node, node.name, node.type, node.expr, environment, stdlib) -def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_list: Optional[List[WDL.Tree.Decl]] = None) -> WDLBindings: +def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_dict: Optional[Dict[str, WDL.Type.Base]] = None) -> WDLBindings: """ - Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. + Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. `inputs_dict` is a mapping of + variable names to their expected type for the input decls in a task. """ - new_bindings: WDLBindings = WDL.Env.Bindings() - inputs_dict = {e.name: e.type for e in inputs_list or []} for k, v in expressions.items(): # Add each binding in turn # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None expected_type = None if not v.type.optional: + # This is done to enable passing in a string into a task input of file type expected_type = inputs_dict.get(k, None) new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) return new_bindings @@ -1540,10 +1541,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: logger.debug("Evaluating step inputs") if self._node.callee is None: # This should never be None, but mypy gets unhappy and this is better than an assert - input_decls = None + inputs_mapping = None else: - input_decls = self._node.callee.inputs - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, input_decls) + inputs_mapping = {e.name: e.type for e in self._node.callee.inputs or []} + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, inputs_mapping) # Bindings may also be added in from the enclosing workflow inputs # TODO: this is letting us also inject them from the workflow body. @@ -2353,21 +2354,23 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: self.defer_postprocessing(workflow_job) return workflow_job.rv() -# Monkey patch miniwdl's WDL.Value.Base.coerce() function -# miniwdl recognizes when a string needs to be converted into a file -# However miniwdl's string to file conversions is to just store the filepath -# Toil needs to virtualize the file into the jobstore -# So monkey patch coerce to always virtualize whenever a file is expected -# _virtualize_filename should detect if the value is already a file and return immediately if so -# Sometimes string coerce is called instead, so monkeypatch string coerce @contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Any: +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: + """ + Monkeypatch miniwdl's WDL.Value.Base.coerce() function. Calls _virtualize_filename from a given standard library object + :param standard_library: a standard library object + :return + """ + # We're doing this because while miniwdl recognizes when a string needs to be converted into a file, it's method of + # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until + # there is an internal entrypoint, monkeypatch it. def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: if isinstance(desired_type, WDL.Type.File): self.value = standard_library._virtualize_filename(self.value) return self return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + # Sometimes string coerce is called instead, so monkeypatch this one as well if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) return old_str_coerce(self, desired_type) From c73df30ed0fa395f7cdff2e675bbf62bcc0891dc Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 19 Sep 2023 18:10:06 -0700 Subject: [PATCH 7/7] Deal with nonetype --- src/toil/wdl/wdltoil.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 9c01363962..4042a797e1 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -759,7 +759,7 @@ def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePo # Add each binding in turn # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None expected_type = None - if not v.type.optional: + if not v.type.optional and inputs_dict is not None: # This is done to enable passing in a string into a task input of file type expected_type = inputs_dict.get(k, None) new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) @@ -2357,7 +2357,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: @contextmanager def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: """ - Monkeypatch miniwdl's WDL.Value.Base.coerce() function. Calls _virtualize_filename from a given standard library object + Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. + Calls _virtualize_filename from a given standard library object. :param standard_library: a standard library object :return """