From 7d829ba94fbe56a5775900a972705565ed99afc4 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 31 May 2016 16:19:22 -0400 Subject: [PATCH 1/5] Ensure total row count is integer or None for 'Query.fetch_data'. --- gcloud/bigquery/query.py | 2 ++ gcloud/bigquery/test_query.py | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/gcloud/bigquery/query.py b/gcloud/bigquery/query.py index b54d7fccf8ac..4dd378af9c95 100644 --- a/gcloud/bigquery/query.py +++ b/gcloud/bigquery/query.py @@ -343,6 +343,8 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, self._set_properties(response) total_rows = response.get('totalRows') + if total_rows is not None: + total_rows = int(total_rows) page_token = response.get('pageToken') rows_data = _rows_from_json(response.get('rows', ()), self.schema) diff --git a/gcloud/bigquery/test_query.py b/gcloud/bigquery/test_query.py index 4fcb3274c12c..7138e63016d5 100644 --- a/gcloud/bigquery/test_query.py +++ b/gcloud/bigquery/test_query.py @@ -47,7 +47,7 @@ def _makeResource(self, complete=False): } if complete: - resource['totalRows'] = 1000 + resource['totalRows'] = '1000' resource['rows'] = [ {'f': [ {'v': 'Phred Phlyntstone'}, @@ -240,6 +240,7 @@ def test_fetch_data_w_bound_client(self): PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) BEFORE = self._makeResource(complete=False) AFTER = self._makeResource(complete=True) + del AFTER['totalRows'] conn = _Connection(AFTER) client = _Client(project=self.PROJECT, connection=conn) @@ -255,7 +256,7 @@ def test_fetch_data_w_bound_client(self): self.assertEqual(rows[1], ('Bharney Rhubble', 33)) self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) - self.assertEqual(total_rows, AFTER['totalRows']) + self.assertEqual(total_rows, None) self.assertEqual(page_token, AFTER['pageToken']) self.assertEqual(len(conn._requested), 1) @@ -290,7 +291,7 @@ def test_fetch_data_w_alternate_client(self): self.assertEqual(rows[1], ('Bharney Rhubble', 33)) self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) - self.assertEqual(total_rows, AFTER['totalRows']) + self.assertEqual(total_rows, int(AFTER['totalRows'])) self.assertEqual(page_token, AFTER['pageToken']) self.assertEqual(len(conn1._requested), 0) From f9a834b7b833eb8ff658bd0a30d57c35dbbc98d6 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 31 May 2016 16:37:56 -0400 Subject: [PATCH 2/5] Harden 'Client.list_jobs' against missing 'jobs' key. --- gcloud/bigquery/client.py | 3 ++- gcloud/bigquery/test_client.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 0d429ecc8dec..d831e248682a 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -178,7 +178,8 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None, path = '/projects/%s/jobs' % (self.project,) resp = self.connection.api_request(method='GET', path=path, query_params=params) - jobs = [self.job_from_resource(resource) for resource in resp['jobs']] + jobs = [self.job_from_resource(resource) + for resource in resp.get('jobs', ())] return jobs, resp.get('nextPageToken') def load_table_from_storage(self, job_name, destination, *source_uris): diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index aa72834f560f..98fb72681fdd 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -294,10 +294,10 @@ def test_list_jobs_load_job_wo_sourceUris(self): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['query_params'], {'projection': 'full'}) - def test_list_jobs_explicit_empty(self): + def test_list_jobs_explicit_missing(self): PROJECT = 'PROJECT' PATH = 'projects/%s/jobs' % PROJECT - DATA = {'jobs': []} + DATA = {} TOKEN = 'TOKEN' creds = _Credentials() client = self._makeOne(PROJECT, creds) From d024df4f087adc03d04ac5a23e6c637c1719f933 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 31 May 2016 16:45:45 -0400 Subject: [PATCH 3/5] Whitespace between sections. --- docs/bigquery-usage.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index f2d0b4f6d21b..ca3d6e092c94 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -25,6 +25,7 @@ Authentication / Configuration >>> from gcloud import bigquery >>> client = bigquery.Client() + Projects -------- @@ -43,6 +44,7 @@ To override the project inferred from the environment, pass an explicit >>> from gcloud import bigquery >>> client = bigquery.Client(project='PROJECT_ID') + Project ACLs ~~~~~~~~~~~~ @@ -50,6 +52,7 @@ Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API: it must be managed using the Google Developer Console. + Datasets -------- @@ -62,6 +65,7 @@ policies to tables as they are created: - A default table expiration period. If set, tables created within the dataset will have the value as their expiration period. + Dataset operations ~~~~~~~~~~~~~~~~~~ @@ -431,6 +435,7 @@ Poll until the job is complete: >>> job.ended datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=) + Exporting data (async) ~~~~~~~~~~~~~~~~~~~~~~ From 42f44977f605183442677470fe5488c45b4b411e Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 31 May 2016 16:46:54 -0400 Subject: [PATCH 4/5] Remove duplicated section for feature handled earlier. --- docs/bigquery-usage.rst | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index ca3d6e092c94..26423e95088a 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -354,25 +354,6 @@ Poll until the job is complete: >>> job.ended datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=) -Inserting data (synchronous) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Load data synchronously from a local CSV file into a new table: - -.. doctest:: - - >>> import csv - >>> from gcloud import bigquery - >>> from gcloud.bigquery import SchemaField - >>> client = bigquery.Client() - >>> table = dataset.table(name='person_ages') - >>> table.schema = [ - ... SchemaField('full_name', 'STRING', mode='required'), - ... SchemaField('age', 'INTEGER', mode='required)] - >>> with open('/path/to/person_ages.csv', 'rb') as file_obj: - ... reader = csv.reader(file_obj) - ... rows = list(reader) - >>> table.insert_data(rows) # API request Inserting data (asynchronous) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 9da9d820bb6f25f41020746d2deea0d62f24769c Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 31 May 2016 16:47:32 -0400 Subject: [PATCH 5/5] Add snippets for 'Client.list_jobs', 'Client.run_sync_query'. --- docs/bigquery-usage.rst | 94 ++++------------------------ docs/bigquery_snippets.py | 127 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 133 insertions(+), 88 deletions(-) diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index 26423e95088a..6ff904da5876 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -195,104 +195,34 @@ Jobs describe actions peformed on data in BigQuery tables: List jobs for a project: -.. doctest:: +.. literalinclude:: bigquery_snippets.py + :start-after: [START client_list_jobs] + :end-before: [END client_list_jobs] - >>> from gcloud import bigquery - >>> client = bigquery.Client() - >>> jobs, token = client.list_jobs() # API request - >>> [(job.name, job.job_type, job.created, job.state) for job in jobs] - ['load-table-job', 'load', (datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=), 'done')] Querying data (synchronous) ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Run a query which can be expected to complete within bounded time: -.. doctest:: - - >>> from gcloud import bigquery - >>> client = bigquery.Client() - >>> QUERY = """\ - ... SELECT count(*) AS age_count FROM dataset_name.person_ages - ... """ - >>> query = client.run_sync_query(QUERY) - >>> query.timeout_ms = 1000 - >>> query.run() # API request - >>> query.complete - True - >>> len(query.schema) - 1 - >>> field = query.schema[0] - >>> field.name - u'count' - >>> field.field_type - u'INTEGER' - >>> field.mode - u'NULLABLE' - >>> query.rows - [(15,)] - >>> query.total_rows - 1 +.. literalinclude:: bigquery_snippets.py + :start-after: [START client_run_sync_query] + :end-before: [END client_run_sync_query] If the rows returned by the query do not fit into the inital response, then we need to fetch the remaining rows via ``fetch_data``: -.. doctest:: - - >>> from gcloud import bigquery - >>> client = bigquery.Client() - >>> QUERY = """\ - ... SELECT * FROM dataset_name.person_ages - ... """ - >>> query = client.run_sync_query(QUERY) - >>> query.timeout_ms = 1000 - >>> query.run() # API request - >>> query.complete - True - >>> query.total_rows - 1234 - >>> query.page_token - '8d6e452459238eb0fe87d8eb191dd526ee70a35e' - >>> do_something_with(query.schema, query.rows) - >>> token = query.page_token # for initial request - >>> while True: - ... do_something_with(query.schema, rows) - ... if token is None: - ... break - ... rows, _, token = query.fetch_data(page_token=token) - +.. literalinclude:: bigquery_snippets.py + :start-after: [START client_run_sync_query_paged] + :end-before: [END client_run_sync_query_paged] If the query takes longer than the timeout allowed, ``query.complete`` will be ``False``. In that case, we need to poll the associated job until it is done, and then fetch the reuslts: -.. doctest:: - - >>> from gcloud import bigquery - >>> client = bigquery.Client() - >>> QUERY = """\ - ... SELECT * FROM dataset_name.person_ages - ... """ - >>> query = client.run_sync_query(QUERY) - >>> query.timeout_ms = 1000 - >>> query.run() # API request - >>> query.complete - False - >>> job = query.job - >>> retry_count = 100 - >>> while retry_count > 0 and job.state == 'running': - ... retry_count -= 1 - ... time.sleep(10) - ... job.reload() # API call - >>> job.state - 'done' - >>> token = None # for initial request - >>> while True: - ... rows, _, token = query.fetch_data(page_token=token) - ... do_something_with(query.schema, rows) - ... if token is None: - ... break - +.. literalinclude:: bigquery_snippets.py + :start-after: [START client_run_sync_query_timeout] + :end-before: [END client_run_sync_query_timeout] Querying data (asynchronous) diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index 1da17826043a..811493623e16 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -68,10 +68,10 @@ def delete(self): @snippet -def client_list_datasets(client, to_delete): # pylint: disable=unused-argument +def client_list_datasets(client, _): """List datasets for a project.""" - def do_something_with(sub): # pylint: disable=unused-argument + def do_something_with(_): pass # [START client_list_datasets] @@ -182,7 +182,7 @@ def dataset_update(client, to_delete): @snippet -def dataset_delete(client, to_delete): # pylint: disable=unused-argument +def dataset_delete(client, _): """Delete a dataset.""" DATASET_NAME = 'dataset_delete_%d' % (_millis(),) dataset = client.dataset(DATASET_NAME) @@ -439,13 +439,12 @@ def table_upload_from_file(client, to_delete): @snippet -def table_delete(client, to_delete): # pylint: disable=unused-argument +def table_delete(client, _): """Delete a table.""" DATASET_NAME = 'table_delete_dataset_%d' % (_millis(),) TABLE_NAME = 'table_create_table_%d' % (_millis(),) dataset = client.dataset(DATASET_NAME) dataset.create() - to_delete.append(dataset) table = dataset.table(TABLE_NAME, SCHEMA) table.create() @@ -457,6 +456,122 @@ def table_delete(client, to_delete): # pylint: disable=unused-argument # [END table_delete] +@snippet +def client_list_jobs(client, _): + """List jobs for a project.""" + + def do_something_with(_): + pass + + # [START client_list_jobs] + jobs, token = client.list_jobs() # API request + while True: + for job in jobs: + do_something_with(job) + if token is None: + break + jobs, token = client.list_jobs(page_token=token) # API request + # [END client_list_jobs] + + +@snippet +def client_run_sync_query(client, _): + """Run a synchronous query.""" + LIMIT = 100 + LIMITED = '%s LIMIT %d' % (QUERY, LIMIT) + TIMEOUT_MS = 1000 + + # [START client_run_sync_query] + query = client.run_sync_query(LIMITED) + query.timeout_ms = TIMEOUT_MS + query.run() # API request + + assert query.complete + assert len(query.rows) == LIMIT + assert [field.name for field in query.schema] == ['name'] + # [END client_run_sync_query] + + +@snippet +def client_run_sync_query_paged(client, _): + """Run a synchronous query with paged results.""" + TIMEOUT_MS = 1000 + PAGE_SIZE = 100 + LIMIT = 1000 + LIMITED = '%s LIMIT %d' % (QUERY, LIMIT) + + all_rows = [] + + def do_something_with(rows): + all_rows.extend(rows) + + # [START client_run_sync_query_paged] + query = client.run_sync_query(LIMITED) + query.timeout_ms = TIMEOUT_MS + query.max_results = PAGE_SIZE + query.run() # API request + + assert query.complete + assert query.page_token is not None + assert len(query.rows) == PAGE_SIZE + assert [field.name for field in query.schema] == ['name'] + + rows = query.rows + token = query.page_token + + while True: + do_something_with(rows) + if token is None: + break + rows, total_count, token = query.fetch_data( + page_token=token) # API request + # [END client_run_sync_query_paged] + + assert total_count == LIMIT + assert len(all_rows) == LIMIT + + +@snippet +def client_run_sync_query_timeout(client, _): + """Run a synchronous query w/ timeout""" + TIMEOUT_MS = 10 + + all_rows = [] + + def do_something_with(rows): + all_rows.extend(rows) + + # [START client_run_sync_query_timeout] + query = client.run_sync_query(QUERY) + query.timeout_ms = TIMEOUT_MS + query.use_query_cache = False + query.run() # API request + + assert not query.complete + + job = query.job + job.reload() # API rquest + retry_count = 0 + + while retry_count < 10 and job.state != u'DONE': + time.sleep(1.5**retry_count) # exponential backoff + retry_count += 1 + job.reload() # API request + + assert job.state == u'DONE' + + rows, total_count, token = query.fetch_data() # API request + while True: + do_something_with(rows) + if token is None: + break + rows, total_count, token = query.fetch_data( + page_token=token) # API request + # [END client_run_sync_query_timeout] + + assert len(all_rows) == total_count + + def _find_examples(): funcs = [obj for obj in globals().values() if getattr(obj, '_snippet', False)] @@ -468,7 +583,7 @@ def main(): client = Client() for example in _find_examples(): to_delete = [] - print('%-25s: %s' % ( + print('%-30s: %s' % ( example.func_name, example.func_doc)) try: example(client, to_delete)