diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 266bfc2c666c..98eadb0a2f8e 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -224,11 +224,18 @@ def _row_tuple_from_json(row, schema): Args: row (Dict): A JSON response row to be converted. - schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): Specification of the field types in ``row``. Returns: Tuple: A tuple of data converted to native types. """ + from google.cloud.bigquery.schema import _to_schema_fields + + schema = _to_schema_fields(schema) + row_data = [] for field, cell in zip(schema, row["f"]): row_data.append(_field_from_json(cell["v"], field)) @@ -236,9 +243,25 @@ def _row_tuple_from_json(row, schema): def _rows_from_json(values, schema): - """Convert JSON row data to rows with appropriate types.""" + """Convert JSON row data to rows with appropriate types. + + Args: + values (Sequence[Dict]): The list of responses (JSON rows) to convert. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. + + Returns: + List[:class:`~google.cloud.bigquery.Row`] + """ from google.cloud.bigquery import Row + from google.cloud.bigquery.schema import _to_schema_fields + schema = _to_schema_fields(schema) field_to_index = _field_to_index_mapping(schema) return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values] diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index c7edf2ae51f5..aeb18c2d213d 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): DataFrame for which the client determines the BigQuery schema. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): A BigQuery schema. Use this argument to override the autodetected type for some or all of the DataFrame columns. @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): any column cannot be determined. """ if bq_schema: + bq_schema = schema._to_schema_fields(bq_schema) for field in bq_schema: if field.field_type in schema._STRUCT_TYPES: raise ValueError( @@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): DataFrame to convert to Arrow table. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): - Desired BigQuery schema. Number of columns must match number of - columns in the DataFrame. + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + Desired BigQuery schema. The number of columns must match the + number of columns in the DataFrame. Returns: pyarrow.Table: @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema): column_and_index_names = set( name for name, _ in list_columns_and_indexes(dataframe) ) + + bq_schema = schema._to_schema_fields(bq_schema) bq_field_names = set(field.name for field in bq_schema) extra_fields = bq_field_names - column_and_index_names @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN Args: dataframe (pandas.DataFrame): DataFrame to convert to Parquet file. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. filepath (str): @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN if pyarrow is None: raise ValueError("pyarrow is required for BigQuery schema conversion.") + bq_schema = schema._to_schema_fields(bq_schema) arrow_table = dataframe_to_arrow(dataframe, bq_schema) pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression) @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types): return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) -def download_arrow_tabledata_list(pages, schema): +def download_arrow_tabledata_list(pages, bq_schema): """Use tabledata.list to construct an iterable of RecordBatches. Args: pages (Iterator[:class:`google.api_core.page_iterator.Page`]): An iterator over the result pages. - schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): A decription of the fields in result pages. Yields: :class:`pyarrow.RecordBatch` The next page of records as a ``pyarrow`` record batch. """ - column_names = bq_to_arrow_schema(schema) or [field.name for field in schema] - arrow_types = [bq_to_arrow_data_type(field) for field in schema] + bq_schema = schema._to_schema_fields(bq_schema) + column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] + arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] for page in pages: yield _tabledata_list_page_to_arrow(page, column_names, arrow_types) @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes): return pandas.DataFrame(columns, columns=column_names) -def download_dataframe_tabledata_list(pages, schema, dtypes): - """Use (slower, but free) tabledata.list to construct a DataFrame.""" - column_names = [field.name for field in schema] +def download_dataframe_tabledata_list(pages, bq_schema, dtypes): + """Use (slower, but free) tabledata.list to construct a DataFrame. + + Args: + pages (Iterator[:class:`google.api_core.page_iterator.Page`]): + An iterator over the result pages. + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + A decription of the fields in result pages. + dtypes(Mapping[str, numpy.dtype]): + The types of columns in result data to hint construction of the + resulting DataFrame. Not all column types have to be specified. + Yields: + :class:`pandas.DataFrame` + The next page of records as a ``pandas.DataFrame`` record batch. + """ + bq_schema = schema._to_schema_fields(bq_schema) + column_names = [field.name for field in bq_schema] for page in pages: yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 96724c9f805b..a8d797f4bef5 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -38,6 +38,7 @@ from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField +from google.cloud.bigquery.schema import _to_schema_fields from google.cloud.bigquery.table import _EmptyRowIterator from google.cloud.bigquery.table import RangePartitioning from google.cloud.bigquery.table import _table_arg_to_table_ref @@ -1225,8 +1226,10 @@ def range_partitioning(self, value): @property def schema(self): - """List[google.cloud.bigquery.schema.SchemaField]: Schema of the - destination table. + """Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]: Schema of the destination table. See https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema @@ -1242,8 +1245,8 @@ def schema(self, value): self._del_sub_prop("schema") return - if not all(hasattr(field, "to_api_repr") for field in value): - raise ValueError("Schema items must be fields") + value = _to_schema_fields(value) + _helpers._set_sub_prop( self._properties, ["load", "schema", "fields"], diff --git a/bigquery/google/cloud/bigquery/schema.py b/bigquery/google/cloud/bigquery/schema.py index cb94133abdad..d766cb542608 100644 --- a/bigquery/google/cloud/bigquery/schema.py +++ b/bigquery/google/cloud/bigquery/schema.py @@ -14,6 +14,8 @@ """Schemas for BigQuery tables / queries.""" +import collections + from google.cloud.bigquery_v2 import types @@ -256,3 +258,36 @@ def _build_schema_resource(fields): Sequence[Dict]: Mappings describing the schema of the supplied fields. """ return [field.to_api_repr() for field in fields] + + +def _to_schema_fields(schema): + """Coerce `schema` to a list of schema field instances. + + Args: + schema(Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + Table schema to convert. If some items are passed as mappings, + their content must be compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. + + Returns: + Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`] + + Raises: + Exception: If ``schema`` is not a sequence, or if any item in the + sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField` + instance or a compatible mapping representation of the field. + """ + for field in schema: + if not isinstance(field, (SchemaField, collections.Mapping)): + raise ValueError( + "Schema items must either be fields or compatible " + "mapping representations." + ) + + return [ + field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field) + for field in schema + ] diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7e36c582c42b..2f2ee50cc89e 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -51,9 +51,9 @@ import google.cloud._helpers from google.cloud.bigquery import _helpers from google.cloud.bigquery import _pandas_helpers -from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource +from google.cloud.bigquery.schema import _to_schema_fields from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration @@ -305,8 +305,13 @@ class Table(object): A pointer to a table. If ``table_ref`` is a string, it must included a project ID, dataset ID, and table ID, each separated by ``.``. - schema (List[google.cloud.bigquery.schema.SchemaField]): - The table's schema + schema (Optional[Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. """ _PROPERTY_TO_API_FIELD = { @@ -369,13 +374,17 @@ def require_partition_filter(self, value): @property def schema(self): - """List[google.cloud.bigquery.schema.SchemaField]: Table's schema. + """Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]: + Table's schema. Raises: - TypeError: If 'value' is not a sequence - ValueError: - If any item in the sequence is not a - :class:`~google.cloud.bigquery.schema.SchemaField` + Exception: + If ``schema`` is not a sequence, or if any item in the sequence + is not a :class:`~google.cloud.bigquery.schema.SchemaField` + instance or a compatible mapping representation of the field. """ prop = self._properties.get("schema") if not prop: @@ -387,9 +396,8 @@ def schema(self): def schema(self, value): if value is None: self._properties["schema"] = None - elif not all(isinstance(field, SchemaField) for field in value): - raise ValueError("Schema items must be fields") else: + value = _to_schema_fields(value) self._properties["schema"] = {"fields": _build_schema_resource(value)} @property @@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator): api_request (Callable[google.cloud._http.JSONConnection.api_request]): The function to use to make API requests. path (str): The method path to query for the list of items. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. page_token (str): A token identifying a page in a result set to start fetching results from. max_results (int, optional): The maximum number of results to fetch. @@ -1328,6 +1343,7 @@ def __init__( page_start=_rows_page_start, next_token="pageToken", ) + schema = _to_schema_fields(schema) self._field_to_index = _helpers._field_to_index_mapping(schema) self._page_size = page_size self._preserve_order = False diff --git a/bigquery/samples/query_external_sheets_permanent_table.py b/bigquery/samples/query_external_sheets_permanent_table.py index fd4f7577f1b1..ce9b1c928782 100644 --- a/bigquery/samples/query_external_sheets_permanent_table.py +++ b/bigquery/samples/query_external_sheets_permanent_table.py @@ -52,8 +52,8 @@ def query_external_sheets_permanent_table(dataset_id): external_config.source_uris = [sheet_url] external_config.options.skip_leading_rows = 1 # Optionally skip header row. external_config.options.range = ( - "us-states!A20:B49" - ) # Optionally set range of the sheet to query from. + "us-states!A20:B49" # Optionally set range of the sheet to query from. + ) table.external_data_configuration = external_config # Create a permanent table linked to the Sheets file. diff --git a/bigquery/samples/query_external_sheets_temporary_table.py b/bigquery/samples/query_external_sheets_temporary_table.py index 9f17e91a46cc..e89b6efab362 100644 --- a/bigquery/samples/query_external_sheets_temporary_table.py +++ b/bigquery/samples/query_external_sheets_temporary_table.py @@ -49,8 +49,8 @@ def query_external_sheets_temporary_table(): ] external_config.options.skip_leading_rows = 1 # Optionally skip header row. external_config.options.range = ( - "us-states!A20:B49" - ) # Optionally set range of the sheet to query from. + "us-states!A20:B49" # Optionally set range of the sheet to query from. + ) table_id = "us_states" job_config = bigquery.QueryJobConfig() job_config.table_definitions = {table_id: external_config} diff --git a/bigquery/tests/unit/test__helpers.py b/bigquery/tests/unit/test__helpers.py index 3884695d83af..6d92b4de73ba 100644 --- a/bigquery/tests/unit/test__helpers.py +++ b/bigquery/tests/unit/test__helpers.py @@ -17,6 +17,8 @@ import decimal import unittest +import mock + class Test_not_null(unittest.TestCase): def _call_fut(self, value, field): @@ -412,7 +414,8 @@ class Test_row_tuple_from_json(unittest.TestCase): def _call_fut(self, row, schema): from google.cloud.bigquery._helpers import _row_tuple_from_json - return _row_tuple_from_json(row, schema) + with _field_isinstance_patcher(): + return _row_tuple_from_json(row, schema) def test_w_single_scalar_column(self): # SELECT 1 AS col @@ -529,7 +532,8 @@ class Test_rows_from_json(unittest.TestCase): def _call_fut(self, rows, schema): from google.cloud.bigquery._helpers import _rows_from_json - return _rows_from_json(rows, schema) + with _field_isinstance_patcher(): + return _rows_from_json(rows, schema) def test_w_record_subfield(self): from google.cloud.bigquery.table import Row @@ -1023,3 +1027,23 @@ def __init__(self, mode, name="unknown", field_type="UNKNOWN", fields=()): self.name = name self.field_type = field_type self.fields = fields + + +def _field_isinstance_patcher(): + """A patcher thank makes _Field instances seem like SchemaField instances. + """ + from google.cloud.bigquery.schema import SchemaField + + def fake_isinstance(instance, target_class): + if instance.__class__.__name__ != "_Field": + return isinstance(instance, target_class) # pragma: NO COVER + + # pretend that _Field() instances are actually instances of SchemaField + return target_class is SchemaField or ( + isinstance(target_class, tuple) and SchemaField in target_class + ) + + patcher = mock.patch( + "google.cloud.bigquery.schema.isinstance", side_effect=fake_isinstance + ) + return patcher diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 46fb59180740..56ac62820841 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -619,7 +619,7 @@ def test_list_columns_and_indexes_without_named_index(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_list_columns_and_indexes_with_named_index_same_as_column_name( - module_under_test + module_under_test, ): df_data = collections.OrderedDict( [ @@ -701,6 +701,32 @@ def test_list_columns_and_indexes_with_multiindex(module_under_test): assert columns_and_indexes == expected +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_dataframe_to_bq_schema_dict_sequence(module_under_test): + df_data = collections.OrderedDict( + [ + ("str_column", [u"hello", u"world"]), + ("int_column", [42, 8]), + ("bool_column", [True, False]), + ] + ) + dataframe = pandas.DataFrame(df_data) + + dict_schema = [ + {"name": "str_column", "type": "STRING", "mode": "NULLABLE"}, + {"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"}, + ] + + returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema) + + expected_schema = ( + schema.SchemaField("str_column", "STRING", "NULLABLE"), + schema.SchemaField("int_column", "INTEGER", "NULLABLE"), + schema.SchemaField("bool_column", "BOOL", "REQUIRED"), + ) + assert returned_schema == expected_schema + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") def test_dataframe_to_arrow_with_multiindex(module_under_test): @@ -856,6 +882,28 @@ def test_dataframe_to_arrow_with_unknown_type(module_under_test): assert arrow_schema[3].name == "field03" +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_arrow_dict_sequence_schema(module_under_test): + dict_schema = [ + {"name": "field01", "type": "STRING", "mode": "REQUIRED"}, + {"name": "field02", "type": "BOOL", "mode": "NULLABLE"}, + ] + + dataframe = pandas.DataFrame( + {"field01": [u"hello", u"world"], "field02": [True, False]} + ) + + arrow_table = module_under_test.dataframe_to_arrow(dataframe, dict_schema) + arrow_schema = arrow_table.schema + + expected_fields = [ + pyarrow.field("field01", "string", nullable=False), + pyarrow.field("field02", "bool", nullable=True), + ] + assert list(arrow_schema) == expected_fields + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_dataframe_to_parquet_without_pyarrow(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "pyarrow", None) @@ -908,6 +956,36 @@ def test_dataframe_to_parquet_compression_method(module_under_test): assert call_args.kwargs.get("compression") == "ZSTD" +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_parquet_dict_sequence_schema(module_under_test): + dict_schema = [ + {"name": "field01", "type": "STRING", "mode": "REQUIRED"}, + {"name": "field02", "type": "BOOL", "mode": "NULLABLE"}, + ] + + dataframe = pandas.DataFrame( + {"field01": [u"hello", u"world"], "field02": [True, False]} + ) + + write_table_patch = mock.patch.object( + module_under_test.pyarrow.parquet, "write_table", autospec=True + ) + to_arrow_patch = mock.patch.object( + module_under_test, "dataframe_to_arrow", autospec=True + ) + + with write_table_patch, to_arrow_patch as fake_to_arrow: + module_under_test.dataframe_to_parquet(dataframe, dict_schema, None) + + expected_schema_arg = [ + schema.SchemaField("field01", "STRING", mode="REQUIRED"), + schema.SchemaField("field02", "BOOL", mode="NULLABLE"), + ] + schema_arg = fake_to_arrow.call_args.args[1] + assert schema_arg == expected_schema_arg + + @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") def test_download_arrow_tabledata_list_unknown_field_type(module_under_test): fake_page = api_core.page_iterator.Page( @@ -977,3 +1055,62 @@ def test_download_arrow_tabledata_list_known_field_type(module_under_test): col = result.columns[1] assert type(col) is pyarrow.lib.StringArray assert list(col) == ["2.2", "22.22", "222.222"] + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_arrow_tabledata_list_dict_sequence_schema(module_under_test): + fake_page = api_core.page_iterator.Page( + parent=mock.Mock(), + items=[{"page_data": "foo"}], + item_to_value=api_core.page_iterator._item_to_value_identity, + ) + fake_page._columns = [[1, 10, 100], ["2.2", "22.22", "222.222"]] + pages = [fake_page] + + dict_schema = [ + {"name": "population_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "non_alien_field", "type": "STRING", "mode": "NULLABLE"}, + ] + + results_gen = module_under_test.download_arrow_tabledata_list(pages, dict_schema) + result = next(results_gen) + + assert len(result.columns) == 2 + col = result.columns[0] + assert type(col) is pyarrow.lib.Int64Array + assert list(col) == [1, 10, 100] + col = result.columns[1] + assert type(col) is pyarrow.lib.StringArray + assert list(col) == ["2.2", "22.22", "222.222"] + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_dataframe_tabledata_list_dict_sequence_schema(module_under_test): + fake_page = api_core.page_iterator.Page( + parent=mock.Mock(), + items=[{"page_data": "foo"}], + item_to_value=api_core.page_iterator._item_to_value_identity, + ) + fake_page._columns = [[1, 10, 100], ["2.2", "22.22", "222.222"]] + pages = [fake_page] + + dict_schema = [ + {"name": "population_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "non_alien_field", "type": "STRING", "mode": "NULLABLE"}, + ] + + results_gen = module_under_test.download_dataframe_tabledata_list( + pages, dict_schema, dtypes={} + ) + result = next(results_gen) + + expected_result = pandas.DataFrame( + collections.OrderedDict( + [ + ("population_size", [1, 10, 100]), + ("non_alien_field", ["2.2", "22.22", "222.222"]), + ] + ) + ) + assert result.equals(expected_result) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 91b9bc642187..657667c8cc62 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -1138,7 +1138,8 @@ def test_create_table_w_day_partition_and_expire(self): self.assertEqual(got.table_id, self.TABLE_ID) def test_create_table_w_schema_and_query(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables" % (self.PROJECT, self.DS_ID) query = "SELECT * from %s:%s" % (self.DS_ID, self.TABLE_ID) @@ -1753,7 +1754,8 @@ def test_update_routine(self): self.assertEqual(req[1]["headers"]["If-Match"], "im-an-etag") def test_update_table(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables/%s" % ( self.PROJECT, @@ -1896,7 +1898,8 @@ def test_update_table_w_query(self): import datetime from google.cloud._helpers import UTC from google.cloud._helpers import _millis - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables/%s" % ( self.PROJECT, @@ -4173,7 +4176,7 @@ def test_insert_rows_w_schema(self): from google.cloud._helpers import UTC from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _microseconds_from_datetime - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField WHEN_TS = 1437767599.006 WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC) @@ -4229,7 +4232,8 @@ def test_insert_rows_w_list_of_dictionaries(self): from google.cloud._helpers import UTC from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _microseconds_from_datetime - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table WHEN_TS = 1437767599.006 WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC) @@ -4290,8 +4294,8 @@ def _row_data(row): ) def test_insert_rows_w_list_of_Rows(self): + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table - from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Row PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( @@ -4335,7 +4339,8 @@ def _row_data(row): ) def test_insert_rows_w_skip_invalid_and_ignore_unknown(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4411,7 +4416,8 @@ def _row_data(row): ) def test_insert_rows_w_repeated_fields(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4504,7 +4510,7 @@ def test_insert_rows_w_repeated_fields(self): ) def test_insert_rows_w_record_schema(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4599,6 +4605,7 @@ def test_insert_rows_errors(self): def test_insert_rows_w_numeric(self): from google.cloud.bigquery import table + from google.cloud.bigquery.schema import SchemaField project = "PROJECT" ds_id = "DS_ID" @@ -4608,10 +4615,7 @@ def test_insert_rows_w_numeric(self): client = self._make_one(project=project, credentials=creds, _http=http) conn = client._connection = make_connection({}) table_ref = DatasetReference(project, ds_id).table(table_id) - schema = [ - table.SchemaField("account", "STRING"), - table.SchemaField("balance", "NUMERIC"), - ] + schema = [SchemaField("account", "STRING"), SchemaField("balance", "NUMERIC")] insert_table = table.Table(table_ref, schema=schema) rows = [ ("Savings", decimal.Decimal("23.47")), @@ -4643,7 +4647,7 @@ def test_insert_rows_w_numeric(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( @@ -4719,7 +4723,7 @@ def test_insert_rows_from_dataframe(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe_many_columns(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( @@ -4766,8 +4770,9 @@ def test_insert_rows_from_dataframe_many_columns(self): assert actual_calls[0] == expected_call def test_insert_rows_json(self): - from google.cloud.bigquery.table import Table, SchemaField from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PROJECT = "PROJECT" DS_ID = "DS_ID" @@ -4878,8 +4883,8 @@ def test_list_partitions_with_string_id(self): def test_list_rows(self): import datetime from google.cloud._helpers import UTC + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table - from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Row PATH = "projects/%s/datasets/%s/tables/%s/data" % ( @@ -4979,7 +4984,8 @@ def test_list_rows_empty_table(self): self.assertEqual(rows.total_rows, 0) def test_list_rows_query_params(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table creds = _make_credentials() http = object() @@ -5001,7 +5007,7 @@ def test_list_rows_query_params(self): self.assertEqual(req[1]["query_params"], test[1], "for kwargs %s" % test[0]) def test_list_rows_repeated_fields(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField PATH = "projects/%s/datasets/%s/tables/%s/data" % ( self.PROJECT, @@ -5061,7 +5067,8 @@ def test_list_rows_repeated_fields(self): ) def test_list_rows_w_record_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/data" % ( self.PROJECT, diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 5f3d3ee965b8..a2aeb5efbc4a 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -843,7 +843,7 @@ def test__set_future_result_w_done_wo_result_set_w_error(self): set_exception.assert_called_once() args, kw = set_exception.call_args - exception, = args + (exception,) = args self.assertIsInstance(exception, NotFound) self.assertEqual(exception.message, "testing") self.assertEqual(kw, {}) @@ -1532,7 +1532,7 @@ def test_schema_hit(self): self.assertEqual(all_props, SchemaField.from_api_repr(all_props_repr)) self.assertEqual(minimal, SchemaField.from_api_repr(minimal_repr)) - def test_schema_setter(self): + def test_schema_setter_fields(self): from google.cloud.bigquery.schema import SchemaField config = self._get_target_class()() @@ -1555,6 +1555,42 @@ def test_schema_setter(self): config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]} ) + def test_schema_setter_valid_mappings_list(self): + config = self._get_target_class()() + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INTEGER", "mode": "REQUIRED"}, + ] + config.schema = schema + + full_name_repr = { + "name": "full_name", + "type": "STRING", + "mode": "REQUIRED", + "description": None, + } + age_repr = { + "name": "age", + "type": "INTEGER", + "mode": "REQUIRED", + "description": None, + } + self.assertEqual( + config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]} + ) + + def test_schema_setter_invalid_mappings_list(self): + config = self._get_target_class()() + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "typeoo": "INTEGER", "mode": "REQUIRED"}, + ] + + with self.assertRaises(Exception): + config.schema = schema + def test_schema_setter_unsetting_schema(self): from google.cloud.bigquery.schema import SchemaField diff --git a/bigquery/tests/unit/test_schema.py b/bigquery/tests/unit/test_schema.py index 862d8a823e62..fc8a41c68c46 100644 --- a/bigquery/tests/unit/test_schema.py +++ b/bigquery/tests/unit/test_schema.py @@ -568,3 +568,69 @@ def test_w_subfields(self): ], }, ) + + +class Test_to_schema_fields(unittest.TestCase): + @staticmethod + def _call_fut(schema): + from google.cloud.bigquery.schema import _to_schema_fields + + return _to_schema_fields(schema) + + def test_invalid_type(self): + schema = [ + ("full_name", "STRING", "REQUIRED"), + ("address", "STRING", "REQUIRED"), + ] + with self.assertRaises(ValueError): + self._call_fut(schema) + + def test_schema_fields_sequence(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INT64", mode="NULLABLE"), + ] + result = self._call_fut(schema) + self.assertEqual(result, schema) + + def test_invalid_mapping_representation(self): + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "address", "typeooo": "STRING", "mode": "REQUIRED"}, + ] + with self.assertRaises(Exception): + self._call_fut(schema) + + def test_valid_mapping_representation(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + { + "name": "residence", + "type": "STRUCT", + "mode": "NULLABLE", + "fields": [ + {"name": "foo", "type": "DATE", "mode": "NULLABLE"}, + {"name": "bar", "type": "BYTES", "mode": "REQUIRED"}, + ], + }, + ] + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField( + "residence", + "STRUCT", + mode="NULLABLE", + fields=[ + SchemaField("foo", "DATE", mode="NULLABLE"), + SchemaField("bar", "BYTES", mode="REQUIRED"), + ], + ), + ] + + result = self._call_fut(schema) + self.assertEqual(result, expected_schema) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index b04a4491e6ca..97a7b4ae745e 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -450,7 +450,7 @@ def test_ctor(self): self.assertIsNone(table.clustering_fields) def test_ctor_w_schema(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -556,7 +556,7 @@ def test_num_rows_getter(self): with self.assertRaises(ValueError): getattr(table, "num_rows") - def test_schema_setter_non_list(self): + def test_schema_setter_non_sequence(self): dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) table = self._make_one(table_ref) @@ -564,7 +564,7 @@ def test_schema_setter_non_list(self): table.schema = object() def test_schema_setter_invalid_field(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -573,8 +573,8 @@ def test_schema_setter_invalid_field(self): with self.assertRaises(ValueError): table.schema = [full_name, object()] - def test_schema_setter(self): - from google.cloud.bigquery.table import SchemaField + def test_schema_setter_valid_fields(self): + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -584,6 +584,48 @@ def test_schema_setter(self): table.schema = [full_name, age] self.assertEqual(table.schema, [full_name, age]) + def test_schema_setter_invalid_mapping_representation(self): + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + full_name = {"name": "full_name", "type": "STRING", "mode": "REQUIRED"} + invalid_field = {"name": "full_name", "typeooo": "STRING", "mode": "REQUIRED"} + with self.assertRaises(Exception): + table.schema = [full_name, invalid_field] + + def test_schema_setter_valid_mapping_representation(self): + from google.cloud.bigquery.schema import SchemaField + + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + full_name = {"name": "full_name", "type": "STRING", "mode": "REQUIRED"} + job_status = { + "name": "is_employed", + "type": "STRUCT", + "mode": "NULLABLE", + "fields": [ + {"name": "foo", "type": "DATE", "mode": "NULLABLE"}, + {"name": "bar", "type": "BYTES", "mode": "REQUIRED"}, + ], + } + + table.schema = [full_name, job_status] + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField( + "is_employed", + "STRUCT", + mode="NULLABLE", + fields=[ + SchemaField("foo", "DATE", mode="NULLABLE"), + SchemaField("bar", "BYTES", mode="REQUIRED"), + ], + ), + ] + self.assertEqual(table.schema, expected_schema) + def test_props_set_by_server(self): import datetime from google.cloud._helpers import UTC @@ -1145,7 +1187,8 @@ def test__row_from_mapping_wo_schema(self): self.assertEqual(exc.exception.args, (_TABLE_HAS_NO_SCHEMA,)) def test__row_from_mapping_w_invalid_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table MAPPING = { "full_name": "Phred Phlyntstone", @@ -1167,7 +1210,8 @@ def test__row_from_mapping_w_invalid_schema(self): self.assertIn("Unknown field mode: BOGUS", str(exc.exception)) def test__row_from_mapping_w_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table MAPPING = { "full_name": "Phred Phlyntstone", @@ -1497,8 +1541,24 @@ def test_constructor_with_table(self): self.assertIs(iterator._table, table) self.assertEqual(iterator.total_rows, 100) + def test_constructor_with_dict_schema(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ] + + iterator = self._make_one(schema=schema) + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INT64", mode="NULLABLE"), + ] + self.assertEqual(iterator.schema, expected_schema) + def test_iterate(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1529,7 +1589,7 @@ def test_iterate(self): api_request.assert_called_once_with(method="GET", path=path, query_params={}) def test_page_size(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1555,7 +1615,7 @@ def test_page_size(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1637,7 +1697,7 @@ def test_to_arrow(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_nulls(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] rows = [ @@ -1670,7 +1730,7 @@ def test_to_arrow_w_nulls(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_unknown_type(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1708,7 +1768,7 @@ def test_to_arrow_w_unknown_type(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_empty_table(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1870,7 +1930,7 @@ def test_to_arrow_w_bqstorage_no_streams(self): @mock.patch("tqdm.tqdm_notebook") @mock.patch("tqdm.tqdm") def test_to_arrow_progress_bar(self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1913,7 +1973,7 @@ def test_to_arrow_w_pyarrow_none(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1945,7 +2005,7 @@ def test_to_dataframe(self): def test_to_dataframe_progress_bar( self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock ): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1978,7 +2038,7 @@ def test_to_dataframe_progress_bar( @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm_no_progress_bar(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2003,7 +2063,7 @@ def test_to_dataframe_no_tqdm_no_progress_bar(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2036,7 +2096,7 @@ def test_to_dataframe_no_tqdm(self): @mock.patch("tqdm.tqdm_notebook", new=None) # will raise TypeError on call @mock.patch("tqdm.tqdm", new=None) # will raise TypeError on call def test_to_dataframe_tqdm_error(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2066,7 +2126,7 @@ def test_to_dataframe_tqdm_error(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_empty_results(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2101,7 +2161,7 @@ def test_to_dataframe_logs_tabledata_list(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_various_types_nullable(self): import datetime - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("start_timestamp", "TIMESTAMP"), @@ -2141,7 +2201,7 @@ def test_to_dataframe_w_various_types_nullable(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_column_dtypes(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("start_timestamp", "TIMESTAMP"), @@ -2179,7 +2239,7 @@ def test_to_dataframe_column_dtypes(self): @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2198,7 +2258,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_max_results_w_bqstorage_warning(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"),