Skip to content

Commit ca2b8d6

Browse files
authored
Add to_bqstorage to convert from Table[Reference] google-cloud-bigquery-storage reference (googleapis#6840)
* Add to_bqstorage to convert from Table[Reference] google-cloud-bigquery-storage reference. This makes it easier to use the new BigQuery Storage API (currently in Alpha) in combination with the BigQuery API. * Remove logic for partition filter and snapshot selector. * Remove unused selected_fields argument.
1 parent 2d83592 commit ca2b8d6

File tree

6 files changed

+121
-14
lines changed

6 files changed

+121
-14
lines changed

bigquery/docs/snippets.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ def test_load_table_from_file(client, to_delete):
13141314

13151315

13161316
def test_load_table_from_uri_avro(client, to_delete, capsys):
1317-
dataset_id = 'load_table_from_uri_avro_{}'.format(_millis())
1317+
dataset_id = "load_table_from_uri_avro_{}".format(_millis())
13181318
dataset = bigquery.Dataset(client.dataset(dataset_id))
13191319
client.create_dataset(dataset)
13201320
to_delete.append(dataset)
@@ -1327,23 +1327,22 @@ def test_load_table_from_uri_avro(client, to_delete, capsys):
13271327
dataset_ref = client.dataset(dataset_id)
13281328
job_config = bigquery.LoadJobConfig()
13291329
job_config.source_format = bigquery.SourceFormat.AVRO
1330-
uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.avro'
1330+
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"
13311331

13321332
load_job = client.load_table_from_uri(
1333-
uri,
1334-
dataset_ref.table('us_states'),
1335-
job_config=job_config) # API request
1336-
print('Starting job {}'.format(load_job.job_id))
1333+
uri, dataset_ref.table("us_states"), job_config=job_config
1334+
) # API request
1335+
print("Starting job {}".format(load_job.job_id))
13371336

13381337
load_job.result() # Waits for table load to complete.
1339-
print('Job finished.')
1338+
print("Job finished.")
13401339

1341-
destination_table = client.get_table(dataset_ref.table('us_states'))
1342-
print('Loaded {} rows.'.format(destination_table.num_rows))
1340+
destination_table = client.get_table(dataset_ref.table("us_states"))
1341+
print("Loaded {} rows.".format(destination_table.num_rows))
13431342
# [END bigquery_load_table_gcs_avro]
13441343

13451344
out, _ = capsys.readouterr()
1346-
assert 'Loaded 50 rows.' in out
1345+
assert "Loaded 50 rows." in out
13471346

13481347

13491348
def test_load_table_from_uri_csv(client, to_delete, capsys):

bigquery/google/cloud/bigquery/table.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,38 @@ def to_api_repr(self):
279279
"tableId": self._table_id,
280280
}
281281

282+
def to_bqstorage(self):
283+
"""Construct a BigQuery Storage API representation of this table.
284+
285+
If the ``table_id`` contains a partition identifier (e.g.
286+
``my_table$201812``) or a snapshot identifier (e.g.
287+
``mytable@1234567890``), it is ignored. Use
288+
:class:`google.cloud.bigquery_storage_v1beta1.types.TableReadOptions`
289+
to filter rows by partition. Use
290+
:class:`google.cloud.bigquery_storage_v1beta1.types.TableModifiers`
291+
to select a specific snapshot to read from.
292+
293+
Returns:
294+
google.cloud.bigquery_storage_v1beta1.types.TableReference:
295+
A reference to this table in the BigQuery Storage API.
296+
"""
297+
from google.cloud import bigquery_storage_v1beta1
298+
299+
table_ref = bigquery_storage_v1beta1.types.TableReference()
300+
table_ref.project_id = self._project
301+
table_ref.dataset_id = self._dataset_id
302+
table_id = self._table_id
303+
304+
if "@" in table_id:
305+
table_id = table_id.split("@")[0]
306+
307+
if "$" in table_id:
308+
table_id = table_id.split("$")[0]
309+
310+
table_ref.table_id = table_id
311+
312+
return table_ref
313+
282314
def _key(self):
283315
"""A tuple key that uniquely describes this field.
284316
@@ -820,6 +852,15 @@ def to_api_repr(self):
820852
"""
821853
return copy.deepcopy(self._properties)
822854

855+
def to_bqstorage(self):
856+
"""Construct a BigQuery Storage API representation of this table.
857+
858+
Returns:
859+
google.cloud.bigquery_storage_v1beta1.types.TableReference:
860+
A reference to this table in the BigQuery Storage API.
861+
"""
862+
return self.reference.to_bqstorage()
863+
823864
def _build_resource(self, filter_fields):
824865
"""Generate a resource for ``update``."""
825866
partial = {}
@@ -971,6 +1012,41 @@ def friendly_name(self):
9711012

9721013
view_use_legacy_sql = property(_view_use_legacy_sql_getter)
9731014

1015+
@classmethod
1016+
def from_string(cls, full_table_id):
1017+
"""Construct a table from fully-qualified table ID.
1018+
1019+
Args:
1020+
full_table_id (str):
1021+
A fully-qualified table ID in standard SQL format. Must
1022+
included a project ID, dataset ID, and table ID, each
1023+
separated by ``.``.
1024+
1025+
Returns:
1026+
Table: Table parsed from ``full_table_id``.
1027+
1028+
Examples:
1029+
>>> Table.from_string('my-project.mydataset.mytable')
1030+
Table(TableRef...(D...('my-project', 'mydataset'), 'mytable'))
1031+
1032+
Raises:
1033+
ValueError:
1034+
If ``full_table_id`` is not a fully-qualified table ID in
1035+
standard SQL format.
1036+
"""
1037+
return cls(
1038+
{"tableReference": TableReference.from_string(full_table_id).to_api_repr()}
1039+
)
1040+
1041+
def to_bqstorage(self):
1042+
"""Construct a BigQuery Storage API representation of this table.
1043+
1044+
Returns:
1045+
google.cloud.bigquery_storage_v1beta1.types.TableReference:
1046+
A reference to this table in the BigQuery Storage API.
1047+
"""
1048+
return self.reference.to_bqstorage()
1049+
9741050

9751051
def _row_from_mapping(mapping, schema):
9761052
"""Convert a mapping to a row tuple using the schema.

bigquery/noxfile.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
LOCAL_DEPS = (
23-
os.path.join('..', 'api_core'),
23+
os.path.join('..', 'api_core[grpc]'),
2424
os.path.join('..', 'core'),
2525
)
2626

@@ -40,9 +40,9 @@ def default(session):
4040

4141
# Pyarrow does not support Python 3.7
4242
if session.python == '3.7':
43-
dev_install = '.[pandas]'
43+
dev_install = '.[bqstorage, pandas]'
4444
else:
45-
dev_install = '.[pandas, pyarrow]'
45+
dev_install = '.[bqstorage, pandas, pyarrow]'
4646
session.install('-e', dev_install)
4747

4848
# IPython does not support Python 2 after version 5.x

bigquery/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
'google-resumable-media >= 0.3.1',
3535
]
3636
extras = {
37+
'bqstorage': 'google-cloud-bigquery-storage<=2.0.0dev',
3738
'pandas': 'pandas>=0.17.1',
3839
# Exclude PyArrow dependency from Windows Python 2.7.
3940
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':

bigquery/tests/unit/test_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ def test_result_default_wo_state(self, result):
800800
begin.assert_called_once_with(retry=DEFAULT_RETRY)
801801
result.assert_called_once_with(timeout=None)
802802

803-
@mock.patch('google.api_core.future.polling.PollingFuture.result')
803+
@mock.patch("google.api_core.future.polling.PollingFuture.result")
804804
def test_result_w_retry_wo_state(self, result):
805805
client = _make_client(project=self.PROJECT)
806806
job = self._make_one(self.JOB_ID, client)

bigquery/tests/unit/test_table.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import itertools
1516
import unittest
1617

1718
import mock
19+
import pytest
1820
import six
1921

22+
try:
23+
from google.cloud import bigquery_storage_v1beta1
24+
except ImportError: # pragma: NO COVER
25+
bigquery_storage_v1beta1 = None
2026
try:
2127
import pandas
2228
except (ImportError, AttributeError): # pragma: NO COVER
@@ -1688,3 +1694,28 @@ def test_set_expiration_w_none(self):
16881694
time_partitioning = self._make_one()
16891695
time_partitioning.expiration_ms = None
16901696
assert time_partitioning._properties["expirationMs"] is None
1697+
1698+
1699+
@pytest.mark.skipif(
1700+
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
1701+
)
1702+
def test_table_reference_to_bqstorage():
1703+
from google.cloud.bigquery import table as mut
1704+
1705+
# Can't use parametrized pytest because bigquery_storage_v1beta1 may not be
1706+
# available.
1707+
expected = bigquery_storage_v1beta1.types.TableReference(
1708+
project_id="my-project", dataset_id="my_dataset", table_id="my_table"
1709+
)
1710+
cases = (
1711+
"my-project.my_dataset.my_table",
1712+
"my-project.my_dataset.my_table$20181225",
1713+
"my-project.my_dataset.my_table@1234567890",
1714+
"my-project.my_dataset.my_table$20181225@1234567890",
1715+
)
1716+
1717+
classes = (mut.TableReference, mut.Table, mut.TableListItem)
1718+
1719+
for case, cls in itertools.product(cases, classes):
1720+
got = cls.from_string(case).to_bqstorage()
1721+
assert got == expected

0 commit comments

Comments
 (0)