Skip to content

Commit 92d76f8

Browse files
tswasttseaver
authored andcommitted
Add args from 'RowIterator.to_dataframe()' to 'QueryJob.to_dataframe()'. (googleapis#7241)
1 parent 20e40e6 commit 92d76f8

3 files changed

Lines changed: 155 additions & 3 deletions

File tree

bigquery/google/cloud/bigquery/job.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2778,9 +2778,34 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
27782778
dest_table = Table(dest_table_ref, schema=schema)
27792779
return self._client.list_rows(dest_table, retry=retry)
27802780

2781-
def to_dataframe(self):
2781+
def to_dataframe(self, bqstorage_client=None, dtypes=None):
27822782
"""Return a pandas DataFrame from a QueryJob
27832783
2784+
Args:
2785+
bqstorage_client ( \
2786+
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
2787+
):
2788+
**Alpha Feature** Optional. A BigQuery Storage API client. If
2789+
supplied, use the faster BigQuery Storage API to fetch rows
2790+
from BigQuery. This API is a billable API.
2791+
2792+
This method requires the ``fastavro`` and
2793+
``google-cloud-bigquery-storage`` libraries.
2794+
2795+
Reading from a specific partition or snapshot is not
2796+
currently supported by this method.
2797+
2798+
**Caution**: There is a known issue reading small anonymous
2799+
query result tables with the BQ Storage API. Write your query
2800+
results to a destination table to work around this issue.
2801+
dtypes ( \
2802+
Map[str, Union[str, pandas.Series.dtype]] \
2803+
):
2804+
Optional. A dictionary of column names pandas ``dtype``s. The
2805+
provided ``dtype`` is used when constructing the series for
2806+
the column specified. Otherwise, the default pandas behavior
2807+
is used.
2808+
27842809
Returns:
27852810
A :class:`~pandas.DataFrame` populated with row data and column
27862811
headers from the query results. The column headers are derived
@@ -2789,7 +2814,9 @@ def to_dataframe(self):
27892814
Raises:
27902815
ValueError: If the `pandas` library cannot be imported.
27912816
"""
2792-
return self.result().to_dataframe()
2817+
return self.result().to_dataframe(
2818+
bqstorage_client=bqstorage_client, dtypes=dtypes
2819+
)
27932820

27942821
def __iter__(self):
27952822
return iter(self.result())

bigquery/google/cloud/bigquery/table.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1441,7 +1441,19 @@ class _EmptyRowIterator(object):
14411441
pages = ()
14421442
total_rows = 0
14431443

1444-
def to_dataframe(self):
1444+
def to_dataframe(self, bqstorage_client=None, dtypes=None):
1445+
"""Create an empty dataframe.
1446+
1447+
Args:
1448+
bqstorage_client (Any):
1449+
Ignored. Added for compatibility with RowIterator.
1450+
dtypes (Any):
1451+
Ignored. Added for compatibility with RowIterator.
1452+
1453+
Returns:
1454+
pandas.DataFrame:
1455+
An empty :class:`~pandas.DataFrame`.
1456+
"""
14451457
if pandas is None:
14461458
raise ValueError(_NO_PANDAS_ERROR)
14471459
return pandas.DataFrame()

bigquery/tests/unit/test_job.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import copy
16+
import json
1617
import unittest
1718

1819
import mock
@@ -22,6 +23,10 @@
2223
import pandas
2324
except (ImportError, AttributeError): # pragma: NO COVER
2425
pandas = None
26+
try:
27+
from google.cloud import bigquery_storage_v1beta1
28+
except (ImportError, AttributeError): # pragma: NO COVER
29+
bigquery_storage_v1beta1 = None
2530

2631

2732
def _make_credentials():
@@ -4543,6 +4548,114 @@ def test_to_dataframe(self):
45434548
self.assertEqual(len(df), 4) # verify the number of rows
45444549
self.assertEqual(list(df), ["name", "age"]) # verify the column names
45454550

4551+
@unittest.skipIf(pandas is None, "Requires `pandas`")
4552+
def test_to_dataframe_ddl_query(self):
4553+
# Destination table may have no schema for some DDL and DML queries.
4554+
query_resource = {
4555+
"jobComplete": True,
4556+
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
4557+
"schema": {"fields": []},
4558+
}
4559+
connection = _make_connection(query_resource)
4560+
client = _make_client(self.PROJECT, connection=connection)
4561+
resource = self._make_resource(ended=True)
4562+
job = self._get_target_class().from_api_repr(resource, client)
4563+
4564+
df = job.to_dataframe()
4565+
4566+
self.assertEqual(len(df), 0)
4567+
4568+
@unittest.skipIf(pandas is None, "Requires `pandas`")
4569+
@unittest.skipIf(
4570+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
4571+
)
4572+
def test_to_dataframe_bqstorage(self):
4573+
query_resource = {
4574+
"jobComplete": True,
4575+
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
4576+
"totalRows": "4",
4577+
"schema": {
4578+
"fields": [
4579+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
4580+
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
4581+
]
4582+
},
4583+
}
4584+
connection = _make_connection(query_resource)
4585+
client = _make_client(self.PROJECT, connection=connection)
4586+
resource = self._make_resource(ended=True)
4587+
job = self._get_target_class().from_api_repr(resource, client)
4588+
bqstorage_client = mock.create_autospec(
4589+
bigquery_storage_v1beta1.BigQueryStorageClient
4590+
)
4591+
session = bigquery_storage_v1beta1.types.ReadSession()
4592+
session.avro_schema.schema = json.dumps(
4593+
{
4594+
"type": "record",
4595+
"name": "__root__",
4596+
"fields": [
4597+
{"name": "name", "type": ["null", "string"]},
4598+
{"name": "age", "type": ["null", "long"]},
4599+
],
4600+
}
4601+
)
4602+
bqstorage_client.create_read_session.return_value = session
4603+
4604+
job.to_dataframe(bqstorage_client=bqstorage_client)
4605+
4606+
bqstorage_client.create_read_session.assert_called_once_with(
4607+
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
4608+
)
4609+
4610+
@unittest.skipIf(pandas is None, "Requires `pandas`")
4611+
def test_to_dataframe_column_dtypes(self):
4612+
begun_resource = self._make_resource()
4613+
query_resource = {
4614+
"jobComplete": True,
4615+
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
4616+
"totalRows": "4",
4617+
"schema": {
4618+
"fields": [
4619+
{"name": "start_timestamp", "type": "TIMESTAMP"},
4620+
{"name": "seconds", "type": "INT64"},
4621+
{"name": "miles", "type": "FLOAT64"},
4622+
{"name": "km", "type": "FLOAT64"},
4623+
{"name": "payment_type", "type": "STRING"},
4624+
{"name": "complete", "type": "BOOL"},
4625+
{"name": "date", "type": "DATE"},
4626+
]
4627+
},
4628+
}
4629+
row_data = [
4630+
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
4631+
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
4632+
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
4633+
]
4634+
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
4635+
query_resource["rows"] = rows
4636+
done_resource = copy.deepcopy(begun_resource)
4637+
done_resource["status"] = {"state": "DONE"}
4638+
connection = _make_connection(
4639+
begun_resource, query_resource, done_resource, query_resource
4640+
)
4641+
client = _make_client(project=self.PROJECT, connection=connection)
4642+
job = self._make_one(self.JOB_ID, self.QUERY, client)
4643+
4644+
df = job.to_dataframe(dtypes={"km": "float16"})
4645+
4646+
self.assertIsInstance(df, pandas.DataFrame)
4647+
self.assertEqual(len(df), 3) # verify the number of rows
4648+
exp_columns = [field["name"] for field in query_resource["schema"]["fields"]]
4649+
self.assertEqual(list(df), exp_columns) # verify the column names
4650+
4651+
self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]")
4652+
self.assertEqual(df.seconds.dtype.name, "int64")
4653+
self.assertEqual(df.miles.dtype.name, "float64")
4654+
self.assertEqual(df.km.dtype.name, "float16")
4655+
self.assertEqual(df.payment_type.dtype.name, "object")
4656+
self.assertEqual(df.complete.dtype.name, "bool")
4657+
self.assertEqual(df.date.dtype.name, "object")
4658+
45464659
def test_iter(self):
45474660
import types
45484661

0 commit comments

Comments
 (0)