Skip to content

Commit

Permalink
Merge pull request #2561 from dhermes/bigquery-iterators-alt
Browse files Browse the repository at this point in the history
Updating Client.list_* methods in BigQuery to use Iterators.
  • Loading branch information
dhermes authored Oct 18, 2016
2 parents e575f81 + f8ab149 commit b402216
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 75 deletions.
130 changes: 72 additions & 58 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigquery.job import LoadTableFromStorageJob
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.query import QueryResults
from google.cloud.iterator import Iterator


class Project(object):
Expand Down Expand Up @@ -87,26 +88,13 @@ def list_projects(self, max_results=None, page_token=None):
not passed, the API will return the first page of
projects.
:rtype: tuple, (list, str)
:returns: list of :class:`~google.cloud.bigquery.client.Project`,
plus a "next page token" string: if the token is not None,
indicates that more projects can be retrieved with another
call (pass that value as ``page_token``).
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.bigquery.client.Project`
accessible to the current client.
"""
params = {}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

path = '/projects'
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
projects = [Project.from_api_repr(resource)
for resource in resp.get('projects', ())]
return projects, resp.get('nextPageToken')
return Iterator(client=self, path='/projects',
items_key='projects', item_to_value=_item_to_project,
page_token=page_token, max_results=max_results)

def list_datasets(self, include_all=False, max_results=None,
page_token=None):
Expand All @@ -127,29 +115,18 @@ def list_datasets(self, include_all=False, max_results=None,
not passed, the API will return the first page of
datasets.
:rtype: tuple, (list, str)
:returns: list of :class:`~google.cloud.bigquery.dataset.Dataset`,
plus a "next page token" string: if the token is not None,
indicates that more datasets can be retrieved with another
call (pass that value as ``page_token``).
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.bigquery.dataset.Dataset`.
accessible to the current client.
"""
params = {}

extra_params = {}
if include_all:
params['all'] = True

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token

extra_params['all'] = True
path = '/projects/%s/datasets' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
datasets = [Dataset.from_api_repr(resource, self)
for resource in resp.get('datasets', ())]
return datasets, resp.get('nextPageToken')
return Iterator(
client=self, path=path, items_key='datasets',
item_to_value=_item_to_dataset, page_token=page_token,
max_results=max_results, extra_params=extra_params)

def dataset(self, dataset_name):
"""Construct a dataset bound to this client.
Expand Down Expand Up @@ -215,32 +192,22 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
* ``"pending"``
* ``"running"``
:rtype: tuple, (list, str)
:returns: list of job instances, plus a "next page token" string:
if the token is not ``None``, indicates that more jobs can be
retrieved with another call, passing that value as
``page_token``).
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterable of job instances.
"""
params = {'projection': 'full'}

if max_results is not None:
params['maxResults'] = max_results

if page_token is not None:
params['pageToken'] = page_token
extra_params = {'projection': 'full'}

if all_users is not None:
params['allUsers'] = all_users
extra_params['allUsers'] = all_users

if state_filter is not None:
params['stateFilter'] = state_filter
extra_params['stateFilter'] = state_filter

path = '/projects/%s/jobs' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
jobs = [self.job_from_resource(resource)
for resource in resp.get('jobs', ())]
return jobs, resp.get('nextPageToken')
return Iterator(
client=self, path=path, items_key='jobs',
item_to_value=_item_to_job, page_token=page_token,
max_results=max_results, extra_params=extra_params)

def load_table_from_storage(self, job_name, destination, *source_uris):
"""Construct a job for loading data into a table from CloudStorage.
Expand Down Expand Up @@ -334,3 +301,50 @@ def run_sync_query(self, query):
:returns: a new ``QueryResults`` instance
"""
return QueryResults(query, client=self)


# pylint: disable=unused-argument
def _item_to_project(iterator, resource):
"""Convert a JSON project to the native object.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: An item to be converted to a project.
:rtype: :class:`.Project`
:returns: The next project in the page.
"""
return Project.from_api_repr(resource)
# pylint: enable=unused-argument


def _item_to_dataset(iterator, resource):
"""Convert a JSON dataset to the native object.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: An item to be converted to a dataset.
:rtype: :class:`.Dataset`
:returns: The next dataset in the page.
"""
return Dataset.from_api_repr(resource, iterator.client)


def _item_to_job(iterator, resource):
"""Convert a JSON job to the native object.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: An item to be converted to a job.
:rtype: job instance.
:returns: The next job in the page.
"""
return iterator.client.job_from_resource(resource)
37 changes: 29 additions & 8 deletions bigquery/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def test_list_projects_defaults(self):
client = self._makeOne(PROJECT_1, creds)
conn = client.connection = _Connection(DATA)

projects, token = client.list_projects()
iterator = client.list_projects()
iterator.update_page()
projects = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(projects), len(DATA['projects']))
for found, expected in zip(projects, DATA['projects']):
Expand All @@ -83,7 +86,10 @@ def test_list_projects_explicit_response_missing_projects_key(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

projects, token = client.list_projects(max_results=3, page_token=TOKEN)
iterator = client.list_projects(max_results=3, page_token=TOKEN)
iterator.update_page()
projects = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(projects), 0)
self.assertIsNone(token)
Expand Down Expand Up @@ -121,7 +127,10 @@ def test_list_datasets_defaults(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

datasets, token = client.list_datasets()
iterator = client.list_datasets()
iterator.update_page()
datasets = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(datasets), len(DATA['datasets']))
for found, expected in zip(datasets, DATA['datasets']):
Expand All @@ -144,8 +153,11 @@ def test_list_datasets_explicit_response_missing_datasets_key(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

datasets, token = client.list_datasets(
iterator = client.list_datasets(
include_all=True, max_results=3, page_token=TOKEN)
iterator.update_page()
datasets = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(datasets), 0)
self.assertIsNone(token)
Expand Down Expand Up @@ -288,7 +300,10 @@ def test_list_jobs_defaults(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

jobs, token = client.list_jobs()
iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
for found, expected in zip(jobs, DATA['jobs']):
Expand Down Expand Up @@ -340,7 +355,10 @@ def test_list_jobs_load_job_wo_sourceUris(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

jobs, token = client.list_jobs()
iterator = client.list_jobs()
iterator.update_page()
jobs = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(jobs), len(DATA['jobs']))
for found, expected in zip(jobs, DATA['jobs']):
Expand All @@ -364,8 +382,11 @@ def test_list_jobs_explicit_missing(self):
client = self._makeOne(PROJECT, creds)
conn = client.connection = _Connection(DATA)

jobs, token = client.list_jobs(max_results=1000, page_token=TOKEN,
all_users=True, state_filter='done')
iterator = client.list_jobs(max_results=1000, page_token=TOKEN,
all_users=True, state_filter='done')
iterator.update_page()
jobs = list(iterator.page)
token = iterator.next_page_token

self.assertEqual(len(jobs), 0)
self.assertIsNone(token)
Expand Down
9 changes: 2 additions & 7 deletions docs/bigquery_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ def do_something_with(_):
pass

# [START client_list_datasets]
datasets, token = client.list_datasets() # API request
while True:
for dataset in datasets:
do_something_with(dataset)
if token is None:
break
datasets, token = client.list_datasets(page_token=token) # API request
for dataset in client.list_datasets(): # API request(s)
do_something_with(dataset)
# [END client_list_datasets]


Expand Down
5 changes: 3 additions & 2 deletions system_tests/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ def test_list_datasets(self):
self.to_delete.append(dataset)

# Retrieve the datasets.
all_datasets, token = Config.CLIENT.list_datasets()
self.assertIsNone(token)
iterator = Config.CLIENT.list_datasets()
all_datasets = list(iterator)
self.assertIsNone(iterator.next_page_token)
created = [dataset for dataset in all_datasets
if dataset.name in datasets_to_create and
dataset.project == Config.CLIENT.project]
Expand Down

0 comments on commit b402216

Please sign in to comment.