Skip to content

Commit

Permalink
Merge pull request #136 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.2.3 release
  • Loading branch information
wnojopra authored Nov 20, 2018
2 parents 1126dc1 + 1f55fe2 commit d86041c
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 61 deletions.
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.2.2'
DSUB_VERSION = '0.2.3'
2 changes: 2 additions & 0 deletions dsub/commands/ddel.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def main():

# Let the user know which jobs we are going to look up
with dsub_util.replace_print():
provider_base.emit_provider_message(provider)

_emit_search_criteria(user_ids, args.jobs, args.tasks, args.label)
# Delete the requested jobs
deleted_tasks = ddel_tasks(
Expand Down
2 changes: 2 additions & 0 deletions dsub/commands/dstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ def main():

# Set up the Genomics Pipelines service interface
provider = provider_base.get_provider(args, resources)
with dsub_util.replace_print():
provider_base.emit_provider_message(provider)

# Set poll interval to zero if --wait is not set.
poll_interval = args.poll_interval if args.wait else 0
Expand Down
3 changes: 3 additions & 0 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,9 @@ def run(provider,
project=None,
disable_warning=False):
"""Actual dsub body, post-stdout-redirection."""
if not dry_run:
provider_base.emit_provider_message(provider)

if not disable_warning:
raise ValueError('Do not user this unstable API component!')

Expand Down
23 changes: 13 additions & 10 deletions dsub/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@
explicit tasks.
"""

from abc import ABCMeta
from abc import abstractmethod
import abc
import six


@six.add_metaclass(ABCMeta)
@six.add_metaclass(abc.ABCMeta)
class JobProvider(object):
"""Interface all job providers should inherit from."""

@abstractmethod
# A member field that a provider can use to expose a status message
# such as a deprecation notice.
status_message = None

@abc.abstractmethod
def prepare_job_metadata(self, script, job_name, user_id, create_time):
"""Returns a dictionary of metadata fields for the job.
Expand Down Expand Up @@ -77,7 +80,7 @@ def prepare_job_metadata(self, script, job_name, user_id, create_time):
"""
raise NotImplementedError()

@abstractmethod
@abc.abstractmethod
def submit_job(self, job_descriptor, skip_if_output_present):
"""Submit the job to be executed.
Expand All @@ -101,7 +104,7 @@ def submit_job(self, job_descriptor, skip_if_output_present):
"""
raise NotImplementedError()

@abstractmethod
@abc.abstractmethod
def delete_jobs(self,
user_ids,
job_ids,
Expand Down Expand Up @@ -132,7 +135,7 @@ def delete_jobs(self,
"""
raise NotImplementedError()

@abstractmethod
@abc.abstractmethod
def lookup_job_tasks(self,
statuses,
user_ids=None,
Expand Down Expand Up @@ -177,7 +180,7 @@ def lookup_job_tasks(self,
"""
raise NotImplementedError()

@abstractmethod
@abc.abstractmethod
def get_tasks_completion_messages(self, tasks):
"""List of the error message of each given task."""
raise NotImplementedError()
Expand All @@ -186,7 +189,7 @@ def get_tasks_completion_messages(self, tasks):
class Task(object):
"""Basic container for task metadata."""

@abstractmethod
@abc.abstractmethod
def raw_task_data(self):
"""Return a provider-specific representation of task data.
Expand All @@ -195,7 +198,7 @@ def raw_task_data(self):
"""
raise NotImplementedError()

@abstractmethod
@abc.abstractmethod
def get_field(self, field, default=None):
"""Return a metadata-field for the task.
Expand Down
15 changes: 15 additions & 0 deletions dsub/providers/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,21 @@ def list(cls, service, ops_filter, page_size=0):
class GoogleJobProvider(base.JobProvider):
"""Interface to dsub and related tools for managing Google cloud jobs."""

status_message = textwrap.dedent("""
** WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING **
The google provider is deprecated.
The underlying service (Pipelines API v1alpha2) is scheduled for turndown
at the end of 2018.
Please use the google-v2 provider.
See:
https://github.com/DataBiosphere/dsub#deprecation-of-the-google-provider
** WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING **
""")

def __init__(self, verbose, dry_run, project, zones=None, credentials=None):
self._verbose = verbose
self._dry_run = dry_run
Expand Down
50 changes: 36 additions & 14 deletions dsub/providers/google_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,15 @@

_LOG_CP_CMD = textwrap.dedent("""\
gsutil_cp /google/logs/action/{user_action}/stdout "${{STDOUT_PATH}}" "true" "text/plain" "${{USER_PROJECT}}" &
STDOUT_PID=$!
gsutil_cp /google/logs/action/{user_action}/stderr "${{STDERR_PATH}}" "true" "text/plain" "${{USER_PROJECT}}" &
STDERR_PID=$!
gsutil_cp /google/logs/output "${{LOGGING_PATH}}" "false" "text/plain" "${{USER_PROJECT}}" &
LOG_PID=$!
wait
wait "${{STDOUT_PID}}"
wait "${{STDERR_PID}}"
wait "${{LOG_PID}}"
""")

_LOGGING_CMD = textwrap.dedent("""\
Expand Down Expand Up @@ -407,7 +412,7 @@ def execute(self):
try:
response = cancel_fn.execute()
except: # pylint: disable=bare-except
exception = sys.exc_info()[0]
exception = sys.exc_info()[1]

self._response_handler(request_id, response, exception)

Expand Down Expand Up @@ -1140,27 +1145,38 @@ def _operation_status(self):
self._op['name']))

def _operation_status_message(self):
"""Returns the most relevant status string and last updated date string.
"""Returns the most relevant status string and failed action.
This string is meant for display only.
Returns:
A printable status string and date string.
A printable status string and name of failed action (if any).
"""
msg = None
action = None
if not google_v2_operations.is_done(self._op):
last_event = google_v2_operations.get_last_event(self._op)
if last_event:
msg = last_event['description']
action_id = last_event.get('details', {}).get('actionId')
action = google_v2_operations.get_action_by_id(self._op, action_id)
else:
msg = 'Pending'
else:
error = google_v2_operations.get_error(self._op)
if error:
msg = error['message']
else:
msg = 'Success'

return msg
failed_events = google_v2_operations.get_failed_events(self._op)
if failed_events:
failed_event = failed_events[-1]
msg = failed_event.get('details', {}).get('stderr')
action_id = failed_event.get('details', {}).get('actionId')
action = google_v2_operations.get_action_by_id(self._op, action_id)
if not msg:
error = google_v2_operations.get_error(self._op)
if error:
msg = error['message']
else:
msg = 'Success'

return msg, action

def error_message(self):
"""Returns an error message if the operation failed for any reason.
Expand Down Expand Up @@ -1259,9 +1275,15 @@ def get_field(self, field, default=None):
value = google_base.parse_rfc3339_utc_string(ds)
elif field == 'status':
value = self._operation_status()
elif field in ['status-message', 'status-detail']:
status = self._operation_status_message()
value = status
elif field == 'status-message':
msg, action = self._operation_status_message()
value = msg
elif field == 'status-detail':
msg, action = self._operation_status_message()
if action:
value = '{}:\n{}'.format(action.get('name'), msg)
else:
value = msg
elif field == 'last-update':
last_update = google_v2_operations.get_last_update(self._op)
if last_update:
Expand Down
25 changes: 21 additions & 4 deletions dsub/providers/google_v2_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,31 @@ def get_actions(op):
return op.get('metadata', {}).get('pipeline').get('actions', [])


def get_action(op, name):
def get_action_by_id(op, action_id):
"""Return the operation's array of actions."""
actions = get_actions(op)
if actions and 1 <= action_id < len(actions):
return actions[action_id - 1]


def _get_action_by_name(op, name):
"""Return the value for the specified action."""
actions = get_actions(op)
for action in actions:
if action['name'] == name:
if action.get('name') == name:
return action


def get_action_environment(op, name):
"""Return the environment for the operation."""
action = get_action(op, name)
action = _get_action_by_name(op, name)
if action:
return action.get('environment')


def get_action_image(op, name):
"""Return the image for the operation."""
action = get_action(op, name)
action = _get_action_by_name(op, name)
if action:
return action.get('imageUri')

Expand All @@ -133,6 +140,16 @@ def get_last_event(op):
return None


def get_failed_events(op):
"""Return the events (if any) with a non-zero exitStatus."""
events = get_events(op)
if events:
return [
e for e in events if int(e.get('details', {}).get('exitStatus', 0)) != 0
]
return None


def get_event_of_type(op, event_type):
"""Return all events of a particular type."""
events = get_events(op)
Expand Down
7 changes: 4 additions & 3 deletions dsub/providers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
# status.txt: File for the runner script to record task status (RUNNING,
# FAILURE, etc.)
# log.txt: File for the runner script to write log messages
# runner-log.txt: Capture of all stderr/stdout from the runner script
# task.pid: Process ID file for task runner
#
# From task directory, the data directory is made available to the Docker
Expand Down Expand Up @@ -453,7 +454,7 @@ def delete_jobs(self,
with open(os.path.join(task_dir, 'end-time.txt'), 'wt') as f:
f.write(today)
msg = 'Operation canceled at %s\n' % today
with open(os.path.join(task_dir, 'log.txt'), 'a') as f:
with open(os.path.join(task_dir, 'runner-log.txt'), 'a') as f:
f.write(msg)

return (canceled, cancel_errors)
Expand Down Expand Up @@ -587,7 +588,7 @@ def _get_end_time_from_task_dir(self, task_dir):

def _get_last_update_time_from_task_dir(self, task_dir):
last_update = 0
for filename in ['status.txt', 'log.txt', 'meta.yaml']:
for filename in ['status.txt', 'runner-log.txt', 'meta.yaml']:
try:
mtime = os.path.getmtime(os.path.join(task_dir, filename))
last_update = max(last_update, mtime)
Expand Down Expand Up @@ -622,7 +623,7 @@ def _get_status_from_task_dir(self, task_dir):

def _get_log_detail_from_task_dir(self, task_dir):
try:
with open(os.path.join(task_dir, 'log.txt'), 'r') as f:
with open(os.path.join(task_dir, 'runner-log.txt'), 'r') as f:
return f.read().splitlines()
except (IOError, OSError):
return None
Expand Down
8 changes: 8 additions & 0 deletions dsub/providers/provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

"""Interface for job providers."""

from __future__ import print_function

import argparse
import os

Expand Down Expand Up @@ -132,6 +135,11 @@ def get_ddel_provider_args(provider_type, project):
return get_dstat_provider_args(provider_type, project)


def emit_provider_message(provider):
if provider.status_message:
print(provider.status_message)


def check_for_unsupported_flag(args):
"""Raise an error if the provider doesn't support a provided flag."""
if args.label and args.provider not in [
Expand Down
29 changes: 15 additions & 14 deletions test/integration/e2e_dstat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,34 @@ readonly -f verify_dstat_output

function verify_dstat_google_provider_fields() {
local dstat_out="${1}"
for (( job=0; job < 3; job++ )); do
# Skip the test if the job's state is not yet running or completed.
local status="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${job}].status")"
if ! [[ "${status}" =~ ^(RUNNING|SUCCESS)$ ]]; then
echo " - NOTICE: status '${status}' may not have compute metadata,"
echo " skipping provider fields tests."
exit 0
fi

for (( task=0; task < 3; task++ )); do
# Run the provider test.
local job_name="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${job}].job-name")"
local job_provider="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${job}].provider")"
local job_name="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${task}].job-name")"
local job_provider="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${task}].provider")"

# Validate provider.
if [[ "${job_provider}" != "${DSUB_PROVIDER}" ]]; then
echo " - FAILURE: provider ${job_provider} does not match '${DSUB_PROVIDER}'"
echo "${dstat_out}"
exit 1
fi
local job_zone=$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${job}].provider-attributes.zone")

# Provider fields are both metadata set on task submission and machine
# information set when the Pipelines API starts processing the task.
local events="$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${task}].events")"
if [[ "${events}" == "[]" ]]; then
echo " - NOTICE: task $((task+1)) not started; skipping provider instance fields tests."
exit 0
fi

local job_zone=$(python "${SCRIPT_DIR}"/get_data_value.py "yaml" "${dstat_out}" "[${task}].provider-attributes.zone")
if ! [[ "${job_zone}" =~ ^[a-z]{1,4}-[a-z]{2,15}[0-9]-[a-z]$ ]]; then
echo " - FAILURE: Job zone ${job_zone} for job ${job_name} not valid."
echo " - FAILURE: Zone ${job_zone} for job ${job_name}, task $((task+1)) not valid."
echo "${dstat_out}"
exit 1
fi
done
echo " - google provider fields verified"
echo " - ${DSUB_PROVIDER} provider fields verified"
}
readonly -f verify_dstat_google_provider_fields

Expand Down
2 changes: 1 addition & 1 deletion test/integration/e2e_dstat_tasks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ if [[ "${CHECK_RESULTS_ONLY:-0}" -eq 0 ]]; then
echo "Waiting 5 seconds and checking 'dstat --age 5s'..."
sleep 5s

DSTAT_OUTPUT="$(run_dstat_age "5s" --status '*' --jobs "${JOBID}" 2>&1)"
DSTAT_OUTPUT="$(run_dstat_age "5s" --status '*' --jobs "${JOBID}")"
if [[ -n "${DSTAT_OUTPUT}" ]]; then
echo "dstat output not empty as expected:"
echo "${DSTAT_OUTPUT}"
Expand Down
Loading

0 comments on commit d86041c

Please sign in to comment.