Skip to content

Commit a92d69f

Browse files
authored
Merge pull request #2321 from tseaver/2083-bigquery-async_query_job-query_results
Add 'QueryJob.results' method
2 parents 9239c3c + ef7b499 commit a92d69f

File tree

7 files changed

+237
-131
lines changed

7 files changed

+237
-131
lines changed

docs/bigquery-usage.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,19 @@ Poll until the job is complete:
284284
>>> job.ended
285285
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)
286286

287+
Retrieve the results:
288+
289+
.. doctest::
290+
291+
>>> results = job.results()
292+
>>> rows, total_count, token = query.fetch_data() # API requet
293+
>>> while True:
294+
... do_something_with(rows)
295+
... if token is None:
296+
... break
297+
... rows, total_count, token = query.fetch_data(
298+
... page_token=token) # API request
299+
287300

288301
Inserting data (asynchronous)
289302
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

google/cloud/bigquery/_helpers.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,55 @@ def _validate(self, value):
172172
"""
173173
if value not in self.ALLOWED:
174174
raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED))
175+
176+
177+
class UDFResource(object):
178+
"""Describe a single user-defined function (UDF) resource.
179+
:type udf_type: str
180+
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
181+
182+
:type value: str
183+
:param value: the inline code or resource URI.
184+
185+
See
186+
https://cloud.google.com/bigquery/user-defined-functions#api
187+
"""
188+
def __init__(self, udf_type, value):
189+
self.udf_type = udf_type
190+
self.value = value
191+
192+
def __eq__(self, other):
193+
return(
194+
self.udf_type == other.udf_type and
195+
self.value == other.value)
196+
197+
198+
class UDFResourcesProperty(object):
199+
"""Custom property type, holding :class:`UDFResource` instances."""
200+
201+
def __get__(self, instance, owner):
202+
"""Descriptor protocol: accessor"""
203+
if instance is None:
204+
return self
205+
return list(instance._udf_resources)
206+
207+
def __set__(self, instance, value):
208+
"""Descriptor protocol: mutator"""
209+
if not all(isinstance(u, UDFResource) for u in value):
210+
raise ValueError("udf items must be UDFResource")
211+
instance._udf_resources = tuple(value)
212+
213+
214+
def _build_udf_resources(resources):
215+
"""
216+
:type resources: sequence of :class:`UDFResource`
217+
:param resources: fields to be appended.
218+
219+
:rtype: mapping
220+
:returns: a mapping describing userDefinedFunctionResources for the query.
221+
"""
222+
udfs = []
223+
for resource in resources:
224+
udf = {resource.udf_type: resource.value}
225+
udfs.append(udf)
226+
return udfs

google/cloud/bigquery/job.py

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,62 +23,10 @@
2323
from google.cloud.bigquery.table import Table
2424
from google.cloud.bigquery.table import _build_schema_resource
2525
from google.cloud.bigquery.table import _parse_schema_resource
26+
from google.cloud.bigquery._helpers import UDFResourcesProperty
2627
from google.cloud.bigquery._helpers import _EnumProperty
2728
from google.cloud.bigquery._helpers import _TypedProperty
28-
29-
30-
class UDFResource(object):
31-
"""Describe a single user-defined function (UDF) resource.
32-
:type udf_type: str
33-
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
34-
35-
:type value: str
36-
:param value: the inline code or resource URI.
37-
38-
See
39-
https://cloud.google.com/bigquery/user-defined-functions#api
40-
"""
41-
def __init__(self, udf_type, value):
42-
self.udf_type = udf_type
43-
self.value = value
44-
45-
def __eq__(self, other):
46-
return(
47-
self.udf_type == other.udf_type and
48-
self.value == other.value)
49-
50-
51-
def _build_udf_resources(resources):
52-
"""
53-
:type resources: sequence of :class:`UDFResource`
54-
:param resources: fields to be appended.
55-
56-
:rtype: mapping
57-
:returns: a mapping describing userDefinedFunctionResources for the query.
58-
"""
59-
udfs = []
60-
for resource in resources:
61-
udf = {resource.udf_type: resource.value}
62-
udfs.append(udf)
63-
return udfs
64-
65-
66-
class UDFResourcesProperty(object):
67-
"""Custom property type for :class:`QueryJob`.
68-
69-
Also used by :class:`~google.cloud.bigquery.query.Query`.
70-
"""
71-
def __get__(self, instance, owner):
72-
"""Descriptor protocol: accessor"""
73-
if instance is None:
74-
return self
75-
return list(instance._udf_resources)
76-
77-
def __set__(self, instance, value):
78-
"""Descriptor protocol: mutator"""
79-
if not all(isinstance(u, UDFResource) for u in value):
80-
raise ValueError("udf items must be UDFResource")
81-
instance._udf_resources = tuple(value)
29+
from google.cloud.bigquery._helpers import _build_udf_resources
8230

8331

8432
class Compression(_EnumProperty):
@@ -957,7 +905,7 @@ class QueryJob(_AsyncJob):
957905
958906
:type udf_resources: tuple
959907
:param udf_resources: An iterable of
960-
:class:`google.cloud.bigquery.job.UDFResource`
908+
:class:`google.cloud.bigquery._helpers.UDFResource`
961909
(empty by default)
962910
"""
963911
_JOB_TYPE = 'query'
@@ -1130,3 +1078,12 @@ def from_api_repr(cls, resource, client):
11301078
job = cls(name, query, client=client)
11311079
job._set_properties(resource)
11321080
return job
1081+
1082+
def results(self):
1083+
"""Construct a QueryResults instance, bound to this job.
1084+
1085+
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
1086+
:returns: results instance
1087+
"""
1088+
from google.cloud.bigquery.query import QueryResults
1089+
return QueryResults.from_query_job(self)

google/cloud/bigquery/query.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
from google.cloud.bigquery._helpers import _rows_from_json
2121
from google.cloud.bigquery.dataset import Dataset
2222
from google.cloud.bigquery.job import QueryJob
23-
from google.cloud.bigquery.job import UDFResourcesProperty
24-
from google.cloud.bigquery.job import _build_udf_resources
2523
from google.cloud.bigquery.table import _parse_schema_resource
24+
from google.cloud.bigquery._helpers import _build_udf_resources
25+
from google.cloud.bigquery._helpers import UDFResourcesProperty
2626

2727

2828
class _SyncQueryConfiguration(object):
@@ -65,6 +65,26 @@ def __init__(self, query, client, udf_resources=()):
6565
self.udf_resources = udf_resources
6666
self._job = None
6767

68+
@classmethod
69+
def from_query_job(cls, job):
70+
"""Factory: construct from an existing job.
71+
72+
:type job: :class:`~google.cloud.bigquery.job.QueryJob`
73+
:param job: existing job
74+
75+
:rtype: :class:`QueryResults`
76+
:returns: the instance, bound to the job
77+
"""
78+
instance = cls(job.query, job._client, job.udf_resources)
79+
instance._job = job
80+
if job.default_dataset is not None:
81+
instance.default_dataset = job.default_dataset
82+
if job.use_query_cache is not None:
83+
instance.use_query_cache = job.use_query_cache
84+
if job.use_legacy_sql is not None:
85+
instance.use_legacy_sql = job.use_legacy_sql
86+
return instance
87+
6888
@property
6989
def project(self):
7090
"""Project bound to the job.
@@ -307,6 +327,9 @@ def run(self, client=None):
307327
:param client: the client to use. If not passed, falls back to the
308328
``client`` stored on the current dataset.
309329
"""
330+
if self._job is not None:
331+
raise ValueError("Query job is already running.")
332+
310333
client = self._require_client(client)
311334
path = '/projects/%s/queries' % (self.project,)
312335
api_response = client.connection.api_request(

unit_tests/bigquery/test__helpers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,76 @@ def __init__(self):
386386
self.assertEqual(wrapper._configuration._attr, None)
387387

388388

389+
class Test_UDFResourcesProperty(unittest.TestCase):
390+
391+
def _getTargetClass(self):
392+
from google.cloud.bigquery._helpers import UDFResourcesProperty
393+
return UDFResourcesProperty
394+
395+
def _makeOne(self, *args, **kw):
396+
return self._getTargetClass()(*args, **kw)
397+
398+
def _descriptor_and_klass(self):
399+
descriptor = self._makeOne()
400+
401+
class _Test(object):
402+
_udf_resources = ()
403+
udf_resources = descriptor
404+
405+
return descriptor, _Test
406+
407+
def test_class_getter(self):
408+
descriptor, klass = self._descriptor_and_klass()
409+
self.assertTrue(klass.udf_resources is descriptor)
410+
411+
def test_instance_getter_empty(self):
412+
_, klass = self._descriptor_and_klass()
413+
instance = klass()
414+
self.assertEqual(instance.udf_resources, [])
415+
416+
def test_instance_getter_w_non_empty_list(self):
417+
from google.cloud.bigquery._helpers import UDFResource
418+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
419+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
420+
_, klass = self._descriptor_and_klass()
421+
instance = klass()
422+
instance._udf_resources = tuple(udf_resources)
423+
424+
self.assertEqual(instance.udf_resources, udf_resources)
425+
426+
def test_instance_setter_w_empty_list(self):
427+
from google.cloud.bigquery._helpers import UDFResource
428+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
429+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
430+
_, klass = self._descriptor_and_klass()
431+
instance = klass()
432+
instance._udf_resources = udf_resources
433+
434+
instance.udf_resources = []
435+
436+
self.assertEqual(instance.udf_resources, [])
437+
438+
def test_instance_setter_w_valid_udf(self):
439+
from google.cloud.bigquery._helpers import UDFResource
440+
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
441+
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
442+
_, klass = self._descriptor_and_klass()
443+
instance = klass()
444+
445+
instance.udf_resources = udf_resources
446+
447+
self.assertEqual(instance.udf_resources, udf_resources)
448+
449+
def test_instance_setter_w_bad_udfs(self):
450+
_, klass = self._descriptor_and_klass()
451+
instance = klass()
452+
453+
with self.assertRaises(ValueError):
454+
instance.udf_resources = ["foo"]
455+
456+
self.assertEqual(instance.udf_resources, [])
457+
458+
389459
class _Field(object):
390460

391461
def __init__(self, mode, name='unknown', field_type='UNKNOWN', fields=()):

unit_tests/bigquery/test_job.py

Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,76 +15,6 @@
1515
import unittest
1616

1717

18-
class Test_UDFResourcesProperty(unittest.TestCase):
19-
20-
def _getTargetClass(self):
21-
from google.cloud.bigquery.job import UDFResourcesProperty
22-
return UDFResourcesProperty
23-
24-
def _makeOne(self, *args, **kw):
25-
return self._getTargetClass()(*args, **kw)
26-
27-
def _descriptor_and_klass(self):
28-
descriptor = self._makeOne()
29-
30-
class _Test(object):
31-
_udf_resources = ()
32-
udf_resources = descriptor
33-
34-
return descriptor, _Test
35-
36-
def test_class_getter(self):
37-
descriptor, klass = self._descriptor_and_klass()
38-
self.assertTrue(klass.udf_resources is descriptor)
39-
40-
def test_instance_getter_empty(self):
41-
_, klass = self._descriptor_and_klass()
42-
instance = klass()
43-
self.assertEqual(instance.udf_resources, [])
44-
45-
def test_instance_getter_w_non_empty_list(self):
46-
from google.cloud.bigquery.job import UDFResource
47-
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
48-
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
49-
_, klass = self._descriptor_and_klass()
50-
instance = klass()
51-
instance._udf_resources = tuple(udf_resources)
52-
53-
self.assertEqual(instance.udf_resources, udf_resources)
54-
55-
def test_instance_setter_w_empty_list(self):
56-
from google.cloud.bigquery.job import UDFResource
57-
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
58-
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
59-
_, klass = self._descriptor_and_klass()
60-
instance = klass()
61-
instance._udf_resources = udf_resources
62-
63-
instance.udf_resources = []
64-
65-
self.assertEqual(instance.udf_resources, [])
66-
67-
def test_instance_setter_w_valid_udf(self):
68-
from google.cloud.bigquery.job import UDFResource
69-
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
70-
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
71-
_, klass = self._descriptor_and_klass()
72-
instance = klass()
73-
74-
instance.udf_resources = udf_resources
75-
76-
self.assertEqual(instance.udf_resources, udf_resources)
77-
78-
def test_instance_setter_w_bad_udfs(self):
79-
_, klass = self._descriptor_and_klass()
80-
instance = klass()
81-
82-
with self.assertRaises(ValueError):
83-
instance.udf_resources = ["foo"]
84-
85-
self.assertEqual(instance.udf_resources, [])
86-
87-
8818
class _Base(object):
8919
PROJECT = 'project'
9020
SOURCE1 = 'http://example.com/source1.csv'
@@ -1466,6 +1396,14 @@ def test_from_api_repr_w_properties(self):
14661396
self.assertTrue(dataset._client is client)
14671397
self._verifyResourceProperties(dataset, RESOURCE)
14681398

1399+
def test_results(self):
1400+
from google.cloud.bigquery.query import QueryResults
1401+
client = _Client(self.PROJECT)
1402+
job = self._makeOne(self.JOB_NAME, self.QUERY, client)
1403+
results = job.results()
1404+
self.assertIsInstance(results, QueryResults)
1405+
self.assertTrue(results._job is job)
1406+
14691407
def test_begin_w_bound_client(self):
14701408
PATH = 'projects/%s/jobs' % self.PROJECT
14711409
RESOURCE = self._makeResource()
@@ -1568,7 +1506,7 @@ def test_begin_w_alternate_client(self):
15681506
self._verifyResourceProperties(job, RESOURCE)
15691507

15701508
def test_begin_w_bound_client_and_udf(self):
1571-
from google.cloud.bigquery.job import UDFResource
1509+
from google.cloud.bigquery._helpers import UDFResource
15721510
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
15731511
PATH = 'projects/%s/jobs' % self.PROJECT
15741512
RESOURCE = self._makeResource()

0 commit comments

Comments
 (0)