From 0343a644acf5646b31a7693262ada4a5af8d053f Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 7 Nov 2017 10:40:46 -0800 Subject: [PATCH 01/21] adds to_dataframe() to QueryJob --- bigquery/google/cloud/bigquery/job.py | 9 ++++ bigquery/tests/system.py | 17 +++++++ bigquery/tests/unit/test_job.py | 64 +++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 91301b1ed8d2..faeb86ceec25 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1949,6 +1949,15 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): return self._client.list_rows(dest_table, selected_fields=schema, retry=retry) + def to_dataframe(self): + import pandas as pd + + iterator = self.result() + column_headers = [field.name for field in iterator.schema] + rows = [row.values() for row in iterator] + + return pd.DataFrame(rows, columns=column_headers) + def __iter__(self): return iter(self.result()) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 6a26922f3564..7290c68b8ee4 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1244,6 +1244,23 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) + def test_query_to_dataframe(self): + import pandas as pd + + query = """ + SELECT corpus AS title, COUNT(*) AS unique_words + FROM `bigquery-public-data.samples.shakespeare` + GROUP BY title + ORDER BY unique_words DESC + LIMIT 10""" + + query_job = Config.CLIENT.query(query) + df = query_job.to_dataframe() + + self.assertIsInstance(df, pd.DataFrame) + self.assertEqual(list(df), ['title', 'unique_words']) + self.assertEqual(len(df), 10) + def test_query_table_def(self): gs_url = self._write_csv_to_storage( 'bq_external_test' + unique_resource_id(), 'person_ages.csv', diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 470835533181..fa39ed3a0329 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2720,6 +2720,70 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) + def test_to_dataframe(self): + import pandas as pd + + begun_resource = self._makeResource() + query_resource = { + 'jobComplete': True, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + 'schema': { + 'fields': [ + {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ], + }, + 'rows': [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, + {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, + ], + } + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection( + begun_resource, query_resource, done_resource, query_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + df = job.to_dataframe() + + self.assertIsInstance(df, pd.DataFrame) + self.assertEqual(len(df), 4) + self.assertEqual(list(df), ['name', 'age']) + + def test_to_dataframe_w_empty_results(self): + import pandas as pd + + begun_resource = self._makeResource() + query_resource = { + 'jobComplete': True, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + 'schema': { + 'fields': [ + {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ], + }, + } + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection( + begun_resource, query_resource, done_resource, query_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + df = job.to_dataframe() + + self.assertIsInstance(df, pd.DataFrame) + self.assertEqual(len(df), 0) + self.assertEqual(list(df), ['name', 'age']) + def test_iter(self): import types From b74e6d4b955cf9de2574cb4f9b20d743ba192f3d Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 7 Nov 2017 13:43:35 -0800 Subject: [PATCH 02/21] removes unnecessary system test --- bigquery/tests/system.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 7290c68b8ee4..6a26922f3564 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1244,23 +1244,6 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) - def test_query_to_dataframe(self): - import pandas as pd - - query = """ - SELECT corpus AS title, COUNT(*) AS unique_words - FROM `bigquery-public-data.samples.shakespeare` - GROUP BY title - ORDER BY unique_words DESC - LIMIT 10""" - - query_job = Config.CLIENT.query(query) - df = query_job.to_dataframe() - - self.assertIsInstance(df, pd.DataFrame) - self.assertEqual(list(df), ['title', 'unique_words']) - self.assertEqual(len(df), 10) - def test_query_table_def(self): gs_url = self._write_csv_to_storage( 'bq_external_test' + unique_resource_id(), 'person_ages.csv', From e89b8deb397e1101bc82f8140ca938927a9f0135 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 7 Nov 2017 14:23:59 -0800 Subject: [PATCH 03/21] adds docstring to to_dataframe() --- bigquery/google/cloud/bigquery/job.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index faeb86ceec25..5608847b4087 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1950,6 +1950,14 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): retry=retry) def to_dataframe(self): + """Create a pandas DataFrame from the query results. + + :rtype: ``pandas.DataFrame`` + :returns: A ``pandas.DataFrame`` object populated with row data + and column headers from the query results. The column + headers are derived from the destination table's + schema. + """ import pandas as pd iterator = self.result() From 81847165fa3d504ead1b6b36c32d3940915168f6 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 7 Nov 2017 14:34:29 -0800 Subject: [PATCH 04/21] updates to _make_resource() after rebasing for #4355 --- bigquery/tests/unit/test_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index fa39ed3a0329..6b24ae3b1d19 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2723,7 +2723,7 @@ def test_reload_w_alternate_client(self): def test_to_dataframe(self): import pandas as pd - begun_resource = self._makeResource() + begun_resource = self._make_resource() query_resource = { 'jobComplete': True, 'jobReference': { @@ -2758,7 +2758,7 @@ def test_to_dataframe(self): def test_to_dataframe_w_empty_results(self): import pandas as pd - begun_resource = self._makeResource() + begun_resource = self._make_resource() query_resource = { 'jobComplete': True, 'jobReference': { From bc20f917af0cef1424b269c660a62c4331c6cbf4 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 7 Nov 2017 14:45:20 -0800 Subject: [PATCH 05/21] skips to_dataframe() tests if pandas is not installed --- bigquery/tests/unit/test_job.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 6b24ae3b1d19..fc47f595a02a 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -16,6 +16,10 @@ from six.moves import http_client import unittest +try: + import pandas +except ImportError: + pandas = None from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig from google.cloud.bigquery.job import LoadJobConfig @@ -2720,9 +2724,8 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) + @unittest.skipIf(pandas is None, 'Requires `pandas`') def test_to_dataframe(self): - import pandas as pd - begun_resource = self._make_resource() query_resource = { 'jobComplete': True, @@ -2751,13 +2754,12 @@ def test_to_dataframe(self): job = self._make_one(self.JOB_ID, self.QUERY, client) df = job.to_dataframe() - self.assertIsInstance(df, pd.DataFrame) - self.assertEqual(len(df), 4) - self.assertEqual(list(df), ['name', 'age']) + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + @unittest.skipIf(pandas is None, 'Requires `pandas`') def test_to_dataframe_w_empty_results(self): - import pandas as pd - begun_resource = self._make_resource() query_resource = { 'jobComplete': True, @@ -2780,9 +2782,9 @@ def test_to_dataframe_w_empty_results(self): job = self._make_one(self.JOB_ID, self.QUERY, client) df = job.to_dataframe() - self.assertIsInstance(df, pd.DataFrame) - self.assertEqual(len(df), 0) - self.assertEqual(list(df), ['name', 'age']) + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names def test_iter(self): import types From 2b8ca850e6b7a5afc68943c8909b63dcd8aed146 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Thu, 9 Nov 2017 16:33:54 -0800 Subject: [PATCH 06/21] imports pandas at module level and raises exception in to_dataframe() if it isn't installed --- bigquery/google/cloud/bigquery/job.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 5608847b4087..3b3abda2332a 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -19,6 +19,10 @@ import six from six.moves import http_client +try: + import pandas +except ImportError: + pandas = None import google.api_core.future.polling from google.cloud import exceptions @@ -1957,14 +1961,20 @@ def to_dataframe(self): and column headers from the query results. The column headers are derived from the destination table's schema. + + :raises: :exc:`ValueError` if the ``pandas`` library cannot be + imported. """ - import pandas as pd + if pandas is None: + raise ValueError('The pandas library is not installed, please ' + 'install pandas to use the to_dataframe() ' + 'function.') - iterator = self.result() - column_headers = [field.name for field in iterator.schema] - rows = [row.values() for row in iterator] + query_results = self.result() + column_headers = [field.name for field in query_results.schema] + rows = [row.values() for row in query_results] - return pd.DataFrame(rows, columns=column_headers) + return pandas.DataFrame(rows, columns=column_headers) def __iter__(self): return iter(self.result()) From 5c52dc6a9e68c32c0f252baba4ae82bb8c92622d Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 10 Nov 2017 10:06:24 -0800 Subject: [PATCH 07/21] adds pandas as extra for installation --- bigquery/setup.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bigquery/setup.py b/bigquery/setup.py index 1dd3a9ff6036..1819b3bc5e5b 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -58,6 +58,10 @@ 'requests >= 2.18.0', ] +EXTRAS_REQUIREMENTS = { + 'pandas': ['pandas >= 0.3.0'], +} + setup( name='google-cloud-bigquery', version='0.28.1.dev1', @@ -69,5 +73,6 @@ ], packages=find_packages(exclude=('tests*',)), install_requires=REQUIREMENTS, + extras_require=EXTRAS_REQUIREMENTS, **SETUP_BASE ) From 484ab9178eee3707c4658657e2b91ba35f83d041 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 10 Nov 2017 14:26:33 -0800 Subject: [PATCH 08/21] updates docstring to google style --- bigquery/google/cloud/bigquery/job.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 3b3abda2332a..fc6e7a2ef6e7 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1956,14 +1956,14 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): def to_dataframe(self): """Create a pandas DataFrame from the query results. - :rtype: ``pandas.DataFrame`` - :returns: A ``pandas.DataFrame`` object populated with row data - and column headers from the query results. The column - headers are derived from the destination table's - schema. - - :raises: :exc:`ValueError` if the ``pandas`` library cannot be - imported. + Returns: + A :class:`~pandas.DataFrame` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: If the `pandas` library cannot be imported. + """ if pandas is None: raise ValueError('The pandas library is not installed, please ' From 4db3f4bced5ebdfdaecf09c81029fec90b3c19d1 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 10 Nov 2017 14:34:23 -0800 Subject: [PATCH 09/21] adds pandas extra to nox environment --- bigquery/nox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/nox.py b/bigquery/nox.py index fa0936ce619e..ff492f51d745 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -36,7 +36,7 @@ def default(session): """ # Install all test dependencies, then install this package in-place. session.install('mock', 'pytest', 'pytest-cov', *LOCAL_DEPS) - session.install('-e', '.') + session.install('-e', '.[pandas]') # Run py.test against the unit tests. session.run( From a31e79d49f0f52f3502a705cf32520b8d17fbaf6 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 10 Nov 2017 15:33:09 -0800 Subject: [PATCH 10/21] adds 'no cover' pragma for pandas import errors --- bigquery/google/cloud/bigquery/job.py | 4 ++-- bigquery/tests/unit/test_job.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index fc6e7a2ef6e7..6b4c760cd910 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -21,7 +21,7 @@ from six.moves import http_client try: import pandas -except ImportError: +except ImportError: # pragma: NO COVER pandas = None import google.api_core.future.polling @@ -1965,7 +1965,7 @@ def to_dataframe(self): ValueError: If the `pandas` library cannot be imported. """ - if pandas is None: + if pandas is None: # pragma: NO COVER raise ValueError('The pandas library is not installed, please ' 'install pandas to use the to_dataframe() ' 'function.') diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index fc47f595a02a..282756b9c10f 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -18,7 +18,7 @@ import unittest try: import pandas -except ImportError: +except ImportError: # pragma: NO COVER pandas = None from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig From 03b7fd5d3af046d9e0e7fbd1d1302ee3ce6bf097 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Mon, 13 Nov 2017 09:29:40 -0800 Subject: [PATCH 11/21] adds test for when pandas is None --- bigquery/google/cloud/bigquery/job.py | 2 +- bigquery/tests/unit/test_job.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 6b4c760cd910..9478b8dedc58 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1965,7 +1965,7 @@ def to_dataframe(self): ValueError: If the `pandas` library cannot be imported. """ - if pandas is None: # pragma: NO COVER + if pandas is None: raise ValueError('The pandas library is not installed, please ' 'install pandas to use the to_dataframe() ' 'function.') diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 282756b9c10f..b2cf3568017b 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2786,6 +2786,15 @@ def test_to_dataframe_w_empty_results(self): self.assertEqual(len(df), 0) # verify the number of rows self.assertEqual(list(df), ['name', 'age']) # verify the column names + @mock.patch('google.cloud.bigquery.job.pandas', new=None) + def test_to_dataframe_error_if_pandas_is_none(self): + connection = _Connection({}) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + with self.assertRaises(ValueError): + df = job.to_dataframe() + def test_iter(self): import types From 0c7bf88a5fd08c9495ca121c68d8a19f1ffe7bcf Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Mon, 13 Nov 2017 10:17:31 -0800 Subject: [PATCH 12/21] fixes lint error --- bigquery/tests/unit/test_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index b2cf3568017b..8181af2a9378 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2793,7 +2793,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): job = self._make_one(self.JOB_ID, self.QUERY, client) with self.assertRaises(ValueError): - df = job.to_dataframe() + job.to_dataframe() def test_iter(self): import types From 84994a7b15f76e0c0e004c897cc33bb1b7af6f17 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 14 Nov 2017 11:00:34 -0800 Subject: [PATCH 13/21] adds RowIterator class --- bigquery/google/cloud/bigquery/_helpers.py | 2 +- bigquery/google/cloud/bigquery/client.py | 17 +++----- bigquery/google/cloud/bigquery/table.py | 51 ++++++++++++++++++++++ 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 41b68db02c39..e535642aa43d 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -468,7 +468,7 @@ def _rows_page_start(iterator, page, response): total_rows = response.get('totalRows') if total_rows is not None: total_rows = int(total_rows) - iterator.total_rows = total_rows + iterator._total_rows = total_rows # pylint: enable=unused-argument diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 757fe6bfd229..c033e07dee00 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -34,8 +34,6 @@ from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW from google.cloud.bigquery._helpers import _field_to_index_mapping -from google.cloud.bigquery._helpers import _item_to_row -from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _snake_to_camel_case from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -48,6 +46,7 @@ from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableListItem from google.cloud.bigquery.table import TableReference +from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA from google.cloud.bigquery.table import _row_from_mapping @@ -1180,7 +1179,7 @@ def list_rows(self, table, selected_fields=None, max_results=None, :type retry: :class:`google.api_core.retry.Retry` :param retry: (Optional) How to retry the RPC. - :rtype: :class:`~google.api_core.page_iterator.Iterator` + :rtype: :class:`~google.cloud.bigquery.table.RowIterator` :returns: Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. During each page, the iterator will have the ``total_rows`` attribute @@ -1208,20 +1207,16 @@ def list_rows(self, table, selected_fields=None, max_results=None, if start_index is not None: params['startIndex'] = start_index - iterator = page_iterator.HTTPIterator( + row_iterator = RowIterator( client=self, api_request=functools.partial(self._call_api, retry), path='%s/data' % (table.path,), - item_to_value=_item_to_row, - items_key='rows', page_token=page_token, - next_token='pageToken', max_results=max_results, - page_start=_rows_page_start, extra_params=params) - iterator.schema = schema - iterator._field_to_index = _field_to_index_mapping(schema) - return iterator + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + return row_iterator def list_partitions(self, table, retry=DEFAULT_RETRY): """List the partitions in a table. diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 79928230f08e..25037a6a1253 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -22,8 +22,12 @@ import six +from google.api_core.page_iterator import HTTPIterator + from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime +from google.cloud.bigquery._helpers import _item_to_row +from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _snake_to_camel_case from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource @@ -1023,3 +1027,50 @@ def __repr__(self): key=operator.itemgetter(1)) f2i = '{' + ', '.join('%r: %d' % item for item in items) + '}' return 'Row({}, {})'.format(self._xxx_values, f2i) + + +class RowIterator(HTTPIterator): + """A class for iterating through HTTP/JSON API row list responses. + + Args: + client (google.cloud.bigquery.Client): The API client. + api_request (Callable[google.cloud._http.JSONConnection.api_request]): + The function to use to make API requests. + path (str): The method path to query for the list of items. + page_token (str): A token identifying a page in a result set to start + fetching results from. + max_results (int): The maximum number of results to fetch. + extra_params (dict): Extra query string parameters for the API call. + + .. autoattribute:: pages + """ + + def __init__(self, client, api_request, path, page_token=None, + max_results=None, extra_params=None): + super(RowIterator, self).__init__( + client, api_request, path, item_to_value=_item_to_row, + items_key='rows', page_token=page_token, max_results=max_results, + extra_params=extra_params, page_start=_rows_page_start, + next_token='pageToken') + self._schema = () + self._field_to_index = None + self._total_rows = None + + @property + def schema(self): + """Schema for the table containing the rows + + Returns: + list of :class:`~google.cloud.bigquery.schema.SchemaField`: + fields describing the schema + """ + return list(self._schema) + + @property + def total_rows(self): + """The total number of rows in the table. + + Returns: + int: the row count. + """ + return self._total_rows From 04f76f533dcefa866fa1aef85689868eb94032e4 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 14 Nov 2017 14:37:36 -0800 Subject: [PATCH 14/21] moves to_dataframe() to RowIterator --- bigquery/google/cloud/bigquery/job.py | 27 ----- bigquery/google/cloud/bigquery/table.py | 26 +++++ bigquery/nox.py | 2 +- bigquery/tests/system.py | 20 ++++ bigquery/tests/unit/test_job.py | 71 ------------ bigquery/tests/unit/test_table.py | 145 ++++++++++++++++++++++++ 6 files changed, 192 insertions(+), 99 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 9478b8dedc58..91301b1ed8d2 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -19,10 +19,6 @@ import six from six.moves import http_client -try: - import pandas -except ImportError: # pragma: NO COVER - pandas = None import google.api_core.future.polling from google.cloud import exceptions @@ -1953,29 +1949,6 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): return self._client.list_rows(dest_table, selected_fields=schema, retry=retry) - def to_dataframe(self): - """Create a pandas DataFrame from the query results. - - Returns: - A :class:`~pandas.DataFrame` populated with row data and column - headers from the query results. The column headers are derived - from the destination table's schema. - - Raises: - ValueError: If the `pandas` library cannot be imported. - - """ - if pandas is None: - raise ValueError('The pandas library is not installed, please ' - 'install pandas to use the to_dataframe() ' - 'function.') - - query_results = self.result() - column_headers = [field.name for field in query_results.schema] - rows = [row.values() for row in query_results] - - return pandas.DataFrame(rows, columns=column_headers) - def __iter__(self): return iter(self.result()) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 25037a6a1253..a5743e954cd8 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -21,6 +21,10 @@ import operator import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.api_core.page_iterator import HTTPIterator @@ -1074,3 +1078,25 @@ def total_rows(self): int: the row count. """ return self._total_rows + + def to_dataframe(self): + """Create a pandas DataFrame from the query results. + + Returns: + A :class:`~pandas.DataFrame` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: If the `pandas` library cannot be imported. + + """ + if pandas is None: + raise ValueError('The pandas library is not installed, please ' + 'install pandas to use the to_dataframe() ' + 'function.') + + column_headers = [field.name for field in self.schema] + rows = [row.values() for row in iter(self)] + + return pandas.DataFrame(rows, columns=column_headers) diff --git a/bigquery/nox.py b/bigquery/nox.py index ff492f51d745..86de5658a803 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -89,7 +89,7 @@ def system(session, py): os.path.join('..', 'storage'), os.path.join('..', 'test_utils'), ) - session.install('-e', '.') + session.install('-e', '.[pandas]') # Run py.test against the system tests. session.run( diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 6a26922f3564..fe376c1f2bf7 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -24,6 +24,10 @@ import uuid import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.api_core.exceptions import PreconditionFailed from google.cloud import bigquery @@ -1244,6 +1248,22 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_query_results_to_dataframe(self): + PUBLIC = 'bigquery-public-data' + DATASET_ID = 'samples' + TABLE_NAME = 'natality' + LIMIT = 1000 + SQL = 'SELECT year, weight_pounds from `{}.{}.{}` LIMIT {}'.format( + PUBLIC, DATASET_ID, TABLE_NAME, LIMIT) + + df = Config.CLIENT.query(SQL).result().to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), LIMIT) # verify the number of rows + self.assertEqual( + list(df), ['year', 'weight_pounds']) # verify the column names + def test_query_table_def(self): gs_url = self._write_csv_to_storage( 'bq_external_test' + unique_resource_id(), 'person_ages.csv', diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 8181af2a9378..f6ac61973ef0 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2724,77 +2724,6 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) - @unittest.skipIf(pandas is None, 'Requires `pandas`') - def test_to_dataframe(self): - begun_resource = self._make_resource() - query_resource = { - 'jobComplete': True, - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': self.JOB_ID, - }, - 'schema': { - 'fields': [ - {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, - {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, - ], - }, - 'rows': [ - {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, - {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, - {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, - {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, - ], - } - done_resource = copy.deepcopy(begun_resource) - done_resource['status'] = {'state': 'DONE'} - connection = _Connection( - begun_resource, query_resource, done_resource, query_resource) - client = _make_client(project=self.PROJECT, connection=connection) - job = self._make_one(self.JOB_ID, self.QUERY, client) - df = job.to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 4) # verify the number of rows - self.assertEqual(list(df), ['name', 'age']) # verify the column names - - @unittest.skipIf(pandas is None, 'Requires `pandas`') - def test_to_dataframe_w_empty_results(self): - begun_resource = self._make_resource() - query_resource = { - 'jobComplete': True, - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': self.JOB_ID, - }, - 'schema': { - 'fields': [ - {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, - {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, - ], - }, - } - done_resource = copy.deepcopy(begun_resource) - done_resource['status'] = {'state': 'DONE'} - connection = _Connection( - begun_resource, query_resource, done_resource, query_resource) - client = _make_client(project=self.PROJECT, connection=connection) - job = self._make_one(self.JOB_ID, self.QUERY, client) - df = job.to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 0) # verify the number of rows - self.assertEqual(list(df), ['name', 'age']) # verify the column names - - @mock.patch('google.cloud.bigquery.job.pandas', new=None) - def test_to_dataframe_error_if_pandas_is_none(self): - connection = _Connection({}) - client = _make_client(project=self.PROJECT, connection=connection) - job = self._make_one(self.JOB_ID, self.QUERY, client) - - with self.assertRaises(ValueError): - job.to_dataframe() - def test_iter(self): import types diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 752239b7276b..fb52de8278f4 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -15,6 +15,11 @@ import unittest import mock +import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.cloud.bigquery.dataset import DatasetReference @@ -864,3 +869,143 @@ def test_row(self): row.z with self.assertRaises(KeyError): row['z'] + + +class TestRowIterator(unittest.TestCase): + + def test_constructor(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery._helpers import _item_to_row + from google.cloud.bigquery._helpers import _rows_page_start + + client = mock.sentinel.client + api_request = mock.sentinel.api_request + path = '/foo' + iterator = RowIterator(client, api_request, path) + + self.assertFalse(iterator._started) + self.assertIs(iterator.client, client) + self.assertEqual(iterator.path, path) + self.assertIs(iterator._item_to_value, _item_to_row) + self.assertEqual(iterator._items_key, 'rows') + self.assertIsNone(iterator.max_results) + self.assertEqual(iterator.extra_params, {}) + self.assertEqual(iterator._page_start, _rows_page_start) + # Changing attributes. + self.assertEqual(iterator.page_number, 0) + self.assertIsNone(iterator.next_page_token) + self.assertEqual(iterator.num_results, 0) + + def test_iterate(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery._helpers import _field_to_index_mapping + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path=path) + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + self.assertEqual(row_iterator.num_results, 0) + + rows_iter = iter(row_iterator) + + val1 = six.next(rows_iter) + print(val1) + self.assertEqual(val1.name, 'Phred Phlyntstone') + self.assertEqual(row_iterator.num_results, 1) + + val2 = six.next(rows_iter) + self.assertEqual(val2.name, 'Bharney Rhubble') + self.assertEqual(row_iterator.num_results, 2) + + with self.assertRaises(StopIteration): + six.next(rows_iter) + + api_request.assert_called_once_with( + method='GET', path=path, query_params={}) + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery._helpers import _field_to_index_mapping + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, + {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path=path) + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_w_empty_results(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery._helpers import _field_to_index_mapping + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': []}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path=path) + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + + @mock.patch('google.cloud.bigquery.table.pandas', new=None) + def test_to_dataframe_error_if_pandas_is_none(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery._helpers import _field_to_index_mapping + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path=path) + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + + with self.assertRaises(ValueError): + row_iterator.to_dataframe() From 4fd0cc0daed503e57023443226253fe41545f9dd Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 14 Nov 2017 18:20:42 -0800 Subject: [PATCH 15/21] adds test for pandas handling of basic BigQuery data types --- bigquery/tests/unit/test_table.py | 46 +++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index fb52de8278f4..ce5ddf07d943 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -986,6 +986,52 @@ def test_to_dataframe_w_empty_results(self): self.assertEqual(len(df), 0) # verify the number of rows self.assertEqual(list(df), ['name', 'age']) # verify the column names + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_w_various_types(self): + import datetime + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery._helpers import _field_to_index_mapping + + schema = [ + SchemaField('start_timestamp', 'TIMESTAMP'), + SchemaField('seconds', 'INT64'), + SchemaField('miles', 'FLOAT64'), + SchemaField('payment_type', 'STRING'), + SchemaField('complete', 'BOOL'), + SchemaField('date', 'DATE'), + ] + row_data = [ + [None, None, None, None, None, None], + ['1.4338368E9', '420', '1.1', 'Cash', 'true', '1999-12-01'], + ['1.3878117E9', '2580', '17.7', 'Cash', 'false', '1953-06-14'], + ['1.3855653E9', '2280', '4.4', 'Credit', 'true', '1981-11-04'], + ] + rows = [{'f': [{'v': field} for field in row]} for row in row_data] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path=path) + row_iterator._schema = schema + row_iterator._field_to_index = _field_to_index_mapping(schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + exp_columns = [field.name for field in schema] + self.assertEqual(list(df), exp_columns) # verify the column names + + for index, row in df.iterrows(): + if index == 0: + self.assertTrue(row.isnull().all()) + else: + self.assertIsInstance(row.start_timestamp, pandas.Timestamp) + self.assertIsInstance(row.seconds, float) + self.assertIsInstance(row.payment_type, str) + self.assertIsInstance(row.complete, bool) + self.assertIsInstance(row.date, datetime.date) + @mock.patch('google.cloud.bigquery.table.pandas', new=None) def test_to_dataframe_error_if_pandas_is_none(self): from google.cloud.bigquery.table import RowIterator From 321b56a3420e8a196df286489280db479eadcbaa Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 15 Nov 2017 13:59:08 -0800 Subject: [PATCH 16/21] moves schema to RowIterator constructor --- bigquery/google/cloud/bigquery/client.py | 4 +--- bigquery/google/cloud/bigquery/table.py | 7 +++--- bigquery/tests/unit/test_table.py | 28 ++++++------------------ 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index c033e07dee00..78a24213f0b3 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -33,7 +33,6 @@ from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW -from google.cloud.bigquery._helpers import _field_to_index_mapping from google.cloud.bigquery._helpers import _snake_to_camel_case from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -1211,11 +1210,10 @@ def list_rows(self, table, selected_fields=None, max_results=None, client=self, api_request=functools.partial(self._call_api, retry), path='%s/data' % (table.path,), + schema=schema, page_token=page_token, max_results=max_results, extra_params=params) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) return row_iterator def list_partitions(self, table, retry=DEFAULT_RETRY): diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index a5743e954cd8..d7e4745fa8c8 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -33,6 +33,7 @@ from google.cloud.bigquery._helpers import _item_to_row from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _snake_to_camel_case +from google.cloud.bigquery._helpers import _field_to_index_mapping from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource @@ -1049,15 +1050,15 @@ class RowIterator(HTTPIterator): .. autoattribute:: pages """ - def __init__(self, client, api_request, path, page_token=None, + def __init__(self, client, api_request, path, schema, page_token=None, max_results=None, extra_params=None): super(RowIterator, self).__init__( client, api_request, path, item_to_value=_item_to_row, items_key='rows', page_token=page_token, max_results=max_results, extra_params=extra_params, page_start=_rows_page_start, next_token='pageToken') - self._schema = () - self._field_to_index = None + self._schema = schema + self._field_to_index = _field_to_index_mapping(schema) self._total_rows = None @property diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index ce5ddf07d943..a7d0b780c563 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -881,7 +881,8 @@ def test_constructor(self): client = mock.sentinel.client api_request = mock.sentinel.api_request path = '/foo' - iterator = RowIterator(client, api_request, path) + schema = [] + iterator = RowIterator(client, api_request, path, schema) self.assertFalse(iterator._started) self.assertIs(iterator.client, client) @@ -899,7 +900,6 @@ def test_constructor(self): def test_iterate(self): from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField - from google.cloud.bigquery._helpers import _field_to_index_mapping schema = [ SchemaField('name', 'STRING', mode='REQUIRED'), @@ -912,9 +912,7 @@ def test_iterate(self): path = '/foo' api_request = mock.Mock(return_value={'rows': rows}) row_iterator = RowIterator( - mock.sentinel.client, api_request, path=path) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) + mock.sentinel.client, api_request, path, schema) self.assertEqual(row_iterator.num_results, 0) rows_iter = iter(row_iterator) @@ -938,7 +936,6 @@ def test_iterate(self): def test_to_dataframe(self): from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField - from google.cloud.bigquery._helpers import _field_to_index_mapping schema = [ SchemaField('name', 'STRING', mode='REQUIRED'), @@ -953,9 +950,7 @@ def test_to_dataframe(self): path = '/foo' api_request = mock.Mock(return_value={'rows': rows}) row_iterator = RowIterator( - mock.sentinel.client, api_request, path=path) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) + mock.sentinel.client, api_request, path, schema) df = row_iterator.to_dataframe() @@ -967,7 +962,6 @@ def test_to_dataframe(self): def test_to_dataframe_w_empty_results(self): from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField - from google.cloud.bigquery._helpers import _field_to_index_mapping schema = [ SchemaField('name', 'STRING', mode='REQUIRED'), @@ -976,9 +970,7 @@ def test_to_dataframe_w_empty_results(self): path = '/foo' api_request = mock.Mock(return_value={'rows': []}) row_iterator = RowIterator( - mock.sentinel.client, api_request, path=path) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) + mock.sentinel.client, api_request, path, schema) df = row_iterator.to_dataframe() @@ -991,7 +983,6 @@ def test_to_dataframe_w_various_types(self): import datetime from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField - from google.cloud.bigquery._helpers import _field_to_index_mapping schema = [ SchemaField('start_timestamp', 'TIMESTAMP'), @@ -1011,9 +1002,7 @@ def test_to_dataframe_w_various_types(self): path = '/foo' api_request = mock.Mock(return_value={'rows': rows}) row_iterator = RowIterator( - mock.sentinel.client, api_request, path=path) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) + mock.sentinel.client, api_request, path, schema) df = row_iterator.to_dataframe() @@ -1036,7 +1025,6 @@ def test_to_dataframe_w_various_types(self): def test_to_dataframe_error_if_pandas_is_none(self): from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField - from google.cloud.bigquery._helpers import _field_to_index_mapping schema = [ SchemaField('name', 'STRING', mode='REQUIRED'), @@ -1049,9 +1037,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): path = '/foo' api_request = mock.Mock(return_value={'rows': rows}) row_iterator = RowIterator( - mock.sentinel.client, api_request, path=path) - row_iterator._schema = schema - row_iterator._field_to_index = _field_to_index_mapping(schema) + mock.sentinel.client, api_request, path, schema) with self.assertRaises(ValueError): row_iterator.to_dataframe() From da52040e2bde67d2a5b15206d8ba3801fbe8cf78 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 17 Nov 2017 12:04:26 -0800 Subject: [PATCH 17/21] adds tests for column dtypes --- bigquery/tests/unit/test_table.py | 42 ++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index a7d0b780c563..dffa815511f1 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -957,6 +957,8 @@ def test_to_dataframe(self): self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 4) # verify the number of rows self.assertEqual(list(df), ['name', 'age']) # verify the column names + self.assertEqual(df.name.dtype.name, 'object') + self.assertEqual(df.age.dtype.name, 'int64') @unittest.skipIf(pandas is None, 'Requires `pandas`') def test_to_dataframe_w_empty_results(self): @@ -979,7 +981,7 @@ def test_to_dataframe_w_empty_results(self): self.assertEqual(list(df), ['name', 'age']) # verify the column names @unittest.skipIf(pandas is None, 'Requires `pandas`') - def test_to_dataframe_w_various_types(self): + def test_to_dataframe_w_various_types_nullable(self): import datetime from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import SchemaField @@ -1021,6 +1023,44 @@ def test_to_dataframe_w_various_types(self): self.assertIsInstance(row.complete, bool) self.assertIsInstance(row.date, datetime.date) + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_column_dtypes(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('start_timestamp', 'TIMESTAMP'), + SchemaField('seconds', 'INT64'), + SchemaField('miles', 'FLOAT64'), + SchemaField('payment_type', 'STRING'), + SchemaField('complete', 'BOOL'), + SchemaField('date', 'DATE'), + ] + row_data = [ + ['1.4338368E9', '420', '1.1', 'Cash', 'true', '1999-12-01'], + ['1.3878117E9', '2580', '17.7', 'Cash', 'false', '1953-06-14'], + ['1.3855653E9', '2280', '4.4', 'Credit', 'true', '1981-11-04'], + ] + rows = [{'f': [{'v': field} for field in row]} for row in row_data] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + exp_columns = [field.name for field in schema] + self.assertEqual(list(df), exp_columns) # verify the column names + + self.assertEqual(df.start_timestamp.dtype.name, 'datetime64[ns, UTC]') + self.assertEqual(df.seconds.dtype.name, 'int64') + self.assertEqual(df.miles.dtype.name, 'float64') + self.assertEqual(df.payment_type.dtype.name, 'object') + self.assertEqual(df.complete.dtype.name, 'bool') + self.assertEqual(df.date.dtype.name, 'object') + @mock.patch('google.cloud.bigquery.table.pandas', new=None) def test_to_dataframe_error_if_pandas_is_none(self): from google.cloud.bigquery.table import RowIterator From 83d9e3c19191235a349e9e8c7e23c3de0d7ce345 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 17 Nov 2017 14:22:05 -0800 Subject: [PATCH 18/21] adds test for query results to_dataframe() with nested schema --- bigquery/tests/system.py | 49 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index fe376c1f2bf7..62de1ef50d03 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1439,6 +1439,55 @@ def test_create_table_rows_fetch_nested_schema(self): e_favtime = datetime.datetime(*parts[0:6]) self.assertEqual(found[7], e_favtime) + def _fetch_dataframe(self, query): + return Config.CLIENT.query(query).result().to_dataframe() + + def test_nested_table_to_dataframe(self): + SF = bigquery.SchemaField + schema = [ + SF('string_col', 'STRING', mode='NULLABLE'), + SF('record_col', 'RECORD', mode='NULLABLE', fields=[ + SF('nested_string', 'STRING', mode='NULLABLE'), + SF('nested_repeated', 'INTEGER', mode='REPEATED'), + SF('nested_record', 'RECORD', mode='NULLABLE', fields=[ + SF('nested_nested_string', 'STRING', mode='NULLABLE'), + ]), + ]), + ] + record = { + 'nested_string': 'another string value', + 'nested_repeated': [0, 1, 2], + 'nested_record': {'nested_nested_string': 'some deep insight'}, + } + to_insert = [ + ('Some value', record) + ] + table_id = 'test_table' + dataset = self.temp_dataset(_make_dataset_id('nested_df')) + table_arg = Table(dataset.table(table_id), schema=schema) + table = retry_403(Config.CLIENT.create_table)(table_arg) + self.to_delete.insert(0, table) + Config.CLIENT.create_rows(table, to_insert) + QUERY = 'SELECT * from `{}.{}.{}`'.format( + Config.CLIENT.project, dataset.dataset_id, table_id) + + retry = RetryResult(_has_rows, max_tries=8) + df = retry(self._fetch_dataframe)(QUERY) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 1) # verify the number of rows + exp_columns = ['string_col', 'record_col'] + self.assertEqual(list(df), exp_columns) # verify the column names + row = df.iloc[0] + # verify the row content + self.assertEqual(row['string_col'], 'Some value') + self.assertEqual(row['record_col'], record) + # verify that nested data can be accessed with indices/keys + self.assertEqual(row['record_col']['nested_repeated'][0], 0) + self.assertEqual( + row['record_col']['nested_record']['nested_nested_string'], + 'some deep insight') + def temp_dataset(self, dataset_id): dataset = retry_403(Config.CLIENT.create_dataset)( Dataset(Config.CLIENT.dataset(dataset_id))) From 10fcd7c4d22018c378cc372d7b728ebed176909b Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 17 Nov 2017 15:50:19 -0800 Subject: [PATCH 19/21] updates system test for to_dataframe to check types --- bigquery/tests/system.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 62de1ef50d03..06a5b28e5f37 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1250,19 +1250,25 @@ def test_query_iter(self): @unittest.skipIf(pandas is None, 'Requires `pandas`') def test_query_results_to_dataframe(self): - PUBLIC = 'bigquery-public-data' - DATASET_ID = 'samples' - TABLE_NAME = 'natality' - LIMIT = 1000 - SQL = 'SELECT year, weight_pounds from `{}.{}.{}` LIMIT {}'.format( - PUBLIC, DATASET_ID, TABLE_NAME, LIMIT) + QUERY = """ + SELECT id, author, time_ts, dead + from `bigquery-public-data.hacker_news.comments` + LIMIT 10 + """ - df = Config.CLIENT.query(SQL).result().to_dataframe() + df = Config.CLIENT.query(QUERY).result().to_dataframe() self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), LIMIT) # verify the number of rows - self.assertEqual( - list(df), ['year', 'weight_pounds']) # verify the column names + self.assertEqual(len(df), 10) # verify the number of rows + column_names = ['id', 'author', 'time_ts', 'dead'] + self.assertEqual(list(df), column_names) # verify the column names + exp_datatypes = {'id': int, 'author': str, + 'time_ts': pandas.Timestamp, 'dead': bool} + for index, row in df.iterrows(): + for col in column_names: + # all the schema fields are nullable, so None is acceptable + if not row[col] is None: + self.assertIsInstance(row[col], exp_datatypes[col]) def test_query_table_def(self): gs_url = self._write_csv_to_storage( From 6762f9567d1d7fc2b2bad5a368d0e25488cae81e Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Fri, 17 Nov 2017 16:32:07 -0800 Subject: [PATCH 20/21] adds to_dataframe() helper to QueryJob --- bigquery/google/cloud/bigquery/job.py | 15 +++++++++++- bigquery/tests/unit/test_job.py | 35 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 91301b1ed8d2..9f66e3ec9ea0 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1929,7 +1929,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): :type retry: :class:`google.api_core.retry.Retry` :param retry: (Optional) How to retry the call that retrieves rows. - :rtype: :class:`~google.api_core.page_iterator.Iterator` + :rtype: :class:`~google.cloud.bigquery.table.RowIterator` :returns: Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. During each page, the iterator will have the ``total_rows`` @@ -1949,6 +1949,19 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): return self._client.list_rows(dest_table, selected_fields=schema, retry=retry) + def to_dataframe(self): + """Return a pandas DataFrame from a QueryJob + + Returns: + A :class:`~pandas.DataFrame` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: If the `pandas` library cannot be imported. + """ + return self.result().to_dataframe() + def __iter__(self): return iter(self.result()) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index f6ac61973ef0..2f141a4dc04d 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2724,6 +2724,41 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe(self): + begun_resource = self._make_resource() + query_resource = { + 'jobComplete': True, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + 'schema': { + 'fields': [ + {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ], + }, + 'rows': [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, + {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, + ], + } + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection( + begun_resource, query_resource, done_resource, query_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + df = job.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + def test_iter(self): import types From 0802ca875e51dafed1e283daf58f61ae75da0473 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 22 Nov 2017 11:33:39 -0800 Subject: [PATCH 21/21] updates pandas version to latest version that passes unit tests --- bigquery/setup.py | 2 +- bigquery/tests/system.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index 1819b3bc5e5b..8d108c9c4dec 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -59,7 +59,7 @@ ] EXTRAS_REQUIREMENTS = { - 'pandas': ['pandas >= 0.3.0'], + 'pandas': ['pandas >= 0.17.1'], } setup( diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 06a5b28e5f37..629276cf46f2 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1448,6 +1448,7 @@ def test_create_table_rows_fetch_nested_schema(self): def _fetch_dataframe(self, query): return Config.CLIENT.query(query).result().to_dataframe() + @unittest.skipIf(pandas is None, 'Requires `pandas`') def test_nested_table_to_dataframe(self): SF = bigquery.SchemaField schema = [