Skip to content

Commit

Permalink
Fix nested schema parsing in insert_rows (#7022)
Browse files Browse the repository at this point in the history
Before this change, insert_rows did not convert record/struct or repeated/array types to the correct JSON. Now the new _helpers._field_to_json function will recursively convert a values into the correct JSON for the provided schema.
  • Loading branch information
adamf authored and tswast committed Jan 2, 2019
1 parent 2fb75da commit e633ffd
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 36 deletions.
107 changes: 107 additions & 0 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Shared helper functions for BigQuery API classes."""

import base64
import copy
import datetime
import decimal

Expand Down Expand Up @@ -329,6 +330,112 @@ def _time_to_json(value):
_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter


def _scalar_field_to_json(field, row_value):
"""Maps a field and value to a JSON-safe value.
Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name.
row_value (any):
Value to be converted, based on the field's type.
Returns:
any:
A JSON-serializable object.
"""
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is None: # STRING doesn't need converting
return row_value
return converter(row_value)


def _repeated_field_to_json(field, row_value):
"""Convert a repeated/array field to its JSON representation.
Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name. The
field mode must equal ``REPEATED``.
row_value (Sequence[any]):
A sequence of values to convert to JSON-serializable values.
Returns:
List[any]:
A list of JSON-serializable objects.
"""
# Remove the REPEATED, but keep the other fields. This allows us to process
# each item as if it were a top-level field.
item_field = copy.deepcopy(field)
item_field._mode = "NULLABLE"
values = []
for item in row_value:
values.append(_field_to_json(item_field, item))
return values


def _record_field_to_json(fields, row_value):
"""Convert a record/struct field to its JSON representation.
Args:
fields ( \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
):
The :class:`~google.cloud.bigquery.schema.SchemaField`s of the
record's subfields to use for type conversion and field names.
row_value (Union[Tuple[Any], Mapping[str, Any]):
A tuple or dictionary to convert to JSON-serializable values.
Returns:
Mapping[str, any]:
A JSON-serializable dictionary.
"""
record = {}
isdict = isinstance(row_value, dict)

for subindex, subfield in enumerate(fields):
subname = subfield.name
subvalue = row_value[subname] if isdict else row_value[subindex]
record[subname] = _field_to_json(subfield, subvalue)
return record


def _field_to_json(field, row_value):
"""Convert a field into JSON-serializable values.
Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name.
row_value (Union[ \
Sequence[list], \
any, \
]):
Row data to be inserted. If the SchemaField's mode is
REPEATED, assume this is a list. If not, the type
is inferred from the SchemaField's field_type.
Returns:
any:
A JSON-serializable object.
"""
if row_value is None:
return None

if field.mode == "REPEATED":
return _repeated_field_to_json(field, row_value)

if field.field_type == "RECORD":
return _record_field_to_json(field.fields, row_value)

return _scalar_field_to_json(field, row_value)


def _snake_to_camel_case(value):
"""Convert snake case string to camel case."""
words = value.split("_")
Expand Down
18 changes: 2 additions & 16 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -51,7 +51,6 @@
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA
from google.cloud.bigquery.table import _row_from_mapping


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
Expand Down Expand Up @@ -1495,20 +1494,7 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):
else:
raise TypeError("table should be Table or TableReference")

json_rows = []

for index, row in enumerate(rows):
if isinstance(row, dict):
row = _row_from_mapping(row, schema)
json_row = {}

for field, value in zip(schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
json_row[field.name] = value

json_rows.append(json_row)
json_rows = [_record_field_to_json(schema, row) for row in rows]

return self.insert_rows_json(table, json_rows, **kwargs)

Expand Down
115 changes: 95 additions & 20 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import six
from six.moves import http_client
import pytest
import pytz

try:
import pandas
Expand Down Expand Up @@ -3482,20 +3483,76 @@ def test_insert_rows_w_repeated_fields(self):
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = _make_connection({})
full_name = SchemaField("color", "STRING", mode="REPEATED")
index = SchemaField("index", "INTEGER", "REPEATED")
score = SchemaField("score", "FLOAT", "REPEATED")
struct = SchemaField("struct", "RECORD", mode="REPEATED", fields=[index, score])
table = Table(self.TABLE_REF, schema=[full_name, struct])
ROWS = [(["red", "green"], [{"index": [1, 2], "score": [3.1415, 1.414]}])]

def _row_data(row):
return {"color": row[0], "struct": row[1]}
color = SchemaField("color", "STRING", mode="REPEATED")
items = SchemaField("items", "INTEGER", mode="REPEATED")
score = SchemaField("score", "INTEGER")
times = SchemaField("times", "TIMESTAMP", mode="REPEATED")
distances = SchemaField("distances", "FLOAT", mode="REPEATED")
structs = SchemaField(
"structs", "RECORD", mode="REPEATED", fields=[score, times, distances]
)
table = Table(self.TABLE_REF, schema=[color, items, structs])
ROWS = [
(
["red", "green"],
[1, 2],
[
(
12,
[
datetime.datetime(2018, 12, 1, 12, 0, 0, tzinfo=pytz.utc),
datetime.datetime(2018, 12, 1, 13, 0, 0, tzinfo=pytz.utc),
],
[1.25, 2.5],
),
{
"score": 13,
"times": [
datetime.datetime(2018, 12, 2, 12, 0, 0, tzinfo=pytz.utc),
datetime.datetime(2018, 12, 2, 13, 0, 0, tzinfo=pytz.utc),
],
"distances": [-1.25, -2.5],
},
],
),
{"color": None, "items": [], "structs": [(None, [], [3.5])]},
]

SENT = {
"rows": [
{"json": _row_data(row), "insertId": str(i)}
for i, row in enumerate(ROWS)
{
"json": {
"color": ["red", "green"],
"items": ["1", "2"],
"structs": [
{
"score": "12",
"times": [
1543665600.0, # 2018-12-01 12:00 UTC
1543669200.0, # 2018-12-01 13:00 UTC
],
"distances": [1.25, 2.5],
},
{
"score": "13",
"times": [
1543752000.0, # 2018-12-02 12:00 UTC
1543755600.0, # 2018-12-02 13:00 UTC
],
"distances": [-1.25, -2.5],
},
],
},
"insertId": "0",
},
{
"json": {
"color": None,
"items": [],
"structs": [{"score": None, "times": [], "distances": [3.5]}],
},
"insertId": "1",
},
]
}

Expand Down Expand Up @@ -3531,20 +3588,38 @@ def test_insert_rows_w_record_schema(self):
"Phred Phlyntstone",
{"area_code": "800", "local_number": "555-1212", "rank": 1},
),
(
"Bharney Rhubble",
{"area_code": "877", "local_number": "768-5309", "rank": 2},
),
("Bharney Rhubble", ("877", "768-5309", 2)),
("Wylma Phlyntstone", None),
]

def _row_data(row):
return {"full_name": row[0], "phone": row[1]}

SENT = {
"rows": [
{"json": _row_data(row), "insertId": str(i)}
for i, row in enumerate(ROWS)
{
"json": {
"full_name": "Phred Phlyntstone",
"phone": {
"area_code": "800",
"local_number": "555-1212",
"rank": "1",
},
},
"insertId": "0",
},
{
"json": {
"full_name": "Bharney Rhubble",
"phone": {
"area_code": "877",
"local_number": "768-5309",
"rank": "2",
},
},
"insertId": "1",
},
{
"json": {"full_name": "Wylma Phlyntstone", "phone": None},
"insertId": "2",
},
]
}

Expand Down

0 comments on commit e633ffd

Please sign in to comment.