Skip to content

Commit

Permalink
Merge pull request #170 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.3.4 release
  • Loading branch information
wnojopra authored Oct 1, 2019
2 parents d4925d9 + cb321e7 commit 76bfb7b
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 68 deletions.
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.3.3'
DSUB_VERSION = '0.3.4'
8 changes: 4 additions & 4 deletions dsub/lib/dsub_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
from apiclient import discovery
from apiclient import errors
from apiclient.http import MediaIoBaseDownload
import oauth2client.client
import retrying
import six

import google.auth


# this is the Job ID for jobs that are skipped.
NO_JOB = 'NO_JOB'
Expand Down Expand Up @@ -131,8 +132,7 @@ def compact_interval_string(value_list):
def _get_storage_service(credentials):
"""Get a storage client using the provided credentials or defaults."""
if credentials is None:
credentials = oauth2client.client.GoogleCredentials.get_application_default(
)
credentials, _ = google.auth.default()
return discovery.build('storage', 'v1', credentials=credentials)


Expand All @@ -141,7 +141,7 @@ def _retry_storage_check(exception):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print_error(
'%s: Exception %s: %s' % (now, type(exception).__name__, str(exception)))
return isinstance(exception, oauth2client.client.AccessTokenRefreshError)
return isinstance(exception, google.auth.exceptions.RefreshError)


# Exponential backoff retrying downloads of GCS object chunks.
Expand Down
11 changes: 9 additions & 2 deletions dsub/lib/param_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ def tasks_file_to_task_descriptors(tasks, retries, input_file_param_util,
task_min = tasks.get('min')
task_max = tasks.get('max')

# First check for any empty lines
param_file = dsub_util.load_file(path)
if any([not line for line in param_file]):
raise ValueError('Blank line(s) found in {}'.format(path))
param_file.close()

# Load the file and set up a Reader that tokenizes the fields
param_file = dsub_util.load_file(path)
reader = csv.reader(param_file, delimiter='\t')
Expand All @@ -537,8 +543,9 @@ def tasks_file_to_task_descriptors(tasks, retries, input_file_param_util,
continue

if len(row) != len(job_params):
dsub_util.print_error('Unexpected number of fields %s vs %s: line %s' %
(len(row), len(job_params), reader.line_num))
raise ValueError(
'Unexpected number of fields {} vs {}: in {} line {}'.format(
len(row), len(job_params), path, reader.line_num))

# Each row can contain "envs", "inputs", "outputs"
envs = set()
Expand Down
9 changes: 5 additions & 4 deletions dsub/providers/google_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import apiclient.errors
from httplib2 import ServerNotFoundError
from ..lib import job_model
import oauth2client.client
import pytz
import retrying
from six.moves import range

import google.auth


# The google v1 provider directly added the bigquery scope, but the v1alpha2
# API automatically added:
# - https://www.googleapis.com/auth/compute
Expand Down Expand Up @@ -505,7 +507,7 @@ def retry_api_check(exception, verbose):
_print_retry_error(exception, verbose)
return True

if isinstance(exception, oauth2client.client.AccessTokenRefreshError):
if isinstance(exception, google.auth.exceptions.RefreshError):
_print_retry_error(exception, verbose)
return True

Expand Down Expand Up @@ -587,8 +589,7 @@ def setup_service(api_name, api_version, credentials=None):
A configured Google Genomics API client with appropriate credentials.
"""
if not credentials:
credentials = oauth2client.client.GoogleCredentials.get_application_default(
)
credentials, _ = google.auth.default()
return apiclient.discovery.build(
api_name, api_version, credentials=credentials)

Expand Down
18 changes: 6 additions & 12 deletions dsub/providers/google_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,9 @@
_SUPPORTED_INPUT_PROVIDERS = _SUPPORTED_FILE_PROVIDERS
_SUPPORTED_OUTPUT_PROVIDERS = _SUPPORTED_FILE_PROVIDERS

# Action steps that interact with GCS need gsutil.
# Use the 'slim' variant of the cloud-sdk Docker image as it is much smaller.
_CLOUD_SDK_IMAGE = 'google/cloud-sdk:slim'

# The prepare step needs Python.
# Use the 'slim' variant of the python Docker image as it is much smaller.
_PYTHON_IMAGE = 'python:2.7-slim'
# Action steps that interact with GCS need gsutil and Python.
# Use the 'slim' variant of the cloud-sdk image as it is much smaller.
_CLOUD_SDK_IMAGE = 'gcr.io/google.com/cloudsdktool/cloud-sdk:264.0.0-slim'

# This image is for an optional ssh container.
_SSH_IMAGE = 'gcr.io/cloud-genomics-pipelines/tools'
Expand Down Expand Up @@ -468,8 +464,7 @@ def execute(self):
class GoogleV2JobProvider(base.JobProvider):
"""dsub provider implementation managing Jobs on Google Cloud."""

def __init__(self, verbose, dry_run, project, credentials=None):
self._verbose = verbose
def __init__(self, dry_run, project, credentials=None):
self._dry_run = dry_run

self._project = project
Expand Down Expand Up @@ -766,7 +761,7 @@ def _build_pipeline_request(self, task_view):
actions.append(
google_v2_pipelines.build_action(
name='prepare',
image_uri=_PYTHON_IMAGE,
image_uri=_CLOUD_SDK_IMAGE,
mounts=[mnt_datadisk],
environment=prepare_env,
entrypoint='/bin/bash',
Expand Down Expand Up @@ -887,8 +882,7 @@ def _submit_pipeline(self, request):
google_base_api = google_base.Api(verbose=True)
operation = google_base_api.execute(
self._service.pipelines().run(body=request))
if self._verbose:
print('Launched operation {}'.format(operation['name']))
print('Provider internal-id (operation): {}'.format(operation['name']))

return GoogleOperation(operation).get_field('task-id')

Expand Down
3 changes: 1 addition & 2 deletions dsub/providers/provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def get_provider(args, resources):
getattr(args, 'dry_run', False), args.project)
elif provider == 'google-v2':
return google_v2.GoogleV2JobProvider(
getattr(args, 'verbose', False), getattr(args, 'dry_run', False),
args.project)
getattr(args, 'dry_run', False), args.project)
elif provider == 'local':
return local.LocalJobProvider(resources)
elif provider == 'test-fails':
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_dsub_version():
install_requires=[
# dependencies for dsub, ddel, dstat
'google-api-python-client',
'oauth2client',
'google-auth',
'python-dateutil',
'pytz',
'pyyaml',
Expand All @@ -107,6 +107,7 @@ def get_dsub_version():

# dependencies for test code
'parameterized',
'mock',
],

# Define a test suite for Python unittests only.
Expand Down
2 changes: 1 addition & 1 deletion test/integration/e2e_python_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get_dsub_provider():
elif test.DSUB_PROVIDER == 'google':
return google.GoogleJobProvider(False, False, test.PROJECT_ID)
elif test.DSUB_PROVIDER == 'google-v2':
return google_v2.GoogleV2JobProvider(False, False, test.PROJECT_ID)
return google_v2.GoogleV2JobProvider(False, test.PROJECT_ID)
else:
print('Invalid provider specified.', file=sys.stderr)
sys.exit(1)
Expand Down
94 changes: 53 additions & 41 deletions test/unit/retrying_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
# limitations under the License.
"""Unit tests for exponential backoff retrying."""

import time
import unittest

import apiclient.errors
from dsub.providers import google_base
import fake_time
from mock import patch
import parameterized


def current_time_ms():
return int(round(time.time() * 1000))
def chronology():
# Simulates the passing of time for fake_time.
while True:
yield 1 # Simulates 1 second passing.


def elapsed_time_in_seconds(fake_time_object):
return int(round(fake_time_object.now()))


class GoogleApiMock(object):
Expand Down Expand Up @@ -51,14 +59,15 @@ class TestRetrying(unittest.TestCase):

@parameterized.parameterized.expand([(True,), (False,)])
def test_success(self, verbose):
exception_list = []
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
start = current_time_ms()
api_wrapper_to_test.execute(mock_api_object)
# No expected retries, and no expected wait time
self.assertEqual(mock_api_object.retry_counter, 0)
self.assertLess(current_time_ms() - start, 500)
ft = fake_time.FakeTime(chronology())
with patch('time.sleep', new=ft.sleep):
exception_list = []
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
api_wrapper_to_test.execute(mock_api_object)
# No expected retries, and no expected wait time
self.assertEqual(mock_api_object.retry_counter, 0)
self.assertLess(elapsed_time_in_seconds(ft), 1)

@parameterized.parameterized.expand(
[(True, error_code)
Expand All @@ -68,18 +77,19 @@ def test_success(self, verbose):
for error_code in list(google_base.TRANSIENT_HTTP_ERROR_CODES) +
list(google_base.HTTP_AUTH_ERROR_CODES)])
def test_retry_once(self, verbose, error_code):
exception_list = [
apiclient.errors.HttpError(
ResponseMock(error_code, None), b'test_exception'),
]
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
start = current_time_ms()
api_wrapper_to_test.execute(mock_api_object)
# Expected to retry once, for about 1 second
self.assertEqual(mock_api_object.retry_counter, 1)
self.assertGreaterEqual(current_time_ms() - start, 1000)
self.assertLess(current_time_ms() - start, 1500)
ft = fake_time.FakeTime(chronology())
with patch('time.sleep', new=ft.sleep):
exception_list = [
apiclient.errors.HttpError(
ResponseMock(error_code, None), b'test_exception'),
]
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
api_wrapper_to_test.execute(mock_api_object)
# Expected to retry once, for about 1 second
self.assertEqual(mock_api_object.retry_counter, 1)
self.assertGreaterEqual(elapsed_time_in_seconds(ft), 1)
self.assertLess(elapsed_time_in_seconds(ft), 1.5)

@parameterized.parameterized.expand([(True,), (False,)])
def test_auth_failure(self, verbose):
Expand All @@ -90,31 +100,33 @@ def test_auth_failure(self, verbose):
apiclient.errors.HttpError(ResponseMock(401, None), b'test_exception'),
apiclient.errors.HttpError(ResponseMock(403, None), b'test_exception'),
]
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
# We don't want to retry auth errors as aggressively, so we expect
# this exception to be raised after 4 retries,
# for a total of 1 + 2 + 4 + 8 = 15 seconds
start = current_time_ms()
with self.assertRaises(apiclient.errors.HttpError):
api_wrapper_to_test.execute(mock_api_object)
self.assertGreaterEqual(current_time_ms() - start, 15000)
self.assertLess(current_time_ms() - start, 15500)
ft = fake_time.FakeTime(chronology())
with patch('time.sleep', new=ft.sleep):
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
# We don't want to retry auth errors as aggressively, so we expect
# this exception to be raised after 4 retries,
# for a total of 1 + 2 + 4 + 8 = 15 seconds
with self.assertRaises(apiclient.errors.HttpError):
api_wrapper_to_test.execute(mock_api_object)
self.assertGreaterEqual(elapsed_time_in_seconds(ft), 15)
self.assertLess(elapsed_time_in_seconds(ft), 15.5)

@parameterized.parameterized.expand([(True,), (False,)])
def test_transient_retries(self, verbose):
exception_list = [
apiclient.errors.HttpError(ResponseMock(500, None), b'test_exception'),
apiclient.errors.HttpError(ResponseMock(503, None), b'test_exception'),
]
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
start = current_time_ms()
api_wrapper_to_test.execute(mock_api_object)
# Expected to retry twice, for a total of 1 + 2 = 3 seconds
self.assertEqual(mock_api_object.retry_counter, 2)
self.assertGreaterEqual(current_time_ms() - start, 3000)
self.assertLess(current_time_ms() - start, 3500)
ft = fake_time.FakeTime(chronology())
with patch('time.sleep', new=ft.sleep):
api_wrapper_to_test = google_base.Api(verbose)
mock_api_object = GoogleApiMock(exception_list)
api_wrapper_to_test.execute(mock_api_object)
# Expected to retry twice, for a total of 1 + 2 = 3 seconds
self.assertEqual(mock_api_object.retry_counter, 2)
self.assertGreaterEqual(elapsed_time_in_seconds(ft), 3)
self.assertLess(elapsed_time_in_seconds(ft), 3.5)


if __name__ == '__main__':
Expand Down

0 comments on commit 76bfb7b

Please sign in to comment.