Skip to content

Commit

Permalink
Merge pull request #168 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.3.3 release
  • Loading branch information
wnojopra authored Sep 3, 2019
2 parents f7f0ed4 + 108fa65 commit d4925d9
Show file tree
Hide file tree
Showing 31 changed files with 503 additions and 156 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ and Azure Batch.
You can install `dsub` from [PyPI](pypi.python.org), or you can clone and
install from this github repository.

Note: `dsub` was written for Python 2.7 and production users of `dsub`
should continue using Python 2.7. As of `dsub` v0.2.0, we have enabled
experimental support of Python 3.5+.

### Pre-installation steps

1. This is optional, but whether installing from PyPI or from github,
Expand All @@ -34,7 +30,7 @@ you are encouraged to use a [Python virtualenv](https://virtualenv.pypa.io).
1. Create and activate a Python virtualenv.

# (You can do this in a directory of your choosing.)
virtualenv --python=python2.7 dsub_libs
virtualenv dsub_libs
source dsub_libs/bin/activate

### Install `dsub`
Expand Down
28 changes: 27 additions & 1 deletion docs/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,38 @@ A task is considered to be retryable if:
- the latest attempt was not canceled (status CANCELED)
- the task has not been retried the maximum number of times

Note that no attempt is made to interpret the reason for failure.
If a task fails due to a "permanent" error such as permissions or insufficient
disk space, the task will still be retried.

### --retries flag

To specify the maximum number of per-task retries, use the `--retries` flag.
For example `--retries 3` will retry each task 3 times before failing.

### ---preemptible flag

Using the `--preemptible` flag will make your jobs run on a preemptible VM.
The `--preemptible` flag can be used as a boolean flag where all attempts are
made using preemptible VMs, or with an integer such that a maximum specified
number of attempts are executed using preemptible VMs.

The following will make at most 4 total attempts (one initial and 3 retries)
all on preemptible VMs:

dsub --preemptible --retries 3 --wait ...

The following will make at most 4 total attempts (one initial and 3 retries)
with the first 3 attempts on a preemptible VM and the last on a full-priced VM:

dsub --preemptible 3 --retries 3 --wait ...

Note that no attempt is made to interpret the reason for task failure.
If a preemptible task fails due to a reason other than VM preemption, the
current attempt will still be treated as a preemptible attempt used.
Transient failure rates should be much lower in practice than preemption rates
and more complex retry logic is not clearly more desirable.

## Tracking task attempts

When viewing tasks with `dstat --full` the attempt number will be available
Expand Down Expand Up @@ -75,5 +102,4 @@ features:
- Enable retries with changes to the runtime environment:
- more disk
- more memory
- switch from preemptible to non-preemptible VMs
- Add a maximum total number of retries for all tasks for a job
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.3.2'
DSUB_VERSION = '0.3.3'
60 changes: 46 additions & 14 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,13 @@ def _parse_arguments(prog, argv):
help='Size (in GB) of the boot disk')
google_common.add_argument(
'--preemptible',
default=False,
action='store_true',
help='Use a preemptible VM for the job')
const=param_util.preemptile_param_type(True),
default=param_util.preemptile_param_type(False),
nargs='?', # Be careful if we ever add positional arguments
type=param_util.preemptile_param_type,
help="""If --preemptible is given without a number, enables preemptible
VMs for all attempts for all tasks. If a number value N is used,
enables preemptible VMs for up to N attempts for each task.""")
google_common.add_argument(
'--zones', nargs='+', help='List of Google Compute Engine zones.')
google_common.add_argument(
Expand Down Expand Up @@ -542,7 +546,6 @@ def _get_job_resources(args):
disk_size=args.disk_size,
disk_type=args.disk_type,
boot_disk_size=args.boot_disk_size,
preemptible=args.preemptible,
image=args.image,
regions=args.regions,
zones=args.zones,
Expand All @@ -561,7 +564,9 @@ def _get_job_resources(args):
timeout=timeout,
log_interval=log_interval,
ssh=args.ssh,
enable_stackdriver_monitoring=args.enable_stackdriver_monitoring)
enable_stackdriver_monitoring=args.enable_stackdriver_monitoring,
max_retries=args.retries,
max_preemptible_attempts=args.preemptible)


def _get_job_metadata(provider, user_id, job_name, script, task_ids,
Expand Down Expand Up @@ -624,22 +629,44 @@ def _resolve_task_logging(job_metadata, job_resources, task_descriptors):
logging_path=logging_path)


def _resolve_preemptible(job_resources, task_descriptors):
"""Resolve whether or not to use a preemptible machine.
Args:
job_resources: Resources specified such as max_preemptible_attempts.
task_descriptors: Task metadata, parameters, and resources.
"""
# Determine if the next attempt should be preemptible
for task_descriptor in task_descriptors:
# The original attempt is attempt number 1.
# The first retry is attempt number 2.
attempt_number = task_descriptor.task_metadata.get('task-attempt', 1)
max_preemptible_attempts = job_resources.max_preemptible_attempts
if max_preemptible_attempts:
use_preemptible = max_preemptible_attempts.should_use_preemptible(
attempt_number)
else:
use_preemptible = job_model.DEFAULT_PREEMPTIBLE
task_descriptor.task_resources = task_descriptor.task_resources._replace(
preemptible=use_preemptible)


def _resolve_task_resources(job_metadata, job_resources, task_descriptors):
"""Resolve task properties (such as the logging path) from job properties.
Args:
job_metadata: Job metadata, such as job-id, job-name, and user-id.
job_resources: Resources specified such as ram, cpu, and logging path.
task_descriptors: Task metadata, parameters, and resources.
This function exists to be called at the point that all job properties have
been validated and resolved. It is also called prior to re-trying a task.
The only property to be resolved right now is the logging path,
which may have substitution parameters such as
job-id, task-id, task-attempt, user-id, and job-name.
task_descriptors: Task metadata, parameters, and resources. This function
exists to be called at the point that all job properties have been
validated and resolved. It is also called prior to re-trying a task.
Right now we resolve two properties: 1) the logging path, which may have
substitution parameters such as job-id, task-id, task-attempt, user-id, and
job-name. and 2) preemptible, which depends on how many preemptible attempts
we have done.
"""
_resolve_task_logging(job_metadata, job_resources, task_descriptors)
_resolve_preemptible(job_resources, task_descriptors)


def _wait_after(provider, job_ids, poll_interval, stop_on_failure):
Expand Down Expand Up @@ -806,7 +833,7 @@ def _retry_task(provider, job_descriptor, task_id, task_attempt):
}, td_orig.task_params, td_orig.task_resources)
]

# Update the logging path.
# Update the logging path and preemptible field.
_resolve_task_resources(job_descriptor.job_metadata,
job_descriptor.job_resources, new_task_descriptors)

Expand Down Expand Up @@ -1035,6 +1062,7 @@ def run_main(args):
user_project=args.user_project,
wait=args.wait,
retries=args.retries,
max_preemptible_attempts=args.preemptible,
poll_interval=args.poll_interval,
after=args.after,
skip=args.skip,
Expand All @@ -1055,6 +1083,7 @@ def run(provider,
user_project=None,
wait=False,
retries=0,
max_preemptible_attempts=None,
poll_interval=10,
after=None,
skip=False,
Expand Down Expand Up @@ -1089,6 +1118,9 @@ def run(provider,
if retries and not wait:
raise ValueError('Requesting retries requires requesting wait')

if max_preemptible_attempts:
max_preemptible_attempts.validate(retries)

# The contract with providers and downstream code is that the job_params
# and task_params contain 'labels', 'envs', 'inputs', and 'outputs'.
job_model.ensure_job_params_are_complete(job_params)
Expand Down
58 changes: 42 additions & 16 deletions dsub/lib/job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,34 @@ def __new__(cls, name, value, docker_path, uri):

class Resources(
collections.namedtuple('Resources', [
'min_cores', 'min_ram', 'machine_type', 'disk_size', 'disk_type',
'boot_disk_size', 'preemptible', 'image', 'logging', 'logging_path',
'regions', 'zones', 'service_account', 'scopes', 'keep_alive',
'cpu_platform', 'network', 'subnetwork', 'use_private_address',
'accelerator_type', 'accelerator_count', 'nvidia_driver_version',
'timeout', 'log_interval', 'ssh', 'enable_stackdriver_monitoring'
'min_cores',
'min_ram',
'machine_type',
'disk_size',
'disk_type',
'boot_disk_size',
'preemptible',
'image',
'logging',
'logging_path',
'regions',
'zones',
'service_account',
'scopes',
'keep_alive',
'cpu_platform',
'network',
'subnetwork',
'use_private_address',
'accelerator_type',
'accelerator_count',
'nvidia_driver_version',
'timeout',
'log_interval',
'ssh',
'enable_stackdriver_monitoring',
'max_retries',
'max_preemptible_attempts',
])):
"""Job resource parameters related to CPUs, memory, and disk.
Expand Down Expand Up @@ -445,6 +467,10 @@ class Resources(
ssh (bool): Start an SSH container in the background.
enable_stackdriver_monitoring (bool): Enable stackdriver monitoring
on the VM.
max_retries (int): Maximum allowed number of retry attempts.
max_preemptible_attempts (param_util.PreemptibleParam): Int representing
maximum allowed number of attempts on a preemptible machine, or boolean
representing always preemtible.
"""
__slots__ = ()

Expand Down Expand Up @@ -474,16 +500,16 @@ def __new__(cls,
timeout=None,
log_interval=None,
ssh=None,
enable_stackdriver_monitoring=None):
return super(Resources,
cls).__new__(cls, min_cores, min_ram, machine_type, disk_size,
disk_type, boot_disk_size, preemptible, image,
logging, logging_path, regions, zones,
service_account, scopes, keep_alive, cpu_platform,
network, subnetwork, use_private_address,
accelerator_type, accelerator_count,
nvidia_driver_version, timeout, log_interval, ssh,
enable_stackdriver_monitoring)
enable_stackdriver_monitoring=None,
max_retries=None,
max_preemptible_attempts=None):
return super(Resources, cls).__new__(
cls, min_cores, min_ram, machine_type, disk_size, disk_type,
boot_disk_size, preemptible, image, logging, logging_path, regions,
zones, service_account, scopes, keep_alive, cpu_platform, network,
subnetwork, use_private_address, accelerator_type, accelerator_count,
nvidia_driver_version, timeout, log_interval, ssh,
enable_stackdriver_monitoring, max_retries, max_preemptible_attempts)


def ensure_job_params_are_complete(job_params):
Expand Down
52 changes: 52 additions & 0 deletions dsub/lib/param_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,3 +923,55 @@ def timeout_in_seconds(timeout):

def log_interval_in_seconds(log_interval):
return _interval_to_seconds(log_interval, valid_units='smh')


class PreemptibleParam(object):
"""Utility class for user specified --preemptible argument.
The --preemptible arg can be set to one of three 'modes':
1) Not given. Never run on a preemptible VM. Internally stored as 'False'.
2) Given. Always run on a preemptible VM. Internally stored as 'True'.
3) Given, and passed an integer p. Run on a preemptible VM up to p times
before falling back to a full-price VM. Internally stored as an integer.
"""

def __init__(self, p):
self._max_preemptible_attempts = p

def should_use_preemptible(self, attempt_number):
if bool is type(self._max_preemptible_attempts):
return self._max_preemptible_attempts
else:
return self._max_preemptible_attempts >= attempt_number

def validate(self, retries):
"""Validates that preemptible arguments make sense with retries."""
if int is type(self._max_preemptible_attempts):
if not retries:
# This means user specified a preemptible number
# but didn't specify a retries number
raise ValueError(
'Requesting 1 or more preemptible attempts requires setting retries'
)

if self._max_preemptible_attempts > retries:
raise ValueError(
'Value passed for --preemptible cannot be larger than --retries.')

if retries < 0 or self._max_preemptible_attempts < 0:
raise ValueError('--retries and --preemptible must be 0 or greater')


def preemptile_param_type(preemptible):
"""Wrapper function to create a PreemptibleParam object from argparse."""
if bool is type(preemptible):
return PreemptibleParam(preemptible)
elif str is type(preemptible):
try:
return PreemptibleParam(int(preemptible))
except ValueError:
raise argparse.ArgumentTypeError(
'Invalid value {} for --preemptible.'.format(preemptible))
else:
raise argparse.ArgumentTypeError(
'Invalid value {} for --preemptible.'.format(preemptible))
4 changes: 2 additions & 2 deletions dsub/lib/providers_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def build_recursive_localize_command(destination, inputs, file_provider):
if {command} {source_uri} {data_mount}/{docker_path}; then
break
elif ((i == 2)); then
2>&1 echo "Recursive localization failed."
1>&2 echo "Recursive localization failed."
exit 1
fi
done
Expand Down Expand Up @@ -184,7 +184,7 @@ def build_recursive_delocalize_command(source, outputs, file_provider):
if {command} {data_mount}/{docker_path} {destination_uri}; then
break
elif ((i == 2)); then
2>&1 echo "Recursive de-localization failed."
1>&2 echo "Recursive de-localization failed."
exit 1
fi
done
Expand Down
4 changes: 2 additions & 2 deletions dsub/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ def _build_pipeline_request(self, task_view):
min_ram=job_resources.min_ram,
disk_size=job_resources.disk_size,
boot_disk_size=job_resources.boot_disk_size,
preemptible=job_resources.preemptible,
preemptible=task_resources.preemptible,
accelerator_type=job_resources.accelerator_type,
accelerator_count=job_resources.accelerator_count,
image=job_resources.image,
Expand All @@ -683,7 +683,7 @@ def _build_pipeline_request(self, task_view):
pipeline.update(
_Pipelines.build_pipeline_args(self._project, script.value, job_params,
task_params, reserved_labels,
job_resources.preemptible, logging_uri,
task_resources.preemptible, logging_uri,
scopes, job_resources.keep_alive))

return pipeline
Expand Down
Loading

0 comments on commit d4925d9

Please sign in to comment.