diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index c29afb4bdc..a29d78677a 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -37,12 +37,13 @@ Iterable, Generator from urllib.parse import urlsplit, urljoin, quote, unquote -from WDL import Error from configargparse import ArgParser from WDL._util import byte_size_units +from WDL.CLI import print_error from WDL.runtime.task_container import TaskContainer from WDL.runtime.backend.singularity import SingularityContainer from WDL.runtime.backend.docker_swarm import SwarmContainer +import WDL.Error import WDL.runtime.config from toil.common import Config, Toil, addOptions @@ -57,6 +58,58 @@ logger = logging.getLogger(__name__) +@contextmanager +def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: + """ + Run code in a context where WDL errors will be reported with pretty formatting. + """ + + try: + yield + except ( + WDL.Error.SyntaxError, + WDL.Error.ImportError, + WDL.Error.ValidationError, + WDL.Error.MultipleValidationErrors, + FileNotFoundError + ) as e: + log("Could not " + task) + # These are the errors that MiniWDL's parser can raise and its reporter + # can report. See + # https://github.com/chanzuckerberg/miniwdl/blob/a780b1bf2db61f18de37616068968b2bb4c2d21c/WDL/CLI.py#L91-L97. + # + # We are going to use MiniWDL's pretty printer to print them. + print_error(e) + if exit: + # Stop right now + sys.exit(1) + else: + # Reraise the exception to stop + raise + +F = TypeVar('F', bound=Callable[..., Any]) +def report_wdl_errors(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Callable[[F], F]: + """ + Create a decorator to report WDL errors with the given task message. + + Decorator can then be applied to a function, and if a WDL error happens it + will say that it could not {task}. + """ + def decorator(decoratee: F) -> F: + """ + Decorate a function with WDL error reporting. + """ + def decorated(*args: Any, **kwargs: Any) -> Any: + """ + Run the decoratee and handle WDL errors. + """ + with wdl_error_reporter(task, exit=exit, log=log): + return decoratee(*args, **kwargs) + return cast(F, decorated) + return decorator + + + def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]: """ Get potential absolute URIs to check for an imported file. @@ -832,7 +885,7 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: host_path_strip = host_path.rstrip("/") if host_path not in task_container.input_path_map and host_path_strip not in task_container.input_path_map: if not os.path.exists(host_path_strip): - raise Error.InputError("input path not found: " + host_path) + raise WDL.Error.InputError("input path not found: " + host_path) host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path) # for each such partition of files # - if there are no basename collisions under input subdirectory 0, then mount them there. @@ -1071,6 +1124,8 @@ def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None: def run(self, file_store: AbstractFileStore) -> Any: """ Run a WDL-related job. + + Remember to decorate non-trivial overrides with :func:`report_wdl_errors`. """ # Make sure that pickle is prepared to save our return values, which # might take a lot of recursive calls. TODO: This might be because @@ -1208,6 +1263,7 @@ def can_fake_root(self) -> bool: logger.warning('No subuids are assigned to %s; cannot fake root.', username) return False + @report_wdl_errors("run task") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Actually run the task. @@ -1569,6 +1625,7 @@ def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Prom if isinstance(self._node, WDL.Tree.Call): logger.debug("Preparing job for call node %s", self._node.workflow_node_id) + @report_wdl_errors("run workflow node") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Actually execute the workflow node. @@ -1659,6 +1716,7 @@ def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequen if isinstance(n, (WDL.Tree.Call, WDL.Tree.Scatter, WDL.Tree.Conditional)): raise RuntimeError("Node cannot be evaluated with other nodes: " + str(n)) + @report_wdl_errors("run workflow node list") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Actually execute the workflow nodes. @@ -1701,6 +1759,7 @@ def __init__(self, prev_node_results: Sequence[Promised[WDLBindings]], **kwargs: self._prev_node_results = prev_node_results + @report_wdl_errors("combine bindings") def run(self, file_store: AbstractFileStore) -> WDLBindings: """ Aggregate incoming results. @@ -2116,6 +2175,7 @@ def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promis self._scatter = scatter self._prev_node_results = prev_node_results + @report_wdl_errors("run scatter") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Run the scatter. @@ -2198,6 +2258,7 @@ def __init__(self, input_bindings: Sequence[Promised[WDLBindings]], base_binding self._input_bindings = input_bindings self._base_bindings = base_bindings + @report_wdl_errors("create array bindings") def run(self, file_store: AbstractFileStore) -> WDLBindings: """ Actually produce the array-ified bindings now that promised values are available. @@ -2249,6 +2310,7 @@ def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequenc self._conditional = conditional self._prev_node_results = prev_node_results + @report_wdl_errors("run conditional") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Run the conditional. @@ -2311,6 +2373,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom self._workflow_id = workflow_id self._namespace = namespace + @report_wdl_errors("run workflow") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Run the workflow. Return the result of the workflow. @@ -2361,6 +2424,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], self._bindings = bindings self._workflow = workflow + @report_wdl_errors("evaluate outputs") def run(self, file_store: AbstractFileStore) -> WDLBindings: """ Make bindings for the outputs. @@ -2408,6 +2472,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_d self._workflow = workflow self._inputs = inputs + @report_wdl_errors("run root job") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Actually build the subgraph. @@ -2456,7 +2521,7 @@ def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] - +@report_wdl_errors("run workflow", exit=True) def main() -> None: """ A Toil workflow to interpret WDL input files. @@ -2500,8 +2565,10 @@ def main() -> None: document: WDL.Tree.Document = WDL.load(options.wdl_uri, read_source=toil_read_source) if document.workflow is None: - logger.critical("No workflow in document!") - sys.exit(1) + # Complain that we need a workflow. + # We need the absolute path or URL to raise the error + wdl_abspath = options.wdl_uri if not os.path.exists(options.wdl_uri) else os.path.abspath(options.wdl_uri) + raise WDL.Error.ValidationError(WDL.Error.SourcePosition(options.wdl_uri, wdl_abspath, 0, 0, 0, 1), "No workflow found in document") if options.inputs_uri: # Load the inputs. Use the same loading mechanism, which means we @@ -2510,10 +2577,13 @@ def main() -> None: try: inputs = json.loads(downloaded.source_text) except json.JSONDecodeError as e: - logger.critical('Cannot parse JSON at %s: %s', downloaded.abspath, e) - sys.exit(1) + # Complain about the JSON document. + # We need the absolute path or URL to raise the error + inputs_abspath = options.inputs_uri if not os.path.exists(options.inputs_uri) else os.path.abspath(options.inputs_uri) + raise WDL.Error.ValidationError(WDL.Error.SourcePosition(options.inputs_uri, inputs_abspath, e.lineno, e.colno, e.lineno, e.colno + 1), "Cannot parse input JSON: " + e.msg) from e else: inputs = {} + # Parse out the available and required inputs. Each key in the # JSON ought to start with the workflow's name and then a . # TODO: WDL's Bindings[] isn't variant in the right way, so we