Skip to content

Commit c5f5f12

Browse files
tseavertswast
authored andcommitted
Add 'Client.get_job' API wrapper. (#3804)
* Allow assigning 'None' to '_TypedProperty' properties. * Ensure that configuration properties are copied when (re)loading jobs.
1 parent a05ae17 commit c5f5f12

File tree

7 files changed

+457
-41
lines changed

7 files changed

+457
-41
lines changed

bigquery/.coveragerc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ exclude_lines =
99
pragma: NO COVER
1010
# Ignore debug-only repr
1111
def __repr__
12+
# Ignore abstract methods
13+
raise NotImplementedError

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ def _validate(self, value):
316316
317317
:raises: ValueError on a type mismatch.
318318
"""
319+
if value is None:
320+
return
319321
if not isinstance(value, self.property_type):
320322
raise ValueError('Required type: %s' % (self.property_type,))
321323

@@ -413,6 +415,14 @@ def __init__(self, name, type_, value):
413415
self.type_ = type_
414416
self.value = value
415417

418+
def __eq__(self, other):
419+
if not isinstance(other, ScalarQueryParameter):
420+
return NotImplemented
421+
return(
422+
self.name == other.name and
423+
self.type_ == other.type_ and
424+
self.value == other.value)
425+
416426
@classmethod
417427
def positional(cls, type_, value):
418428
"""Factory for positional paramater.
@@ -515,6 +525,14 @@ def __init__(self, name, array_type, values):
515525
self.array_type = array_type
516526
self.values = values
517527

528+
def __eq__(self, other):
529+
if not isinstance(other, ArrayQueryParameter):
530+
return NotImplemented
531+
return(
532+
self.name == other.name and
533+
self.array_type == other.array_type and
534+
self.values == other.values)
535+
518536
@classmethod
519537
def positional(cls, array_type, values):
520538
"""Factory for positional parameters.
@@ -657,6 +675,14 @@ def __init__(self, name, *sub_params):
657675
types[sub.name] = sub.type_
658676
values[sub.name] = sub.value
659677

678+
def __eq__(self, other):
679+
if not isinstance(other, StructQueryParameter):
680+
return NotImplemented
681+
return(
682+
self.name == other.name and
683+
self.struct_types == other.struct_types and
684+
self.struct_values == other.struct_values)
685+
660686
@classmethod
661687
def positional(cls, *sub_params):
662688
"""Factory for positional parameters.
@@ -770,6 +796,18 @@ def __repr__(self):
770796
return 'StructQueryParameter{}'.format(self._key())
771797

772798

799+
def _query_param_from_api_repr(resource):
800+
"""Helper: construct concrete query parameter from JSON resource."""
801+
qp_type = resource['parameterType']
802+
if 'arrayType' in qp_type:
803+
klass = ArrayQueryParameter
804+
elif 'structTypes' in qp_type:
805+
klass = StructQueryParameter
806+
else:
807+
klass = ScalarQueryParameter
808+
return klass.from_api_repr(resource)
809+
810+
773811
class QueryParametersProperty(object):
774812
"""Custom property type, holding query parameter instances."""
775813

bigquery/google/cloud/bigquery/client.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,35 @@ def job_from_resource(self, resource):
222222
return QueryJob.from_api_repr(resource, self)
223223
raise ValueError('Cannot parse job resource')
224224

225+
def get_job(self, job_id, project=None):
226+
"""Fetch a job for the project associated with this client.
227+
228+
See
229+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
230+
231+
:type job_id: str
232+
:param job_id: Name of the job.
233+
234+
:type project: str
235+
:param project:
236+
project ID owning the job (defaults to the client's project)
237+
238+
:rtype: :class:`~google.cloud.bigquery.job._AsyncJob`
239+
:returns:
240+
Concrete job instance, based on the resource returned by the API.
241+
"""
242+
extra_params = {'projection': 'full'}
243+
244+
if project is None:
245+
project = self.project
246+
247+
path = '/projects/{}/jobs/{}'.format(project, job_id)
248+
249+
resource = self._connection.api_request(
250+
method='GET', path=path, query_params=extra_params)
251+
252+
return self.job_from_resource(resource)
253+
225254
def list_jobs(self, max_results=None, page_token=None, all_users=None,
226255
state_filter=None):
227256
"""List jobs for the project associated with this client.
@@ -272,14 +301,14 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
272301
max_results=max_results,
273302
extra_params=extra_params)
274303

275-
def load_table_from_storage(self, job_name, destination, *source_uris):
304+
def load_table_from_storage(self, job_id, destination, *source_uris):
276305
"""Construct a job for loading data into a table from CloudStorage.
277306
278307
See
279308
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load
280309
281-
:type job_name: str
282-
:param job_name: Name of the job.
310+
:type job_id: str
311+
:param job_id: Name of the job.
283312
284313
:type destination: :class:`google.cloud.bigquery.table.Table`
285314
:param destination: Table into which data is to be loaded.
@@ -291,16 +320,16 @@ def load_table_from_storage(self, job_name, destination, *source_uris):
291320
:rtype: :class:`google.cloud.bigquery.job.LoadJob`
292321
:returns: a new ``LoadJob`` instance
293322
"""
294-
return LoadJob(job_name, destination, source_uris, client=self)
323+
return LoadJob(job_id, destination, source_uris, client=self)
295324

296-
def copy_table(self, job_name, destination, *sources):
325+
def copy_table(self, job_id, destination, *sources):
297326
"""Construct a job for copying one or more tables into another table.
298327
299328
See
300329
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
301330
302-
:type job_name: str
303-
:param job_name: Name of the job.
331+
:type job_id: str
332+
:param job_id: Name of the job.
304333
305334
:type destination: :class:`google.cloud.bigquery.table.Table`
306335
:param destination: Table into which data is to be copied.
@@ -311,16 +340,16 @@ def copy_table(self, job_name, destination, *sources):
311340
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
312341
:returns: a new ``CopyJob`` instance
313342
"""
314-
return CopyJob(job_name, destination, sources, client=self)
343+
return CopyJob(job_id, destination, sources, client=self)
315344

316-
def extract_table_to_storage(self, job_name, source, *destination_uris):
345+
def extract_table_to_storage(self, job_id, source, *destination_uris):
317346
"""Construct a job for extracting a table into Cloud Storage files.
318347
319348
See
320349
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract
321350
322-
:type job_name: str
323-
:param job_name: Name of the job.
351+
:type job_id: str
352+
:param job_id: Name of the job.
324353
325354
:type source: :class:`google.cloud.bigquery.table.Table`
326355
:param source: table to be extracted.
@@ -333,17 +362,17 @@ def extract_table_to_storage(self, job_name, source, *destination_uris):
333362
:rtype: :class:`google.cloud.bigquery.job.ExtractJob`
334363
:returns: a new ``ExtractJob`` instance
335364
"""
336-
return ExtractJob(job_name, source, destination_uris, client=self)
365+
return ExtractJob(job_id, source, destination_uris, client=self)
337366

338-
def run_async_query(self, job_name, query,
367+
def run_async_query(self, job_id, query,
339368
udf_resources=(), query_parameters=()):
340369
"""Construct a job for running a SQL query asynchronously.
341370
342371
See
343372
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query
344373
345-
:type job_name: str
346-
:param job_name: Name of the job.
374+
:type job_id: str
375+
:param job_id: Name of the job.
347376
348377
:type query: str
349378
:param query: SQL query to be executed
@@ -362,7 +391,7 @@ def run_async_query(self, job_name, query,
362391
:rtype: :class:`google.cloud.bigquery.job.QueryJob`
363392
:returns: a new ``QueryJob`` instance
364393
"""
365-
return QueryJob(job_name, query, client=self,
394+
return QueryJob(job_id, query, client=self,
366395
udf_resources=udf_resources,
367396
query_parameters=query_parameters)
368397

bigquery/google/cloud/bigquery/job.py

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
from google.cloud.bigquery._helpers import QueryParametersProperty
3333
from google.cloud.bigquery._helpers import ScalarQueryParameter
3434
from google.cloud.bigquery._helpers import StructQueryParameter
35+
from google.cloud.bigquery._helpers import UDFResource
3536
from google.cloud.bigquery._helpers import UDFResourcesProperty
3637
from google.cloud.bigquery._helpers import _EnumProperty
38+
from google.cloud.bigquery._helpers import _query_param_from_api_repr
3739
from google.cloud.bigquery._helpers import _TypedProperty
3840

3941
_DONE_STATE = 'DONE'
@@ -61,6 +63,22 @@
6163
}
6264

6365

66+
def _bool_or_none(value):
67+
"""Helper: deserialize boolean value from JSON string."""
68+
if isinstance(value, bool):
69+
return value
70+
if value is not None:
71+
return value.lower() in ['t', 'true', '1']
72+
73+
74+
def _int_or_none(value):
75+
"""Helper: deserialize int value from JSON string."""
76+
if isinstance(value, int):
77+
return value
78+
if value is not None:
79+
return int(value)
80+
81+
6482
def _error_result_to_exception(error_result):
6583
"""Maps BigQuery error reasons to an exception.
6684
@@ -311,6 +329,10 @@ def _scrub_local_properties(self, cleaned):
311329
"""Helper: handle subclass properties in cleaned."""
312330
pass
313331

332+
def _copy_configuration_properties(self, configuration):
333+
"""Helper: assign subclass configuration properties in cleaned."""
334+
raise NotImplementedError("Abstract")
335+
314336
def _set_properties(self, api_response):
315337
"""Update properties from resource in body of ``api_response``
316338
@@ -330,6 +352,8 @@ def _set_properties(self, api_response):
330352

331353
self._properties.clear()
332354
self._properties.update(cleaned)
355+
configuration = cleaned['configuration'][self._JOB_TYPE]
356+
self._copy_configuration_properties(configuration)
333357

334358
# For Future interface
335359
self._set_future_result()
@@ -731,7 +755,7 @@ def _populate_config_resource(self, configuration):
731755
if self.quote_character is not None:
732756
configuration['quote'] = self.quote_character
733757
if self.skip_leading_rows is not None:
734-
configuration['skipLeadingRows'] = self.skip_leading_rows
758+
configuration['skipLeadingRows'] = str(self.skip_leading_rows)
735759
if self.source_format is not None:
736760
configuration['sourceFormat'] = self.source_format
737761
if self.write_disposition is not None:
@@ -769,6 +793,28 @@ def _scrub_local_properties(self, cleaned):
769793
schema = cleaned.pop('schema', {'fields': ()})
770794
self.schema = _parse_schema_resource(schema)
771795

796+
def _copy_configuration_properties(self, configuration):
797+
"""Helper: assign subclass configuration properties in cleaned."""
798+
self.allow_jagged_rows = _bool_or_none(
799+
configuration.get('allowJaggedRows'))
800+
self.allow_quoted_newlines = _bool_or_none(
801+
configuration.get('allowQuotedNewlines'))
802+
self.autodetect = _bool_or_none(
803+
configuration.get('autodetect'))
804+
self.create_disposition = configuration.get('createDisposition')
805+
self.encoding = configuration.get('encoding')
806+
self.field_delimiter = configuration.get('fieldDelimiter')
807+
self.ignore_unknown_values = _bool_or_none(
808+
configuration.get('ignoreUnknownValues'))
809+
self.max_bad_records = _int_or_none(
810+
configuration.get('maxBadRecords'))
811+
self.null_marker = configuration.get('nullMarker')
812+
self.quote_character = configuration.get('quote')
813+
self.skip_leading_rows = _int_or_none(
814+
configuration.get('skipLeadingRows'))
815+
self.source_format = configuration.get('sourceFormat')
816+
self.write_disposition = configuration.get('writeDisposition')
817+
772818
@classmethod
773819
def from_api_repr(cls, resource, client):
774820
"""Factory: construct a job given its API representation
@@ -879,6 +925,11 @@ def _build_resource(self):
879925

880926
return resource
881927

928+
def _copy_configuration_properties(self, configuration):
929+
"""Helper: assign subclass configuration properties in cleaned."""
930+
self.create_disposition = configuration.get('createDisposition')
931+
self.write_disposition = configuration.get('writeDisposition')
932+
882933
@classmethod
883934
def from_api_repr(cls, resource, client):
884935
"""Factory: construct a job given its API representation
@@ -1012,6 +1063,14 @@ def _build_resource(self):
10121063

10131064
return resource
10141065

1066+
def _copy_configuration_properties(self, configuration):
1067+
"""Helper: assign subclass configuration properties in cleaned."""
1068+
self.compression = configuration.get('compression')
1069+
self.destination_format = configuration.get('destinationFormat')
1070+
self.field_delimiter = configuration.get('fieldDelimiter')
1071+
self.print_header = _bool_or_none(
1072+
configuration.get('printHeader'))
1073+
10151074
@classmethod
10161075
def from_api_repr(cls, resource, client):
10171076
"""Factory: construct a job given its API representation
@@ -1208,7 +1267,8 @@ def _populate_config_resource(self, configuration):
12081267
if self.maximum_billing_tier is not None:
12091268
configuration['maximumBillingTier'] = self.maximum_billing_tier
12101269
if self.maximum_bytes_billed is not None:
1211-
configuration['maximumBytesBilled'] = self.maximum_bytes_billed
1270+
configuration['maximumBytesBilled'] = str(
1271+
self.maximum_bytes_billed)
12121272
if len(self._udf_resources) > 0:
12131273
configuration[self._UDF_KEY] = [
12141274
{udf_resource.udf_type: udf_resource.value}
@@ -1258,6 +1318,25 @@ def _scrub_local_properties(self, cleaned):
12581318
configuration = cleaned['configuration']['query']
12591319

12601320
self.query = configuration['query']
1321+
1322+
def _copy_configuration_properties(self, configuration):
1323+
"""Helper: assign subclass configuration properties in cleaned."""
1324+
self.allow_large_results = _bool_or_none(
1325+
configuration.get('allowLargeResults'))
1326+
self.flatten_results = _bool_or_none(
1327+
configuration.get('flattenResults'))
1328+
self.use_query_cache = _bool_or_none(
1329+
configuration.get('useQueryCache'))
1330+
self.use_legacy_sql = _bool_or_none(
1331+
configuration.get('useLegacySql'))
1332+
1333+
self.create_disposition = configuration.get('createDisposition')
1334+
self.priority = configuration.get('priority')
1335+
self.write_disposition = configuration.get('writeDisposition')
1336+
self.maximum_billing_tier = configuration.get('maximumBillingTier')
1337+
self.maximum_bytes_billed = _int_or_none(
1338+
configuration.get('maximumBytesBilled'))
1339+
12611340
dest_remote = configuration.get('destinationTable')
12621341

12631342
if dest_remote is None:
@@ -1266,9 +1345,30 @@ def _scrub_local_properties(self, cleaned):
12661345
else:
12671346
dest_local = self._destination_table_resource()
12681347
if dest_remote != dest_local:
1269-
dataset = self._client.dataset(dest_remote['datasetId'])
1348+
project = dest_remote['projectId']
1349+
dataset = self._client.dataset(
1350+
dest_remote['datasetId'], project=project)
12701351
self.destination = dataset.table(dest_remote['tableId'])
12711352

1353+
def_ds = configuration.get('defaultDataset')
1354+
if def_ds is None:
1355+
if self.default_dataset is not None:
1356+
del self.default_dataset
1357+
else:
1358+
project = def_ds['projectId']
1359+
self.default_dataset = self._client.dataset(def_ds['datasetId'])
1360+
1361+
udf_resources = []
1362+
for udf_mapping in configuration.get(self._UDF_KEY, ()):
1363+
key_val, = udf_mapping.items()
1364+
udf_resources.append(UDFResource(key_val[0], key_val[1]))
1365+
self._udf_resources = udf_resources
1366+
1367+
self._query_parameters = [
1368+
_query_param_from_api_repr(mapping)
1369+
for mapping in configuration.get(self._QUERY_PARAMETERS_KEY, ())
1370+
]
1371+
12721372
@classmethod
12731373
def from_api_repr(cls, resource, client):
12741374
"""Factory: construct a job given its API representation

0 commit comments

Comments
 (0)