Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report errors in WDL using MiniWDL's error location printer #4637

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 77 additions & 7 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
Iterable, Generator
from urllib.parse import urlsplit, urljoin, quote, unquote

from WDL import Error
from configargparse import ArgParser
from WDL._util import byte_size_units
from WDL.CLI import print_error
from WDL.runtime.task_container import TaskContainer
from WDL.runtime.backend.singularity import SingularityContainer
from WDL.runtime.backend.docker_swarm import SwarmContainer
import WDL.Error
import WDL.runtime.config

from toil.common import Config, Toil, addOptions
Expand All @@ -57,6 +58,58 @@

logger = logging.getLogger(__name__)

@contextmanager
def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]:
"""
Run code in a context where WDL errors will be reported with pretty formatting.
"""

try:
yield
except (
WDL.Error.SyntaxError,
WDL.Error.ImportError,
WDL.Error.ValidationError,
WDL.Error.MultipleValidationErrors,
FileNotFoundError
) as e:
log("Could not " + task)
# These are the errors that MiniWDL's parser can raise and its reporter
# can report. See
# https://github.com/chanzuckerberg/miniwdl/blob/a780b1bf2db61f18de37616068968b2bb4c2d21c/WDL/CLI.py#L91-L97.
#
# We are going to use MiniWDL's pretty printer to print them.
print_error(e)
if exit:
# Stop right now
sys.exit(1)
else:
# Reraise the exception to stop
raise

F = TypeVar('F', bound=Callable[..., Any])
def report_wdl_errors(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Callable[[F], F]:
"""
Create a decorator to report WDL errors with the given task message.

Decorator can then be applied to a function, and if a WDL error happens it
will say that it could not {task}.
"""
def decorator(decoratee: F) -> F:
"""
Decorate a function with WDL error reporting.
"""
def decorated(*args: Any, **kwargs: Any) -> Any:
"""
Run the decoratee and handle WDL errors.
"""
with wdl_error_reporter(task, exit=exit, log=log):
return decoratee(*args, **kwargs)
return cast(F, decorated)
return decorator



def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]:
"""
Get potential absolute URIs to check for an imported file.
Expand Down Expand Up @@ -832,7 +885,7 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None:
host_path_strip = host_path.rstrip("/")
if host_path not in task_container.input_path_map and host_path_strip not in task_container.input_path_map:
if not os.path.exists(host_path_strip):
raise Error.InputError("input path not found: " + host_path)
raise WDL.Error.InputError("input path not found: " + host_path)
host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path)
# for each such partition of files
# - if there are no basename collisions under input subdirectory 0, then mount them there.
Expand Down Expand Up @@ -1071,6 +1124,8 @@ def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None:
def run(self, file_store: AbstractFileStore) -> Any:
"""
Run a WDL-related job.

Remember to decorate non-trivial overrides with :func:`report_wdl_errors`.
"""
# Make sure that pickle is prepared to save our return values, which
# might take a lot of recursive calls. TODO: This might be because
Expand Down Expand Up @@ -1208,6 +1263,7 @@ def can_fake_root(self) -> bool:
logger.warning('No subuids are assigned to %s; cannot fake root.', username)
return False

@report_wdl_errors("run task")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Actually run the task.
Expand Down Expand Up @@ -1569,6 +1625,7 @@ def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Prom
if isinstance(self._node, WDL.Tree.Call):
logger.debug("Preparing job for call node %s", self._node.workflow_node_id)

@report_wdl_errors("run workflow node")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Actually execute the workflow node.
Expand Down Expand Up @@ -1659,6 +1716,7 @@ def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequen
if isinstance(n, (WDL.Tree.Call, WDL.Tree.Scatter, WDL.Tree.Conditional)):
raise RuntimeError("Node cannot be evaluated with other nodes: " + str(n))

@report_wdl_errors("run workflow node list")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Actually execute the workflow nodes.
Expand Down Expand Up @@ -1701,6 +1759,7 @@ def __init__(self, prev_node_results: Sequence[Promised[WDLBindings]], **kwargs:

self._prev_node_results = prev_node_results

@report_wdl_errors("combine bindings")
def run(self, file_store: AbstractFileStore) -> WDLBindings:
"""
Aggregate incoming results.
Expand Down Expand Up @@ -2116,6 +2175,7 @@ def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promis
self._scatter = scatter
self._prev_node_results = prev_node_results

@report_wdl_errors("run scatter")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Run the scatter.
Expand Down Expand Up @@ -2198,6 +2258,7 @@ def __init__(self, input_bindings: Sequence[Promised[WDLBindings]], base_binding
self._input_bindings = input_bindings
self._base_bindings = base_bindings

@report_wdl_errors("create array bindings")
def run(self, file_store: AbstractFileStore) -> WDLBindings:
"""
Actually produce the array-ified bindings now that promised values are available.
Expand Down Expand Up @@ -2249,6 +2310,7 @@ def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequenc
self._conditional = conditional
self._prev_node_results = prev_node_results

@report_wdl_errors("run conditional")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Run the conditional.
Expand Down Expand Up @@ -2311,6 +2373,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom
self._workflow_id = workflow_id
self._namespace = namespace

@report_wdl_errors("run workflow")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Run the workflow. Return the result of the workflow.
Expand Down Expand Up @@ -2361,6 +2424,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings],
self._bindings = bindings
self._workflow = workflow

@report_wdl_errors("evaluate outputs")
def run(self, file_store: AbstractFileStore) -> WDLBindings:
"""
Make bindings for the outputs.
Expand Down Expand Up @@ -2408,6 +2472,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_d
self._workflow = workflow
self._inputs = inputs

@report_wdl_errors("run root job")
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Actually build the subgraph.
Expand Down Expand Up @@ -2456,7 +2521,7 @@ def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base]
WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign]
WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign]


@report_wdl_errors("run workflow", exit=True)
def main() -> None:
"""
A Toil workflow to interpret WDL input files.
Expand Down Expand Up @@ -2500,8 +2565,10 @@ def main() -> None:
document: WDL.Tree.Document = WDL.load(options.wdl_uri, read_source=toil_read_source)

if document.workflow is None:
logger.critical("No workflow in document!")
sys.exit(1)
# Complain that we need a workflow.
# We need the absolute path or URL to raise the error
wdl_abspath = options.wdl_uri if not os.path.exists(options.wdl_uri) else os.path.abspath(options.wdl_uri)
raise WDL.Error.ValidationError(WDL.Error.SourcePosition(options.wdl_uri, wdl_abspath, 0, 0, 0, 1), "No workflow found in document")

if options.inputs_uri:
# Load the inputs. Use the same loading mechanism, which means we
Expand All @@ -2510,10 +2577,13 @@ def main() -> None:
try:
inputs = json.loads(downloaded.source_text)
except json.JSONDecodeError as e:
logger.critical('Cannot parse JSON at %s: %s', downloaded.abspath, e)
sys.exit(1)
# Complain about the JSON document.
# We need the absolute path or URL to raise the error
inputs_abspath = options.inputs_uri if not os.path.exists(options.inputs_uri) else os.path.abspath(options.inputs_uri)
raise WDL.Error.ValidationError(WDL.Error.SourcePosition(options.inputs_uri, inputs_abspath, e.lineno, e.colno, e.lineno, e.colno + 1), "Cannot parse input JSON: " + e.msg) from e
else:
inputs = {}

# Parse out the available and required inputs. Each key in the
# JSON ought to start with the workflow's name and then a .
# TODO: WDL's Bindings[] isn't variant in the right way, so we
Expand Down