Skip to content

Commit

Permalink
Merge pull request #102 from googlegenomics/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan Crampton authored Jan 16, 2018
2 parents b57b088 + 8098b4e commit 43cd7c5
Show file tree
Hide file tree
Showing 31 changed files with 1,043 additions and 238 deletions.
8 changes: 8 additions & 0 deletions docs/job_control.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,11 @@ When wildcards are used for `--output` parameters or `--output-recursive`
parameters are used, there is no way for `dsub` to verify that *all* output is
present. The best that `dsub` can do is to verify that *some* output was created
for each such parameter.

While it's allowed to specify `--output` and `--tasks` on the command line
at the same time (for example if output has wildcards and each task writes
a different file that matches the pattern), note that in this scenario
`--skip` will dutifully skip all tasks if any output matching the pattern is
present. In practice this means that it's generally unwise to use all three
of `--output`, `--tasks`, and `--skip` on the command line. Instead, specify
a different output for each task, in the tasks file.
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.1.4'
DSUB_VERSION = '0.1.5'
12 changes: 8 additions & 4 deletions dsub/commands/ddel.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def main():
job_ids=set(args.jobs) if args.jobs else None,
task_ids=set(args.tasks) if args.tasks else None,
labels=labels,
create_time=create_time)
create_time_min=create_time)
# Emit the count of deleted jobs.
# Only emit anything about tasks if any of the jobs contains a task-id.
deleted_jobs = dsub_util.tasks_to_job_ids(deleted_tasks)
Expand All @@ -153,7 +153,8 @@ def ddel_tasks(provider,
job_ids=None,
task_ids=None,
labels=None,
create_time=None):
create_time_min=None,
create_time_max=None):
"""Kill jobs or job tasks.
This function separates ddel logic from flag parsing and user output. Users
Expand All @@ -165,14 +166,17 @@ def ddel_tasks(provider,
job_ids: a set of job ids to delete.
task_ids: a set of task ids to delete.
labels: a set of LabelParam, each must match the job(s) to be cancelled.
create_time: a UTC value for earliest create time for a task.
create_time_min: a timezone-aware datetime value for the earliest create
time of a task, inclusive.
create_time_max: a timezone-aware datetime value for the most recent create
time of a task, inclusive.
Returns:
list of job ids which were deleted.
"""
# Delete the requested jobs
deleted_tasks, error_messages = provider.delete_jobs(
user_ids, job_ids, task_ids, labels, create_time)
user_ids, job_ids, task_ids, labels, create_time_min, create_time_max)

# Emit any errors canceling jobs
for msg in error_messages:
Expand Down
71 changes: 62 additions & 9 deletions dsub/commands/dstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def main():
args = _parse_arguments()

# Compute the age filter (if any)
create_time = param_util.age_to_create_time(args.age)
create_time_min = param_util.age_to_create_time(args.age)

# Set up the output formatter
if args.format == 'json':
Expand Down Expand Up @@ -379,7 +379,7 @@ def main():
job_names=set(args.names) if args.names else None,
task_ids=set(args.tasks) if args.tasks else None,
labels=labels if labels else None,
create_time=create_time,
create_time_min=create_time_min,
max_tasks=args.limit,
full_output=args.full,
poll_interval=poll_interval,
Expand All @@ -401,16 +401,14 @@ def dstat_job_producer(provider,
job_names=None,
task_ids=None,
labels=None,
create_time=None,
create_time_min=None,
create_time_max=None,
max_tasks=0,
full_output=False,
poll_interval=0,
raw_format=False):
"""Generate jobs as lists of task dicts ready for formatting/output.
This function separates dstat logic from flag parsing and user output. Users
of dstat who intend to access the data programmatically should use this.
Args:
provider: an instantiated dsub provider.
statuses: a set of status strings that eligible jobs may match.
Expand All @@ -419,7 +417,10 @@ def dstat_job_producer(provider,
job_names: a set of job-name strings eligible jobs may match.
task_ids: a set of task-id strings eligible tasks may match.
labels: set of LabelParam that all tasks must match.
create_time: a UTC value for earliest create time for a task.
create_time_min: a timezone-aware datetime value for the earliest create
time of a task, inclusive.
create_time_max: a timezone-aware datetime value for the most recent create
time of a task, inclusive.
max_tasks: (int) maximum number of tasks to return per dstat job lookup.
full_output: (bool) return all dsub fields.
poll_interval: (int) wait time between poll events, dstat will poll jobs
Expand All @@ -444,8 +445,10 @@ def dstat_job_producer(provider,
job_names=job_names,
task_ids=task_ids,
labels=labels,
create_time=create_time,
max_tasks=max_tasks)
create_time_min=create_time_min,
create_time_max=create_time_max,
max_tasks=max_tasks,
page_size=max_tasks)

some_job_running = False

Expand All @@ -468,5 +471,55 @@ def dstat_job_producer(provider,
else:
break


def lookup_job_tasks(provider,
statuses,
user_ids=None,
job_ids=None,
job_names=None,
task_ids=None,
labels=None,
create_time_min=None,
create_time_max=None,
max_tasks=0,
page_size=0):
"""Generate formatted jobs individually, in order of create-time.
Args:
provider: an instantiated dsub provider.
statuses: a set of status strings that eligible jobs may match.
user_ids: a set of user strings that eligible jobs may match.
job_ids: a set of job-id strings eligible jobs may match.
job_names: a set of job-name strings eligible jobs may match.
task_ids: a set of task-id strings eligible tasks may match.
labels: set of LabelParam that all tasks must match.
create_time_min: a timezone-aware datetime value for the earliest create
time of a task, inclusive.
create_time_max: a timezone-aware datetime value for the most recent create
time of a task, inclusive.
max_tasks: (int) maximum number of tasks to return per dstat job lookup.
page_size: the page size to use for each query to the backend. May be
ignored by some provider implementations.
Yields:
Individual task dictionaries with associated metadata
"""
tasks_generator = provider.lookup_job_tasks(
statuses,
user_ids=user_ids,
job_ids=job_ids,
job_names=job_names,
task_ids=task_ids,
labels=labels,
create_time_min=create_time_min,
create_time_max=create_time_max,
max_tasks=max_tasks,
page_size=page_size)

# Yield formatted tasks.
for task in tasks_generator:
yield _prepare_row(task, True)


if __name__ == '__main__':
main()
38 changes: 6 additions & 32 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
# dsub ... --after $JOB_ID
#
# "NO_JOB" will be treated as having completed.
NO_JOB = 'NO_JOB'

#
# The job created by dsub will automatically include a data disk,
# Each provider sets a different DATA_ROOT environment variable.
# The DATA_ROOT is the root directory for the data disk.
Expand Down Expand Up @@ -447,7 +446,7 @@ def _wait_after(provider, job_ids, poll_interval, stop_on_failure):
# * stop_on_failure is TRUE AND at least one job returned an error

# remove NO_JOB
job_ids_to_check = {j for j in job_ids if j != NO_JOB}
job_ids_to_check = {j for j in job_ids if j != dsub_util.NO_JOB}
error_messages = []
while job_ids_to_check and (not error_messages or not stop_on_failure):
print('Waiting for: %s.' % (', '.join(job_ids_to_check)))
Expand Down Expand Up @@ -573,21 +572,6 @@ def _wait_for_any_job(provider, job_ids, poll_interval):
SLEEP_FUNCTION(poll_interval)


def _job_outputs_are_present(job_data):
"""True if each output contains at least one file."""
# See reference args_to_job_data in param_util.py for a description
# of what's in job_data.
outputs = job_data['outputs']
for o in outputs:
if o.recursive:
if not dsub_util.folder_exists(o.value):
return False
else:
if not dsub_util.simple_pattern_exists_in_gcs(o.value):
return False
return True


def _validate_job_and_task_arguments(job_data, all_task_data):
"""Validates that job and task argument names do not overlap."""

Expand Down Expand Up @@ -679,10 +663,6 @@ def run_main(args):

provider_base.check_for_unsupported_flag(args)

if args.tasks and args.skip:
raise ValueError('Output skipping (--skip) not supported for --task '
'commands.')

# Set up job parameters and job data from a tasks file or flags.
input_file_param_util = param_util.InputFileParamUtil(
DEFAULT_INPUT_LOCAL_PATH)
Expand Down Expand Up @@ -748,9 +728,6 @@ def run(provider,
if not disable_warning:
raise ValueError('Do not user this unstable API component!')

if len(all_task_data) > 1 and skip:
raise ValueError('The skip option is not supported with multiple tasks')

if command and script:
raise ValueError('Cannot supply both a command and script value.')

Expand Down Expand Up @@ -793,17 +770,14 @@ def run(provider,
'One or more predecessor jobs completed but did not succeed.',
error_messages)

# If requested, skip running this job if its outputs already exist
if skip and not dry_run:
if _job_outputs_are_present(job_data):
print('Job output already present, skipping new job submission.')
return {'job-id': NO_JOB}

# Launch all the job tasks!
launched_job = provider.submit_job(job_resources, job_metadata, job_data,
all_task_data)
all_task_data, skip)

if not dry_run:
if launched_job['job-id'] == dsub_util.NO_JOB:
print('Job output already present, skipping new job submission.')
return {'job-id': dsub_util.NO_JOB}
print('Launched job-id: %s' % launched_job['job-id'])
if launched_job.get('task-id'):
print('%s task(s)' % len(launched_job['task-id']))
Expand Down
21 changes: 21 additions & 0 deletions dsub/lib/dsub_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
from oauth2client.client import GoogleCredentials


# this is the Job ID for jobs that are skipped.
NO_JOB = 'NO_JOB'


class _Printer(object):
"""File-like stream object that redirects stdout to a file object."""

Expand Down Expand Up @@ -257,3 +261,20 @@ def simple_pattern_exists_in_gcs(file_pattern, credentials=None):
return False
items_list = [i['name'] for i in response['items']]
return any(fnmatch.fnmatch(i, prefix) for i in items_list)


def outputs_are_present(outputs):
"""True if each output contains at least one file."""
# outputs are OutputFileParam (see param_util.py)

# If outputs contain a pattern, then there is no way for `dsub` to verify
# that *all* output is present. The best that `dsub` can do is to verify
# that *some* output was created for each such parameter.
for o in outputs:
if o.recursive:
if not folder_exists(o.value):
return False
else:
if not simple_pattern_exists_in_gcs(o.value):
return False
return True
43 changes: 22 additions & 21 deletions dsub/lib/param_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import os
import re
import sys

from . import dsub_util
from .._dsub_version import DSUB_VERSION
from dateutil.tz import tzlocal
import pytz

AUTO_PREFIX_INPUT = 'INPUT_' # Prefix for auto-generated input names
AUTO_PREFIX_OUTPUT = 'OUTPUT_' # Prefix for auto-generated output names
Expand Down Expand Up @@ -941,45 +942,45 @@ def handle_version_flag():
sys.exit()


def age_to_create_time(age, from_time=datetime.datetime.utcnow()):
def age_to_create_time(age, from_time=None):
"""Compute the create time (UTC) for the list filter.
If the age is an integer value it is treated as a UTC date.
Otherwise the value must be of the form "<integer><unit>" where supported
units are s, m, h, d, w (seconds, months, hours, days, weeks).
units are s, m, h, d, w (seconds, minutes, hours, days, weeks).
Args:
age: A "<integer><unit>" string or integer value.
from_time:
Returns:
A date value in UTC or None if age parameter is empty.
A timezone-aware datetime or None if age parameter is empty.
"""

if not age:
return None

if not from_time:
from_time = datetime.datetime.now().replace(tzinfo=tzlocal())

try:
last_char = age[-1]

if last_char in 'smhdw':
if last_char == 's':
interval = datetime.timedelta(seconds=int(age[:-1]))
elif last_char == 'm':
interval = datetime.timedelta(minutes=int(age[:-1]))
elif last_char == 'h':
interval = datetime.timedelta(hours=int(age[:-1]))
elif last_char == 'd':
interval = datetime.timedelta(days=int(age[:-1]))
elif last_char == 'w':
interval = datetime.timedelta(weeks=int(age[:-1]))

start = from_time - interval
epoch = datetime.datetime.utcfromtimestamp(0)

return int((start - epoch).total_seconds())
if last_char == 's':
return from_time - datetime.timedelta(seconds=int(age[:-1]))
elif last_char == 'm':
return from_time - datetime.timedelta(minutes=int(age[:-1]))
elif last_char == 'h':
return from_time - datetime.timedelta(hours=int(age[:-1]))
elif last_char == 'd':
return from_time - datetime.timedelta(days=int(age[:-1]))
elif last_char == 'w':
return from_time - datetime.timedelta(weeks=int(age[:-1]))
else:
return int(age)
# If no unit is given treat the age as seconds from epoch, otherwise apply
# the correct time unit.
return datetime.datetime.utcfromtimestamp(
int(age)).replace(tzinfo=pytz.utc)

except (ValueError, OverflowError) as e:
raise ValueError('Unable to parse age string %s: %s' % (age, e))
Loading

0 comments on commit 43cd7c5

Please sign in to comment.