Skip to content

Commit

Permalink
Merge pull request #119 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.1.10 release
  • Loading branch information
wnojopra authored Aug 27, 2018
2 parents 898f6d1 + eab6182 commit 3046eef
Show file tree
Hide file tree
Showing 40 changed files with 1,351 additions and 360 deletions.
47 changes: 34 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,18 @@

## Overview

dsub is a command-line tool that makes it easy to submit and run batch scripts
`dsub` is a command-line tool that makes it easy to submit and run batch scripts
in the cloud.

The dsub user experience is modeled after traditional high-performance
The `dsub` user experience is modeled after traditional high-performance
computing job schedulers like Grid Engine and Slurm. You write a script and
then submit it to a job scheduler from a shell prompt on your local machine.

Today dsub supports Google Cloud as the backend batch job runner, along with a
Today `dsub` supports Google Cloud as the backend batch job runner, along with a
local provider for development and testing. With help from the community, we'd
like to add other backends, such as a Grid Engine, Slurm, Amazon Batch,
and Azure Batch.

If others find dsub useful, our hope is to contribute dsub to an open-source
foundation for use by the wider batch computing community.

## Getting started

You can install `dsub` from [PyPI](pypi.python.org), or you can clone and
Expand Down Expand Up @@ -158,6 +155,26 @@ charges using it.
gsutil cat gs://my-bucket/output/out.txt


### Getting started with the `google-v2` provider

Google Cloud has made available a new version of the Google Genomics
Pipelines API. This version, `v2alpha1`, will soon replace the `v1alpha2`
verision that dsub's `google` provider uses.

To use the `google-v2` provider:

- Add `--provider google-v2` to your command-line
- Use `--machine-type` (default is `n1-standard-1`).

The `--machine-type` value can be one of the
[Predefined Machine Types](https://cloud.google.com/compute/docs/machine-types#predefined_machine_types)
or a
[Custom Machine Type](https://cloud.google.com/compute/docs/machine-types#custom_machine_types).

The `google` provider supports `--min-cpu` and `--min-ram`. A plan to support
these flags for `google-v2` is being evaluated.
See [google-v2 support](https://github.com/DataBiosphere/dsub/issues/114).

## `dsub` features

The following sections show how to run more complex jobs.
Expand Down Expand Up @@ -307,8 +324,8 @@ If you use a Red Hat or CentOS Docker image, you are encouraged to use the
`dsub` tasks run using the `local` provider will use the resources available on
your local machine.

`dsub` tasks run using the `google` provider can take advantage of a wide range
of CPU, RAM, disk, and hardware accelerator (eg. GPU) options.
`dsub` tasks run using the `google` or `google-v2` providers can take advantage
of a wide range of CPU, RAM, disk, and hardware accelerator (eg. GPU) options.

See the [Compute Resources](docs/compute_resources.md) documentation for
details.
Expand All @@ -334,13 +351,17 @@ parameters. For example:

--env SAMPLE_ID<tab>--input VCF_FILE<tab>--output OUTPUT_PATH

The first line also supports bare-word variables which are treated as
the names of environment variables. This example is equivalent to the previous:
Each addition line in the file should provide the variable, input, and output
values for each task. Each line beyond the header represents the values for a
separate task.

SAMPLE_ID<tab>--input VCF_FILE<tab>--output OUTPUT_PATH
Multiple `--env`, `--input`, and `--output` parameters can be specified and
they can be specified in any order. For example:

--env SAMPLE<tab>--input A<tab>--input B<tab>--env REFNAME<tab>--output O
S1<tab>gs://path/A1.txt<tab>gs://path/B1.txt<tab>R1<tab>gs://path/O1.txt
S2<tab>gs://path/A2.txt<tab>gs://path/B2.txt<tab>R2<tab>gs://path/O2.txt

Each addition line in the file should provide the variable, input, and output
values for each task. Each line represents the values for a separate task.

#### Tasks parameter

Expand Down
2 changes: 1 addition & 1 deletion docs/code.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ For information on building Docker images, see the Docker documentation:

* [Build your own image](https://docs.docker.com/engine/getstarted/step_four/)
* [Best practices for writing Dockerfiles](https://docs.docker.com/engine/userguide/eng-image/dockerfile_best-practices/)
* [Google Cloud Container Builder](https://cloud.google.com/container-builder/docs/)
* [Google Cloud Build](https://cloud.google.com/cloud-build/docs/)


## --input "path to file in cloud storage"
Expand Down
15 changes: 13 additions & 2 deletions docs/compute_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,24 @@ tasks will need.
By default, `dsub` launches a Compute Engine VM with a single CPU core and
3.75 GB.

To change the minimum RAM, use the `--min-ram` flag.
### With the `google` provider (the default):

To change the minimum number of CPU cores, use the `--min-cores` flag.
To change the virtual machine minimum RAM, use the `--min-ram` flag.
To change the virtual machine minimum number of CPU cores, use the `--min-cores` flag.

The machine type selected will be the smallest matching VM from the
[predefined machine types](https://cloud.google.com/compute/docs/machine-types#predefined_machine_types).

### With the `google-v2` provider:

To change the virtual machine RAM and number of CPU cores, use the
`--machine-type` flag.

The `--machine-type` value can be one of the
[predefined machine types](https://cloud.google.com/compute/docs/machine-types#predefined_machine_types)
or a
[custom machine type](https://cloud.google.com/compute/docs/machine-types#custom_machine_types).

## Disks

By default, `dsub` launches a Compute Engine VM with a boot disk of 10 GB and an
Expand Down
13 changes: 13 additions & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ or for `--tasks` jobs:
- `gs://my-bucket/my-path/my-pipeline/{job-id}.{task-id}-stderr.log`
- `gs://my-bucket/my-path/my-pipeline/{job-id}.{task-id}-stdout.log`

or for `--retries` jobs:

- `gs://my-bucket/my-path/my-pipeline/{job-id}.{task-id}.{task-attempt}.log`
- `gs://my-bucket/my-path/my-pipeline/{job-id}.{task-id}.{task-attempt}-stderr.log`
- `gs://my-bucket/my-path/my-pipeline/{job-id}.{task-id}.{task-attempt}-stdout.log`

### A path + file ending in ".log"

You can also specify a path that ends in ".log". For example if you specify:
Expand All @@ -58,6 +64,12 @@ or for `--tasks` jobs:
- `gs://my-bucket/my-path/my-pipeline.{task-id}-stderr.log`
- `gs://my-bucket/my-path/my-pipeline.{task-id}-stdout.log`

or for `--retries` jobs:

- `gs://my-bucket/my-path/my-pipeline.{task-id}.{task-attempt}.log`
- `gs://my-bucket/my-path/my-pipeline.{task-id}.{task-attempt}-stderr.log`
- `gs://my-bucket/my-path/my-pipeline.{task-id}.{task-attempt}-stdout.log`

### Inserting job data

You may want to structure your logging output differently than either of the
Expand All @@ -76,4 +88,5 @@ Supported variables are:
- `job-name`
- `task-id`
- `user-id`
- `task-attempt`

2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.1.9'
DSUB_VERSION = '0.1.10'
87 changes: 74 additions & 13 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,29 @@ def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, tasks)


def _google_parse_arguments(args):
"""Validated google arguments."""
if args.machine_type:
raise ValueError('Not supported with the google provider: --machine-type. '
'Use --min-cores and --min-ram instead.'
'')


def _google_v2_parse_arguments(args):
"""Validated google-v2 arguments."""
if (args.zones and args.regions) or (not args.zones and not args.regions):
raise ValueError('Exactly one of --regions and --zones must be specified')

if args.min_cores:
raise ValueError('Not supported with the google-v2 provider: --min-cores. '
'Use --machine-type instead.'
'')

if args.min_ram:
raise ValueError('Not supported with the google-v2 provider: --min-ram. '
'Use --machine-type instead.'
'')


def _parse_arguments(prog, argv):
"""Parses command line arguments.
Expand Down Expand Up @@ -314,12 +332,10 @@ def _parse_arguments(prog, argv):
# Add dsub resource requirement arguments
parser.add_argument(
'--min-cores',
default=job_model.DEFAULT_MIN_CORES,
type=int,
help='Minimum CPU cores for each job')
parser.add_argument(
'--min-ram',
default=job_model.DEFAULT_MIN_RAM,
type=float,
help='Minimum RAM per job in GB')
parser.add_argument(
Expand Down Expand Up @@ -399,6 +415,33 @@ def _parse_arguments(prog, argv):
Only one of --zones and --regions may be specified.""")
google_v2.add_argument(
'--machine-type', help='Provider-specific machine type')
google_v2.add_argument(
'--cpu-platform',
help="""The CPU platform to request. Supported values can be found at
https://cloud.google.com/compute/docs/instances/specify-min-cpu-platform"""
)
google_v2.add_argument(
'--network',
help="""The Compute Engine VPC network name to attach the VM's network
interface to. The value will be prefixed with global/networks/ unless
it contains a /, in which case it is assumed to be a fully specified
network resource URL.""")
google_v2.add_argument(
'--subnetwork',
help="""The name of the Compute Engine subnetwork to attach the instance
to.""")
google_v2.add_argument(
'--use-private-address',
default=False,
action='store_true',
help='If set to true, do not attach a public IP address to the VM.')
google_v2.add_argument(
'--timeout',
help="""The maximum amount of time to give the pipeline to complete.
This includes the time spent waiting for a worker to be allocated.
Time can be listed using a number followed by a unit. Supported units are
s (seconds), m (minutes), h (hours), d (days), w (weeks).
For example: '7d' (7 days).""")

args = provider_base.parse_args(
parser, {
Expand All @@ -408,6 +451,8 @@ def _parse_arguments(prog, argv):
'local': ['logging'],
}, argv)

if args.provider == 'google':
_google_parse_arguments(args)
if args.provider == 'google-v2':
_google_v2_parse_arguments(args)

Expand All @@ -425,6 +470,7 @@ def _get_job_resources(args):
"""
logging = param_util.build_logging_param(
args.logging) if args.logging else None
timeout = param_util.timeout_in_seconds(args.timeout)

return job_model.Resources(
min_cores=args.min_cores,
Expand All @@ -440,8 +486,13 @@ def _get_job_resources(args):
logging_path=None,
scopes=args.scopes,
keep_alive=args.keep_alive,
cpu_platform=args.cpu_platform,
network=args.network,
subnetwork=args.subnetwork,
use_private_address=args.use_private_address,
accelerator_type=args.accelerator_type,
accelerator_count=args.accelerator_count)
accelerator_count=args.accelerator_count,
timeout=timeout)


def _get_job_metadata(provider, user_id, job_name, script, task_ids):
Expand Down Expand Up @@ -507,9 +558,11 @@ def _resolve_task_resources(job_metadata, job_resources, task_descriptors):
task_descriptors: Task metadata, parameters, and resources.
This function exists to be called at the point that all job properties have
been validated and resolved. The only property to be resolved right now is
the logging path, which may have substitution parameters such as
job-id, task-id, user-id, and job-name.
been validated and resolved. It is also called prior to re-trying a task.
The only property to be resolved right now is the logging path,
which may have substitution parameters such as
job-id, task-id, task-attempt, user-id, and job-name.
"""
_resolve_task_logging(job_metadata, job_resources, task_descriptors)

Expand Down Expand Up @@ -650,7 +703,8 @@ def _wait_and_retry(provider, job_id, poll_interval, retries, job_descriptor):
return []

for task_id in retry_tasks:
print(' %s failed. Retrying.' % (task_id if task_id else 'Task'))
print(' %s failed. Retrying.' % ('Task %s' % task_id
if task_id else 'Task'))
_retry_task(provider, job_descriptor, task_id,
task_fail_count[task_id] + 1)

Expand All @@ -660,15 +714,22 @@ def _wait_and_retry(provider, job_id, poll_interval, retries, job_descriptor):
def _retry_task(provider, job_descriptor, task_id, task_attempt):
"""Retry task_id (numeric id) assigning it task_attempt."""
td_orig = job_descriptor.find_task_descriptor(task_id)

new_task_descriptors = [
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': task_attempt
}, td_orig.task_params, td_orig.task_resources)
]

# Update the logging path.
_resolve_task_resources(job_descriptor.job_metadata,
job_descriptor.job_resources, new_task_descriptors)

provider.submit_job(
job_model.JobDescriptor(
job_descriptor.job_metadata, job_descriptor.job_params,
job_descriptor.job_resources, [
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': task_attempt
}, td_orig.task_params, td_orig.task_resources)
]), False)
job_descriptor.job_resources, new_task_descriptors), False)


def _dominant_task_for_jobs(tasks):
Expand Down
19 changes: 16 additions & 3 deletions dsub/lib/job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ class Resources(
collections.namedtuple('Resources', [
'min_cores', 'min_ram', 'machine_type', 'disk_size', 'boot_disk_size',
'preemptible', 'image', 'logging', 'logging_path', 'regions', 'zones',
'scopes', 'keep_alive', 'accelerator_type', 'accelerator_count'
'scopes', 'keep_alive', 'cpu_platform', 'network', 'subnetwork',
'use_private_address', 'accelerator_type', 'accelerator_count',
'timeout'
])):
"""Job resource parameters related to CPUs, memory, and disk.
Expand All @@ -344,9 +346,14 @@ class Resources(
zones (List[str]): zone list in which to run the job
scopes (List[str]): OAuth2 scopes for the job
keep_alive (int): Seconds to keep VM alive on failure
cpu_platform (string): The CPU platform to request (e.g. 'Intel Skylake')
network (string): The network name to attach the VM's network interface to.
subnetwork (string): The name of the subnetwork to attach the instance to.
use_private_address (bool): Do not attach a public IP address to the VM
accelerator_type (string): Accelerator type (e.g. 'nvidia-tesla-k80').
accelerator_count (int): Number of accelerators of the specified type to
attach.
timeout (string): The max amount of time to give the pipeline to complete.
"""
__slots__ = ()

Expand All @@ -364,12 +371,18 @@ def __new__(cls,
zones=None,
scopes=None,
keep_alive=None,
cpu_platform=None,
network=None,
subnetwork=None,
use_private_address=None,
accelerator_type=None,
accelerator_count=0):
accelerator_count=0,
timeout=None):
return super(Resources, cls).__new__(
cls, min_cores, min_ram, machine_type, disk_size, boot_disk_size,
preemptible, image, logging, logging_path, regions, zones, scopes,
keep_alive, accelerator_type, accelerator_count)
keep_alive, cpu_platform, network, subnetwork, use_private_address,
accelerator_type, accelerator_count, timeout)


def ensure_job_params_are_complete(job_params):
Expand Down
Loading

0 comments on commit 3046eef

Please sign in to comment.