From 5abe271e25d144f4b2a029a5921f54b0a0d88785 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Wed, 24 Jun 2020 17:33:32 +0530 Subject: [PATCH 1/3] feat(bigquery): add support and tests for struct fields --- google/cloud/bigquery/_pandas_helpers.py | 16 ++++--- tests/system.py | 44 ++++++++++++++++++ tests/unit/test_client.py | 58 ++++++++++++++++++++---- 3 files changed, 101 insertions(+), 17 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index f5f9d4a99..1121d794f 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -18,6 +18,7 @@ import functools import logging import warnings +import six from six.moves import queue @@ -286,13 +287,14 @@ def dataframe_to_bq_schema(dataframe, bq_schema): """ if bq_schema: bq_schema = schema._to_schema_fields(bq_schema) - for field in bq_schema: - if field.field_type in schema._STRUCT_TYPES: - raise ValueError( - "Uploading dataframes with struct (record) column types " - "is not supported. See: " - "https://github.com/googleapis/google-cloud-python/issues/8191" - ) + if six.PY2: + for field in bq_schema: + if field.field_type in schema._STRUCT_TYPES: + raise ValueError( + "Uploading dataframes with struct (record) column types " + "is not supported. See: " + "https://github.com/googleapis/google-cloud-python/issues/8191" + ) bq_schema_index = {field.name: field for field in bq_schema} bq_schema_unused = set(bq_schema_index.keys()) else: diff --git a/tests/system.py b/tests/system.py index 14d3f49a1..f0d241be0 100644 --- a/tests/system.py +++ b/tests/system.py @@ -130,6 +130,8 @@ PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0") PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version +PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0") +PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version def _has_rows(result): @@ -1074,6 +1076,48 @@ def test_load_table_from_dataframe_w_explicit_schema(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 3) + @unittest.skipIf( + pyarrow is None or PYARROW_INSTALLED_VERSION < PYARROW_MINIMUM_VERSION, + "Only `pyarrow version >=0.17.0` is supported", + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_struct_datatype(self): + """Test that a DataFrame with struct datatype can be uploaded if a + BigQuery schema is specified. + + https://github.com/googleapis/python-bigquery/issues/21 + """ + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_struct_datatype".format( + Config.CLIENT.project, dataset_id + ) + table_schema = [ + bigquery.SchemaField( + "bar", + "RECORD", + fields=[ + bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + mode="REQUIRED", + ), + ] + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + df_data = [{"id": 1, "age": 21}, {"id": 2, "age": 22}, {"id": 2, "age": 23}] + dataframe = pandas.DataFrame(data={"bar": df_data}, columns=["bar"]) + + load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual(table.schema, table_schema) + self.assertEqual(table.num_rows, 3) + def test_load_table_from_json_basic_use(self): table_schema = ( bigquery.SchemaField("name", "STRING", mode="REQUIRED"), diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 0e083d43f..c9b3e9751 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -7093,19 +7093,22 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_struct_fields_error(self): + def test_load_table_from_dataframe_struct_fields(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}] - dataframe = pandas.DataFrame(data=records) + records = [(3.14, {"foo": 1, "bar": 1})] + dataframe = pandas.DataFrame( + data=records, columns=["float_column", "struct_column"] + ) schema = [ SchemaField("float_column", "FLOAT"), SchemaField( - "agg_col", + "struct_column", "RECORD", fields=[SchemaField("foo", "INTEGER"), SchemaField("bar", "INTEGER")], ), @@ -7116,14 +7119,49 @@ def test_load_table_from_dataframe_struct_fields_error(self): "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) - with pytest.raises(ValueError) as exc_info, load_patch: - client.load_table_from_dataframe( - dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + if six.PY2: + with pytest.raises(ValueError) as exc_info, load_patch: + client.load_table_from_dataframe( + dataframe, + self.TABLE_REF, + job_config=job_config, + location=self.LOCATION, + ) + + err_msg = str(exc_info.value) + assert "struct" in err_msg + assert "not support" in err_msg + + else: + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + side_effect=google.api_core.exceptions.NotFound("Table not found"), + ) + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, + self.TABLE_REF, + job_config=job_config, + location=self.LOCATION, + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, ) - err_msg = str(exc_info.value) - assert "struct" in err_msg - assert "not support" in err_msg + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert sent_config.schema == schema @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") From 6185db4f08839a6c59f8a1c87ed25ae09ab83781 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Wed, 22 Jul 2020 12:29:09 +0530 Subject: [PATCH 2/3] feat(bigquery): bump pyarrow version for python3 --- google/cloud/bigquery/_pandas_helpers.py | 1 - setup.py | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 551d9fc06..c82edf167 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -20,7 +20,6 @@ import warnings import six -import six from six.moves import queue try: diff --git a/setup.py b/setup.py index 497853be0..ed2301cfa 100644 --- a/setup.py +++ b/setup.py @@ -47,10 +47,8 @@ ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. - 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [ - # Bad Linux release for 0.14.0. - # https://issues.apache.org/jira/browse/ARROW-5868 - "pyarrow>=0.4.1, != 0.14.0" + 'pyarrow: platform_system != "Windows" or python_version >= "3.5"': [ + "pyarrow>=0.17.0" ], "tqdm": ["tqdm >= 4.0.0, <5.0.0dev"], "fastparquet": [ From 59786e946adb90b62e42df7014ec7f46385518af Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Thu, 30 Jul 2020 17:17:13 +0530 Subject: [PATCH 3/3] feat(bigquery): nit --- google/cloud/bigquery/_pandas_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c82edf167..953b7d0fe 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -18,8 +18,8 @@ import functools import logging import warnings -import six +import six from six.moves import queue try: @@ -292,8 +292,8 @@ def dataframe_to_bq_schema(dataframe, bq_schema): if field.field_type in schema._STRUCT_TYPES: raise ValueError( "Uploading dataframes with struct (record) column types " - "is not supported. See: " - "https://github.com/googleapis/google-cloud-python/issues/8191" + "is not supported under Python2. See: " + "https://github.com/googleapis/python-bigquery/issues/21" ) bq_schema_index = {field.name: field for field in bq_schema} bq_schema_unused = set(bq_schema_index.keys())