|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | import copy |
| 16 | +import json |
16 | 17 | import unittest |
17 | 18 |
|
18 | 19 | import mock |
|
22 | 23 | import pandas |
23 | 24 | except (ImportError, AttributeError): # pragma: NO COVER |
24 | 25 | pandas = None |
| 26 | +try: |
| 27 | + from google.cloud import bigquery_storage_v1beta1 |
| 28 | +except (ImportError, AttributeError): # pragma: NO COVER |
| 29 | + bigquery_storage_v1beta1 = None |
25 | 30 |
|
26 | 31 |
|
27 | 32 | def _make_credentials(): |
@@ -4543,6 +4548,114 @@ def test_to_dataframe(self): |
4543 | 4548 | self.assertEqual(len(df), 4) # verify the number of rows |
4544 | 4549 | self.assertEqual(list(df), ["name", "age"]) # verify the column names |
4545 | 4550 |
|
| 4551 | + @unittest.skipIf(pandas is None, "Requires `pandas`") |
| 4552 | + def test_to_dataframe_ddl_query(self): |
| 4553 | + # Destination table may have no schema for some DDL and DML queries. |
| 4554 | + query_resource = { |
| 4555 | + "jobComplete": True, |
| 4556 | + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, |
| 4557 | + "schema": {"fields": []}, |
| 4558 | + } |
| 4559 | + connection = _make_connection(query_resource) |
| 4560 | + client = _make_client(self.PROJECT, connection=connection) |
| 4561 | + resource = self._make_resource(ended=True) |
| 4562 | + job = self._get_target_class().from_api_repr(resource, client) |
| 4563 | + |
| 4564 | + df = job.to_dataframe() |
| 4565 | + |
| 4566 | + self.assertEqual(len(df), 0) |
| 4567 | + |
| 4568 | + @unittest.skipIf(pandas is None, "Requires `pandas`") |
| 4569 | + @unittest.skipIf( |
| 4570 | + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" |
| 4571 | + ) |
| 4572 | + def test_to_dataframe_bqstorage(self): |
| 4573 | + query_resource = { |
| 4574 | + "jobComplete": True, |
| 4575 | + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, |
| 4576 | + "totalRows": "4", |
| 4577 | + "schema": { |
| 4578 | + "fields": [ |
| 4579 | + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, |
| 4580 | + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, |
| 4581 | + ] |
| 4582 | + }, |
| 4583 | + } |
| 4584 | + connection = _make_connection(query_resource) |
| 4585 | + client = _make_client(self.PROJECT, connection=connection) |
| 4586 | + resource = self._make_resource(ended=True) |
| 4587 | + job = self._get_target_class().from_api_repr(resource, client) |
| 4588 | + bqstorage_client = mock.create_autospec( |
| 4589 | + bigquery_storage_v1beta1.BigQueryStorageClient |
| 4590 | + ) |
| 4591 | + session = bigquery_storage_v1beta1.types.ReadSession() |
| 4592 | + session.avro_schema.schema = json.dumps( |
| 4593 | + { |
| 4594 | + "type": "record", |
| 4595 | + "name": "__root__", |
| 4596 | + "fields": [ |
| 4597 | + {"name": "name", "type": ["null", "string"]}, |
| 4598 | + {"name": "age", "type": ["null", "long"]}, |
| 4599 | + ], |
| 4600 | + } |
| 4601 | + ) |
| 4602 | + bqstorage_client.create_read_session.return_value = session |
| 4603 | + |
| 4604 | + job.to_dataframe(bqstorage_client=bqstorage_client) |
| 4605 | + |
| 4606 | + bqstorage_client.create_read_session.assert_called_once_with( |
| 4607 | + mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY |
| 4608 | + ) |
| 4609 | + |
| 4610 | + @unittest.skipIf(pandas is None, "Requires `pandas`") |
| 4611 | + def test_to_dataframe_column_dtypes(self): |
| 4612 | + begun_resource = self._make_resource() |
| 4613 | + query_resource = { |
| 4614 | + "jobComplete": True, |
| 4615 | + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, |
| 4616 | + "totalRows": "4", |
| 4617 | + "schema": { |
| 4618 | + "fields": [ |
| 4619 | + {"name": "start_timestamp", "type": "TIMESTAMP"}, |
| 4620 | + {"name": "seconds", "type": "INT64"}, |
| 4621 | + {"name": "miles", "type": "FLOAT64"}, |
| 4622 | + {"name": "km", "type": "FLOAT64"}, |
| 4623 | + {"name": "payment_type", "type": "STRING"}, |
| 4624 | + {"name": "complete", "type": "BOOL"}, |
| 4625 | + {"name": "date", "type": "DATE"}, |
| 4626 | + ] |
| 4627 | + }, |
| 4628 | + } |
| 4629 | + row_data = [ |
| 4630 | + ["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"], |
| 4631 | + ["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"], |
| 4632 | + ["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"], |
| 4633 | + ] |
| 4634 | + rows = [{"f": [{"v": field} for field in row]} for row in row_data] |
| 4635 | + query_resource["rows"] = rows |
| 4636 | + done_resource = copy.deepcopy(begun_resource) |
| 4637 | + done_resource["status"] = {"state": "DONE"} |
| 4638 | + connection = _make_connection( |
| 4639 | + begun_resource, query_resource, done_resource, query_resource |
| 4640 | + ) |
| 4641 | + client = _make_client(project=self.PROJECT, connection=connection) |
| 4642 | + job = self._make_one(self.JOB_ID, self.QUERY, client) |
| 4643 | + |
| 4644 | + df = job.to_dataframe(dtypes={"km": "float16"}) |
| 4645 | + |
| 4646 | + self.assertIsInstance(df, pandas.DataFrame) |
| 4647 | + self.assertEqual(len(df), 3) # verify the number of rows |
| 4648 | + exp_columns = [field["name"] for field in query_resource["schema"]["fields"]] |
| 4649 | + self.assertEqual(list(df), exp_columns) # verify the column names |
| 4650 | + |
| 4651 | + self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]") |
| 4652 | + self.assertEqual(df.seconds.dtype.name, "int64") |
| 4653 | + self.assertEqual(df.miles.dtype.name, "float64") |
| 4654 | + self.assertEqual(df.km.dtype.name, "float16") |
| 4655 | + self.assertEqual(df.payment_type.dtype.name, "object") |
| 4656 | + self.assertEqual(df.complete.dtype.name, "bool") |
| 4657 | + self.assertEqual(df.date.dtype.name, "object") |
| 4658 | + |
4546 | 4659 | def test_iter(self): |
4547 | 4660 | import types |
4548 | 4661 |
|
|
0 commit comments