From e633ffdb96acebbd8c87d1f52edeb2432f6ac7e0 Mon Sep 17 00:00:00 2001 From: Adam Fletcher Date: Wed, 2 Jan 2019 07:04:12 -0800 Subject: [PATCH] Fix nested schema parsing in insert_rows (#7022) Before this change, insert_rows did not convert record/struct or repeated/array types to the correct JSON. Now the new _helpers._field_to_json function will recursively convert a values into the correct JSON for the provided schema. --- bigquery/google/cloud/bigquery/_helpers.py | 107 +++++++++++++++++++ bigquery/google/cloud/bigquery/client.py | 18 +--- bigquery/tests/unit/test_client.py | 115 +++++++++++++++++---- 3 files changed, 204 insertions(+), 36 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 6990fb3eaa69..10753cfc998b 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -15,6 +15,7 @@ """Shared helper functions for BigQuery API classes.""" import base64 +import copy import datetime import decimal @@ -329,6 +330,112 @@ def _time_to_json(value): _SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter +def _scalar_field_to_json(field, row_value): + """Maps a field and value to a JSON-safe value. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + row_value (any): + Value to be converted, based on the field's type. + + Returns: + any: + A JSON-serializable object. + """ + converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) + if converter is None: # STRING doesn't need converting + return row_value + return converter(row_value) + + +def _repeated_field_to_json(field, row_value): + """Convert a repeated/array field to its JSON representation. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. The + field mode must equal ``REPEATED``. + row_value (Sequence[any]): + A sequence of values to convert to JSON-serializable values. + + Returns: + List[any]: + A list of JSON-serializable objects. + """ + # Remove the REPEATED, but keep the other fields. This allows us to process + # each item as if it were a top-level field. + item_field = copy.deepcopy(field) + item_field._mode = "NULLABLE" + values = [] + for item in row_value: + values.append(_field_to_json(item_field, item)) + return values + + +def _record_field_to_json(fields, row_value): + """Convert a record/struct field to its JSON representation. + + Args: + fields ( \ + Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \ + ): + The :class:`~google.cloud.bigquery.schema.SchemaField`s of the + record's subfields to use for type conversion and field names. + row_value (Union[Tuple[Any], Mapping[str, Any]): + A tuple or dictionary to convert to JSON-serializable values. + + Returns: + Mapping[str, any]: + A JSON-serializable dictionary. + """ + record = {} + isdict = isinstance(row_value, dict) + + for subindex, subfield in enumerate(fields): + subname = subfield.name + subvalue = row_value[subname] if isdict else row_value[subindex] + record[subname] = _field_to_json(subfield, subvalue) + return record + + +def _field_to_json(field, row_value): + """Convert a field into JSON-serializable values. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + + row_value (Union[ \ + Sequence[list], \ + any, \ + ]): + Row data to be inserted. If the SchemaField's mode is + REPEATED, assume this is a list. If not, the type + is inferred from the SchemaField's field_type. + + Returns: + any: + A JSON-serializable object. + """ + if row_value is None: + return None + + if field.mode == "REPEATED": + return _repeated_field_to_json(field, row_value) + + if field.field_type == "RECORD": + return _record_field_to_json(field.fields, row_value) + + return _scalar_field_to_json(field, row_value) + + def _snake_to_camel_case(value): """Convert snake case string to camel case.""" words = value.split("_") diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index dbc68119530f..96f1310c3f99 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -37,7 +37,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject -from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW +from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -51,7 +51,6 @@ from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA -from google.cloud.bigquery.table import _row_from_mapping _DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB @@ -1495,20 +1494,7 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs): else: raise TypeError("table should be Table or TableReference") - json_rows = [] - - for index, row in enumerate(rows): - if isinstance(row, dict): - row = _row_from_mapping(row, schema) - json_row = {} - - for field, value in zip(schema, row): - converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) - if converter is not None: # STRING doesn't need converting - value = converter(value) - json_row[field.name] = value - - json_rows.append(json_row) + json_rows = [_record_field_to_json(schema, row) for row in rows] return self.insert_rows_json(table, json_rows, **kwargs) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 0fc14b160a9c..c3d90ed640fb 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -25,6 +25,7 @@ import six from six.moves import http_client import pytest +import pytz try: import pandas @@ -3482,20 +3483,76 @@ def test_insert_rows_w_repeated_fields(self): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = _make_connection({}) - full_name = SchemaField("color", "STRING", mode="REPEATED") - index = SchemaField("index", "INTEGER", "REPEATED") - score = SchemaField("score", "FLOAT", "REPEATED") - struct = SchemaField("struct", "RECORD", mode="REPEATED", fields=[index, score]) - table = Table(self.TABLE_REF, schema=[full_name, struct]) - ROWS = [(["red", "green"], [{"index": [1, 2], "score": [3.1415, 1.414]}])] - - def _row_data(row): - return {"color": row[0], "struct": row[1]} + color = SchemaField("color", "STRING", mode="REPEATED") + items = SchemaField("items", "INTEGER", mode="REPEATED") + score = SchemaField("score", "INTEGER") + times = SchemaField("times", "TIMESTAMP", mode="REPEATED") + distances = SchemaField("distances", "FLOAT", mode="REPEATED") + structs = SchemaField( + "structs", "RECORD", mode="REPEATED", fields=[score, times, distances] + ) + table = Table(self.TABLE_REF, schema=[color, items, structs]) + ROWS = [ + ( + ["red", "green"], + [1, 2], + [ + ( + 12, + [ + datetime.datetime(2018, 12, 1, 12, 0, 0, tzinfo=pytz.utc), + datetime.datetime(2018, 12, 1, 13, 0, 0, tzinfo=pytz.utc), + ], + [1.25, 2.5], + ), + { + "score": 13, + "times": [ + datetime.datetime(2018, 12, 2, 12, 0, 0, tzinfo=pytz.utc), + datetime.datetime(2018, 12, 2, 13, 0, 0, tzinfo=pytz.utc), + ], + "distances": [-1.25, -2.5], + }, + ], + ), + {"color": None, "items": [], "structs": [(None, [], [3.5])]}, + ] SENT = { "rows": [ - {"json": _row_data(row), "insertId": str(i)} - for i, row in enumerate(ROWS) + { + "json": { + "color": ["red", "green"], + "items": ["1", "2"], + "structs": [ + { + "score": "12", + "times": [ + 1543665600.0, # 2018-12-01 12:00 UTC + 1543669200.0, # 2018-12-01 13:00 UTC + ], + "distances": [1.25, 2.5], + }, + { + "score": "13", + "times": [ + 1543752000.0, # 2018-12-02 12:00 UTC + 1543755600.0, # 2018-12-02 13:00 UTC + ], + "distances": [-1.25, -2.5], + }, + ], + }, + "insertId": "0", + }, + { + "json": { + "color": None, + "items": [], + "structs": [{"score": None, "times": [], "distances": [3.5]}], + }, + "insertId": "1", + }, ] } @@ -3531,20 +3588,38 @@ def test_insert_rows_w_record_schema(self): "Phred Phlyntstone", {"area_code": "800", "local_number": "555-1212", "rank": 1}, ), - ( - "Bharney Rhubble", - {"area_code": "877", "local_number": "768-5309", "rank": 2}, - ), + ("Bharney Rhubble", ("877", "768-5309", 2)), ("Wylma Phlyntstone", None), ] - def _row_data(row): - return {"full_name": row[0], "phone": row[1]} - SENT = { "rows": [ - {"json": _row_data(row), "insertId": str(i)} - for i, row in enumerate(ROWS) + { + "json": { + "full_name": "Phred Phlyntstone", + "phone": { + "area_code": "800", + "local_number": "555-1212", + "rank": "1", + }, + }, + "insertId": "0", + }, + { + "json": { + "full_name": "Bharney Rhubble", + "phone": { + "area_code": "877", + "local_number": "768-5309", + "rank": "2", + }, + }, + "insertId": "1", + }, + { + "json": {"full_name": "Wylma Phlyntstone", "phone": None}, + "insertId": "2", + }, ] }