Skip to content

Commit b94a326

Browse files
WillianFuksdhermes
authored andcommitted
Added support for schema auto-detection feature in LoadTableFromStorageJob (googleapis#3648)
1 parent 597657e commit b94a326

File tree

3 files changed

+194
-14
lines changed

3 files changed

+194
-14
lines changed

bigquery/google/cloud/bigquery/job.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,20 @@ def _error_result_to_exception(error_result):
8080
status_code, error_result.get('message', ''), errors=[error_result])
8181

8282

83+
class AutoDetectSchema(_TypedProperty):
84+
"""Typed Property for ``autodetect`` properties.
85+
86+
:raises ValueError: on ``set`` operation if ``instance.schema``
87+
is already defined.
88+
"""
89+
def __set__(self, instance, value):
90+
self._validate(value)
91+
if instance.schema:
92+
raise ValueError('A schema should not be already defined '
93+
'when using schema auto-detection')
94+
setattr(instance._configuration, self._backing_name, value)
95+
96+
8397
class Compression(_EnumProperty):
8498
"""Pseudo-enum for ``compression`` properties."""
8599
GZIP = 'GZIP'
@@ -505,6 +519,7 @@ class _LoadConfiguration(object):
505519
"""
506520
_allow_jagged_rows = None
507521
_allow_quoted_newlines = None
522+
_autodetect = None
508523
_create_disposition = None
509524
_encoding = None
510525
_field_delimiter = None
@@ -544,9 +559,10 @@ def __init__(self, name, destination, source_uris, client, schema=()):
544559
super(LoadTableFromStorageJob, self).__init__(name, client)
545560
self.destination = destination
546561
self.source_uris = source_uris
547-
# Let the @property do validation.
548-
self.schema = schema
549562
self._configuration = _LoadConfiguration()
563+
# Let the @property do validation. This must occur after all other
564+
# attributes have been set.
565+
self.schema = schema
550566

551567
@property
552568
def schema(self):
@@ -564,12 +580,20 @@ def schema(self, value):
564580
:type value: list of :class:`SchemaField`
565581
:param value: fields describing the schema
566582
567-
:raises: TypeError if 'value' is not a sequence, or ValueError if
568-
any item in the sequence is not a SchemaField
583+
:raises TypeError: If ``value`is not a sequence.
584+
:raises ValueError: If any item in the sequence is not
585+
a ``SchemaField``.
569586
"""
570-
if not all(isinstance(field, SchemaField) for field in value):
571-
raise ValueError('Schema items must be fields')
572-
self._schema = tuple(value)
587+
if not value:
588+
self._schema = ()
589+
else:
590+
if not all(isinstance(field, SchemaField) for field in value):
591+
raise ValueError('Schema items must be fields')
592+
if self.autodetect:
593+
raise ValueError(
594+
'Schema can not be set if `autodetect` property is True')
595+
596+
self._schema = tuple(value)
573597

574598
@property
575599
def input_file_bytes(self):
@@ -625,6 +649,11 @@ def output_rows(self):
625649
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines
626650
"""
627651

652+
autodetect = AutoDetectSchema('autodetect', bool)
653+
"""See
654+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
655+
"""
656+
628657
create_disposition = CreateDisposition('create_disposition')
629658
"""See
630659
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
@@ -676,6 +705,8 @@ def _populate_config_resource(self, configuration):
676705
configuration['allowJaggedRows'] = self.allow_jagged_rows
677706
if self.allow_quoted_newlines is not None:
678707
configuration['allowQuotedNewlines'] = self.allow_quoted_newlines
708+
if self.autodetect is not None:
709+
configuration['autodetect'] = self.autodetect
679710
if self.create_disposition is not None:
680711
configuration['createDisposition'] = self.create_disposition
681712
if self.encoding is not None:

bigquery/tests/system.py

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import base64
16+
import csv
1617
import datetime
1718
import json
1819
import operator
@@ -21,6 +22,8 @@
2122
import unittest
2223
import uuid
2324

25+
import six
26+
2427
from google.cloud import bigquery
2528
from google.cloud._helpers import UTC
2629
from google.cloud.bigquery import dbapi
@@ -290,8 +293,6 @@ def test_update_table(self):
290293

291294
@staticmethod
292295
def _fetch_single_page(table):
293-
import six
294-
295296
iterator = table.fetch_data()
296297
page = six.next(iterator.pages)
297298
return list(page)
@@ -341,7 +342,6 @@ def test_insert_data_then_dump_table(self):
341342
sorted(ROWS, key=by_age))
342343

343344
def test_load_table_from_local_file_then_dump_table(self):
344-
import csv
345345
from google.cloud._testing import _NamedTemporaryFile
346346

347347
ROWS = [
@@ -432,7 +432,6 @@ def test_load_table_from_local_avro_file_then_dump_table(self):
432432
sorted(ROWS, key=by_wavelength))
433433

434434
def test_load_table_from_storage_then_dump_table(self):
435-
import csv
436435
from google.cloud._testing import _NamedTemporaryFile
437436
from google.cloud.storage import Client as StorageClient
438437

@@ -448,11 +447,11 @@ def test_load_table_from_storage_then_dump_table(self):
448447
]
449448
TABLE_NAME = 'test_table'
450449

451-
s_client = StorageClient()
450+
storage_client = StorageClient()
452451

453452
# In the **very** rare case the bucket name is reserved, this
454453
# fails with a ConnectionError.
455-
bucket = s_client.create_bucket(BUCKET_NAME)
454+
bucket = storage_client.create_bucket(BUCKET_NAME)
456455
self.to_delete.append(bucket)
457456

458457
blob = bucket.blob(BLOB_NAME)
@@ -501,6 +500,75 @@ def test_load_table_from_storage_then_dump_table(self):
501500
self.assertEqual(sorted(rows, key=by_age),
502501
sorted(ROWS, key=by_age))
503502

503+
def test_load_table_from_storage_w_autodetect_schema(self):
504+
from google.cloud._testing import _NamedTemporaryFile
505+
from google.cloud.storage import Client as StorageClient
506+
from google.cloud.bigquery import SchemaField
507+
508+
local_id = unique_resource_id()
509+
bucket_name = 'bq_load_test' + local_id
510+
blob_name = 'person_ages.csv'
511+
gs_url = 'gs://{}/{}'.format(bucket_name, blob_name)
512+
rows = [
513+
('Phred Phlyntstone', 32),
514+
('Bharney Rhubble', 33),
515+
('Wylma Phlyntstone', 29),
516+
('Bhettye Rhubble', 27),
517+
] * 100 # BigQuery internally uses the first 100 rows to detect schema
518+
table_name = 'test_table'
519+
520+
storage_client = StorageClient()
521+
522+
# In the **very** rare case the bucket name is reserved, this
523+
# fails with a ConnectionError.
524+
bucket = storage_client.create_bucket(bucket_name)
525+
self.to_delete.append(bucket)
526+
527+
blob = bucket.blob(blob_name)
528+
529+
with _NamedTemporaryFile() as temp:
530+
with open(temp.name, 'w') as csv_write:
531+
writer = csv.writer(csv_write)
532+
writer.writerow(('Full Name', 'Age'))
533+
writer.writerows(rows)
534+
535+
with open(temp.name, 'rb') as csv_read:
536+
blob.upload_from_file(csv_read, content_type='text/csv')
537+
538+
self.to_delete.insert(0, blob)
539+
540+
dataset = Config.CLIENT.dataset(
541+
_make_dataset_name('load_gcs_then_dump'))
542+
543+
retry_403(dataset.create)()
544+
self.to_delete.append(dataset)
545+
546+
table = dataset.table(table_name)
547+
self.to_delete.insert(0, table)
548+
549+
job = Config.CLIENT.load_table_from_storage(
550+
'bq_load_storage_test_' + local_id, table, gs_url)
551+
job.autodetect = True
552+
553+
job.begin()
554+
555+
# Allow for 90 seconds of "warm up" before rows visible. See
556+
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
557+
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
558+
retry = RetryInstanceState(_job_done, max_tries=8)
559+
retry(job.reload)()
560+
561+
table.reload()
562+
field_name = SchemaField(
563+
u'Full_Name', u'string', u'NULLABLE', None, ())
564+
field_age = SchemaField(u'Age', u'integer', u'NULLABLE', None, ())
565+
self.assertEqual(table.schema, [field_name, field_age])
566+
567+
actual_rows = self._fetch_single_page(table)
568+
by_age = operator.itemgetter(1)
569+
self.assertEqual(
570+
sorted(actual_rows, key=by_age), sorted(rows, key=by_age))
571+
504572
def test_job_cancel(self):
505573
DATASET_NAME = _make_dataset_name('job_cancel')
506574
JOB_NAME = 'fetch_' + DATASET_NAME
@@ -674,7 +742,6 @@ def test_dbapi_w_standard_sql_types(self):
674742
self.assertIsNone(row)
675743

676744
def _load_table_for_dml(self, rows, dataset_name, table_name):
677-
import csv
678745
from google.cloud._testing import _NamedTemporaryFile
679746

680747
dataset = Config.CLIENT.dataset(dataset_name)

bigquery/tests/unit/test_job.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ def _verifyBooleanConfigProperties(self, job, config):
189189
config['allowQuotedNewlines'])
190190
else:
191191
self.assertIsNone(job.allow_quoted_newlines)
192+
if 'autodetect' in config:
193+
self.assertEqual(
194+
job.autodetect, config['autodetect'])
195+
else:
196+
self.assertIsNone(job.autodetect)
192197
if 'ignoreUnknownValues' in config:
193198
self.assertEqual(job.ignore_unknown_values,
194199
config['ignoreUnknownValues'])
@@ -277,6 +282,7 @@ def test_ctor(self):
277282
# set/read from resource['configuration']['load']
278283
self.assertIsNone(job.allow_jagged_rows)
279284
self.assertIsNone(job.allow_quoted_newlines)
285+
self.assertIsNone(job.autodetect)
280286
self.assertIsNone(job.create_disposition)
281287
self.assertIsNone(job.encoding)
282288
self.assertIsNone(job.field_delimiter)
@@ -326,6 +332,41 @@ def test_schema_setter(self):
326332
job.schema = [full_name, age]
327333
self.assertEqual(job.schema, [full_name, age])
328334

335+
def test_schema_setter_w_autodetect(self):
336+
from google.cloud.bigquery.schema import SchemaField
337+
338+
client = _Client(self.PROJECT)
339+
table = _Table()
340+
full_name = SchemaField('full_name', 'STRING')
341+
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
342+
job.autodetect = False
343+
job.schema = [full_name]
344+
self.assertEqual(job.schema, [full_name])
345+
346+
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
347+
job.autodetect = True
348+
with self.assertRaises(ValueError):
349+
job.schema = [full_name]
350+
351+
def test_autodetect_setter_w_schema(self):
352+
from google.cloud.bigquery.schema import SchemaField
353+
354+
client = _Client(self.PROJECT)
355+
table = _Table()
356+
full_name = SchemaField('full_name', 'STRING')
357+
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
358+
359+
job.autodetect = True
360+
job.schema = []
361+
self.assertEqual(job.schema, [])
362+
363+
job.autodetect = False
364+
job.schema = [full_name]
365+
self.assertEqual(job.autodetect, False)
366+
367+
with self.assertRaises(ValueError):
368+
job.autodetect = True
369+
329370
def test_props_set_by_server(self):
330371
import datetime
331372
from google.cloud._helpers import UTC
@@ -491,6 +532,47 @@ def test_begin_w_bound_client(self):
491532
self.assertEqual(req['data'], SENT)
492533
self._verifyResourceProperties(job, RESOURCE)
493534

535+
def test_begin_w_autodetect(self):
536+
path = '/projects/{}/jobs'.format(self.PROJECT)
537+
resource = self._makeResource()
538+
resource['configuration']['load']['autodetect'] = True
539+
# Ensure None for missing server-set props
540+
del resource['statistics']['creationTime']
541+
del resource['etag']
542+
del resource['selfLink']
543+
del resource['user_email']
544+
conn = _Connection(resource)
545+
client = _Client(project=self.PROJECT, connection=conn)
546+
table = _Table()
547+
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
548+
job.autodetect = True
549+
job.begin()
550+
551+
sent = {
552+
'jobReference': {
553+
'projectId': self.PROJECT,
554+
'jobId': self.JOB_NAME,
555+
},
556+
'configuration': {
557+
'load': {
558+
'sourceUris': [self.SOURCE1],
559+
'destinationTable': {
560+
'projectId': self.PROJECT,
561+
'datasetId': self.DS_NAME,
562+
'tableId': self.TABLE_NAME,
563+
},
564+
'autodetect': True
565+
},
566+
},
567+
}
568+
expected_request = {
569+
'method': 'POST',
570+
'path': path,
571+
'data': sent,
572+
}
573+
self.assertEqual(conn._requested, [expected_request])
574+
self._verifyResourceProperties(job, resource)
575+
494576
def test_begin_w_alternate_client(self):
495577
from google.cloud.bigquery.schema import SchemaField
496578

0 commit comments

Comments
 (0)