Skip to content

Commit

Permalink
Merge pull request #228 from DataBiosphere/dev2
Browse files Browse the repository at this point in the history
PR for 0.4.5 release
  • Loading branch information
wnojopra authored Aug 26, 2021
2 parents 8278b37 + ad1c654 commit a01408d
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: python
python:
- "3.7"
- "3.8"
# command to install dependencies
install: "python setup.py install"
# command to run tests
Expand Down
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.4.4'
DSUB_VERSION = '0.4.5'
15 changes: 8 additions & 7 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
import sys
import time
import uuid
from dateutil.tz import tzlocal

import dateutil
from ..lib import dsub_errors
from ..lib import dsub_util
from ..lib import job_model
from ..lib import output_formatter
from ..lib import param_util
from ..lib import resources
from ..lib.dsub_util import print_error
from ..providers import google_base
from ..providers import provider_base

Expand Down Expand Up @@ -660,7 +659,8 @@ def _get_job_metadata(provider, user_id, job_name, script, task_ids,
Returns:
A dictionary of job-specific metadata (such as job id, name, etc.)
"""
create_time = dsub_util.replace_timezone(datetime.datetime.now(), tzlocal())
create_time = dsub_util.replace_timezone(datetime.datetime.now(),
dateutil.tz.tzlocal())
user_id = user_id or dsub_util.get_os_user()
job_metadata = provider.prepare_job_metadata(script.name, job_name, user_id)
if unique_job_id:
Expand Down Expand Up @@ -811,7 +811,7 @@ def _wait_after(provider, job_ids, poll_interval, stop_on_failure, summary):
jobs_not_found = jobs_completed.difference(jobs_found)
for j in jobs_not_found:
error = '%s: not found' % j
print_error(' %s' % error)
dsub_util.print_error(' %s' % error)
error_messages += [error]

# Print the dominant task for the completed jobs
Expand Down Expand Up @@ -997,7 +997,8 @@ def _importance_of_task(task):
return (importance[task.get_field('task-status')],
task.get_field(
'end-time',
dsub_util.replace_timezone(datetime.datetime.max, tzlocal())))
dsub_util.replace_timezone(datetime.datetime.max,
dateutil.tz.tzlocal())))


def _wait_for_any_job(provider, job_ids, poll_interval, summary):
Expand Down Expand Up @@ -1289,7 +1290,7 @@ def run(provider,
summary)
if error_messages:
for msg in error_messages:
print_error(msg)
dsub_util.print_error(msg)
raise dsub_errors.PredecessorJobFailureError(
'One or more predecessor jobs completed but did not succeed.',
error_messages, None)
Expand Down Expand Up @@ -1331,7 +1332,7 @@ def run(provider,
poll_interval, False, summary)
if error_messages:
for msg in error_messages:
print_error(msg)
dsub_util.print_error(msg)
raise dsub_errors.JobExecutionError(
'One or more jobs finished with status FAILURE or CANCELED'
' during wait.', error_messages, launched_job)
Expand Down
28 changes: 16 additions & 12 deletions dsub/lib/dsub_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import pwd
import sys
import warnings
from . import retry_util

from . import retry_util
import google.auth
import googleapiclient.discovery
import googleapiclient.errors
import googleapiclient.http
import tenacity

import google.auth


# this is the Job ID for jobs that are skipped.
NO_JOB = 'NO_JOB'
Expand All @@ -56,6 +55,9 @@ def __init__(self, fileobj):
def write(self, buf):
self._fileobj.write(buf)

def flush(self):
self._fileobj.flush()


@contextlib.contextmanager
def replace_print(fileobj=sys.stderr):
Expand Down Expand Up @@ -140,23 +142,25 @@ def get_storage_service(credentials):
'ignore', 'Your application has authenticated using end user credentials')
if credentials is None:
credentials, _ = google.auth.default()
# Set cache_discovery to False because we use google-auth
# See https://github.com/googleapis/google-api-python-client/issues/299
return googleapiclient.discovery.build(
'storage', 'v1', credentials=credentials)
'storage', 'v1', credentials=credentials, cache_discovery=False)


# Exponential backoff retrying downloads of GCS object chunks.
# Maximum 23 retries. Wait 1, 2, 4 ... 64, 64, 64... seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def _downloader_next_chunk(downloader):
"""Downloads the next chunk."""
Expand Down Expand Up @@ -214,14 +218,14 @@ def load_file(file_path, credentials=None):
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def _file_exists_in_gcs(gcs_file_path, credentials=None, storage_service=None):
"""Check whether the file exists, in GCS.
Expand Down Expand Up @@ -252,14 +256,14 @@ def _file_exists_in_gcs(gcs_file_path, credentials=None, storage_service=None):
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def _prefix_exists_in_gcs(gcs_prefix, credentials=None, storage_service=None):
"""Check whether there is a GCS object whose name starts with the prefix.
Expand Down Expand Up @@ -302,14 +306,14 @@ def folder_exists(folder_path, credentials=None, storage_service=None):
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def simple_pattern_exists_in_gcs(file_pattern,
credentials=None,
Expand Down
2 changes: 1 addition & 1 deletion dsub/lib/job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def get_complete_descriptor(cls, task_metadata, task_params, task_resources):
return task_descriptor

def __str__(self):
return 'task-id: {}'.format(self.job_metadata.get('task-id'))
return 'task-id: {}'.format(self.task_metadata.get('task-id'))

def __repr__(self):
return ('task_metadata: {}, task_params: {}').format(
Expand Down
2 changes: 1 addition & 1 deletion dsub/lib/output_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def prepare_summary_table(rows):
# Use the original table as the driver in order to preserve the order.
new_rows = []
for job_key in sorted(grouped.keys()):
group = grouped.get(job_key, None)
group = grouped[job_key]
canonical_status = ['RUNNING', 'SUCCESS', 'FAILURE', 'CANCEL']
# Written this way to ensure that if somehow a new status is introduced,
# it shows up in our output.
Expand Down
4 changes: 2 additions & 2 deletions dsub/lib/retry_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sys

import googleapiclient.errors
from httplib2 import ServerNotFoundError
import httplib2
import tenacity

import google.auth
Expand Down Expand Up @@ -127,7 +127,7 @@ def retry_api_check(retry_state: tenacity.RetryCallState) -> bool:

# This has been observed as a transient error:
# ServerNotFoundError: Unable to find the server at genomics.googleapis.com
if isinstance(exception, ServerNotFoundError):
if isinstance(exception, httplib2.ServerNotFoundError):
_print_retry_error(attempt_number, MAX_API_ATTEMPTS, exception)
return True

Expand Down
2 changes: 1 addition & 1 deletion dsub/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_tasks_completion_messages(self, tasks):
raise NotImplementedError()


class Task(object):
class Task(object, metaclass=abc.ABCMeta):
"""Basic container for task metadata."""

@abc.abstractmethod
Expand Down
14 changes: 8 additions & 6 deletions dsub/providers/google_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def parse_rfc3339_utc_string(rfc3339_utc_string):
# When nanoseconds are provided, we round
micros = int(round(int(fraction) // 1000))
else:
assert False, 'Fraction length not 0, 6, or 9: {}'.len(fraction)
assert False, 'Fraction length not 0, 6, or 9: {}'.format(len(fraction))

try:
return datetime.datetime(
Expand Down Expand Up @@ -424,14 +424,14 @@ def cancel(batch_fn, cancel_fn, ops):
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def setup_service(api_name, api_version, credentials=None):
"""Configures genomics API client.
Expand All @@ -449,8 +449,10 @@ def setup_service(api_name, api_version, credentials=None):
'ignore', 'Your application has authenticated using end user credentials')
if not credentials:
credentials, _ = google.auth.default()
# Set cache_discovery to False because we use google-auth
# See https://github.com/googleapis/google-api-python-client/issues/299
return googleapiclient.discovery.build(
api_name, api_version, credentials=credentials)
api_name, api_version, cache_discovery=False, credentials=credentials)


def credentials_from_service_account_info(credentials_file):
Expand All @@ -467,14 +469,14 @@ class Api(object):
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_API_ATTEMPTS),
retry=retry_util.retry_api_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=64),
wait=tenacity.wait_exponential(multiplier=1, max=64),
retry_error_callback=retry_util.on_give_up)
# For API errors dealing with auth, we want to retry, but not as often
# Maximum 4 retries. Wait 1, 2, 4, 8 seconds.
@tenacity.retry(
stop=tenacity.stop_after_attempt(retry_util.MAX_AUTH_ATTEMPTS),
retry=retry_util.retry_auth_check,
wait=tenacity.wait_exponential(multiplier=0.5, max=8),
wait=tenacity.wait_exponential(multiplier=1, max=8),
retry_error_callback=retry_util.on_give_up)
def execute(self, api):
"""Executes operation.
Expand Down
2 changes: 1 addition & 1 deletion dsub/providers/google_v2_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def get_filtered_normalized_events(self):
continue

if name == 'pulling-image':
if match.group(1) != user_image:
if match and match.group(1) != user_image:
continue

events[name] = mapped
Expand Down
2 changes: 1 addition & 1 deletion dsub/providers/provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def parse_args(parser, provider_required_args, argv):

# For the selected provider, check the required arguments
for arg in provider_required_args[args.provider]:
if not args.__getattribute__(arg):
if not vars(args)[arg]:
parser.error('argument --%s is required' % arg)

return args
Expand Down
22 changes: 9 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import os
import sys
import unittest
# Always prefer setuptools over distutils
from setuptools import find_packages
Expand All @@ -15,22 +14,22 @@
# dependencies for dsub, ddel, dstat
# Pin to known working versions to prevent episodic breakage from library
# version mismatches.
# This version list generated: 02/01/2021
# This version list generated: 08/18/2021

# direct dependencies
'google-api-python-client<=1.12.8',
'google-auth<=1.24.0',
'python-dateutil<=2.8.1',
'google-api-python-client<=2.17.0',
'google-auth<=1.35.0',
'python-dateutil<=2.8.2',
'pytz<=2021.1',
'pyyaml<=5.4.1',
'tenacity<=5.0.4',
'tabulate<=0.8.7',
'tenacity<=7.0.0',
'tabulate<=0.8.9',

# downstream dependencies
'funcsigs<=1.0.2',
'google-api-core<=1.25.1',
'google-auth-httplib2<=0.0.4',
'httplib2<=0.19.0',
'google-api-core<=1.31.2',
'google-auth-httplib2<=0.1.0',
'httplib2<=0.19.1',
'pyasn1<=0.4.8',
'pyasn1-modules<=0.2.8',
'rsa<=4.7',
Expand All @@ -41,9 +40,6 @@
'mock<=4.0.3',
]

if sys.version_info[0] == 2:
_DEPENDENCIES.append('cachetools==3.1.1')


def unittest_suite():
"""Get test suite (Python unit tests only)."""
Expand Down
2 changes: 1 addition & 1 deletion test/integration/script_python.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/python3
"""Minimal Python script."""

from __future__ import print_function
Expand Down
8 changes: 7 additions & 1 deletion test/unit/batch_handling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ def callback_mock(request_id, response, exception):
raise exception


class ResponseMock(object):

def __init__(self):
self.reason = None


class CancelMock(object):

def __init__(self):
pass

def execute(self):
raise apiclient.errors.HttpError(None, b'test_exception')
raise apiclient.errors.HttpError(ResponseMock(), b'test_exception')


class TestBatchHandling(unittest.TestCase):
Expand Down
Loading

0 comments on commit a01408d

Please sign in to comment.