diff --git a/MANIFEST.in b/MANIFEST.in index c93cf86..0a33425 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ # Include the license and version files. include LICENSE -include dsub/VERSION \ No newline at end of file +include dsub/providers/local/runner.sh diff --git a/dsub/_dsub_version.py b/dsub/_dsub_version.py index 05b5eca..e4a5632 100644 --- a/dsub/_dsub_version.py +++ b/dsub/_dsub_version.py @@ -20,4 +20,4 @@ file could break setup.py. """ -DSUB_VERSION = '0.1.1' +DSUB_VERSION = '0.1.2' diff --git a/dsub/commands/ddel.py b/dsub/commands/ddel.py index 5e56bbd..b096234 100755 --- a/dsub/commands/ddel.py +++ b/dsub/commands/ddel.py @@ -16,14 +16,15 @@ Follows the model of qdel. """ -import argparse +import sys from ..lib import dsub_util from ..lib import param_util +from ..lib import resources from ..providers import provider_base -def parse_arguments(): +def _parse_arguments(): """Parses command line arguments. Returns: @@ -32,25 +33,11 @@ def parse_arguments(): # Handle version flag and exit if it was passed. param_util.handle_version_flag() - provider_required_args = { - 'google': ['project'], - 'test-fails': [], - 'local': [], - } - epilog = 'Provider-required arguments:\n' - for provider in provider_required_args: - epilog += ' %s: %s\n' % (provider, provider_required_args[provider]) - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter, epilog=epilog) - provider_base.add_provider_argument(parser) + parser = provider_base.create_parser(sys.argv[0]) + parser.add_argument( '--version', '-v', default=False, help='Print the dsub version and exit.') - google = parser.add_argument_group( - title='google', - description='Options for the Google provider (Pipelines API)') - google.add_argument( - '--project', - help='Cloud project ID in which to find and delete the job(s)') + parser.add_argument( '--jobs', '-j', @@ -82,10 +69,23 @@ def parse_arguments(): default=[], help='User labels to match. Tasks returned must match all labels.', metavar='KEY=VALUE') - return parser.parse_args() + + # Add provider-specific arguments + google = parser.add_argument_group( + title='google', + description='Options for the Google provider (Pipelines API)') + google.add_argument( + '--project', + help='Cloud project ID in which to find and delete the job(s)') + + return provider_base.parse_args(parser, { + 'google': ['project'], + 'test-fails': [], + 'local': [], + }, sys.argv[1:]) -def emit_search_criteria(users, jobs, tasks, labels): +def _emit_search_criteria(users, jobs, tasks, labels): """Print the filters used to delete tasks. Use raw flags as arguments.""" print 'Delete running jobs:' print ' user:' @@ -103,13 +103,13 @@ def emit_search_criteria(users, jobs, tasks, labels): def main(): # Parse args and validate - args = parse_arguments() + args = _parse_arguments() # Compute the age filter (if any) create_time = param_util.age_to_create_time(args.age) # Set up the Genomics Pipelines service interface - provider = provider_base.get_provider(args) + provider = provider_base.get_provider(args, resources) # Make sure users were provided, or try to fill from OS user. This cannot # be made into a default argument since some environments lack the ability @@ -121,7 +121,7 @@ def main(): # Let the user know which jobs we are going to look up with dsub_util.replace_print(): - emit_search_criteria(user_list, args.jobs, args.tasks, args.label) + _emit_search_criteria(user_list, args.jobs, args.tasks, args.label) # Delete the requested jobs deleted_tasks = ddel_tasks(provider, user_list, args.jobs, args.tasks, labels, create_time) diff --git a/dsub/commands/dstat.py b/dsub/commands/dstat.py index 38500d0..7348378 100755 --- a/dsub/commands/dstat.py +++ b/dsub/commands/dstat.py @@ -28,15 +28,16 @@ from __future__ import print_function -import argparse import collections from datetime import datetime import json +import sys import time from dateutil.tz import tzlocal from ..lib import dsub_util from ..lib import param_util +from ..lib import resources from ..providers import provider_base import tabulate @@ -191,7 +192,7 @@ def print_table(self, table): print(json.dumps(table, indent=2, default=self.serialize)) -def prepare_row(task, full): +def _prepare_row(task, full): """return a dict with the task's info (more if "full" is set).""" # Would like to include the Job ID in the default set of columns, but @@ -236,7 +237,7 @@ def prepare_row(task, full): return row -def parse_arguments(): +def _parse_arguments(): """Parses command line arguments. Returns: @@ -245,21 +246,11 @@ def parse_arguments(): # Handle version flag and exit if it was passed. param_util.handle_version_flag() - provider_required_args = { - 'google': ['project'], - 'test-fails': [], - 'local': [], - } - epilog = 'Provider-required arguments:\n' - for provider in provider_required_args: - epilog += ' %s: %s\n' % (provider, provider_required_args[provider]) - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter, epilog=epilog) + parser = provider_base.create_parser(sys.argv[0]) + parser.add_argument( '--version', '-v', default=False, help='Print the dsub version and exit.') - parser.add_argument( - '--project', - help='Cloud project ID in which to query pipeline operations') + parser.add_argument( '--jobs', '-j', @@ -289,7 +280,9 @@ def parse_arguments(): default=['RUNNING'], choices=['RUNNING', 'SUCCESS', 'FAILURE', 'CANCELED', '*'], help="""Lists only those jobs which match the specified status(es). - Use "*" to list jobs of any status.""") + Choose from {'RUNNING', 'SUCCESS', 'FAILURE', 'CANCELED'}. + Use "*" to list jobs of any status.""", + metavar='STATUS') parser.add_argument( '--age', help="""List only those jobs newer than the specified age. Ages can be @@ -325,21 +318,25 @@ def parse_arguments(): '--format', choices=['text', 'json', 'yaml', 'provider-json'], help='Set the output format.') - # Add provider-specific arguments - provider_base.add_provider_argument(parser) - args = parser.parse_args() + # Add provider-specific arguments + google = parser.add_argument_group( + title='google', + description='Options for the Google provider (Pipelines API)') + google.add_argument( + '--project', + help='Cloud project ID in which to find and delete the job(s)') - # check special flag rules - for arg in provider_required_args[args.provider]: - if not args.__getattribute__(arg): - parser.error('argument --%s is required' % arg) - return args + return provider_base.parse_args(parser, { + 'google': ['project'], + 'test-fails': [], + 'local': [], + }, sys.argv[1:]) def main(): # Parse args and validate - args = parse_arguments() + args = _parse_arguments() # Compute the age filter (if any) create_time = param_util.age_to_create_time(args.age) @@ -362,7 +359,7 @@ def main(): output_formatter = TextOutput(args.full) # Set up the Genomics Pipelines service interface - provider = provider_base.get_provider(args) + provider = provider_base.get_provider(args, resources) # Set poll interval to zero if --wait is not set. poll_interval = args.poll_interval if args.wait else 0 @@ -458,7 +455,7 @@ def dstat_job_producer(provider, if raw_format: formatted_tasks.append(task.raw_task_data()) else: - formatted_tasks.append(prepare_row(task, full_output)) + formatted_tasks.append(_prepare_row(task, full_output)) # Determine if any of the jobs are running. if task.get_field('task-status') == 'RUNNING': diff --git a/dsub/commands/dsub.py b/dsub/commands/dsub.py index 3f0c492..c40b5cf 100644 --- a/dsub/commands/dsub.py +++ b/dsub/commands/dsub.py @@ -30,6 +30,7 @@ from ..lib import dsub_util from ..lib import job_util from ..lib import param_util +from ..lib import resources from ..lib.dsub_util import print_error from ..providers import provider_base @@ -182,22 +183,12 @@ def _parse_arguments(prog, argv): # Handle version flag and exit if it was passed. param_util.handle_version_flag() - provider_required_args = { - 'google': ['project', 'zones', 'logging'], - 'test-fails': [], - 'local': ['logging'], - } - epilog = 'Provider-required arguments:\n' - for provider in provider_required_args: - epilog += ' %s: %s\n' % (provider, provider_required_args[provider]) - parser = argparse.ArgumentParser( - prog=prog, - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=epilog) + parser = provider_base.create_parser(prog) # Add dsub core job submission arguments parser.add_argument( '--version', '-v', default=False, help='Print the dsub version and exit.') + parser.add_argument( '--name', help="""Name for pipeline. Defaults to the script name or @@ -331,7 +322,6 @@ def _parse_arguments(prog, argv): ' (either a folder, or file ending in ".log")') # Add provider-specific arguments - provider_base.add_provider_argument(parser) google = parser.add_argument_group( title='google', description='Options for the Google provider (Pipelines API)') @@ -362,13 +352,12 @@ def _parse_arguments(prog, argv): Allows for connecting to the VM for debugging. Default is 0; maximum allowed value is 86400 (1 day).""") - args = parser.parse_args(argv) - - # check special flag rules - for arg in provider_required_args[args.provider]: - if not args.__getattribute__(arg): - parser.error('argument --%s is required' % arg) - return args + return provider_base.parse_args( + parser, { + 'google': ['project', 'zones', 'logging'], + 'test-fails': [], + 'local': ['logging'], + }, argv) def _get_job_resources(args): @@ -704,7 +693,7 @@ def run_main(args): }] return run( - provider_base.get_provider(args), + provider_base.get_provider(args, resources), _get_job_resources(args), job_data, all_task_data, diff --git a/dsub/lib/resources.py b/dsub/lib/resources.py new file mode 100644 index 0000000..8198500 --- /dev/null +++ b/dsub/lib/resources.py @@ -0,0 +1,40 @@ +# Copyright 2017 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Enable dsub methods to access resources in dsub packages. + +dsub, dstat, and ddel are designed to run under a few different packaging +environments. This module implements access to resources (such as static +text files) for the setuptools distribution. + +This module is imported by dsub.py, dstat.py, and ddel.py and passed down to +other classes that may need it. This module should not otherwise be imported +directly. + +This mechanism allows users of dsub.py, dstat.py, and ddel.py to replace the +resources module with their own resource package after important and before +calling main() or other entrypoints. +""" + +import os + +# The resource root is the root dsub directory. +# For example: +# my_dir/dsub/dsub/lib/resources.py --> my_dir/dsub +_RESOURCE_ROOT = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + + +def get_resource(resource_path, mode='rb'): + with open(os.path.join(_RESOURCE_ROOT, resource_path), mode=mode) as f: + return f.read() diff --git a/dsub/providers/local.py b/dsub/providers/local.py index 04eb425..a010f9a 100644 --- a/dsub/providers/local.py +++ b/dsub/providers/local.py @@ -97,12 +97,15 @@ _PROVIDER_NAME = 'local' -DATA_SUBDIR = 'data' +# Relative path to the runner.sh file within dsub +_RUNNER_SH_RESOURCE = 'dsub/providers/local/runner.sh' -SCRIPT_DIR = 'script' -WORKING_DIR = 'workingdir' +_DATA_SUBDIR = 'data' -DATA_MOUNT_POINT = '/mnt/data' +_SCRIPT_DIR = 'script' +_WORKING_DIR = 'workingdir' + +_DATA_MOUNT_POINT = '/mnt/data' # Set file provider whitelist. _SUPPORTED_FILE_PROVIDERS = frozenset([param_util.P_GCS, param_util.P_LOCAL]) @@ -152,8 +155,15 @@ def label_char_transform(char): class LocalJobProvider(base.JobProvider): """Docker jobs running locally (i.e. on the caller's computer).""" - def __init__(self): + def __init__(self, resources): + """Run jobs on your local machine. + + Args: + resources: module providing access to files packaged with dsub + (See dsub/libs/resources.py) + """ self._operations = [] + self._resources = resources def prepare_job_metadata(self, script, job_name, user_id): job_name_value = job_name or os.path.basename(script) @@ -213,19 +223,20 @@ def submit_job(self, job_resources, job_metadata, job_data, all_task_data): 'task-id': launched_tasks } + def _write_source_file(self, dest, body): + with open(dest, 'wt') as f: + f.write(body) + os.chmod(dest, 0500) + def _run_docker_via_script(self, task_dir, env, job_resources, task_metadata, job_data, task_data): script_header = textwrap.dedent("""\ - #!/bin/bash - - # dsub-generated script to start the local Docker container - # and keep a running status. - - set -o nounset + # dsub-generated script containing data for task execution readonly VOLUMES=({volumes}) readonly NAME='{name}' readonly IMAGE='{image}' + # Absolute path to the user's script file inside Docker. readonly SCRIPT_FILE='{script}' # Mount point for the volume on Docker. @@ -238,8 +249,6 @@ def _run_docker_via_script(self, task_dir, env, job_resources, task_metadata, readonly ENV_FILE='{env_file}' # Date format used in the logging message prefix. readonly DATE_FORMAT='{date_format}' - # Absolute path to this script's directory. - readonly TASK_DIR="$(dirname $0)" # User to run as (by default) readonly MY_UID='{uid}' # Set environment variables for recursive input directories @@ -273,287 +282,24 @@ def _run_docker_via_script(self, task_dir, env, job_resources, task_metadata, delocalize_logs_function "${{cp_cmd}}" "${{prefix}}" }} """) - script_body = textwrap.dedent("""\ - # Delete local files - function cleanup() { - local rm_data_dir="${1:-true}" - - log_info "Copying the logs before cleanup" - delocalize_logs - - # Clean up files staged from outside Docker - if [[ "${rm_data_dir}" == "true" ]]; then - echo "cleaning up ${DATA_DIR}" - - # Clean up files written from inside Docker - 2>&1 docker run \\ - --name "${NAME}-cleanup" \\ - --workdir "${DATA_MOUNT_POINT}/${WORKING_DIR}" \\ - "${VOLUMES[@]}" \\ - --env-file "${ENV_FILE}" \\ - "${IMAGE}" \\ - rm -rf "${DATA_MOUNT_POINT}/*" | tee -a "${TASK_DIR}/log.txt" - - rm -rf "${DATA_DIR}" || echo "sorry, unable to delete ${DATA_DIR}." - fi - } - readonly -f cleanup - - function delocalize_logs_function() { - local cp_cmd="${1}" - local prefix="${2}" - - if [[ -f "${TASK_DIR}/stdout.txt" ]]; then - ${cp_cmd} "${TASK_DIR}/stdout.txt" "${prefix}-stdout.log" - fi - if [[ -f "${TASK_DIR}/stderr.txt" ]]; then - ${cp_cmd} "${TASK_DIR}/stderr.txt" "${prefix}-stderr.log" - fi - if [[ -f "${TASK_DIR}/log.txt" ]]; then - ${cp_cmd} "${TASK_DIR}/log.txt" "${prefix}.log" - fi - } - readonly -f delocalize_logs_function - - function get_datestamp() { - date "${DATE_FORMAT}" - } - readonly -f get_datestamp - - function write_status() { - local status="${1}" - echo "${status}" > "${TASK_DIR}/status.txt" - case "${status}" in - SUCCESS|FAILURE|CANCELED) - # Record the finish time (with microseconds) - # Prepend "10#" so numbers like 0999... are not treated as octal - local nanos=$(echo "10#"$(date "+%N")) - echo $(date "+%Y-%m-%d %H:%M:%S").$((nanos/1000)) \ - > "${TASK_DIR}/end-time.txt" - ;; - RUNNING) - ;; - *) - echo 2>&1 "Unexpected status: ${status}" - exit 1 - ;; - esac - } - readonly -f write_status - - function log_info() { - echo "$(get_datestamp) I: $@" | tee -a "${TASK_DIR}/log.txt" - } - readonly -f log_info - - function log_error() { - echo "$(get_datestamp) E: $@" | tee -a "${TASK_DIR}/log.txt" - } - readonly -f log_error - - # Correctly log failures and nounset exits - function error() { - local parent_lineno="$1" - local code="$2" - local message="${3:-Error}" - - # Disable further traps - trap EXIT - trap ERR - - if [[ $code != "0" ]]; then - write_status "FAILURE" - log_error "${message} on or near line ${parent_lineno}; exiting with status ${code}" - fi - cleanup "false" - exit "${code}" - } - readonly -f error - - function fetch_image() { - local image="$1" - - for ((attempt=0; attempt < 3; attempt++)); do - log_info "Using gcloud to fetch ${image}." - if gcloud docker -- pull "${image}"; then - return - fi - log_info "Sleeping 30s before the next attempt." - sleep 30s - done - - log_error "FAILED to fetch ${image}" - exit 1 - } - readonly -f fetch_image - - function fetch_image_if_necessary() { - local image="$1" - - # Remove everything from the first / on - local prefix="${image%%/*}" - - # Check that the prefix is gcr.io or .gcr.io - if [[ "${prefix}" == "gcr.io" ]] || - [[ "${prefix}" == *.gcr.io ]]; then - fetch_image "${image}" - fi - } - readonly -f fetch_image_if_necessary - - function get_docker_user() { - # Get the userid and groupid the Docker image is set to run as. - docker run \\ - --name "${NAME}-get-docker-userid" \\ - "${IMAGE}" \\ - bash -c 'echo "$(id -u):$(id -g)"' 2>> "${TASK_DIR}/stderr.txt" - } - readonly -f get_docker_user - - function docker_recursive_chown() { - # Calls, in Docker: chown -R $1 $2 - local usergroup="$1" - local docker_directory="$2" - # Not specifying a name because Docker refuses to run if two containers - # have the same name, and it keeps them around for a little bit - # after they return. - docker run \\ - --user 0 \\ - "${VOLUMES[@]}" \\ - "${IMAGE}" \\ - chown -R "${usergroup}" "${docker_directory}" \\ - >> "${TASK_DIR}/stdout.txt" 2>> "${TASK_DIR}/stderr.txt" - } - readonly -f docker_recursive_chown - - function exit_if_canceled() { - if [[ -f die ]]; then - log_info "Job is canceled, stopping Docker container ${NAME}." - docker stop "${NAME}" - write_status "CANCELED" - log_info "Delocalize logs and cleanup" - cleanup "false" - trap EXIT - log_info "Canceled, exiting." - exit 1 - fi - } - readonly -f exit_if_canceled - - - # This will trigger whenever a command returns an error code - # (exactly like set -e) - trap 'error ${LINENO} $? Error' ERR - - # This will trigger on all other exits. We disable it before normal - # exit so we know if it fires it means there's a problem. - trap 'error ${LINENO} $? "Exit (undefined variable or kill?)"' EXIT - - # Make sure that ERR traps are inherited by shell functions - set -o errtrace - - # Beginning main execution - - # Copy inputs - cd "${TASK_DIR}" - write_status "RUNNING" - log_info "Localizing inputs." - localize_data - - # Handle gcr.io images - fetch_image_if_necessary "${IMAGE}" - - log_info "Checking image userid." - DOCKER_USERGROUP="$(get_docker_user)" - if [[ "${DOCKER_USERGROUP}" != "0:0" ]]; then - log_info "Ensuring docker user (${DOCKER_USERGROUP} can access ${DATA_MOUNT_POINT}." - docker_recursive_chown "${DOCKER_USERGROUP}" "${DATA_MOUNT_POINT}" - fi - - # Begin execution of user script - FAILURE_MESSAGE='' - # Disable ERR trap, we want to copy the logs even if Docker fails. - trap ERR - log_info "Running Docker image." - docker run \\ - --detach \\ - --name "${NAME}" \\ - --workdir "${DATA_MOUNT_POINT}/${WORKING_DIR}" \\ - "${VOLUMES[@]}" \\ - --env-file "${ENV_FILE}" \\ - "${IMAGE}" \\ - "${SCRIPT_FILE}" - - # Start a log writer in the background - docker logs --follow "${NAME}" \ - >> "${TASK_DIR}/stdout.txt" 2>> "${TASK_DIR}/stderr.txt" & - - # Wait for completion - DOCKER_EXITCODE=$(docker wait "${NAME}") - log_info "Docker exit code ${DOCKER_EXITCODE}." - if [[ "${DOCKER_EXITCODE}" != 0 ]]; then - FAILURE_MESSAGE="Docker exit code ${DOCKER_EXITCODE} (check stderr)." - fi - - # If we were canceled during execution, be sure to process as such - exit_if_canceled - - # Re-enable trap - trap 'error ${LINENO} $? Error' ERR - - # Prepare data for delocalization. - HOST_USERGROUP="$(id -u):$(id -g)" - log_info "Ensure host user (${HOST_USERGROUP}) owns Docker-written data" - # Disable ERR trap, we want to copy the logs even if Docker fails. - trap ERR - docker_recursive_chown "${HOST_USERGROUP}" "${DATA_MOUNT_POINT}" - DOCKER_EXITCODE_2=$? - # Re-enable trap - trap 'error ${LINENO} $? Error' ERR - if [[ "${DOCKER_EXITCODE_2}" != 0 ]]; then - # Ensure we report failure at the end of the execution - FAILURE_MESSAGE="chown failed, Docker returned ${DOCKER_EXITCODE_2}." - log_error "${FAILURE_MESSAGE}" - fi - - log_info "Copying outputs." - delocalize_data - - # Delocalize logs & cleanup - # - # Disable further traps (if cleanup fails we don't want to call it - # recursively) - trap EXIT - log_info "Delocalize logs and cleanup." - cleanup "true" - if [[ -z "${FAILURE_MESSAGE}" ]]; then - write_status "SUCCESS" - log_info "Done" - else - write_status "FAILURE" - # we want this to be the last line in the log, for dstat to work right. - log_error "${FAILURE_MESSAGE}" - exit 1 - fi - """) # Build the local runner script - volumes = ('-v ' + task_dir + '/' + DATA_SUBDIR + '/' - ':' + DATA_MOUNT_POINT) + volumes = ('-v ' + task_dir + '/' + _DATA_SUBDIR + '/' + ':' + _DATA_MOUNT_POINT) - script = script_header.format( + script_data = script_header.format( volumes=volumes, name=_format_task_name( task_metadata.get('job-id'), task_metadata.get('task-id')), image=job_resources.image, - script=DATA_MOUNT_POINT + '/' + SCRIPT_DIR + '/' + + script=_DATA_MOUNT_POINT + '/' + _SCRIPT_DIR + '/' + task_metadata['script'].name, env_file=task_dir + '/' + 'docker.env', uid=os.getuid(), - data_mount_point=DATA_MOUNT_POINT, - data_dir=task_dir + '/' + DATA_SUBDIR, + data_mount_point=_DATA_MOUNT_POINT, + data_dir=task_dir + '/' + _DATA_SUBDIR, date_format='+%Y-%m-%d %H:%M:%S', - workingdir=WORKING_DIR, + workingdir=_WORKING_DIR, export_input_dirs=providers_util.build_recursive_localize_env( task_dir, job_data['inputs'] + task_data['inputs']), recursive_localize_command=self._localize_inputs_recursive_command( @@ -568,19 +314,19 @@ def _run_docker_via_script(self, task_dir, env, job_resources, task_metadata, task_dir, job_data['outputs'] + task_data['outputs']), delocalize_logs_command=self._delocalize_logging_command( job_resources.logging.file_provider, task_metadata), - ) + script_body + ) - # Write the local runner script - script_fname = task_dir + '/runner.sh' - f = open(script_fname, 'wt') - f.write(script) - f.close() - os.chmod(script_fname, 0500) + # Write the runner script and data file to the task_dir + script_path = os.path.join(task_dir, 'runner.sh') + script_data_path = os.path.join(task_dir, 'data.sh') + self._write_source_file(script_path, + self._resources.get_resource(_RUNNER_SH_RESOURCE)) + self._write_source_file(script_data_path, script_data) # Write the environment variables env_vars = env.items() + job_data['envs'] + task_data['envs'] + [ - param_util.EnvParam('DATA_ROOT', DATA_MOUNT_POINT), - param_util.EnvParam('TMPDIR', DATA_MOUNT_POINT + '/tmp') + param_util.EnvParam('DATA_ROOT', _DATA_MOUNT_POINT), + param_util.EnvParam('TMPDIR', _DATA_MOUNT_POINT + '/tmp') ] env_fname = task_dir + '/docker.env' with open(env_fname, 'wt') as f: @@ -592,7 +338,7 @@ def _run_docker_via_script(self, task_dir, env, job_resources, task_metadata, # JOBID=$(dsub ...) doesn't block until docker returns. runner_log = open(task_dir + '/runner-log.txt', 'wt') runner = subprocess.Popen( - [script_fname], stderr=runner_log, stdout=runner_log) + [script_path, script_data_path], stderr=runner_log, stdout=runner_log) pid = runner.pid f = open(task_dir + '/task.pid', 'wt') f.write(str(pid) + '\n') @@ -637,7 +383,6 @@ def delete_jobs(self, 'Unable to cancel %s: docker error %s:\n%s' % (docker_name, cpe.returncode, cpe.output) ] - continue # The script should have quit in response. If it hasn't, kill it. pid = task.get_field('pid', 0) @@ -979,14 +724,14 @@ def _make_environment(self, inputs, outputs): """Return a dictionary of environment variables for the VM.""" ret = {} for i in inputs: - ret[i.name] = DATA_MOUNT_POINT + '/' + i.docker_path + ret[i.name] = _DATA_MOUNT_POINT + '/' + i.docker_path for o in outputs: - ret[o.name] = DATA_MOUNT_POINT + '/' + o.docker_path + ret[o.name] = _DATA_MOUNT_POINT + '/' + o.docker_path return ret def _localize_inputs_recursive_command(self, task_dir, inputs): """Returns a command that will stage recursive inputs.""" - data_dir = os.path.join(task_dir, DATA_SUBDIR) + data_dir = os.path.join(task_dir, _DATA_SUBDIR) provider_commands = [ providers_util.build_recursive_localize_command(data_dir, inputs, file_provider) @@ -1023,7 +768,7 @@ def _localize_inputs_command(self, task_dir, inputs): continue source_file_path = i.uri - local_file_path = task_dir + '/' + DATA_SUBDIR + '/' + i.docker_path + local_file_path = task_dir + '/' + _DATA_SUBDIR + '/' + i.docker_path dest_file_path = self._get_input_target_path(local_file_path) commands.append('mkdir -p "%s"' % os.path.dirname(local_file_path)) @@ -1042,10 +787,10 @@ def _localize_inputs_command(self, task_dir, inputs): return '\n'.join(commands) def _mkdir_outputs(self, task_dir, outputs): - os.makedirs(task_dir + '/' + DATA_SUBDIR + '/' + WORKING_DIR) - os.makedirs(task_dir + '/' + DATA_SUBDIR + '/tmp') + os.makedirs(task_dir + '/' + _DATA_SUBDIR + '/' + _WORKING_DIR) + os.makedirs(task_dir + '/' + _DATA_SUBDIR + '/tmp') for o in outputs: - local_file_path = task_dir + '/' + DATA_SUBDIR + '/' + o.docker_path + local_file_path = task_dir + '/' + _DATA_SUBDIR + '/' + o.docker_path # makedirs errors out if the folder already exists, so check. if not os.path.isdir(os.path.dirname(local_file_path)): os.makedirs(os.path.dirname(local_file_path)) @@ -1059,10 +804,10 @@ def _delocalize_outputs_recursive_command(self, task_dir, outputs): # Generate local and GCS delocalize commands. cmd_lines.append( providers_util.build_recursive_delocalize_command( - os.path.join(task_dir, DATA_SUBDIR), outputs, param_util.P_GCS)) + os.path.join(task_dir, _DATA_SUBDIR), outputs, param_util.P_GCS)) cmd_lines.append( providers_util.build_recursive_delocalize_command( - os.path.join(task_dir, DATA_SUBDIR), outputs, param_util.P_LOCAL)) + os.path.join(task_dir, _DATA_SUBDIR), outputs, param_util.P_LOCAL)) return '\n'.join(cmd_lines) def _delocalize_outputs_commands(self, task_dir, outputs): @@ -1075,7 +820,7 @@ def _delocalize_outputs_commands(self, task_dir, outputs): # The destination path is o.uri.path, which is the target directory # (rather than o.uri, which includes the filename or wildcard). dest_path = o.uri.path - local_path = task_dir + '/' + DATA_SUBDIR + '/' + o.docker_path + local_path = task_dir + '/' + _DATA_SUBDIR + '/' + o.docker_path if o.file_provider == param_util.P_LOCAL: commands.append('mkdir -p "%s"' % dest_path) @@ -1087,7 +832,8 @@ def _delocalize_outputs_commands(self, task_dir, outputs): return '\n'.join(commands) def _stage_script(self, task_dir, script_name, script_text): - path = (task_dir + '/' + DATA_SUBDIR + '/' + SCRIPT_DIR + '/' + script_name) + path = ( + task_dir + '/' + _DATA_SUBDIR + '/' + _SCRIPT_DIR + '/' + script_name) os.makedirs(os.path.dirname(path)) f = open(path, 'w') f.write(script_text) diff --git a/dsub/providers/local/runner.sh b/dsub/providers/local/runner.sh new file mode 100644 index 0000000..d441f19 --- /dev/null +++ b/dsub/providers/local/runner.sh @@ -0,0 +1,287 @@ +#!/bin/bash + +# Copyright 2017 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# dsub runner script to orchestrate local docker execution + +set -o nounset +set -o errexit + +readonly RUNNER_DATA=${1} +source "${RUNNER_DATA}" + +# Absolute path to this script's directory. +readonly TASK_DIR="$(dirname $0)" + +function get_datestamp() { + date "${DATE_FORMAT}" +} +readonly -f get_datestamp + +function log_info() { + echo "$(get_datestamp) I: $@" | tee -a "${TASK_DIR}/log.txt" +} +readonly -f log_info + +function log_error() { + echo "$(get_datestamp) E: $@" | tee -a "${TASK_DIR}/log.txt" +} +readonly -f log_error + +# Delete local files +function cleanup() { + local rm_data_dir="${1:-true}" + + log_info "Copying the logs before cleanup" + delocalize_logs + + # Clean up files staged from outside Docker + if [[ "${rm_data_dir}" == "true" ]]; then + echo "cleaning up ${DATA_DIR}" + + # Clean up files written from inside Docker + 2>&1 docker run \ + --name "${NAME}-cleanup" \ + --workdir "${DATA_MOUNT_POINT}/${WORKING_DIR}" \ + "${VOLUMES[@]}" \ + --env-file "${ENV_FILE}" \ + "${IMAGE}" \ + rm -rf "${DATA_MOUNT_POINT}/*" | tee -a "${TASK_DIR}/log.txt" + + rm -rf "${DATA_DIR}" || echo "sorry, unable to delete ${DATA_DIR}." + fi +} +readonly -f cleanup + +function delocalize_logs_function() { + local cp_cmd="${1}" + local prefix="${2}" + + if [[ -f "${TASK_DIR}/stdout.txt" ]]; then + ${cp_cmd} "${TASK_DIR}/stdout.txt" "${prefix}-stdout.log" + fi + if [[ -f "${TASK_DIR}/stderr.txt" ]]; then + ${cp_cmd} "${TASK_DIR}/stderr.txt" "${prefix}-stderr.log" + fi + if [[ -f "${TASK_DIR}/log.txt" ]]; then + ${cp_cmd} "${TASK_DIR}/log.txt" "${prefix}.log" + fi +} +readonly -f delocalize_logs_function + +function write_status() { + local status="${1}" + echo "${status}" > "${TASK_DIR}/status.txt" + case "${status}" in + SUCCESS|FAILURE|CANCELED) + # Record the finish time (with microseconds) + # Prepend "10#" so numbers like 0999... are not treated as octal + local nanos=$(echo "10#"$(date "+%N")) + echo $(date "+%Y-%m-%d %H:%M:%S").$((nanos/1000)) \ + > "${TASK_DIR}/end-time.txt" + ;; + RUNNING) + ;; + *) + echo 2>&1 "Unexpected status: ${status}" + exit 1 + ;; + esac +} +readonly -f write_status + +# Correctly log failures and nounset exits +function error() { + local parent_lineno="$1" + local code="$2" + local message="${3:-Error}" + + # Disable further traps + trap EXIT + trap ERR + + if [[ $code != "0" ]]; then + write_status "FAILURE" + log_error "${message} on or near line ${parent_lineno}; exiting with status ${code}" + fi + cleanup "false" + exit "${code}" +} +readonly -f error + +function fetch_image() { + local image="$1" + + for ((attempt=0; attempt < 3; attempt++)); do + log_info "Using gcloud to fetch ${image}." + if gcloud docker -- pull "${image}"; then + return + fi + log_info "Sleeping 30s before the next attempt." + sleep 30s + done + + log_error "FAILED to fetch ${image}" + exit 1 +} +readonly -f fetch_image + +function fetch_image_if_necessary() { + local image="$1" + + # Remove everything from the first / on + local prefix="${image%%/*}" + + # Check that the prefix is gcr.io or .gcr.io + if [[ "${prefix}" == "gcr.io" ]] || + [[ "${prefix}" == *.gcr.io ]]; then + fetch_image "${image}" + fi +} +readonly -f fetch_image_if_necessary + +function get_docker_user() { + # Get the userid and groupid the Docker image is set to run as. + docker run \ + --name "${NAME}-get-docker-userid" \ + "${IMAGE}" \ + bash -c 'echo "$(id -u):$(id -g)"' 2>> "${TASK_DIR}/stderr.txt" +} +readonly -f get_docker_user + +function docker_recursive_chown() { + # Calls, in Docker: chown -R $1 $2 + local usergroup="$1" + local docker_directory="$2" + # Not specifying a name because Docker refuses to run if two containers + # have the same name, and it keeps them around for a little bit + # after they return. + docker run \ + --user 0 \ + "${VOLUMES[@]}" \ + "${IMAGE}" \ + chown -R "${usergroup}" "${docker_directory}" \ + >> "${TASK_DIR}/stdout.txt" 2>> "${TASK_DIR}/stderr.txt" +} +readonly -f docker_recursive_chown + +function exit_if_canceled() { + if [[ -f die ]]; then + log_info "Job is canceled, stopping Docker container ${NAME}." + docker stop "${NAME}" + write_status "CANCELED" + log_info "Delocalize logs and cleanup" + cleanup "false" + trap EXIT + log_info "Canceled, exiting." + exit 1 + fi +} +readonly -f exit_if_canceled + +# Begin main execution + +# Trap errors and handle them instead of using errexit +set +o errexit +trap 'error ${LINENO} $? Error' ERR + +# This will trigger on all other exits. We disable it before normal +# exit so we know if it fires it means there's a problem. +trap 'error ${LINENO} $? "Exit (undefined variable or kill?)"' EXIT + +# Make sure that ERR traps are inherited by shell functions +set -o errtrace + +# Copy inputs +cd "${TASK_DIR}" +write_status "RUNNING" +log_info "Localizing inputs." +localize_data + +# Handle gcr.io images +fetch_image_if_necessary "${IMAGE}" + +log_info "Checking image userid." +DOCKER_USERGROUP="$(get_docker_user)" +if [[ "${DOCKER_USERGROUP}" != "0:0" ]]; then + log_info "Ensuring docker user (${DOCKER_USERGROUP} can access ${DATA_MOUNT_POINT}." + docker_recursive_chown "${DOCKER_USERGROUP}" "${DATA_MOUNT_POINT}" +fi + +# Begin execution of user script +FAILURE_MESSAGE='' +# Disable ERR trap, we want to copy the logs even if Docker fails. +trap ERR +log_info "Running Docker image." +docker run \ + --detach \ + --name "${NAME}" \ + --workdir "${DATA_MOUNT_POINT}/${WORKING_DIR}" \ + "${VOLUMES[@]}" \ + --env-file "${ENV_FILE}" \ + "${IMAGE}" \ + "${SCRIPT_FILE}" + +# Start a log writer in the background +docker logs --follow "${NAME}" \ + >> "${TASK_DIR}/stdout.txt" 2>> "${TASK_DIR}/stderr.txt" & + +# Wait for completion +DOCKER_EXITCODE=$(docker wait "${NAME}") +log_info "Docker exit code ${DOCKER_EXITCODE}." +if [[ "${DOCKER_EXITCODE}" != 0 ]]; then + FAILURE_MESSAGE="Docker exit code ${DOCKER_EXITCODE} (check stderr)." +fi + +# If we were canceled during execution, be sure to process as such +exit_if_canceled + +# Re-enable trap +trap 'error ${LINENO} $? Error' ERR + +# Prepare data for delocalization. +HOST_USERGROUP="$(id -u):$(id -g)" +log_info "Ensure host user (${HOST_USERGROUP}) owns Docker-written data" +# Disable ERR trap, we want to copy the logs even if Docker fails. +trap ERR +docker_recursive_chown "${HOST_USERGROUP}" "${DATA_MOUNT_POINT}" +DOCKER_EXITCODE_2=$? +# Re-enable trap +trap 'error ${LINENO} $? Error' ERR +if [[ "${DOCKER_EXITCODE_2}" != 0 ]]; then + # Ensure we report failure at the end of the execution + FAILURE_MESSAGE="chown failed, Docker returned ${DOCKER_EXITCODE_2}." + log_error "${FAILURE_MESSAGE}" +fi + +log_info "Copying outputs." +delocalize_data + +# Delocalize logs & cleanup +# +# Disable further traps (if cleanup fails we don't want to call it +# recursively) +trap EXIT +log_info "Delocalize logs and cleanup." +cleanup "true" +if [[ -z "${FAILURE_MESSAGE}" ]]; then + write_status "SUCCESS" + log_info "Done" +else + write_status "FAILURE" + # we want this to be the last line in the log, for dstat to work right. + log_error "${FAILURE_MESSAGE}" + exit 1 +fi diff --git a/dsub/providers/provider_base.py b/dsub/providers/provider_base.py index ff7fd3c..4a4bd5f 100644 --- a/dsub/providers/provider_base.py +++ b/dsub/providers/provider_base.py @@ -13,6 +13,7 @@ # limitations under the License. """Interface for job providers.""" +import argparse from . import google from . import local @@ -26,7 +27,7 @@ } -def get_provider(args): +def get_provider(args, resources): """Returns a provider for job submission requests.""" provider = getattr(args, 'provider', 'google') @@ -36,7 +37,7 @@ def get_provider(args): getattr(args, 'verbose', False), getattr(args, 'dry_run', False), args.project) elif provider == 'local': - return local.LocalJobProvider() + return local.LocalJobProvider(resources) elif provider == 'test-fails': return test_fails.FailsJobProvider() else: @@ -48,7 +49,22 @@ def get_provider_name(provider): return PROVIDER_NAME_MAP[provider.__class__] -def add_provider_argument(parser): +class DsubHelpFormatter(argparse.ArgumentDefaultsHelpFormatter, + argparse.RawDescriptionHelpFormatter): + """Display defaults in help and display the epilog in its raw format. + + As described in https://bugs.python.org/issue13023, there is not a built-in + class to provide both display of defaults as well as displaying the epilog + just as you want it to. The recommended approach is to create a simple + subclass of both Formatters. + """ + pass + + +def create_parser(prog): + """Create an argument parser, adding in the list of providers.""" + parser = argparse.ArgumentParser(prog=prog, formatter_class=DsubHelpFormatter) + parser.add_argument( '--provider', default='google', @@ -58,6 +74,28 @@ def add_provider_argument(parser): are for testing purposes only.""", metavar='PROVIDER') + return parser + + +def parse_args(parser, provider_required_args, argv): + """Add provider required arguments epilog message, parse, and validate.""" + + # Add the provider required arguments epilog message + epilog = 'Provider-required arguments:\n' + for provider in provider_required_args: + epilog += ' %s: %s\n' % (provider, provider_required_args[provider]) + parser.epilog = epilog + + # Parse arguments + args = parser.parse_args(argv) + + # For the selected provider, check the required arguments + for arg in provider_required_args[args.provider]: + if not args.__getattribute__(arg): + parser.error('argument --%s is required' % arg) + + return args + def get_dstat_provider_args(provider, project): """A string with the arguments to point dstat to the same provider+project.""" diff --git a/setup.py b/setup.py index 1eb8a9d..cfb8778 100644 --- a/setup.py +++ b/setup.py @@ -85,9 +85,7 @@ def get_dsub_version(): # Packages to distribute. packages=find_packages(), - package_data={ - 'dsub': ['VERSION'], - }, + include_package_data=True, # List run-time dependencies here. These will be installed by pip when # your project is installed. diff --git a/test/integration/e2e_ddel.sh b/test/integration/e2e_ddel.sh index 745e1ea..5d55764 100755 --- a/test/integration/e2e_ddel.sh +++ b/test/integration/e2e_ddel.sh @@ -65,7 +65,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo echo "Sleeping 10 seconds before exercising 'ddel --age 5s'" sleep 10s - run_ddel --jobs "${JOB_ID}" --age 5s + run_ddel_age "5s" --jobs "${JOB_ID}" # Make sure dstat still shows the job as not canceled. echo diff --git a/test/integration/e2e_dstat.sh b/test/integration/e2e_dstat.sh index 6562d0f..528d655 100755 --- a/test/integration/e2e_dstat.sh +++ b/test/integration/e2e_dstat.sh @@ -50,7 +50,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Checking dstat (by status)..." - if ! DSTAT_OUTPUT="$(test_dstat --status 'RUNNING' --jobs "${JOBID}" --full)"; then + if ! DSTAT_OUTPUT="$(run_dstat --status 'RUNNING' --jobs "${JOBID}" --full)"; then echo "dstat exited with a non-zero exit code!" echo "Output:" echo "${DSTAT_OUTPUT}" @@ -65,7 +65,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Checking dstat (by job-name)..." - if ! DSTAT_OUTPUT="$(test_dstat --status '*' --full --names "${JOB_NAME}")"; then + if ! DSTAT_OUTPUT="$(run_dstat --status '*' --full --names "${JOB_NAME}")"; then echo "dstat exited with a non-zero exit code!" echo "Output:" echo "${DSTAT_OUTPUT}" @@ -80,7 +80,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Checking dstat (by job-id: default)..." - if ! DSTAT_OUTPUT="$(test_dstat --status '*' --jobs "${JOBID}")"; then + if ! DSTAT_OUTPUT="$(run_dstat --status '*' --jobs "${JOBID}")"; then echo "dstat exited with a non-zero exit code!" echo "Output:" echo "${DSTAT_OUTPUT}" @@ -95,7 +95,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Checking dstat (by job-id: full)..." - if ! DSTAT_OUTPUT=$(test_dstat --status '*' --full --jobs "${JOBID}"); then + if ! DSTAT_OUTPUT=$(run_dstat --status '*' --full --jobs "${JOBID}"); then echo "dstat exited with a non-zero exit code!" echo "Output:" echo "${DSTAT_OUTPUT}" @@ -111,7 +111,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Waiting 5 seconds and checking 'dstat --age 5s'..." sleep 5s - DSTAT_OUTPUT="$(run_dstat --status '*' --jobs "${JOBID}" --age 5s --full)" + DSTAT_OUTPUT="$(run_dstat_age "5s" --status '*' --jobs "${JOBID}" --full)" if [[ "${DSTAT_OUTPUT}" != "[]" ]]; then echo "dstat output not empty as expected:" echo "${DSTAT_OUTPUT}" @@ -120,7 +120,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Verifying that the job didn't disappear completely." - DSTAT_OUTPUT="$(test_dstat --status '*' --jobs "${JOBID}" --full)" + DSTAT_OUTPUT="$(run_dstat --status '*' --jobs "${JOBID}" --full)" if [[ "$(dstat_output_job_name "${DSTAT_OUTPUT}")" != "${JOB_NAME}" ]]; then echo "Job ${JOB_NAME} not found in the dstat output!" echo "${DSTAT_OUTPUT}" diff --git a/test/integration/e2e_dstat_tasks.sh b/test/integration/e2e_dstat_tasks.sh index 207a25d..c96488d 100755 --- a/test/integration/e2e_dstat_tasks.sh +++ b/test/integration/e2e_dstat_tasks.sh @@ -161,7 +161,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then echo "Waiting 5 seconds and checking 'dstat --age 5s'..." sleep 5s - DSTAT_OUTPUT="$(run_dstat --status '*' --jobs "${JOBID}" --age 5s 2>&1)" + DSTAT_OUTPUT="$(run_dstat_age "5s" --status '*' --jobs "${JOBID}" 2>&1)" if [[ -n "${DSTAT_OUTPUT}" ]]; then echo "dstat output not empty as expected:" echo "${DSTAT_OUTPUT}" diff --git a/test/integration/e2e_logging_paths.sh b/test/integration/e2e_logging_paths.sh index eb2c29f..9349210 100755 --- a/test/integration/e2e_logging_paths.sh +++ b/test/integration/e2e_logging_paths.sh @@ -30,7 +30,6 @@ function dstat_get_logging() { run_dstat \ --jobs "${job_id}" \ --status "*" \ - --age 30m \ --full \ --format json) diff --git a/test/integration/e2e_logging_paths_tasks.sh b/test/integration/e2e_logging_paths_tasks.sh index a66dbfe..cdb068e 100755 --- a/test/integration/e2e_logging_paths_tasks.sh +++ b/test/integration/e2e_logging_paths_tasks.sh @@ -31,7 +31,6 @@ function dstat_get_logging() { run_dstat \ --jobs "${job_id}" \ --status "*" \ - --age 30m \ --full \ --format json) @@ -50,7 +49,7 @@ readonly -f ddel_task readonly LOGGING_BASE="$(dirname "${LOGGING}")" declare LOGGING_OVERRIDE -readonly JOB_NAME="log-test" +readonly JOB_NAME="log-tasks" readonly JOB_USER="${USER}" # Test a basic job with base logging path @@ -64,10 +63,10 @@ JOB_ID=$(run_dsub \ LOGGING_PATH=$(dstat_get_logging "${JOB_ID}" "1") -if [[ ! "${LOGGING_PATH}" == "${LOGGING_OVERRIDE}/log-test"*.1.log ]]; then +if [[ ! "${LOGGING_PATH}" == "${LOGGING_OVERRIDE}/${JOB_NAME}"*.1.log ]]; then echo "ERROR: Unexpected logging path." echo "Received: ${LOGGING_PATH}" - echo "Expected: ${LOGGING_OVERRIDE}/log-test*.1.log" + echo "Expected: ${LOGGING_OVERRIDE}/${JOB_NAME}*.1.log" exit 1 fi diff --git a/test/integration/test_setup.sh b/test/integration/test_setup.sh index f97704f..5e6d2b7 100644 --- a/test/integration/test_setup.sh +++ b/test/integration/test_setup.sh @@ -103,8 +103,20 @@ function dsub_test-fails() { # dstat +function run_dstat_age() { + # Call dstat, allowing the caller to set a "--age" + local age="${1}" + shift + + dstat_"${DSUB_PROVIDER}" --age "${age}" "${@}" +} + function run_dstat() { - dstat_"${DSUB_PROVIDER}" "${@}" + # Call dstat and automatically add "--age 30m". + # This speeds up tests and helps avoid dstat calls that return jobs + # from other test runs. + # If a test takes longer than 30 minutes, then we should fix the test. + run_dstat_age "30m" "${@}" } function dstat_google() { @@ -120,20 +132,22 @@ function dstat_local() { "${@}" } -function test_dstat() { - # Version of dstat that automatically adds "--age 5m". - # This speeds up tests and helps avoid dstat calls that return jobs - # from other test runs. - # If a test takes longer than 5 minutes, then we should fix the test. - dstat_"${DSUB_PROVIDER}" --age "5m" "${@}" -} - # ddel -function run_ddel() { - local provider=${DSUB_PROVIDER:-google} +function run_ddel_age() { + # Call ddel, allowing the caller to set a "--age" + local age="${1}" + shift - ddel_"${provider}" "${@}" + ddel_"${DSUB_PROVIDER}" --age "${age}" "${@}" +} + +function run_ddel() { + # Call ddel and automatically add "--age 30m". + # This speeds up tests and helps avoid ddel calls that return jobs + # from other test runs. + # If a test takes longer than 30 minutes, then we should fix the test. + run_ddel_age "30m" "${@}" } function ddel_google() { diff --git a/test/integration/test_util.sh b/test/integration/test_util.sh index 8670876..f343276 100644 --- a/test/integration/test_util.sh +++ b/test/integration/test_util.sh @@ -103,7 +103,6 @@ function util::get_job_status() { run_dstat \ --jobs "${job_id}" \ --status "*" \ - --age 30m \ --full \ --format json); then return 1