Skip to content

Commit

Permalink
Making unit tests pass after regression3 test fixes.
Browse files Browse the repository at this point in the history
- Adding test for branch miss in credentials._get_signed_query_params
- Making storage.batch._unpack_batch_response work correctly in
  Python2 and Python3 (parser expects str in both)

NOTE: Yet again %s caused issues between Py2 and Py3 (as in the PR googleapis#126
      in oauth2client).
  • Loading branch information
dhermes committed Mar 27, 2015
1 parent 11904c3 commit fdb1d23
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 12 deletions.
20 changes: 13 additions & 7 deletions gcloud/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,22 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def _unpack_batch_response(response, content):
"""Convert response, content -> [(status, reason, payload)]."""
parser = Parser()
if isinstance(content, six.binary_type):
content = content.decode('utf-8')
faux_message = ''.join([
'Content-Type: ',
response['content-type'],
'\nMIME-Version: 1.0\n\n',
if not isinstance(content, six.binary_type):
content = content.encode('utf-8')
content_type = response['content-type']
if not isinstance(content_type, six.binary_type):
content_type = content_type.encode('utf-8')
faux_message = b''.join([
b'Content-Type: ',
content_type,
b'\nMIME-Version: 1.0\n\n',
content,
])

message = parser.parsestr(faux_message)
if six.PY2:
message = parser.parsestr(faux_message)
else: # pragma: NO COVER Python3
message = parser.parsestr(faux_message.decode('utf-8'))

if not isinstance(message._payload, list):
raise ValueError('Bad response: not multi-part')
Expand Down
27 changes: 26 additions & 1 deletion gcloud/storage/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,32 @@ def test_as_context_mgr_w_error(self):
self.assertEqual(len(batch._responses), 0)


_THREE_PART_MIME_RESPONSE = """\
class Test__unpack_batch_response(unittest2.TestCase):

def _callFUT(self, response, content):
from gcloud.storage.batch import _unpack_batch_response
return _unpack_batch_response(response, content)

def test_bytes(self):
RESPONSE = {'content-type': b'multipart/mixed; boundary="DEADBEEF="'}
CONTENT = _THREE_PART_MIME_RESPONSE
result = list(self._callFUT(RESPONSE, CONTENT))
self.assertEqual(len(result), 3)
self.assertEqual(result[0], ('200', 'OK', {u'bar': 2, u'foo': 1}))
self.assertEqual(result[1], ('200', 'OK', {u'foo': 1, u'bar': 3}))
self.assertEqual(result[2], ('204', 'No Content', ''))

def test_unicode(self):
RESPONSE = {'content-type': u'multipart/mixed; boundary="DEADBEEF="'}
CONTENT = _THREE_PART_MIME_RESPONSE.decode('utf-8')
result = list(self._callFUT(RESPONSE, CONTENT))
self.assertEqual(len(result), 3)
self.assertEqual(result[0], ('200', 'OK', {u'bar': 2, u'foo': 1}))
self.assertEqual(result[1], ('200', 'OK', {u'foo': 1, u'bar': 3}))
self.assertEqual(result[2], ('204', 'No Content', ''))


_THREE_PART_MIME_RESPONSE = b"""\
--DEADBEEF=
Content-Type: application/http
Content-ID: <response-8a09ca85-8d1d-4f45-9eb0-da8e8b07ec83+1>
Expand Down
26 changes: 22 additions & 4 deletions gcloud/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,13 @@ def _get_pem_key(credentials):
SIGNATURE_STRING = 'dummy_signature'
with _Monkey(MUT, RSA=rsa, PKCS1_v1_5=pkcs_v1_5,
SHA256=sha256, _get_pem_key=_get_pem_key):
self.assertRaises(NameError, self._callFUT,
self.assertRaises(UnboundLocalError, self._callFUT,
BAD_CREDENTIALS, EXPIRATION, SIGNATURE_STRING)

def _run_test_with_credentials(self, credentials, account_name):
def _run_test_with_credentials(self, credentials, account_name,
signature_string=None):
import base64
import six
from gcloud._testing import _Monkey
from gcloud import credentials as MUT

Expand All @@ -190,7 +192,7 @@ def _run_test_with_credentials(self, credentials, account_name):
sha256 = _SHA256()

EXPIRATION = '100'
SIGNATURE_STRING = b'dummy_signature'
SIGNATURE_STRING = signature_string or b'dummy_signature'
with _Monkey(MUT, crypt=crypt, RSA=rsa, PKCS1_v1_5=pkcs_v1_5,
SHA256=sha256):
result = self._callFUT(credentials, EXPIRATION, SIGNATURE_STRING)
Expand All @@ -199,7 +201,12 @@ def _run_test_with_credentials(self, credentials, account_name):
self.assertEqual(crypt._private_key_text,
base64.b64encode(b'dummy_private_key_text'))
self.assertEqual(crypt._private_key_password, 'notasecret')
self.assertEqual(sha256._signature_string, SIGNATURE_STRING)
# sha256._signature_string is always bytes.
if isinstance(SIGNATURE_STRING, six.binary_type):
self.assertEqual(sha256._signature_string, SIGNATURE_STRING)
else:
self.assertEqual(sha256._signature_string,
SIGNATURE_STRING.encode('utf-8'))
SIGNED = base64.b64encode(b'DEADBEEF')
expected_query = {
'Expires': EXPIRATION,
Expand All @@ -217,6 +224,17 @@ def test_signed_jwt_for_p12(self):
ACCOUNT_NAME, b'dummy_private_key_text', scopes)
self._run_test_with_credentials(credentials, ACCOUNT_NAME)

def test_signature_non_bytes(self):
from oauth2client import client

scopes = []
ACCOUNT_NAME = 'dummy_service_account_name'
SIGNATURE_STRING = u'dummy_signature'
credentials = client.SignedJwtAssertionCredentials(
ACCOUNT_NAME, b'dummy_private_key_text', scopes)
self._run_test_with_credentials(credentials, ACCOUNT_NAME,
signature_string=SIGNATURE_STRING)

def test_service_account_via_json_key(self):
from oauth2client import service_account
from gcloud._testing import _Monkey
Expand Down

0 comments on commit fdb1d23

Please sign in to comment.