Skip to content

Commit e633ffd

Browse files
adamftswast
authored andcommitted
Fix nested schema parsing in insert_rows (googleapis#7022)
Before this change, insert_rows did not convert record/struct or repeated/array types to the correct JSON. Now the new _helpers._field_to_json function will recursively convert a values into the correct JSON for the provided schema.
1 parent 2fb75da commit e633ffd

File tree

3 files changed

+204
-36
lines changed

3 files changed

+204
-36
lines changed

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Shared helper functions for BigQuery API classes."""
1616

1717
import base64
18+
import copy
1819
import datetime
1920
import decimal
2021

@@ -329,6 +330,112 @@ def _time_to_json(value):
329330
_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter
330331

331332

333+
def _scalar_field_to_json(field, row_value):
334+
"""Maps a field and value to a JSON-safe value.
335+
336+
Args:
337+
field ( \
338+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
339+
):
340+
The SchemaField to use for type conversion and field name.
341+
row_value (any):
342+
Value to be converted, based on the field's type.
343+
344+
Returns:
345+
any:
346+
A JSON-serializable object.
347+
"""
348+
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
349+
if converter is None: # STRING doesn't need converting
350+
return row_value
351+
return converter(row_value)
352+
353+
354+
def _repeated_field_to_json(field, row_value):
355+
"""Convert a repeated/array field to its JSON representation.
356+
357+
Args:
358+
field ( \
359+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
360+
):
361+
The SchemaField to use for type conversion and field name. The
362+
field mode must equal ``REPEATED``.
363+
row_value (Sequence[any]):
364+
A sequence of values to convert to JSON-serializable values.
365+
366+
Returns:
367+
List[any]:
368+
A list of JSON-serializable objects.
369+
"""
370+
# Remove the REPEATED, but keep the other fields. This allows us to process
371+
# each item as if it were a top-level field.
372+
item_field = copy.deepcopy(field)
373+
item_field._mode = "NULLABLE"
374+
values = []
375+
for item in row_value:
376+
values.append(_field_to_json(item_field, item))
377+
return values
378+
379+
380+
def _record_field_to_json(fields, row_value):
381+
"""Convert a record/struct field to its JSON representation.
382+
383+
Args:
384+
fields ( \
385+
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
386+
):
387+
The :class:`~google.cloud.bigquery.schema.SchemaField`s of the
388+
record's subfields to use for type conversion and field names.
389+
row_value (Union[Tuple[Any], Mapping[str, Any]):
390+
A tuple or dictionary to convert to JSON-serializable values.
391+
392+
Returns:
393+
Mapping[str, any]:
394+
A JSON-serializable dictionary.
395+
"""
396+
record = {}
397+
isdict = isinstance(row_value, dict)
398+
399+
for subindex, subfield in enumerate(fields):
400+
subname = subfield.name
401+
subvalue = row_value[subname] if isdict else row_value[subindex]
402+
record[subname] = _field_to_json(subfield, subvalue)
403+
return record
404+
405+
406+
def _field_to_json(field, row_value):
407+
"""Convert a field into JSON-serializable values.
408+
409+
Args:
410+
field ( \
411+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
412+
):
413+
The SchemaField to use for type conversion and field name.
414+
415+
row_value (Union[ \
416+
Sequence[list], \
417+
any, \
418+
]):
419+
Row data to be inserted. If the SchemaField's mode is
420+
REPEATED, assume this is a list. If not, the type
421+
is inferred from the SchemaField's field_type.
422+
423+
Returns:
424+
any:
425+
A JSON-serializable object.
426+
"""
427+
if row_value is None:
428+
return None
429+
430+
if field.mode == "REPEATED":
431+
return _repeated_field_to_json(field, row_value)
432+
433+
if field.field_type == "RECORD":
434+
return _record_field_to_json(field.fields, row_value)
435+
436+
return _scalar_field_to_json(field, row_value)
437+
438+
332439
def _snake_to_camel_case(value):
333440
"""Convert snake case string to camel case."""
334441
words = value.split("_")

bigquery/google/cloud/bigquery/client.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from google.cloud import exceptions
3838
from google.cloud.client import ClientWithProject
3939

40-
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
40+
from google.cloud.bigquery._helpers import _record_field_to_json
4141
from google.cloud.bigquery._helpers import _str_or_none
4242
from google.cloud.bigquery._http import Connection
4343
from google.cloud.bigquery.dataset import Dataset
@@ -51,7 +51,6 @@
5151
from google.cloud.bigquery.table import TableReference
5252
from google.cloud.bigquery.table import RowIterator
5353
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA
54-
from google.cloud.bigquery.table import _row_from_mapping
5554

5655

5756
_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
@@ -1495,20 +1494,7 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):
14951494
else:
14961495
raise TypeError("table should be Table or TableReference")
14971496

1498-
json_rows = []
1499-
1500-
for index, row in enumerate(rows):
1501-
if isinstance(row, dict):
1502-
row = _row_from_mapping(row, schema)
1503-
json_row = {}
1504-
1505-
for field, value in zip(schema, row):
1506-
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
1507-
if converter is not None: # STRING doesn't need converting
1508-
value = converter(value)
1509-
json_row[field.name] = value
1510-
1511-
json_rows.append(json_row)
1497+
json_rows = [_record_field_to_json(schema, row) for row in rows]
15121498

15131499
return self.insert_rows_json(table, json_rows, **kwargs)
15141500

bigquery/tests/unit/test_client.py

Lines changed: 95 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import six
2626
from six.moves import http_client
2727
import pytest
28+
import pytz
2829

2930
try:
3031
import pandas
@@ -3482,20 +3483,76 @@ def test_insert_rows_w_repeated_fields(self):
34823483
http = object()
34833484
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
34843485
conn = client._connection = _make_connection({})
3485-
full_name = SchemaField("color", "STRING", mode="REPEATED")
3486-
index = SchemaField("index", "INTEGER", "REPEATED")
3487-
score = SchemaField("score", "FLOAT", "REPEATED")
3488-
struct = SchemaField("struct", "RECORD", mode="REPEATED", fields=[index, score])
3489-
table = Table(self.TABLE_REF, schema=[full_name, struct])
3490-
ROWS = [(["red", "green"], [{"index": [1, 2], "score": [3.1415, 1.414]}])]
3491-
3492-
def _row_data(row):
3493-
return {"color": row[0], "struct": row[1]}
3486+
color = SchemaField("color", "STRING", mode="REPEATED")
3487+
items = SchemaField("items", "INTEGER", mode="REPEATED")
3488+
score = SchemaField("score", "INTEGER")
3489+
times = SchemaField("times", "TIMESTAMP", mode="REPEATED")
3490+
distances = SchemaField("distances", "FLOAT", mode="REPEATED")
3491+
structs = SchemaField(
3492+
"structs", "RECORD", mode="REPEATED", fields=[score, times, distances]
3493+
)
3494+
table = Table(self.TABLE_REF, schema=[color, items, structs])
3495+
ROWS = [
3496+
(
3497+
["red", "green"],
3498+
[1, 2],
3499+
[
3500+
(
3501+
12,
3502+
[
3503+
datetime.datetime(2018, 12, 1, 12, 0, 0, tzinfo=pytz.utc),
3504+
datetime.datetime(2018, 12, 1, 13, 0, 0, tzinfo=pytz.utc),
3505+
],
3506+
[1.25, 2.5],
3507+
),
3508+
{
3509+
"score": 13,
3510+
"times": [
3511+
datetime.datetime(2018, 12, 2, 12, 0, 0, tzinfo=pytz.utc),
3512+
datetime.datetime(2018, 12, 2, 13, 0, 0, tzinfo=pytz.utc),
3513+
],
3514+
"distances": [-1.25, -2.5],
3515+
},
3516+
],
3517+
),
3518+
{"color": None, "items": [], "structs": [(None, [], [3.5])]},
3519+
]
34943520

34953521
SENT = {
34963522
"rows": [
3497-
{"json": _row_data(row), "insertId": str(i)}
3498-
for i, row in enumerate(ROWS)
3523+
{
3524+
"json": {
3525+
"color": ["red", "green"],
3526+
"items": ["1", "2"],
3527+
"structs": [
3528+
{
3529+
"score": "12",
3530+
"times": [
3531+
1543665600.0, # 2018-12-01 12:00 UTC
3532+
1543669200.0, # 2018-12-01 13:00 UTC
3533+
],
3534+
"distances": [1.25, 2.5],
3535+
},
3536+
{
3537+
"score": "13",
3538+
"times": [
3539+
1543752000.0, # 2018-12-02 12:00 UTC
3540+
1543755600.0, # 2018-12-02 13:00 UTC
3541+
],
3542+
"distances": [-1.25, -2.5],
3543+
},
3544+
],
3545+
},
3546+
"insertId": "0",
3547+
},
3548+
{
3549+
"json": {
3550+
"color": None,
3551+
"items": [],
3552+
"structs": [{"score": None, "times": [], "distances": [3.5]}],
3553+
},
3554+
"insertId": "1",
3555+
},
34993556
]
35003557
}
35013558

@@ -3531,20 +3588,38 @@ def test_insert_rows_w_record_schema(self):
35313588
"Phred Phlyntstone",
35323589
{"area_code": "800", "local_number": "555-1212", "rank": 1},
35333590
),
3534-
(
3535-
"Bharney Rhubble",
3536-
{"area_code": "877", "local_number": "768-5309", "rank": 2},
3537-
),
3591+
("Bharney Rhubble", ("877", "768-5309", 2)),
35383592
("Wylma Phlyntstone", None),
35393593
]
35403594

3541-
def _row_data(row):
3542-
return {"full_name": row[0], "phone": row[1]}
3543-
35443595
SENT = {
35453596
"rows": [
3546-
{"json": _row_data(row), "insertId": str(i)}
3547-
for i, row in enumerate(ROWS)
3597+
{
3598+
"json": {
3599+
"full_name": "Phred Phlyntstone",
3600+
"phone": {
3601+
"area_code": "800",
3602+
"local_number": "555-1212",
3603+
"rank": "1",
3604+
},
3605+
},
3606+
"insertId": "0",
3607+
},
3608+
{
3609+
"json": {
3610+
"full_name": "Bharney Rhubble",
3611+
"phone": {
3612+
"area_code": "877",
3613+
"local_number": "768-5309",
3614+
"rank": "2",
3615+
},
3616+
},
3617+
"insertId": "1",
3618+
},
3619+
{
3620+
"json": {"full_name": "Wylma Phlyntstone", "phone": None},
3621+
"insertId": "2",
3622+
},
35483623
]
35493624
}
35503625

0 commit comments

Comments
 (0)