diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 41b68db02c39..e535642aa43d 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -468,7 +468,7 @@ def _rows_page_start(iterator, page, response): total_rows = response.get('totalRows') if total_rows is not None: total_rows = int(total_rows) - iterator.total_rows = total_rows + iterator._total_rows = total_rows # pylint: enable=unused-argument diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 6f62291e550f..34fc9114ce92 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -33,9 +33,6 @@ from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW -from google.cloud.bigquery._helpers import _field_to_index_mapping -from google.cloud.bigquery._helpers import _item_to_row -from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _snake_to_camel_case from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -48,6 +45,7 @@ from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableListItem from google.cloud.bigquery.table import TableReference +from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA from google.cloud.bigquery.table import _row_from_mapping @@ -1189,7 +1187,7 @@ def list_rows(self, table, selected_fields=None, max_results=None, :type retry: :class:`google.api_core.retry.Retry` :param retry: (Optional) How to retry the RPC. - :rtype: :class:`~google.api_core.page_iterator.Iterator` + :rtype: :class:`~google.cloud.bigquery.table.RowIterator` :returns: Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. During each page, the iterator will have the ``total_rows`` attribute @@ -1217,20 +1215,15 @@ def list_rows(self, table, selected_fields=None, max_results=None, if start_index is not None: params['startIndex'] = start_index - iterator = page_iterator.HTTPIterator( + row_iterator = RowIterator( client=self, api_request=functools.partial(self._call_api, retry), path='%s/data' % (table.path,), - item_to_value=_item_to_row, - items_key='rows', + schema=schema, page_token=page_token, - next_token='pageToken', max_results=max_results, - page_start=_rows_page_start, extra_params=params) - iterator.schema = schema - iterator._field_to_index = _field_to_index_mapping(schema) - return iterator + return row_iterator def list_partitions(self, table, retry=DEFAULT_RETRY): """List the partitions in a table. diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 91301b1ed8d2..9f66e3ec9ea0 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1929,7 +1929,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): :type retry: :class:`google.api_core.retry.Retry` :param retry: (Optional) How to retry the call that retrieves rows. - :rtype: :class:`~google.api_core.page_iterator.Iterator` + :rtype: :class:`~google.cloud.bigquery.table.RowIterator` :returns: Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s. During each page, the iterator will have the ``total_rows`` @@ -1949,6 +1949,19 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): return self._client.list_rows(dest_table, selected_fields=schema, retry=retry) + def to_dataframe(self): + """Return a pandas DataFrame from a QueryJob + + Returns: + A :class:`~pandas.DataFrame` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: If the `pandas` library cannot be imported. + """ + return self.result().to_dataframe() + def __iter__(self): return iter(self.result()) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 79928230f08e..d7e4745fa8c8 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -21,10 +21,19 @@ import operator import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None + +from google.api_core.page_iterator import HTTPIterator from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime +from google.cloud.bigquery._helpers import _item_to_row +from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _snake_to_camel_case +from google.cloud.bigquery._helpers import _field_to_index_mapping from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource @@ -1023,3 +1032,72 @@ def __repr__(self): key=operator.itemgetter(1)) f2i = '{' + ', '.join('%r: %d' % item for item in items) + '}' return 'Row({}, {})'.format(self._xxx_values, f2i) + + +class RowIterator(HTTPIterator): + """A class for iterating through HTTP/JSON API row list responses. + + Args: + client (google.cloud.bigquery.Client): The API client. + api_request (Callable[google.cloud._http.JSONConnection.api_request]): + The function to use to make API requests. + path (str): The method path to query for the list of items. + page_token (str): A token identifying a page in a result set to start + fetching results from. + max_results (int): The maximum number of results to fetch. + extra_params (dict): Extra query string parameters for the API call. + + .. autoattribute:: pages + """ + + def __init__(self, client, api_request, path, schema, page_token=None, + max_results=None, extra_params=None): + super(RowIterator, self).__init__( + client, api_request, path, item_to_value=_item_to_row, + items_key='rows', page_token=page_token, max_results=max_results, + extra_params=extra_params, page_start=_rows_page_start, + next_token='pageToken') + self._schema = schema + self._field_to_index = _field_to_index_mapping(schema) + self._total_rows = None + + @property + def schema(self): + """Schema for the table containing the rows + + Returns: + list of :class:`~google.cloud.bigquery.schema.SchemaField`: + fields describing the schema + """ + return list(self._schema) + + @property + def total_rows(self): + """The total number of rows in the table. + + Returns: + int: the row count. + """ + return self._total_rows + + def to_dataframe(self): + """Create a pandas DataFrame from the query results. + + Returns: + A :class:`~pandas.DataFrame` populated with row data and column + headers from the query results. The column headers are derived + from the destination table's schema. + + Raises: + ValueError: If the `pandas` library cannot be imported. + + """ + if pandas is None: + raise ValueError('The pandas library is not installed, please ' + 'install pandas to use the to_dataframe() ' + 'function.') + + column_headers = [field.name for field in self.schema] + rows = [row.values() for row in iter(self)] + + return pandas.DataFrame(rows, columns=column_headers) diff --git a/bigquery/nox.py b/bigquery/nox.py index fa0936ce619e..86de5658a803 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -36,7 +36,7 @@ def default(session): """ # Install all test dependencies, then install this package in-place. session.install('mock', 'pytest', 'pytest-cov', *LOCAL_DEPS) - session.install('-e', '.') + session.install('-e', '.[pandas]') # Run py.test against the unit tests. session.run( @@ -89,7 +89,7 @@ def system(session, py): os.path.join('..', 'storage'), os.path.join('..', 'test_utils'), ) - session.install('-e', '.') + session.install('-e', '.[pandas]') # Run py.test against the system tests. session.run( diff --git a/bigquery/setup.py b/bigquery/setup.py index 1dd3a9ff6036..8d108c9c4dec 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -58,6 +58,10 @@ 'requests >= 2.18.0', ] +EXTRAS_REQUIREMENTS = { + 'pandas': ['pandas >= 0.17.1'], +} + setup( name='google-cloud-bigquery', version='0.28.1.dev1', @@ -69,5 +73,6 @@ ], packages=find_packages(exclude=('tests*',)), install_requires=REQUIREMENTS, + extras_require=EXTRAS_REQUIREMENTS, **SETUP_BASE ) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 6a26922f3564..629276cf46f2 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -24,6 +24,10 @@ import uuid import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.api_core.exceptions import PreconditionFailed from google.cloud import bigquery @@ -1244,6 +1248,28 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_query_results_to_dataframe(self): + QUERY = """ + SELECT id, author, time_ts, dead + from `bigquery-public-data.hacker_news.comments` + LIMIT 10 + """ + + df = Config.CLIENT.query(QUERY).result().to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 10) # verify the number of rows + column_names = ['id', 'author', 'time_ts', 'dead'] + self.assertEqual(list(df), column_names) # verify the column names + exp_datatypes = {'id': int, 'author': str, + 'time_ts': pandas.Timestamp, 'dead': bool} + for index, row in df.iterrows(): + for col in column_names: + # all the schema fields are nullable, so None is acceptable + if not row[col] is None: + self.assertIsInstance(row[col], exp_datatypes[col]) + def test_query_table_def(self): gs_url = self._write_csv_to_storage( 'bq_external_test' + unique_resource_id(), 'person_ages.csv', @@ -1419,6 +1445,56 @@ def test_create_table_rows_fetch_nested_schema(self): e_favtime = datetime.datetime(*parts[0:6]) self.assertEqual(found[7], e_favtime) + def _fetch_dataframe(self, query): + return Config.CLIENT.query(query).result().to_dataframe() + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_nested_table_to_dataframe(self): + SF = bigquery.SchemaField + schema = [ + SF('string_col', 'STRING', mode='NULLABLE'), + SF('record_col', 'RECORD', mode='NULLABLE', fields=[ + SF('nested_string', 'STRING', mode='NULLABLE'), + SF('nested_repeated', 'INTEGER', mode='REPEATED'), + SF('nested_record', 'RECORD', mode='NULLABLE', fields=[ + SF('nested_nested_string', 'STRING', mode='NULLABLE'), + ]), + ]), + ] + record = { + 'nested_string': 'another string value', + 'nested_repeated': [0, 1, 2], + 'nested_record': {'nested_nested_string': 'some deep insight'}, + } + to_insert = [ + ('Some value', record) + ] + table_id = 'test_table' + dataset = self.temp_dataset(_make_dataset_id('nested_df')) + table_arg = Table(dataset.table(table_id), schema=schema) + table = retry_403(Config.CLIENT.create_table)(table_arg) + self.to_delete.insert(0, table) + Config.CLIENT.create_rows(table, to_insert) + QUERY = 'SELECT * from `{}.{}.{}`'.format( + Config.CLIENT.project, dataset.dataset_id, table_id) + + retry = RetryResult(_has_rows, max_tries=8) + df = retry(self._fetch_dataframe)(QUERY) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 1) # verify the number of rows + exp_columns = ['string_col', 'record_col'] + self.assertEqual(list(df), exp_columns) # verify the column names + row = df.iloc[0] + # verify the row content + self.assertEqual(row['string_col'], 'Some value') + self.assertEqual(row['record_col'], record) + # verify that nested data can be accessed with indices/keys + self.assertEqual(row['record_col']['nested_repeated'][0], 0) + self.assertEqual( + row['record_col']['nested_record']['nested_nested_string'], + 'some deep insight') + def temp_dataset(self, dataset_id): dataset = retry_403(Config.CLIENT.create_dataset)( Dataset(Config.CLIENT.dataset(dataset_id))) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 470835533181..2f141a4dc04d 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -16,6 +16,10 @@ from six.moves import http_client import unittest +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig from google.cloud.bigquery.job import LoadJobConfig @@ -2720,6 +2724,41 @@ def test_reload_w_alternate_client(self): self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe(self): + begun_resource = self._make_resource() + query_resource = { + 'jobComplete': True, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + 'schema': { + 'fields': [ + {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ], + }, + 'rows': [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, + {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, + ], + } + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection( + begun_resource, query_resource, done_resource, query_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + df = job.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + def test_iter(self): import types diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 752239b7276b..dffa815511f1 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -15,6 +15,11 @@ import unittest import mock +import six +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None from google.cloud.bigquery.dataset import DatasetReference @@ -864,3 +869,215 @@ def test_row(self): row.z with self.assertRaises(KeyError): row['z'] + + +class TestRowIterator(unittest.TestCase): + + def test_constructor(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery._helpers import _item_to_row + from google.cloud.bigquery._helpers import _rows_page_start + + client = mock.sentinel.client + api_request = mock.sentinel.api_request + path = '/foo' + schema = [] + iterator = RowIterator(client, api_request, path, schema) + + self.assertFalse(iterator._started) + self.assertIs(iterator.client, client) + self.assertEqual(iterator.path, path) + self.assertIs(iterator._item_to_value, _item_to_row) + self.assertEqual(iterator._items_key, 'rows') + self.assertIsNone(iterator.max_results) + self.assertEqual(iterator.extra_params, {}) + self.assertEqual(iterator._page_start, _rows_page_start) + # Changing attributes. + self.assertEqual(iterator.page_number, 0) + self.assertIsNone(iterator.next_page_token) + self.assertEqual(iterator.num_results, 0) + + def test_iterate(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + self.assertEqual(row_iterator.num_results, 0) + + rows_iter = iter(row_iterator) + + val1 = six.next(rows_iter) + print(val1) + self.assertEqual(val1.name, 'Phred Phlyntstone') + self.assertEqual(row_iterator.num_results, 1) + + val2 = six.next(rows_iter) + self.assertEqual(val2.name, 'Bharney Rhubble') + self.assertEqual(row_iterator.num_results, 2) + + with self.assertRaises(StopIteration): + six.next(rows_iter) + + api_request.assert_called_once_with( + method='GET', path=path, query_params={}) + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + {'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]}, + {'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + self.assertEqual(df.name.dtype.name, 'object') + self.assertEqual(df.age.dtype.name, 'int64') + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_w_empty_results(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': []}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 0) # verify the number of rows + self.assertEqual(list(df), ['name', 'age']) # verify the column names + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_w_various_types_nullable(self): + import datetime + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('start_timestamp', 'TIMESTAMP'), + SchemaField('seconds', 'INT64'), + SchemaField('miles', 'FLOAT64'), + SchemaField('payment_type', 'STRING'), + SchemaField('complete', 'BOOL'), + SchemaField('date', 'DATE'), + ] + row_data = [ + [None, None, None, None, None, None], + ['1.4338368E9', '420', '1.1', 'Cash', 'true', '1999-12-01'], + ['1.3878117E9', '2580', '17.7', 'Cash', 'false', '1953-06-14'], + ['1.3855653E9', '2280', '4.4', 'Credit', 'true', '1981-11-04'], + ] + rows = [{'f': [{'v': field} for field in row]} for row in row_data] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 4) # verify the number of rows + exp_columns = [field.name for field in schema] + self.assertEqual(list(df), exp_columns) # verify the column names + + for index, row in df.iterrows(): + if index == 0: + self.assertTrue(row.isnull().all()) + else: + self.assertIsInstance(row.start_timestamp, pandas.Timestamp) + self.assertIsInstance(row.seconds, float) + self.assertIsInstance(row.payment_type, str) + self.assertIsInstance(row.complete, bool) + self.assertIsInstance(row.date, datetime.date) + + @unittest.skipIf(pandas is None, 'Requires `pandas`') + def test_to_dataframe_column_dtypes(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('start_timestamp', 'TIMESTAMP'), + SchemaField('seconds', 'INT64'), + SchemaField('miles', 'FLOAT64'), + SchemaField('payment_type', 'STRING'), + SchemaField('complete', 'BOOL'), + SchemaField('date', 'DATE'), + ] + row_data = [ + ['1.4338368E9', '420', '1.1', 'Cash', 'true', '1999-12-01'], + ['1.3878117E9', '2580', '17.7', 'Cash', 'false', '1953-06-14'], + ['1.3855653E9', '2280', '4.4', 'Credit', 'true', '1981-11-04'], + ] + rows = [{'f': [{'v': field} for field in row]} for row in row_data] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + df = row_iterator.to_dataframe() + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + exp_columns = [field.name for field in schema] + self.assertEqual(list(df), exp_columns) # verify the column names + + self.assertEqual(df.start_timestamp.dtype.name, 'datetime64[ns, UTC]') + self.assertEqual(df.seconds.dtype.name, 'int64') + self.assertEqual(df.miles.dtype.name, 'float64') + self.assertEqual(df.payment_type.dtype.name, 'object') + self.assertEqual(df.complete.dtype.name, 'bool') + self.assertEqual(df.date.dtype.name, 'object') + + @mock.patch('google.cloud.bigquery.table.pandas', new=None) + def test_to_dataframe_error_if_pandas_is_none(self): + from google.cloud.bigquery.table import RowIterator + from google.cloud.bigquery.table import SchemaField + + schema = [ + SchemaField('name', 'STRING', mode='REQUIRED'), + SchemaField('age', 'INTEGER', mode='REQUIRED') + ] + rows = [ + {'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]}, + {'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]}, + ] + path = '/foo' + api_request = mock.Mock(return_value={'rows': rows}) + row_iterator = RowIterator( + mock.sentinel.client, api_request, path, schema) + + with self.assertRaises(ValueError): + row_iterator.to_dataframe()