Skip to content

Commit 89eaedb

Browse files
authored
feat(bigquery): allow passing schema as a sequence of dicts (googleapis#9550)
* feat(bigquery): add _to_schema_fields() schema helper * Allow passing schema as dicts _helpers * Allow passing schema as dicts in table.py * Allow passing schema as dicts in job.py * Import SchemaField directly in several tests SchemaField should not be imported from bigquery.table, but directly from where it's defined, so that any changes to the imports in bigquery.table do not cause unnecessary test failures. * Allow passing schema as dicts in pandas helpers * Replace return statement with an else block * Alter the type spec of values in schema field dict * Blacken a few files * Simplify _to_schema_fields() schema helper * Update docstrings for schema parameter
1 parent 5571911 commit 89eaedb

File tree

11 files changed

+516
-75
lines changed

11 files changed

+516
-75
lines changed

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,21 +224,44 @@ def _row_tuple_from_json(row, schema):
224224
225225
Args:
226226
row (Dict): A JSON response row to be converted.
227-
schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`.
227+
schema (Sequence[Union[ \
228+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
229+
Mapping[str, Any] \
230+
]]): Specification of the field types in ``row``.
228231
229232
Returns:
230233
Tuple: A tuple of data converted to native types.
231234
"""
235+
from google.cloud.bigquery.schema import _to_schema_fields
236+
237+
schema = _to_schema_fields(schema)
238+
232239
row_data = []
233240
for field, cell in zip(schema, row["f"]):
234241
row_data.append(_field_from_json(cell["v"], field))
235242
return tuple(row_data)
236243

237244

238245
def _rows_from_json(values, schema):
239-
"""Convert JSON row data to rows with appropriate types."""
246+
"""Convert JSON row data to rows with appropriate types.
247+
248+
Args:
249+
values (Sequence[Dict]): The list of responses (JSON rows) to convert.
250+
schema (Sequence[Union[ \
251+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
252+
Mapping[str, Any] \
253+
]]):
254+
The table's schema. If any item is a mapping, its content must be
255+
compatible with
256+
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
257+
258+
Returns:
259+
List[:class:`~google.cloud.bigquery.Row`]
260+
"""
240261
from google.cloud.bigquery import Row
262+
from google.cloud.bigquery.schema import _to_schema_fields
241263

264+
schema = _to_schema_fields(schema)
242265
field_to_index = _field_to_index_mapping(schema)
243266
return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]
244267

bigquery/google/cloud/bigquery/_pandas_helpers.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
239239
Args:
240240
dataframe (pandas.DataFrame):
241241
DataFrame for which the client determines the BigQuery schema.
242-
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
242+
bq_schema (Sequence[Union[ \
243+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
244+
Mapping[str, Any] \
245+
]]):
243246
A BigQuery schema. Use this argument to override the autodetected
244247
type for some or all of the DataFrame columns.
245248
@@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
249252
any column cannot be determined.
250253
"""
251254
if bq_schema:
255+
bq_schema = schema._to_schema_fields(bq_schema)
252256
for field in bq_schema:
253257
if field.field_type in schema._STRUCT_TYPES:
254258
raise ValueError(
@@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema):
297301
Args:
298302
dataframe (pandas.DataFrame):
299303
DataFrame to convert to Arrow table.
300-
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
301-
Desired BigQuery schema. Number of columns must match number of
302-
columns in the DataFrame.
304+
bq_schema (Sequence[Union[ \
305+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
306+
Mapping[str, Any] \
307+
]]):
308+
Desired BigQuery schema. The number of columns must match the
309+
number of columns in the DataFrame.
303310
304311
Returns:
305312
pyarrow.Table:
@@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema):
310317
column_and_index_names = set(
311318
name for name, _ in list_columns_and_indexes(dataframe)
312319
)
320+
321+
bq_schema = schema._to_schema_fields(bq_schema)
313322
bq_field_names = set(field.name for field in bq_schema)
314323

315324
extra_fields = bq_field_names - column_and_index_names
@@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
354363
Args:
355364
dataframe (pandas.DataFrame):
356365
DataFrame to convert to Parquet file.
357-
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
366+
bq_schema (Sequence[Union[ \
367+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
368+
Mapping[str, Any] \
369+
]]):
358370
Desired BigQuery schema. Number of columns must match number of
359371
columns in the DataFrame.
360372
filepath (str):
@@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
368380
if pyarrow is None:
369381
raise ValueError("pyarrow is required for BigQuery schema conversion.")
370382

383+
bq_schema = schema._to_schema_fields(bq_schema)
371384
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
372385
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
373386

@@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
388401
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
389402

390403

391-
def download_arrow_tabledata_list(pages, schema):
404+
def download_arrow_tabledata_list(pages, bq_schema):
392405
"""Use tabledata.list to construct an iterable of RecordBatches.
393406
394407
Args:
395408
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
396409
An iterator over the result pages.
397-
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
410+
bq_schema (Sequence[Union[ \
411+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
412+
Mapping[str, Any] \
413+
]]):
398414
A decription of the fields in result pages.
399415
Yields:
400416
:class:`pyarrow.RecordBatch`
401417
The next page of records as a ``pyarrow`` record batch.
402418
"""
403-
column_names = bq_to_arrow_schema(schema) or [field.name for field in schema]
404-
arrow_types = [bq_to_arrow_data_type(field) for field in schema]
419+
bq_schema = schema._to_schema_fields(bq_schema)
420+
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
421+
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
405422

406423
for page in pages:
407424
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
@@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
422439
return pandas.DataFrame(columns, columns=column_names)
423440

424441

425-
def download_dataframe_tabledata_list(pages, schema, dtypes):
426-
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
427-
column_names = [field.name for field in schema]
442+
def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
443+
"""Use (slower, but free) tabledata.list to construct a DataFrame.
444+
445+
Args:
446+
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
447+
An iterator over the result pages.
448+
bq_schema (Sequence[Union[ \
449+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
450+
Mapping[str, Any] \
451+
]]):
452+
A decription of the fields in result pages.
453+
dtypes(Mapping[str, numpy.dtype]):
454+
The types of columns in result data to hint construction of the
455+
resulting DataFrame. Not all column types have to be specified.
456+
Yields:
457+
:class:`pandas.DataFrame`
458+
The next page of records as a ``pandas.DataFrame`` record batch.
459+
"""
460+
bq_schema = schema._to_schema_fields(bq_schema)
461+
column_names = [field.name for field in bq_schema]
428462
for page in pages:
429463
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)
430464

bigquery/google/cloud/bigquery/job.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from google.cloud.bigquery.retry import DEFAULT_RETRY
3939
from google.cloud.bigquery.routine import RoutineReference
4040
from google.cloud.bigquery.schema import SchemaField
41+
from google.cloud.bigquery.schema import _to_schema_fields
4142
from google.cloud.bigquery.table import _EmptyRowIterator
4243
from google.cloud.bigquery.table import RangePartitioning
4344
from google.cloud.bigquery.table import _table_arg_to_table_ref
@@ -1225,8 +1226,10 @@ def range_partitioning(self, value):
12251226

12261227
@property
12271228
def schema(self):
1228-
"""List[google.cloud.bigquery.schema.SchemaField]: Schema of the
1229-
destination table.
1229+
"""Sequence[Union[ \
1230+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
1231+
Mapping[str, Any] \
1232+
]]: Schema of the destination table.
12301233
12311234
See
12321235
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema
@@ -1242,8 +1245,8 @@ def schema(self, value):
12421245
self._del_sub_prop("schema")
12431246
return
12441247

1245-
if not all(hasattr(field, "to_api_repr") for field in value):
1246-
raise ValueError("Schema items must be fields")
1248+
value = _to_schema_fields(value)
1249+
12471250
_helpers._set_sub_prop(
12481251
self._properties,
12491252
["load", "schema", "fields"],

bigquery/google/cloud/bigquery/schema.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""Schemas for BigQuery tables / queries."""
1616

17+
import collections
18+
1719
from google.cloud.bigquery_v2 import types
1820

1921

@@ -256,3 +258,36 @@ def _build_schema_resource(fields):
256258
Sequence[Dict]: Mappings describing the schema of the supplied fields.
257259
"""
258260
return [field.to_api_repr() for field in fields]
261+
262+
263+
def _to_schema_fields(schema):
264+
"""Coerce `schema` to a list of schema field instances.
265+
266+
Args:
267+
schema(Sequence[Union[ \
268+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
269+
Mapping[str, Any] \
270+
]]):
271+
Table schema to convert. If some items are passed as mappings,
272+
their content must be compatible with
273+
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
274+
275+
Returns:
276+
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]
277+
278+
Raises:
279+
Exception: If ``schema`` is not a sequence, or if any item in the
280+
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
281+
instance or a compatible mapping representation of the field.
282+
"""
283+
for field in schema:
284+
if not isinstance(field, (SchemaField, collections.Mapping)):
285+
raise ValueError(
286+
"Schema items must either be fields or compatible "
287+
"mapping representations."
288+
)
289+
290+
return [
291+
field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field)
292+
for field in schema
293+
]

bigquery/google/cloud/bigquery/table.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@
5151
import google.cloud._helpers
5252
from google.cloud.bigquery import _helpers
5353
from google.cloud.bigquery import _pandas_helpers
54-
from google.cloud.bigquery.schema import SchemaField
5554
from google.cloud.bigquery.schema import _build_schema_resource
5655
from google.cloud.bigquery.schema import _parse_schema_resource
56+
from google.cloud.bigquery.schema import _to_schema_fields
5757
from google.cloud.bigquery.external_config import ExternalConfig
5858
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
5959

@@ -305,8 +305,13 @@ class Table(object):
305305
A pointer to a table. If ``table_ref`` is a string, it must
306306
included a project ID, dataset ID, and table ID, each separated
307307
by ``.``.
308-
schema (List[google.cloud.bigquery.schema.SchemaField]):
309-
The table's schema
308+
schema (Optional[Sequence[Union[ \
309+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
310+
Mapping[str, Any] \
311+
]]]):
312+
The table's schema. If any item is a mapping, its content must be
313+
compatible with
314+
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
310315
"""
311316

312317
_PROPERTY_TO_API_FIELD = {
@@ -369,13 +374,17 @@ def require_partition_filter(self, value):
369374

370375
@property
371376
def schema(self):
372-
"""List[google.cloud.bigquery.schema.SchemaField]: Table's schema.
377+
"""Sequence[Union[ \
378+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
379+
Mapping[str, Any] \
380+
]]:
381+
Table's schema.
373382
374383
Raises:
375-
TypeError: If 'value' is not a sequence
376-
ValueError:
377-
If any item in the sequence is not a
378-
:class:`~google.cloud.bigquery.schema.SchemaField`
384+
Exception:
385+
If ``schema`` is not a sequence, or if any item in the sequence
386+
is not a :class:`~google.cloud.bigquery.schema.SchemaField`
387+
instance or a compatible mapping representation of the field.
379388
"""
380389
prop = self._properties.get("schema")
381390
if not prop:
@@ -387,9 +396,8 @@ def schema(self):
387396
def schema(self, value):
388397
if value is None:
389398
self._properties["schema"] = None
390-
elif not all(isinstance(field, SchemaField) for field in value):
391-
raise ValueError("Schema items must be fields")
392399
else:
400+
value = _to_schema_fields(value)
393401
self._properties["schema"] = {"fields": _build_schema_resource(value)}
394402

395403
@property
@@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator):
12841292
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
12851293
The function to use to make API requests.
12861294
path (str): The method path to query for the list of items.
1295+
schema (Sequence[Union[ \
1296+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
1297+
Mapping[str, Any] \
1298+
]]):
1299+
The table's schema. If any item is a mapping, its content must be
1300+
compatible with
1301+
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
12871302
page_token (str): A token identifying a page in a result set to start
12881303
fetching results from.
12891304
max_results (int, optional): The maximum number of results to fetch.
@@ -1328,6 +1343,7 @@ def __init__(
13281343
page_start=_rows_page_start,
13291344
next_token="pageToken",
13301345
)
1346+
schema = _to_schema_fields(schema)
13311347
self._field_to_index = _helpers._field_to_index_mapping(schema)
13321348
self._page_size = page_size
13331349
self._preserve_order = False

bigquery/tests/unit/test__helpers.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import decimal
1818
import unittest
1919

20+
import mock
21+
2022

2123
class Test_not_null(unittest.TestCase):
2224
def _call_fut(self, value, field):
@@ -412,7 +414,8 @@ class Test_row_tuple_from_json(unittest.TestCase):
412414
def _call_fut(self, row, schema):
413415
from google.cloud.bigquery._helpers import _row_tuple_from_json
414416

415-
return _row_tuple_from_json(row, schema)
417+
with _field_isinstance_patcher():
418+
return _row_tuple_from_json(row, schema)
416419

417420
def test_w_single_scalar_column(self):
418421
# SELECT 1 AS col
@@ -529,7 +532,8 @@ class Test_rows_from_json(unittest.TestCase):
529532
def _call_fut(self, rows, schema):
530533
from google.cloud.bigquery._helpers import _rows_from_json
531534

532-
return _rows_from_json(rows, schema)
535+
with _field_isinstance_patcher():
536+
return _rows_from_json(rows, schema)
533537

534538
def test_w_record_subfield(self):
535539
from google.cloud.bigquery.table import Row
@@ -1023,3 +1027,23 @@ def __init__(self, mode, name="unknown", field_type="UNKNOWN", fields=()):
10231027
self.name = name
10241028
self.field_type = field_type
10251029
self.fields = fields
1030+
1031+
1032+
def _field_isinstance_patcher():
1033+
"""A patcher thank makes _Field instances seem like SchemaField instances.
1034+
"""
1035+
from google.cloud.bigquery.schema import SchemaField
1036+
1037+
def fake_isinstance(instance, target_class):
1038+
if instance.__class__.__name__ != "_Field":
1039+
return isinstance(instance, target_class) # pragma: NO COVER
1040+
1041+
# pretend that _Field() instances are actually instances of SchemaField
1042+
return target_class is SchemaField or (
1043+
isinstance(target_class, tuple) and SchemaField in target_class
1044+
)
1045+
1046+
patcher = mock.patch(
1047+
"google.cloud.bigquery.schema.isinstance", side_effect=fake_isinstance
1048+
)
1049+
return patcher

0 commit comments

Comments
 (0)