Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #6300 - Fix nested schema parsing in insert_rows #7022

Merged
merged 3 commits into from
Jan 2, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Shared helper functions for BigQuery API classes."""

import base64
import copy
import datetime
import decimal

Expand Down Expand Up @@ -329,6 +330,112 @@ def _time_to_json(value):
_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter


def _scalar_field_to_json(field, row_value):
"""Maps a field and value to a JSON-safe value.

Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name.
row_value (any):
Value to be converted, based on the field's type.

Returns:
any:
A JSON-serializable object.
"""
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is None: # STRING doesn't need converting
return row_value
return converter(row_value)


def _repeated_field_to_json(field, row_value):
"""Convert a repeated/array field to its JSON representation.

Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name. The
field mode must equal ``REPEATED``.
row_value (Sequence[any]):
A sequence of values to convert to JSON-serializable values.

Returns:
List[any]:
A list of JSON-serializable objects.
"""
# Remove the REPEATED, but keep the other fields. This allows us to process
# each item as if it were a top-level field.
item_field = copy.deepcopy(field)
item_field._mode = "NULLABLE"
values = []
for item in row_value:
values.append(_field_to_json(item_field, item))
return values


def _record_field_to_json(fields, row_value):
"""Convert a record/struct field to its JSON representation.

Args:
field ( \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
):
The SchemaField to use for type conversion and field name. The
field type must equal ``RECORD``.
row_value (Union[Tuple[Any], Mapping[str, Any]):
A tuple or dictionary to convert to JSON-serializable values.

Returns:
Mapping[str, any]:
A JSON-serializable dictionary.
"""
record = {}
isdict = isinstance(row_value, dict)

for subindex, subfield in enumerate(fields):
subname = subfield.name
subvalue = row_value[subname] if isdict else row_value[subindex]
record[subname] = _field_to_json(subfield, subvalue)
return record


def _field_to_json(field, row_value):
"""Convert a field into JSON-serializable values.

Args:
field ( \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
):
The SchemaField to use for type conversion and field name.

row_value (Union[ \
Sequence[list], \
any, \
]):
Row data to be inserted. If the SchemaField's mode is
REPEATED, assume this is a list. If not, the type
is inferred from the SchemaField's field_type.

Returns:
any:
A JSON-serializable object.
"""
if row_value is None:
return None

if field.mode == "REPEATED":
return _repeated_field_to_json(field, row_value)

if field.field_type == "RECORD":
return _record_field_to_json(field.fields, row_value)

return _scalar_field_to_json(field, row_value)


def _snake_to_camel_case(value):
"""Convert snake case string to camel case."""
words = value.split("_")
Expand Down
18 changes: 2 additions & 16 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -51,7 +51,6 @@
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA
from google.cloud.bigquery.table import _row_from_mapping


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
Expand Down Expand Up @@ -1495,20 +1494,7 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):
else:
raise TypeError("table should be Table or TableReference")

json_rows = []

for index, row in enumerate(rows):
if isinstance(row, dict):
row = _row_from_mapping(row, schema)
json_row = {}

for field, value in zip(schema, row):
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
json_row[field.name] = value

json_rows.append(json_row)
json_rows = [_record_field_to_json(schema, row) for row in rows]

return self.insert_rows_json(table, json_rows, **kwargs)

Expand Down
115 changes: 95 additions & 20 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import six
from six.moves import http_client
import pytest
import pytz

try:
import pandas
Expand Down Expand Up @@ -3482,20 +3483,76 @@ def test_insert_rows_w_repeated_fields(self):
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = _make_connection({})
full_name = SchemaField("color", "STRING", mode="REPEATED")
index = SchemaField("index", "INTEGER", "REPEATED")
score = SchemaField("score", "FLOAT", "REPEATED")
struct = SchemaField("struct", "RECORD", mode="REPEATED", fields=[index, score])
table = Table(self.TABLE_REF, schema=[full_name, struct])
ROWS = [(["red", "green"], [{"index": [1, 2], "score": [3.1415, 1.414]}])]

def _row_data(row):
return {"color": row[0], "struct": row[1]}
color = SchemaField("color", "STRING", mode="REPEATED")
items = SchemaField("items", "INTEGER", mode="REPEATED")
score = SchemaField("score", "INTEGER")
times = SchemaField("times", "TIMESTAMP", mode="REPEATED")
distances = SchemaField("distances", "FLOAT", mode="REPEATED")
structs = SchemaField(
"structs", "RECORD", mode="REPEATED", fields=[score, times, distances]
)
table = Table(self.TABLE_REF, schema=[color, items, structs])
ROWS = [
(
["red", "green"],
[1, 2],
[
(
12,
[
datetime.datetime(2018, 12, 1, 12, 0, 0, tzinfo=pytz.utc),
datetime.datetime(2018, 12, 1, 13, 0, 0, tzinfo=pytz.utc),
],
[1.25, 2.5],
),
{
"score": 13,
"times": [
datetime.datetime(2018, 12, 2, 12, 0, 0, tzinfo=pytz.utc),
datetime.datetime(2018, 12, 2, 13, 0, 0, tzinfo=pytz.utc),
],
"distances": [-1.25, -2.5],
},
],
),
{"color": None, "items": [], "structs": [(None, [], [3.5])]},
]

SENT = {
"rows": [
{"json": _row_data(row), "insertId": str(i)}
for i, row in enumerate(ROWS)
{
"json": {
"color": ["red", "green"],
"items": ["1", "2"],
"structs": [
{
"score": "12",
"times": [
1543665600.0, # 2018-12-01 12:00 UTC
1543669200.0, # 2018-12-01 13:00 UTC
],
"distances": [1.25, 2.5],
},
{
"score": "13",
"times": [
1543752000.0, # 2018-12-02 12:00 UTC
1543755600.0, # 2018-12-02 13:00 UTC
],
"distances": [-1.25, -2.5],
},
],
},
"insertId": "0",
},
{
"json": {
"color": None,
"items": [],
"structs": [{"score": None, "times": [], "distances": [3.5]}],
},
"insertId": "1",
},
]
}

Expand Down Expand Up @@ -3531,20 +3588,38 @@ def test_insert_rows_w_record_schema(self):
"Phred Phlyntstone",
{"area_code": "800", "local_number": "555-1212", "rank": 1},
),
(
"Bharney Rhubble",
{"area_code": "877", "local_number": "768-5309", "rank": 2},
),
("Bharney Rhubble", ("877", "768-5309", 2)),
("Wylma Phlyntstone", None),
]

def _row_data(row):
return {"full_name": row[0], "phone": row[1]}

SENT = {
"rows": [
{"json": _row_data(row), "insertId": str(i)}
for i, row in enumerate(ROWS)
{
"json": {
"full_name": "Phred Phlyntstone",
"phone": {
"area_code": "800",
"local_number": "555-1212",
"rank": "1",
},
},
"insertId": "0",
},
{
"json": {
"full_name": "Bharney Rhubble",
"phone": {
"area_code": "877",
"local_number": "768-5309",
"rank": "2",
},
},
"insertId": "1",
},
{
"json": {"full_name": "Wylma Phlyntstone", "phone": None},
"insertId": "2",
},
]
}

Expand Down