From f197b600c6b2fb25523f8b8cea945ffde2adb5d2 Mon Sep 17 00:00:00 2001 From: Adam Fletcher Date: Mon, 24 Dec 2018 12:50:08 -0500 Subject: [PATCH 1/3] Fix nested schema parsing in insert_rows --- bigquery/google/cloud/bigquery/client.py | 97 +++++++++++++++++++++++- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index dbc68119530f..004afa7e5744 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1440,6 +1440,97 @@ def query( return query_job + def _convert_to_safe_json_dict(self, field, row_value): + """Maps a field and value to a JSON-safe value. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + + row_value (Union[ \ + Sequence[list], \ + any, \ + ]): + Row value(s) to be converted. If the value is + a list, converted all elements in the list and return + a dictionary whose value is that converted list. + + Returns: + dict: + A dict of a field name to value(s), encoded for JSON. + """ + if not isinstance(row_value, list): + converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) + value = {} + if converter is not None: # STRING doesn't need converting + value[field.name] = converter(row_value) + else: + value[field.name] = row_value + return value + + values = [] + for v in row_value: + converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) + if converter is not None: # STRING doesn't need converting + values.append(converter(v)) + else: + values.append(v) + return {field.name: values} + + def _handle_nested_schema(self, field, row_value): + """Traverses a row to map all field values to JSON-safe equivalents. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + + row_value (Union[ \ + Sequence[list], \ + any, \ + ]): + Row data to be inserted. If the SchemaField's mode is + REPEATED, assume this is a list. If not, the type + is inferred from the SchemaField's field_type. + + Returns: + Sequence[dict]: + A (potentially nested) dict of field names to value(s), encoded for JSON. + """ + is_repeated = False + is_record = False + if field.mode is not None and field.mode == "REPEATED": + is_repeated = True + if field.field_type == "RECORD": + is_record = True + + if not is_repeated and not is_record: + return self._convert_to_safe_json_dict(field, row_value) + + # if it is repeated, but not a record, loop thru the values, and return + if is_repeated and not is_record: + return self._convert_to_safe_json_dict(field, row_value) + + # if it is a record, but not repeated, just parse the subfields: + if is_record and not is_repeated: + record = {} + for f in field.fields: + d = self._handle_nested_schema(f, row_value[f.name]) + record[f.name] = d[f.name] + return record + + values = [] + if is_record and is_repeated: + for record in row_value: + # remove the REPEATED, but keep the fields + # this will get processed as a RECORD + f = SchemaField(field.name, field.field_type, fields=field.fields) + values.append(self._handle_nested_schema(f, record)) + return {field.name: values} + def insert_rows(self, table, rows, selected_fields=None, **kwargs): """Insert rows into a table via the streaming API. @@ -1503,10 +1594,8 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs): json_row = {} for field, value in zip(schema, row): - converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) - if converter is not None: # STRING doesn't need converting - value = converter(value) - json_row[field.name] = value + value = self._handle_nested_schema(field, value) + json_row[field.name] = value[field.name] json_rows.append(json_row) From 3c55241d496642493cbd15253f7475c46963309b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 28 Dec 2018 09:07:37 -0800 Subject: [PATCH 2/3] Move x_to_json methods for insert_rows to _helpers. --- bigquery/google/cloud/bigquery/_helpers.py | 107 +++++++++++++++++++ bigquery/google/cloud/bigquery/client.py | 107 +------------------ bigquery/tests/unit/test_client.py | 115 +++++++++++++++++---- 3 files changed, 204 insertions(+), 125 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 6990fb3eaa69..b30b0845202e 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -15,6 +15,7 @@ """Shared helper functions for BigQuery API classes.""" import base64 +import copy import datetime import decimal @@ -329,6 +330,112 @@ def _time_to_json(value): _SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter +def _scalar_field_to_json(field, row_value): + """Maps a field and value to a JSON-safe value. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + row_value (any): + Value to be converted, based on the field's type. + + Returns: + any: + A JSON-serializable object. + """ + converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) + if converter is None: # STRING doesn't need converting + return row_value + return converter(row_value) + + +def _repeated_field_to_json(field, row_value): + """Convert a repeated/array field to its JSON representation. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. The + field mode must equal ``REPEATED``. + row_value (Sequence[any]): + A sequence of values to convert to JSON-serializable values. + + Returns: + List[any]: + A list of JSON-serializable objects. + """ + # Remove the REPEATED, but keep the other fields. This allows us to process + # each item as if it were a top-level field. + item_field = copy.deepcopy(field) + item_field._mode = "NULLABLE" + values = [] + for item in row_value: + values.append(_field_to_json(item_field, item)) + return values + + +def _record_field_to_json(fields, row_value): + """Convert a record/struct field to its JSON representation. + + Args: + field ( \ + Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \ + ): + The SchemaField to use for type conversion and field name. The + field type must equal ``RECORD``. + row_value (Union[Tuple[Any], Mapping[str, Any]): + A tuple or dictionary to convert to JSON-serializable values. + + Returns: + Mapping[str, any]: + A JSON-serializable dictionary. + """ + record = {} + isdict = isinstance(row_value, dict) + + for subindex, subfield in enumerate(fields): + subname = subfield.name + subvalue = row_value[subname] if isdict else row_value[subindex] + record[subname] = _field_to_json(subfield, subvalue) + return record + + +def _field_to_json(field, row_value): + """Convert a field into JSON-serializable values. + + Args: + field ( \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ): + The SchemaField to use for type conversion and field name. + + row_value (Union[ \ + Sequence[list], \ + any, \ + ]): + Row data to be inserted. If the SchemaField's mode is + REPEATED, assume this is a list. If not, the type + is inferred from the SchemaField's field_type. + + Returns: + any: + A JSON-serializable object. + """ + if row_value is None: + return None + + if field.mode == "REPEATED": + return _repeated_field_to_json(field, row_value) + + if field.field_type == "RECORD": + return _record_field_to_json(field.fields, row_value) + + return _scalar_field_to_json(field, row_value) + + def _snake_to_camel_case(value): """Convert snake case string to camel case.""" words = value.split("_") diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 004afa7e5744..96f1310c3f99 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -37,7 +37,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject -from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW +from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -51,7 +51,6 @@ from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA -from google.cloud.bigquery.table import _row_from_mapping _DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB @@ -1440,97 +1439,6 @@ def query( return query_job - def _convert_to_safe_json_dict(self, field, row_value): - """Maps a field and value to a JSON-safe value. - - Args: - field ( \ - :class:`~google.cloud.bigquery.schema.SchemaField`, \ - ): - The SchemaField to use for type conversion and field name. - - row_value (Union[ \ - Sequence[list], \ - any, \ - ]): - Row value(s) to be converted. If the value is - a list, converted all elements in the list and return - a dictionary whose value is that converted list. - - Returns: - dict: - A dict of a field name to value(s), encoded for JSON. - """ - if not isinstance(row_value, list): - converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) - value = {} - if converter is not None: # STRING doesn't need converting - value[field.name] = converter(row_value) - else: - value[field.name] = row_value - return value - - values = [] - for v in row_value: - converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) - if converter is not None: # STRING doesn't need converting - values.append(converter(v)) - else: - values.append(v) - return {field.name: values} - - def _handle_nested_schema(self, field, row_value): - """Traverses a row to map all field values to JSON-safe equivalents. - - Args: - field ( \ - :class:`~google.cloud.bigquery.schema.SchemaField`, \ - ): - The SchemaField to use for type conversion and field name. - - row_value (Union[ \ - Sequence[list], \ - any, \ - ]): - Row data to be inserted. If the SchemaField's mode is - REPEATED, assume this is a list. If not, the type - is inferred from the SchemaField's field_type. - - Returns: - Sequence[dict]: - A (potentially nested) dict of field names to value(s), encoded for JSON. - """ - is_repeated = False - is_record = False - if field.mode is not None and field.mode == "REPEATED": - is_repeated = True - if field.field_type == "RECORD": - is_record = True - - if not is_repeated and not is_record: - return self._convert_to_safe_json_dict(field, row_value) - - # if it is repeated, but not a record, loop thru the values, and return - if is_repeated and not is_record: - return self._convert_to_safe_json_dict(field, row_value) - - # if it is a record, but not repeated, just parse the subfields: - if is_record and not is_repeated: - record = {} - for f in field.fields: - d = self._handle_nested_schema(f, row_value[f.name]) - record[f.name] = d[f.name] - return record - - values = [] - if is_record and is_repeated: - for record in row_value: - # remove the REPEATED, but keep the fields - # this will get processed as a RECORD - f = SchemaField(field.name, field.field_type, fields=field.fields) - values.append(self._handle_nested_schema(f, record)) - return {field.name: values} - def insert_rows(self, table, rows, selected_fields=None, **kwargs): """Insert rows into a table via the streaming API. @@ -1586,18 +1494,7 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs): else: raise TypeError("table should be Table or TableReference") - json_rows = [] - - for index, row in enumerate(rows): - if isinstance(row, dict): - row = _row_from_mapping(row, schema) - json_row = {} - - for field, value in zip(schema, row): - value = self._handle_nested_schema(field, value) - json_row[field.name] = value[field.name] - - json_rows.append(json_row) + json_rows = [_record_field_to_json(schema, row) for row in rows] return self.insert_rows_json(table, json_rows, **kwargs) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 0fc14b160a9c..c3d90ed640fb 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -25,6 +25,7 @@ import six from six.moves import http_client import pytest +import pytz try: import pandas @@ -3482,20 +3483,76 @@ def test_insert_rows_w_repeated_fields(self): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = _make_connection({}) - full_name = SchemaField("color", "STRING", mode="REPEATED") - index = SchemaField("index", "INTEGER", "REPEATED") - score = SchemaField("score", "FLOAT", "REPEATED") - struct = SchemaField("struct", "RECORD", mode="REPEATED", fields=[index, score]) - table = Table(self.TABLE_REF, schema=[full_name, struct]) - ROWS = [(["red", "green"], [{"index": [1, 2], "score": [3.1415, 1.414]}])] - - def _row_data(row): - return {"color": row[0], "struct": row[1]} + color = SchemaField("color", "STRING", mode="REPEATED") + items = SchemaField("items", "INTEGER", mode="REPEATED") + score = SchemaField("score", "INTEGER") + times = SchemaField("times", "TIMESTAMP", mode="REPEATED") + distances = SchemaField("distances", "FLOAT", mode="REPEATED") + structs = SchemaField( + "structs", "RECORD", mode="REPEATED", fields=[score, times, distances] + ) + table = Table(self.TABLE_REF, schema=[color, items, structs]) + ROWS = [ + ( + ["red", "green"], + [1, 2], + [ + ( + 12, + [ + datetime.datetime(2018, 12, 1, 12, 0, 0, tzinfo=pytz.utc), + datetime.datetime(2018, 12, 1, 13, 0, 0, tzinfo=pytz.utc), + ], + [1.25, 2.5], + ), + { + "score": 13, + "times": [ + datetime.datetime(2018, 12, 2, 12, 0, 0, tzinfo=pytz.utc), + datetime.datetime(2018, 12, 2, 13, 0, 0, tzinfo=pytz.utc), + ], + "distances": [-1.25, -2.5], + }, + ], + ), + {"color": None, "items": [], "structs": [(None, [], [3.5])]}, + ] SENT = { "rows": [ - {"json": _row_data(row), "insertId": str(i)} - for i, row in enumerate(ROWS) + { + "json": { + "color": ["red", "green"], + "items": ["1", "2"], + "structs": [ + { + "score": "12", + "times": [ + 1543665600.0, # 2018-12-01 12:00 UTC + 1543669200.0, # 2018-12-01 13:00 UTC + ], + "distances": [1.25, 2.5], + }, + { + "score": "13", + "times": [ + 1543752000.0, # 2018-12-02 12:00 UTC + 1543755600.0, # 2018-12-02 13:00 UTC + ], + "distances": [-1.25, -2.5], + }, + ], + }, + "insertId": "0", + }, + { + "json": { + "color": None, + "items": [], + "structs": [{"score": None, "times": [], "distances": [3.5]}], + }, + "insertId": "1", + }, ] } @@ -3531,20 +3588,38 @@ def test_insert_rows_w_record_schema(self): "Phred Phlyntstone", {"area_code": "800", "local_number": "555-1212", "rank": 1}, ), - ( - "Bharney Rhubble", - {"area_code": "877", "local_number": "768-5309", "rank": 2}, - ), + ("Bharney Rhubble", ("877", "768-5309", 2)), ("Wylma Phlyntstone", None), ] - def _row_data(row): - return {"full_name": row[0], "phone": row[1]} - SENT = { "rows": [ - {"json": _row_data(row), "insertId": str(i)} - for i, row in enumerate(ROWS) + { + "json": { + "full_name": "Phred Phlyntstone", + "phone": { + "area_code": "800", + "local_number": "555-1212", + "rank": "1", + }, + }, + "insertId": "0", + }, + { + "json": { + "full_name": "Bharney Rhubble", + "phone": { + "area_code": "877", + "local_number": "768-5309", + "rank": "2", + }, + }, + "insertId": "1", + }, + { + "json": {"full_name": "Wylma Phlyntstone", "phone": None}, + "insertId": "2", + }, ] } From cb13c07dc5d0b4a326ae44c259e3853b9186f0d6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 28 Dec 2018 09:15:23 -0800 Subject: [PATCH 3/3] Fix _record_field_to_json docstring --- bigquery/google/cloud/bigquery/_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index b30b0845202e..10753cfc998b 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -381,11 +381,11 @@ def _record_field_to_json(fields, row_value): """Convert a record/struct field to its JSON representation. Args: - field ( \ + fields ( \ Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \ ): - The SchemaField to use for type conversion and field name. The - field type must equal ``RECORD``. + The :class:`~google.cloud.bigquery.schema.SchemaField`s of the + record's subfields to use for type conversion and field names. row_value (Union[Tuple[Any], Mapping[str, Any]): A tuple or dictionary to convert to JSON-serializable values.