Skip to content

Commit a9f701a

Browse files
authored
Merge pull request #2058 from dwmclary/add_partition_support
Add partition support to BigQuery
2 parents a54a0d4 + 8b450e7 commit a9f701a

File tree

2 files changed

+229
-0
lines changed

2 files changed

+229
-0
lines changed

gcloud/bigquery/table.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,56 @@ def table_type(self):
230230
"""
231231
return self._properties.get('type')
232232

233+
@property
234+
def partitioning_type(self):
235+
"""Time partitioning of the table.
236+
:rtype: str, or ``NoneType``
237+
:returns: Returns type if the table is partitioned, None otherwise.
238+
"""
239+
return self._properties.get('timePartitioning', {}).get('type')
240+
241+
@partitioning_type.setter
242+
def partitioning_type(self, value):
243+
"""Update the partitioning type of the table
244+
245+
:type value: str
246+
:param value: partitioning type only "DAY" is currently supported
247+
"""
248+
if not (isinstance(value, six.string_types)
249+
and value.upper() == "DAY") and value is not None:
250+
raise ValueError("value must be one of ['DAY', None]")
251+
252+
self._properties.setdefault('timePartitioning', {})
253+
if value is not None:
254+
self._properties['timePartitioning']['type'] = value.upper()
255+
256+
@property
257+
def partition_expiration(self):
258+
"""Expiration time in ms for a partition
259+
:rtype: int, or ``NoneType``
260+
:returns: Returns the time in ms for partition expiration
261+
"""
262+
expiry = None
263+
if "timePartitioning" in self._properties:
264+
time_part = self._properties.get("timePartitioning")
265+
expiry = time_part.get("expirationMs")
266+
return expiry
267+
268+
@partition_expiration.setter
269+
def partition_expiration(self, value):
270+
"""Update the experation time in ms for a partition
271+
272+
:type value: int
273+
:param value: partition experiation time in ms
274+
"""
275+
if not isinstance(value, int):
276+
raise ValueError("must be an integer representing millisseconds")
277+
try:
278+
self._properties["timePartitioning"]["expirationMs"] = value
279+
except KeyError:
280+
self._properties['timePartitioning'] = {'type': "DAY"}
281+
self._properties["timePartitioning"]["expirationMs"] = value
282+
233283
@property
234284
def description(self):
235285
"""Description of the table.
@@ -348,6 +398,22 @@ def view_query(self):
348398
"""Delete SQL query defining the table as a view."""
349399
self._properties.pop('view', None)
350400

401+
def list_partitions(self, client=None):
402+
"""List the partitions in a table.
403+
404+
:type client: :class:`gcloud.bigquery.client.Client` or ``NoneType``
405+
:param client: the client to use. If not passed, falls back to the
406+
``client`` stored on the current dataset.
407+
408+
:rtype: list
409+
:returns: a list of time partitions
410+
"""
411+
query = self._require_client(client).run_sync_query(
412+
'SELECT partition_id from [%s.%s$__PARTITIONS_SUMMARY__]' %
413+
(self.dataset_name, self.name))
414+
query.run()
415+
return [row[0] for row in query.rows]
416+
351417
@classmethod
352418
def from_api_repr(cls, resource, dataset):
353419
"""Factory: construct a table given its API representation
@@ -423,6 +489,9 @@ def _build_resource(self):
423489
if self.location is not None:
424490
resource['location'] = self.location
425491

492+
if self.partitioning_type is not None:
493+
resource['timePartitioning'] = self._properties['timePartitioning']
494+
426495
if self.view_query is not None:
427496
view = resource['view'] = {}
428497
view['query'] = self.view_query

gcloud/bigquery/test_table.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,154 @@ def test_create_w_bound_client(self):
524524
self.assertEqual(req['data'], SENT)
525525
self._verifyResourceProperties(table, RESOURCE)
526526

527+
def test_create_w_partition_no_expire(self):
528+
from gcloud.bigquery.table import SchemaField
529+
PATH = 'projects/%s/datasets/%s/tables' % (self.PROJECT, self.DS_NAME)
530+
RESOURCE = self._makeResource()
531+
conn = _Connection(RESOURCE)
532+
client = _Client(project=self.PROJECT, connection=conn)
533+
dataset = _Dataset(client)
534+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
535+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
536+
table = self._makeOne(self.TABLE_NAME, dataset,
537+
schema=[full_name, age])
538+
539+
self.assertEqual(table.partitioning_type, None)
540+
table.partitioning_type = "DAY"
541+
self.assertEqual(table.partitioning_type, "DAY")
542+
table.create()
543+
544+
self.assertEqual(len(conn._requested), 1)
545+
req = conn._requested[0]
546+
self.assertEqual(req['method'], 'POST')
547+
self.assertEqual(req['path'], '/%s' % PATH)
548+
SENT = {
549+
'tableReference': {
550+
'projectId': self.PROJECT,
551+
'datasetId': self.DS_NAME,
552+
'tableId': self.TABLE_NAME},
553+
'timePartitioning': {'type': 'DAY'},
554+
'schema': {'fields': [
555+
{'name': 'full_name', 'type': 'STRING', 'mode': 'REQUIRED'},
556+
{'name': 'age', 'type': 'INTEGER', 'mode': 'REQUIRED'}]},
557+
}
558+
self.assertEqual(req['data'], SENT)
559+
self._verifyResourceProperties(table, RESOURCE)
560+
561+
def test_create_w_partition_and_expire(self):
562+
from gcloud.bigquery.table import SchemaField
563+
PATH = 'projects/%s/datasets/%s/tables' % (self.PROJECT, self.DS_NAME)
564+
RESOURCE = self._makeResource()
565+
conn = _Connection(RESOURCE)
566+
client = _Client(project=self.PROJECT, connection=conn)
567+
dataset = _Dataset(client)
568+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
569+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
570+
table = self._makeOne(self.TABLE_NAME, dataset,
571+
schema=[full_name, age])
572+
self.assertEqual(table.partition_expiration, None)
573+
table.partition_expiration = 100
574+
self.assertEqual(table.partitioning_type, "DAY")
575+
self.assertEqual(table.partition_expiration, 100)
576+
table.create()
577+
578+
self.assertEqual(len(conn._requested), 1)
579+
req = conn._requested[0]
580+
self.assertEqual(req['method'], 'POST')
581+
self.assertEqual(req['path'], '/%s' % PATH)
582+
SENT = {
583+
'tableReference': {
584+
'projectId': self.PROJECT,
585+
'datasetId': self.DS_NAME,
586+
'tableId': self.TABLE_NAME},
587+
'timePartitioning': {'type': 'DAY', 'expirationMs': 100},
588+
'schema': {'fields': [
589+
{'name': 'full_name', 'type': 'STRING', 'mode': 'REQUIRED'},
590+
{'name': 'age', 'type': 'INTEGER', 'mode': 'REQUIRED'}]},
591+
}
592+
self.assertEqual(req['data'], SENT)
593+
self._verifyResourceProperties(table, RESOURCE)
594+
595+
def test_partition_type_setter_none_type(self):
596+
from gcloud.bigquery.table import SchemaField
597+
RESOURCE = self._makeResource()
598+
conn = _Connection(RESOURCE)
599+
client = _Client(project=self.PROJECT, connection=conn)
600+
dataset = _Dataset(client)
601+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
602+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
603+
table = self._makeOne(self.TABLE_NAME, dataset,
604+
schema=[full_name, age])
605+
self.assertEqual(table.partitioning_type, None)
606+
table.partitioning_type = None
607+
self.assertEqual(table.partitioning_type, None)
608+
609+
def test_partition_type_setter_bad_type(self):
610+
from gcloud.bigquery.table import SchemaField
611+
RESOURCE = self._makeResource()
612+
conn = _Connection(RESOURCE)
613+
client = _Client(project=self.PROJECT, connection=conn)
614+
dataset = _Dataset(client)
615+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
616+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
617+
table = self._makeOne(self.TABLE_NAME, dataset,
618+
schema=[full_name, age])
619+
with self.assertRaises(ValueError):
620+
table.partitioning_type = 123
621+
622+
def test_partition_type_setter_unknown_value(self):
623+
from gcloud.bigquery.table import SchemaField
624+
RESOURCE = self._makeResource()
625+
conn = _Connection(RESOURCE)
626+
client = _Client(project=self.PROJECT, connection=conn)
627+
dataset = _Dataset(client)
628+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
629+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
630+
table = self._makeOne(self.TABLE_NAME, dataset,
631+
schema=[full_name, age])
632+
with self.assertRaises(ValueError):
633+
table.partitioning_type = "HASH"
634+
635+
def test_partition_experiation_bad_type(self):
636+
from gcloud.bigquery.table import SchemaField
637+
RESOURCE = self._makeResource()
638+
conn = _Connection(RESOURCE)
639+
client = _Client(project=self.PROJECT, connection=conn)
640+
dataset = _Dataset(client)
641+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
642+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
643+
table = self._makeOne(self.TABLE_NAME, dataset,
644+
schema=[full_name, age])
645+
with self.assertRaises(ValueError):
646+
table.partition_expiration = "NEVER"
647+
648+
def test_partition_expiration(self):
649+
from gcloud.bigquery.table import SchemaField
650+
RESOURCE = self._makeResource()
651+
conn = _Connection(RESOURCE)
652+
client = _Client(project=self.PROJECT, connection=conn)
653+
dataset = _Dataset(client)
654+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
655+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
656+
table = self._makeOne(self.TABLE_NAME, dataset,
657+
schema=[full_name, age])
658+
self.assertEqual(table.partition_expiration, None)
659+
table.partition_expiration = 100
660+
self.assertEqual(table.partitioning_type, "DAY")
661+
self.assertEqual(table.partition_expiration, 100)
662+
663+
def test_list_partitions(self):
664+
from gcloud.bigquery.table import SchemaField
665+
RESOURCE = self._makeResource()
666+
conn = _Connection(RESOURCE)
667+
client = _Client(project=self.PROJECT, connection=conn)
668+
dataset = _Dataset(client)
669+
full_name = SchemaField('full_name', 'STRING', mode='REQUIRED')
670+
age = SchemaField('age', 'INTEGER', mode='REQUIRED')
671+
table = self._makeOne(self.TABLE_NAME, dataset,
672+
schema=[full_name, age])
673+
self.assertEqual(table.list_partitions(), [20160804, 20160805])
674+
527675
def test_create_w_alternate_client(self):
528676
import datetime
529677
from gcloud._helpers import UTC
@@ -1669,6 +1817,18 @@ def __init__(self, project='project', connection=None):
16691817
def job_from_resource(self, resource): # pylint: disable=unused-argument
16701818
return self._job
16711819

1820+
def run_sync_query(self, q=None): # pylint: disable=unused-argument
1821+
return _Query(self)
1822+
1823+
1824+
class _Query(object):
1825+
1826+
def __init__(self, client=None): # pylint: disable=unused-argument
1827+
self.rows = []
1828+
1829+
def run(self):
1830+
self.rows = [(20160804, None), (20160805, None)]
1831+
16721832

16731833
class _Dataset(object):
16741834

0 commit comments

Comments
 (0)