Skip to content

Commit

Permalink
Sniffing for and workaround for keeping output of croo workflows (#5096)
Browse files Browse the repository at this point in the history
* Stack up scatters and conditionals to find tasks and fix #5094

* Use conformance tests with correct failure expectation

* Add --allCallOutputs to add call outputs to workflow output

* Default to including all task outputs for croo workflows

* Document new option and missing option

* Improve option doc wording

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Lon Blauvelt <lblauvel@ucsc.edu>
  • Loading branch information
3 people authored Sep 26, 2024
1 parent 561f5ba commit a698f45
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 11 deletions.
12 changes: 12 additions & 0 deletions docs/wdl/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,22 @@ input JSON file, for compatibility with other WDL runners.
``cromwell`` to just return the workflow's output values as JSON or ``miniwdl``
to nest that under an ``outputs`` key and includes a ``dir`` key.

``--referenceInputs``: Specifies whether input files to Toil should be passed
around by URL reference instead of being imported into Toil's storage. Defaults
to off. Can be ``True`` or ``False`` or other similar words.

``--container``: Specifies the container engine to use to run tasks. By default
this is ``auto``, which tries Singularity if it is installed and Docker if it
isn't. Can also be set to ``docker`` or ``singularity`` explicitly.

``--allCallOutputs``: Specifies whether outputs from all calls in a workflow
should be included alongside the outputs from the ``output`` section, when an
``output`` section is defined. For strict WDL spec compliance, should be set to
``False``. Usually defaults to ``False``. If the workflow includes metadata for
the `Cromwell Output Organizer (croo)`_, will default to ``True``.

.. _`Cromwell Output Organizer (croo)`: https://github.com/ENCODE-DCC/croo

Any number of other Toil options may also be specified. For defined Toil options,
see :ref:`commandRef`.

Expand Down
7 changes: 6 additions & 1 deletion src/toil/options/wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from configargparse import SUPPRESS

from toil.lib.conversions import strtobool


def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
"""
Expand Down Expand Up @@ -30,8 +32,11 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
parser.add_argument(*output_file_arguments, dest="output_file", type=str, default=None,
help=suppress_help or "File or URI to save output JSON to.")
reference_inputs_arguments = ["--wdlReferenceInputs"] + (["--referenceInputs"] if not suppress else [])
parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=bool, default=False,
parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=strtobool, default=False,
help=suppress_help or "Pass input files by URL")
container_arguments = ["--wdlContainer"] + (["--container"] if not suppress else [])
parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto",
help=suppress_help or "Container engine to use to run WDL tasks")
all_call_outputs_arguments = ["--wdlAllCallOutputs"] + (["--allCallOutputs"] if not suppress else [])
parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=None,
help=suppress_help or "Keep and return all call outputs as workflow outputs")
38 changes: 38 additions & 0 deletions src/toil/test/wdl/testfiles/croo.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version 1.0

workflow wf {
meta {
# Advertise as needing the Cromwell Output Organizer
croo_out_def: 'https://storage.googleapis.com/encode-pipeline-output-definition/atac.croo.v5.json'
}

input {
}

call do_math {
input:
number = 3
}

Int should_never_output = do_math.cube - 1

output {
Int only_result = do_math.square
}
}

task do_math {
input {
Int number
}

# Not allowed to not have a command
command <<<
>>>

output {
Int square = number * number
Int cube = number * number * number
}
}

33 changes: 33 additions & 0 deletions src/toil/test/wdl/testfiles/not_enough_outputs.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version 1.0

workflow wf {
input {
}

call do_math {
input:
number = 3
}

Int should_never_output = do_math.cube - 1

output {
Int only_result = do_math.square
}
}

task do_math {
input {
Int number
}

# Not allowed to not have a command
command <<<
>>>

output {
Int square = number * number
Int cube = number * number * number
}
}

66 changes: 65 additions & 1 deletion src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_url_to_file(self):
assert 'url_to_file.first_line' in result
assert isinstance(result['url_to_file.first_line'], str)
self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328')

@needs_docker
def test_wait(self):
"""
Expand All @@ -182,6 +182,70 @@ def test_wait(self):
assert isinstance(result['wait.result'], str)
self.assertEqual(result['wait.result'], 'waited')

@needs_singularity_or_docker
def test_all_call_outputs(self):
"""
Test if Toil can collect all call outputs from a workflow that doesn't expose them.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/not_enough_outputs.wdl')

# With no flag we don't include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

# With flag off we don't include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=false'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

# With flag on we do include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=on'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' in result
assert 'wf.do_math.cube' in result
assert 'wf.should_never_output' not in result

@needs_singularity_or_docker
def test_croo_detection(self):
"""
Test if Toil can detect and do something sensible with Cromwell Output Organizer workflows.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/croo.wdl')

# With no flag we should include all task outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' in result
assert 'wf.do_math.cube' in result
assert 'wf.should_never_output' not in result

# With flag off we obey the WDL spec even if we're suspicious
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=off'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

def test_url_to_optional_file(self):
"""
Test if missing and error-producing URLs are handled correctly for optional File? values.
Expand Down
44 changes: 35 additions & 9 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3381,9 +3381,20 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings:
standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir"))

try:
if self._workflow.outputs is None:
# The output section is not declared
# So get all task outputs and return that
if self._workflow.outputs is not None:
# Output section is declared and is nonempty, so evaluate normally

# Combine the bindings from the previous job
with monkeypatch_coerce(standard_library):
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
else:
# If no output section is present, start with an empty bindings
output_bindings = WDL.Env.Bindings()

if self._workflow.outputs is None or self._wdl_options.get("all_call_outputs", False):
# The output section is not declared, or we want to keep task outputs anyway.

# Get all task outputs and return that
# First get all task output names
output_set = set()
# We need to recurse down through scatters and conditionals to find all the task names.
Expand All @@ -3409,12 +3420,6 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings:
if binding.name in output_set:
# The bindings will already be namespaced with the task namespaces
output_bindings = output_bindings.bind(binding.name, binding.value)
else:
# Output section is declared and is nonempty, so evaluate normally

# Combine the bindings from the previous job
with monkeypatch_coerce(standard_library):
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
finally:
# We don't actually know when all our files are downloaded since
# anything we evaluate might devirtualize inside any expression.
Expand Down Expand Up @@ -3553,6 +3558,26 @@ def main() -> None:
else:
raise WDL.Error.InputError("WDL document is empty!")

if "croo_out_def" in target.meta:
# This workflow or task wants to have its outputs
# "organized" by the Cromwell Output Organizer:
# <https://github.com/ENCODE-DCC/croo>.
#
# TODO: We don't support generating anything that CROO can read.
logger.warning("This WDL expects to be used with the Cromwell Output Organizer (croo) <https://github.com/ENCODE-DCC/croo>. Toil cannot yet produce the outputs that croo requires. You will not be able to use croo on the output of this Toil run!")

# But we can assume that we need to preserve individual
# taks outputs since the point of CROO is fetching those
# from Cromwell's output directories.
#
# This isn't quite WDL spec compliant but it will rescue
# runs of the popular
# <https://github.com/ENCODE-DCC/atac-seq-pipeline>
if options.all_call_outputs is None:
logger.warning("Inferring --allCallOutputs=True to preserve probable actual outputs of a croo WDL file.")
options.all_call_outputs = True


if options.inputs_uri:
# Load the inputs. Use the same loading mechanism, which means we
# have to break into async temporarily.
Expand Down Expand Up @@ -3612,6 +3637,7 @@ def main() -> None:
wdl_options["execution_dir"] = execution_dir
wdl_options["container"] = options.container
assert wdl_options.get("container") is not None
wdl_options["all_call_outputs"] = options.all_call_outputs

# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options)
Expand Down

0 comments on commit a698f45

Please sign in to comment.