Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make synchronous query snippets testable #1836

Merged
merged 5 commits into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 17 additions & 101 deletions docs/bigquery-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Authentication / Configuration
>>> from gcloud import bigquery
>>> client = bigquery.Client()


This comment was marked as spam.

This comment was marked as spam.

Projects
--------

Expand All @@ -43,13 +44,15 @@ To override the project inferred from the environment, pass an explicit
>>> from gcloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')


Project ACLs
~~~~~~~~~~~~

Each project has an access control list granting reader / writer / owner
permission to one or more entities. This list cannot be queried or set
via the API: it must be managed using the Google Developer Console.


Datasets
--------

Expand All @@ -62,6 +65,7 @@ policies to tables as they are created:
- A default table expiration period. If set, tables created within the
dataset will have the value as their expiration period.


Dataset operations
~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -191,104 +195,34 @@ Jobs describe actions peformed on data in BigQuery tables:

List jobs for a project:

.. doctest::
.. literalinclude:: bigquery_snippets.py
:start-after: [START client_list_jobs]
:end-before: [END client_list_jobs]

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> jobs, token = client.list_jobs() # API request
>>> [(job.name, job.job_type, job.created, job.state) for job in jobs]
['load-table-job', 'load', (datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>), 'done')]

Querying data (synchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Run a query which can be expected to complete within bounded time:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT count(*) AS age_count FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
True
>>> len(query.schema)
1
>>> field = query.schema[0]
>>> field.name
u'count'
>>> field.field_type
u'INTEGER'
>>> field.mode
u'NULLABLE'
>>> query.rows
[(15,)]
>>> query.total_rows
1
.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query]
:end-before: [END client_run_sync_query]

If the rows returned by the query do not fit into the inital response,
then we need to fetch the remaining rows via ``fetch_data``:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT * FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
True
>>> query.total_rows
1234
>>> query.page_token
'8d6e452459238eb0fe87d8eb191dd526ee70a35e'
>>> do_something_with(query.schema, query.rows)
>>> token = query.page_token # for initial request
>>> while True:
... do_something_with(query.schema, rows)
... if token is None:
... break
... rows, _, token = query.fetch_data(page_token=token)

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_paged]
:end-before: [END client_run_sync_query_paged]

If the query takes longer than the timeout allowed, ``query.complete``
will be ``False``. In that case, we need to poll the associated job until
it is done, and then fetch the reuslts:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT * FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
False
>>> job = query.job
>>> retry_count = 100
>>> while retry_count > 0 and job.state == 'running':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> token = None # for initial request
>>> while True:
... rows, _, token = query.fetch_data(page_token=token)
... do_something_with(query.schema, rows)
... if token is None:
... break

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_timeout]
:end-before: [END client_run_sync_query_timeout]


Querying data (asynchronous)
Expand Down Expand Up @@ -350,25 +284,6 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Inserting data (synchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Load data synchronously from a local CSV file into a new table:

.. doctest::

>>> import csv
>>> from gcloud import bigquery
>>> from gcloud.bigquery import SchemaField
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> table.schema = [
... SchemaField('full_name', 'STRING', mode='required'),
... SchemaField('age', 'INTEGER', mode='required)]
>>> with open('/path/to/person_ages.csv', 'rb') as file_obj:
... reader = csv.reader(file_obj)
... rows = list(reader)
>>> table.insert_data(rows) # API request

Inserting data (asynchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -431,6 +346,7 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)


Exporting data (async)
~~~~~~~~~~~~~~~~~~~~~~

Expand Down
127 changes: 121 additions & 6 deletions docs/bigquery_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def delete(self):


@snippet
def client_list_datasets(client, to_delete): # pylint: disable=unused-argument
def client_list_datasets(client, _):

This comment was marked as spam.

"""List datasets for a project."""

def do_something_with(sub): # pylint: disable=unused-argument
def do_something_with(_):
pass

# [START client_list_datasets]
Expand Down Expand Up @@ -182,7 +182,7 @@ def dataset_update(client, to_delete):


@snippet
def dataset_delete(client, to_delete): # pylint: disable=unused-argument
def dataset_delete(client, _):
"""Delete a dataset."""
DATASET_NAME = 'dataset_delete_%d' % (_millis(),)
dataset = client.dataset(DATASET_NAME)
Expand Down Expand Up @@ -439,13 +439,12 @@ def table_upload_from_file(client, to_delete):


@snippet
def table_delete(client, to_delete): # pylint: disable=unused-argument
def table_delete(client, _):
"""Delete a table."""
DATASET_NAME = 'table_delete_dataset_%d' % (_millis(),)
TABLE_NAME = 'table_create_table_%d' % (_millis(),)
dataset = client.dataset(DATASET_NAME)
dataset.create()
to_delete.append(dataset)

table = dataset.table(TABLE_NAME, SCHEMA)
table.create()
Expand All @@ -457,6 +456,122 @@ def table_delete(client, to_delete): # pylint: disable=unused-argument
# [END table_delete]


@snippet
def client_list_jobs(client, _):
"""List jobs for a project."""

def do_something_with(_):
pass

# [START client_list_jobs]
jobs, token = client.list_jobs() # API request
while True:
for job in jobs:
do_something_with(job)
if token is None:
break
jobs, token = client.list_jobs(page_token=token) # API request
# [END client_list_jobs]


@snippet
def client_run_sync_query(client, _):
"""Run a synchronous query."""
LIMIT = 100
LIMITED = '%s LIMIT %d' % (QUERY, LIMIT)
TIMEOUT_MS = 1000

# [START client_run_sync_query]
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.run() # API request

assert query.complete
assert len(query.rows) == LIMIT
assert [field.name for field in query.schema] == ['name']
# [END client_run_sync_query]


@snippet
def client_run_sync_query_paged(client, _):
"""Run a synchronous query with paged results."""
TIMEOUT_MS = 1000
PAGE_SIZE = 100
LIMIT = 1000
LIMITED = '%s LIMIT %d' % (QUERY, LIMIT)

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)

# [START client_run_sync_query_paged]
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.max_results = PAGE_SIZE
query.run() # API request

assert query.complete
assert query.page_token is not None
assert len(query.rows) == PAGE_SIZE
assert [field.name for field in query.schema] == ['name']

rows = query.rows
token = query.page_token

while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
# [END client_run_sync_query_paged]

assert total_count == LIMIT
assert len(all_rows) == LIMIT


@snippet
def client_run_sync_query_timeout(client, _):
"""Run a synchronous query w/ timeout"""
TIMEOUT_MS = 10

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)

# [START client_run_sync_query_timeout]
query = client.run_sync_query(QUERY)
query.timeout_ms = TIMEOUT_MS
query.use_query_cache = False
query.run() # API request

assert not query.complete

job = query.job
job.reload() # API rquest
retry_count = 0

while retry_count < 10 and job.state != u'DONE':
time.sleep(1.5**retry_count) # exponential backoff
retry_count += 1
job.reload() # API request

assert job.state == u'DONE'

rows, total_count, token = query.fetch_data() # API request
while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
# [END client_run_sync_query_timeout]

assert len(all_rows) == total_count


def _find_examples():
funcs = [obj for obj in globals().values()
if getattr(obj, '_snippet', False)]
Expand All @@ -468,7 +583,7 @@ def main():
client = Client()
for example in _find_examples():
to_delete = []
print('%-25s: %s' % (
print('%-30s: %s' % (
example.func_name, example.func_doc))
try:
example(client, to_delete)
Expand Down
3 changes: 2 additions & 1 deletion gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
path = '/projects/%s/jobs' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
jobs = [self.job_from_resource(resource) for resource in resp['jobs']]
jobs = [self.job_from_resource(resource)
for resource in resp.get('jobs', ())]
return jobs, resp.get('nextPageToken')

def load_table_from_storage(self, job_name, destination, *source_uris):
Expand Down
2 changes: 2 additions & 0 deletions gcloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
self._set_properties(response)

total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
page_token = response.get('pageToken')
rows_data = _rows_from_json(response.get('rows', ()), self.schema)

Expand Down
4 changes: 2 additions & 2 deletions gcloud/bigquery/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,10 @@ def test_list_jobs_load_job_wo_sourceUris(self):
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_explicit_empty(self):
def test_list_jobs_explicit_missing(self):
PROJECT = 'PROJECT'
PATH = 'projects/%s/jobs' % PROJECT
DATA = {'jobs': []}
DATA = {}
TOKEN = 'TOKEN'
creds = _Credentials()
client = self._makeOne(PROJECT, creds)
Expand Down
Loading