Skip to content

Commit

Permalink
Merge pull request #113 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.1.9 release
  • Loading branch information
eap authored Jun 27, 2018
2 parents dbd4d70 + 4f4b27a commit 898f6d1
Show file tree
Hide file tree
Showing 23 changed files with 561 additions and 216 deletions.
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.1.8'
DSUB_VERSION = '0.1.9'
22 changes: 12 additions & 10 deletions dsub/commands/ddel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,21 @@ def _parse_arguments():
help='User labels to match. Tasks returned must match all labels.',
metavar='KEY=VALUE')

# Add provider-specific arguments
google = parser.add_argument_group(
title='google',
description='Options for the Google provider (Pipelines API)')
google.add_argument(
# Shared arguments between the "google" and "google-v2" providers
google_common = parser.add_argument_group(
title='google-common',
description='Options common to the "google" and "google-v2" providers')
google_common.add_argument(
'--project',
help='Cloud project ID in which to find and delete the job(s)')

return provider_base.parse_args(parser, {
'google': ['project'],
'test-fails': [],
'local': [],
}, sys.argv[1:])
return provider_base.parse_args(
parser, {
'google': ['project'],
'google-v2': ['project'],
'test-fails': [],
'local': [],
}, sys.argv[1:])


def _emit_search_criteria(user_ids, job_ids, task_ids, labels):
Expand Down
1 change: 1 addition & 0 deletions dsub/commands/dstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ def _prepare_row(task, full, summary):
row_spec('labels', True, {}),
row_spec('provider', True, None),
row_spec('provider-attributes', True, {}),
row_spec('events', True, []),
row_spec('dsub-version', False, None),
]
summary_columns = default_columns + [
Expand Down
36 changes: 21 additions & 15 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,18 +359,7 @@ def _parse_arguments(prog, argv):
help="""Space-separated scopes for Google Compute Engine instances.
If unspecified, provider will use '%s'""" % ','.join(
google_base.DEFAULT_SCOPES))

google = parser.add_argument_group(
title='"google" provider options',
description='See also the "google-common" options listed above')
google.add_argument(
'--keep-alive',
type=int,
help="""Time (in seconds) to keep a tasks's virtual machine (VM) running
after a localization, docker command, or delocalization failure.
Allows for connecting to the VM for debugging.
Default is 0; maximum allowed value is 86400 (1 day).""")
google.add_argument(
google_common.add_argument(
'--accelerator-type',
help="""The Compute Engine accelerator type. By specifying this parameter,
you will download and install the following third-party software onto
Expand All @@ -380,7 +369,7 @@ def _parse_arguments(prog, argv):
and
https://cloud.google.com/genomics/reference/rest/v1alpha2/pipelines#pipelineresources
for more details.""")
google.add_argument(
google_common.add_argument(
'--accelerator-count',
type=int,
default=0,
Expand All @@ -389,6 +378,17 @@ def _parse_arguments(prog, argv):
following third-party software onto your job's Compute Engine
instances: NVIDIA(R) Tesla(R) drivers and NVIDIA(R) CUDA toolkit.""")

google = parser.add_argument_group(
title='"google" provider options',
description='See also the "google-common" options listed above')
google.add_argument(
'--keep-alive',
type=int,
help="""Time (in seconds) to keep a tasks's virtual machine (VM) running
after a localization, docker command, or delocalization failure.
Allows for connecting to the VM for debugging.
Default is 0; maximum allowed value is 86400 (1 day).""")

google_v2 = parser.add_argument_group(
title='"google-v2" provider options',
description='See also the "google-common" options listed above')
Expand Down Expand Up @@ -818,8 +818,14 @@ def call(argv):
def main(prog=sys.argv[0], argv=sys.argv[1:]):
try:
dsub_main(prog, argv)
except dsub_errors.PredecessorJobFailureError as e:
# Never tried to launch. Failure occurred in the --after wait.
print(dsub_util.NO_JOB)
sys.exit(1)
except dsub_errors.JobError as e:
# Job was launched, but there was failure in --wait
print('%s: %s' % (type(e).__name__, str(e)), file=sys.stderr)
print(e.launched_job['job-id'])
sys.exit(1)
return 0

Expand Down Expand Up @@ -958,7 +964,7 @@ def run(provider,
print_error(msg)
raise dsub_errors.PredecessorJobFailureError(
'One or more predecessor jobs completed but did not succeed.',
error_messages)
error_messages, None)

# Launch all the job tasks!
job_descriptor = job_model.JobDescriptor(job_metadata, job_params,
Expand Down Expand Up @@ -995,7 +1001,7 @@ def run(provider,
print_error(msg)
raise dsub_errors.JobExecutionError(
'One or more jobs finished with status FAILURE or CANCELED'
' during wait.', error_messages)
' during wait.', error_messages, launched_job)

return launched_job

Expand Down
11 changes: 10 additions & 1 deletion dsub/lib/dsub_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
class JobError(Exception):
"""Exception containing error information of one or more jobs."""

def __init__(self, message, error_list):
def __init__(self, message, error_list, launched_job):
"""Create a JobError to indicate something went wrong.
Args:
message: user-friendly message
error_list: what went wrong
launched_job: if the job is launched, but has errors in
"--wait"ing on the tasks.
"""
super(JobError, self).__init__(message)
self.message = message
self.error_list = error_list
self.launched_job = launched_job


class PredecessorJobFailureError(JobError):
Expand Down
5 changes: 5 additions & 0 deletions dsub/lib/providers_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,8 @@ def get_task_metadata(job_metadata, task_id):
task_metadata['task-id'] = task_id

return task_metadata


def get_job_and_task_param(job_params, task_params, field):
"""Returns a dict combining the field for job and task params."""
return job_params.get(field, set()) | task_params.get(field, set())
2 changes: 1 addition & 1 deletion dsub/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_field(self, field, default=None):
'job-name', 'job-id', 'task-id', 'task-attempt', 'user-id', 'task-status',
'error-message', 'create-time', 'start-time', 'end-time', 'inputs',
'outputs'
'outputs', 'events'
The following are required by dstat:
- status: The task status ('RUNNING', 'CANCELED', 'FAILED', 'SUCCESS')
Expand Down
139 changes: 19 additions & 120 deletions dsub/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,6 @@
fi
""")

# When attempting to cancel an operation that is already completed
# (succeeded, failed, or canceled), the response will include:
# "error": {
# "code": 400,
# "status": "FAILED_PRECONDITION",
# }
FAILED_PRECONDITION_CODE = 400
FAILED_PRECONDITION_STATUS = 'FAILED_PRECONDITION'


class _Pipelines(object):
"""Utilty methods for creating pipeline operations."""
Expand Down Expand Up @@ -281,7 +272,7 @@ def build_pipeline(cls, project, zones, min_cores, min_ram, disk_size,
pipeline_name: string name of pipeline.
Returns:
A nested dictionary with one entry under the key emphemeralPipeline
A nested dictionary with one entry under the key ephemeralPipeline
containing the pipeline configuration.
"""
if min_cores is None:
Expand Down Expand Up @@ -614,106 +605,6 @@ def list(cls, service, ops_filter, page_size=0):
page_token = response.get('nextPageToken')
more_operations = bool(page_token)

@classmethod
def _cancel_batch(cls, service, ops):
"""Cancel a batch of operations.
Args:
service: Google Genomics API service object.
ops: A list of operations to cancel.
Returns:
A list of operations canceled and a list of error messages.
"""

# We define an inline callback which will populate a list of
# successfully canceled operations as well as a list of operations
# which were not successfully canceled.

canceled = []
failed = []

def handle_cancel(request_id, response, exception):
"""Callback for the cancel response."""
del response # unused

if exception:
# We don't generally expect any failures here, except possibly trying
# to cancel an operation that is already canceled or finished.
#
# If the operation is already finished, provide a clearer message than
# "error 400: Bad Request".

msg = 'error %s: %s' % (exception.resp.status, exception.resp.reason)
if exception.resp.status == FAILED_PRECONDITION_CODE:
detail = json.loads(exception.content)
status = detail.get('error', {}).get('status')
if status == FAILED_PRECONDITION_STATUS:
msg = 'Not running'

failed.append({'name': request_id, 'msg': msg})
else:
canceled.append({'name': request_id})

return

# Set up the batch object
batch = service.new_batch_http_request(callback=handle_cancel)

# The callback gets a "request_id" which is the operation name.
# Build a dict such that after the callback, we can lookup the operation
# objects by name
ops_by_name = {}
for op in ops:
op_name = op.get_field('internal-id')
ops_by_name[op_name] = op
batch.add(
service.operations().cancel(name=op_name, body={}),
request_id=op_name)

# Cancel the operations
batch.execute()

# Iterate through the canceled and failed lists to build our return lists
canceled_ops = [ops_by_name[cancel['name']] for cancel in canceled]
error_messages = []
for fail in failed:
message = "Error canceling '%s': %s"
op = ops_by_name[fail['name']]
message %= (op.get_operation_full_job_id(), fail['msg'])
error_messages.append(message)

return canceled_ops, error_messages

@classmethod
def cancel(cls, service, ops):
"""Cancel operations.
Args:
service: Google Genomics API service object.
ops: A list of operations to cancel.
Returns:
A list of operations canceled and a list of error messages.
"""

# Canceling many operations one-by-one can be slow.
# The Pipelines API doesn't directly support a list of operations to cancel,
# but the requests can be performed in batch.

canceled_ops = []
error_messages = []

max_batch = 256
total_ops = len(ops)
for first_op in range(0, total_ops, max_batch):
batch_canceled, batch_messages = cls._cancel_batch(
service, ops[first_op:first_op + max_batch])
canceled_ops.extend(batch_canceled)
error_messages.extend(batch_messages)

return canceled_ops, error_messages


class GoogleJobProvider(base.JobProvider):
"""Interface to dsub and related tools for managing Google cloud jobs."""
Expand Down Expand Up @@ -986,7 +877,8 @@ def delete_jobs(self,

print 'Found %d tasks to delete.' % len(tasks)

return _Operations.cancel(self._service, tasks)
return google_base.cancel(self._service.new_batch_http_request,
self._service.operations().cancel, tasks)

def get_tasks_completion_messages(self, tasks):
completion_messages = []
Expand Down Expand Up @@ -1094,6 +986,22 @@ def get_field(self, field, default=None):
'instance-name': instance_name,
'zone': instance_zone,
}
elif field == 'events':
events = metadata.get('events', [])
value = []
for event in events:
event_value = {
'name':
event.get('description', ''),
'start-time':
google_base.parse_rfc3339_utc_string(event['startTime'])
}
if 'endTime' in event:
event_value['end-time'] = google_base.parse_rfc3339_utc_string(
event['endTime'])

value.append(event_value)

else:
raise ValueError('Unsupported field: "%s"' % field)

Expand Down Expand Up @@ -1144,15 +1052,6 @@ def _operation_status_message(self):

return (msg, google_base.parse_rfc3339_utc_string(ds))

def get_operation_full_job_id(self):
"""Returns the job-id or job-id.task-id for the operation."""
job_id = self.get_field('job-id')
task_id = self.get_field('task-id')
if task_id:
return '%s.%s' % (job_id, task_id)
else:
return job_id

def _get_operation_input_field_values(self, metadata, file_input):
"""Returns a dictionary of envs or file inputs for an operation.
Expand Down
Loading

0 comments on commit 898f6d1

Please sign in to comment.