Skip to content

Commit fe57c5a

Browse files
authored
BigQuery: Adds load_table_from_dataframe() and snippet (#5387)
* Adds load_table_from_dataframe() and snippet * Add index to DataFrame in bigquery_load_table_dataframe sample
1 parent 4e98d3b commit fe57c5a

5 files changed

Lines changed: 180 additions & 5 deletions

File tree

bigquery/google/cloud/bigquery/client.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -773,8 +773,8 @@ def load_table_from_file(
773773
job_config=None):
774774
"""Upload the contents of this table from a file-like object.
775775
776-
Like load_table_from_uri, this creates, starts and returns
777-
a ``LoadJob``.
776+
Similar to :meth:`load_table_from_uri`, this method creates, starts and
777+
returns a :class:`~google.cloud.bigquery.job.LoadJob`.
778778
779779
Arguments:
780780
file_obj (file): A file handle opened in binary mode for reading.
@@ -833,6 +833,63 @@ def load_table_from_file(
833833
raise exceptions.from_http_response(exc.response)
834834
return self.job_from_resource(response.json())
835835

836+
def load_table_from_dataframe(self, dataframe, destination,
837+
num_retries=_DEFAULT_NUM_RETRIES,
838+
job_id=None, job_id_prefix=None,
839+
location=None, project=None,
840+
job_config=None):
841+
"""Upload the contents of a table from a pandas DataFrame.
842+
843+
Similar to :meth:`load_table_from_uri`, this method creates, starts and
844+
returns a :class:`~google.cloud.bigquery.job.LoadJob`.
845+
846+
Arguments:
847+
dataframe (pandas.DataFrame):
848+
A :class:`~pandas.DataFrame` containing the data to load.
849+
destination (google.cloud.bigquery.table.TableReference):
850+
The destination table to use for loading the data. If it is an
851+
existing table, the schema of the :class:`~pandas.DataFrame`
852+
must match the schema of the destination table. If the table
853+
does not yet exist, the schema is inferred from the
854+
:class:`~pandas.DataFrame`.
855+
856+
Keyword Arguments:
857+
num_retries (int, optional): Number of upload retries.
858+
job_id (str, optional): Name of the job.
859+
job_id_prefix (str, optional):
860+
The user-provided prefix for a randomly generated
861+
job ID. This parameter will be ignored if a ``job_id`` is
862+
also given.
863+
location (str):
864+
Location where to run the job. Must match the location of the
865+
destination table.
866+
project (str, optional):
867+
Project ID of the project of where to run the job. Defaults
868+
to the client's project.
869+
job_config (google.cloud.bigquery.job.LoadJobConfig, optional):
870+
Extra configuration options for the job.
871+
872+
Returns:
873+
google.cloud.bigquery.job.LoadJob: A new load job.
874+
875+
Raises:
876+
ImportError:
877+
If a usable parquet engine cannot be found. This method
878+
requires one of :mod:`pyarrow` or :mod:`fastparquet` to be
879+
installed.
880+
"""
881+
buffer = six.BytesIO()
882+
dataframe.to_parquet(buffer)
883+
884+
if job_config is None:
885+
job_config = job.LoadJobConfig()
886+
job_config.source_format = job.SourceFormat.PARQUET
887+
888+
return self.load_table_from_file(
889+
buffer, destination, num_retries=num_retries, rewind=True,
890+
job_id=job_id, job_id_prefix=job_id_prefix, location=location,
891+
project=project, job_config=job_config)
892+
836893
def _do_resumable_upload(self, stream, metadata, num_retries):
837894
"""Perform a resumable upload.
838895

bigquery/nox.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def default(session):
4141
if session.interpreter == 'python3.4':
4242
session.install('-e', '.')
4343
else:
44-
session.install('-e', '.[pandas]')
44+
session.install('-e', '.[pandas, pyarrow]')
4545

4646
# IPython does not support Python 2 after version 5.x
4747
if session.interpreter == 'python2.7':
@@ -142,7 +142,7 @@ def snippets(session, py):
142142
os.path.join('..', 'storage'),
143143
os.path.join('..', 'test_utils'),
144144
)
145-
session.install('-e', '.[pandas]')
145+
session.install('-e', '.[pandas, pyarrow]')
146146

147147
# Run py.test against the system tests.
148148
session.run(

bigquery/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
]
3636
extras = {
3737
'pandas': 'pandas>=0.17.1',
38+
'pyarrow': 'pyarrow>=0.4.1',
3839
}
3940

4041

bigquery/tests/unit/test_client.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323
import six
2424
from six.moves import http_client
2525
import pytest
26+
try:
27+
import pandas
28+
except (ImportError, AttributeError): # pragma: NO COVER
29+
pandas = None
30+
try:
31+
import pyarrow
32+
except (ImportError, AttributeError): # pragma: NO COVER
33+
pyarrow = None
2634

2735
from google.cloud.bigquery.dataset import DatasetReference
2836

@@ -3484,6 +3492,68 @@ def test_load_table_from_file_bad_mode(self):
34843492
with pytest.raises(ValueError):
34853493
client.load_table_from_file(file_obj, self.TABLE_REF)
34863494

3495+
@unittest.skipIf(pandas is None, 'Requires `pandas`')
3496+
@unittest.skipIf(pyarrow is None, 'Requires `pyarrow`')
3497+
def test_load_table_from_dataframe(self):
3498+
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
3499+
from google.cloud.bigquery import job
3500+
3501+
client = self._make_client()
3502+
records = [
3503+
{'name': 'Monty', 'age': 100},
3504+
{'name': 'Python', 'age': 60},
3505+
]
3506+
dataframe = pandas.DataFrame(records)
3507+
3508+
load_patch = mock.patch(
3509+
'google.cloud.bigquery.client.Client.load_table_from_file',
3510+
autospec=True)
3511+
with load_patch as load_table_from_file:
3512+
client.load_table_from_dataframe(dataframe, self.TABLE_REF)
3513+
3514+
load_table_from_file.assert_called_once_with(
3515+
client, mock.ANY, self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES,
3516+
rewind=True, job_id=None, job_id_prefix=None, location=None,
3517+
project=None, job_config=mock.ANY)
3518+
3519+
sent_file = load_table_from_file.mock_calls[0][1][1]
3520+
sent_bytes = sent_file.getvalue()
3521+
assert isinstance(sent_bytes, bytes)
3522+
assert len(sent_bytes) > 0
3523+
3524+
sent_config = load_table_from_file.mock_calls[0][2]['job_config']
3525+
assert sent_config.source_format == job.SourceFormat.PARQUET
3526+
3527+
@unittest.skipIf(pandas is None, 'Requires `pandas`')
3528+
@unittest.skipIf(pyarrow is None, 'Requires `pyarrow`')
3529+
def test_load_table_from_dataframe_w_custom_job_config(self):
3530+
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
3531+
from google.cloud.bigquery import job
3532+
3533+
client = self._make_client()
3534+
records = [
3535+
{'name': 'Monty', 'age': 100},
3536+
{'name': 'Python', 'age': 60},
3537+
]
3538+
dataframe = pandas.DataFrame(records)
3539+
job_config = job.LoadJobConfig()
3540+
3541+
load_patch = mock.patch(
3542+
'google.cloud.bigquery.client.Client.load_table_from_file',
3543+
autospec=True)
3544+
with load_patch as load_table_from_file:
3545+
client.load_table_from_dataframe(
3546+
dataframe, self.TABLE_REF, job_config=job_config)
3547+
3548+
load_table_from_file.assert_called_once_with(
3549+
client, mock.ANY, self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES,
3550+
rewind=True, job_id=None, job_id_prefix=None, location=None,
3551+
project=None, job_config=mock.ANY)
3552+
3553+
sent_config = load_table_from_file.mock_calls[0][2]['job_config']
3554+
assert sent_config is job_config
3555+
assert sent_config.source_format == job.SourceFormat.PARQUET
3556+
34873557
# Low-level tests
34883558

34893559
@classmethod

docs/bigquery/snippets.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@
3131
import six
3232
try:
3333
import pandas
34-
except ImportError:
34+
except (ImportError, AttributeError):
3535
pandas = None
36+
try:
37+
import pyarrow
38+
except (ImportError, AttributeError):
39+
pyarrow = None
3640

3741
from google.cloud import bigquery
3842

@@ -2073,5 +2077,48 @@ def test_list_rows_as_dataframe(client):
20732077
assert len(df) == table.num_rows # verify the number of rows
20742078

20752079

2080+
@pytest.mark.skipif(pandas is None, reason='Requires `pandas`')
2081+
@pytest.mark.skipif(pyarrow is None, reason='Requires `pyarrow`')
2082+
def test_load_table_from_dataframe(client, to_delete):
2083+
dataset_id = 'load_table_dataframe_dataset_{}'.format(_millis())
2084+
dataset = bigquery.Dataset(client.dataset(dataset_id))
2085+
client.create_dataset(dataset)
2086+
to_delete.append(dataset)
2087+
2088+
# [START bigquery_load_table_dataframe]
2089+
# from google.cloud import bigquery
2090+
# client = bigquery.Client()
2091+
# dataset_id = 'my_dataset'
2092+
2093+
dataset_ref = client.dataset(dataset_id)
2094+
table_ref = dataset_ref.table('monty_python')
2095+
records = [
2096+
{'title': 'The Meaning of Life', 'release_year': 1983},
2097+
{'title': 'Monty Python and the Holy Grail', 'release_year': 1975},
2098+
{'title': 'Life of Brian', 'release_year': 1979},
2099+
{
2100+
'title': 'And Now for Something Completely Different',
2101+
'release_year': 1971
2102+
},
2103+
]
2104+
# Optionally set explicit indices.
2105+
# If indices are not specified, a column will be created for the default
2106+
# indices created by pandas.
2107+
index = ['Q24980', 'Q25043', 'Q24953', 'Q16403']
2108+
dataframe = pandas.DataFrame(
2109+
records, index=pandas.Index(index, name='wikidata_id'))
2110+
2111+
job = client.load_table_from_dataframe(dataframe, table_ref, location='US')
2112+
2113+
job.result() # Waits for table load to complete.
2114+
2115+
assert job.state == 'DONE'
2116+
table = client.get_table(table_ref)
2117+
assert table.num_rows == 4
2118+
# [END bigquery_load_table_dataframe]
2119+
column_names = [field.name for field in table.schema]
2120+
assert sorted(column_names) == ['release_year', 'title', 'wikidata_id']
2121+
2122+
20762123
if __name__ == '__main__':
20772124
pytest.main()

0 commit comments

Comments
 (0)