Skip to content

Commit 675dbd8

Browse files
authored
Use temporary file in load_table_from_dataframe (googleapis#7545)
* Use temporary file in load_table_from_dataframe This fixes a bug where `load_table_from_dataframe` could not be used with the `fastparquet` library. It should also use less memory when uploading large dataframes. * Add tests using the fastparquet engine.
1 parent c2b5dfc commit 675dbd8

File tree

5 files changed

+45
-26
lines changed

5 files changed

+45
-26
lines changed

bigquery/docs/snippets.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
import pytest
3131
import six
3232

33+
try:
34+
import fastparquet
35+
except (ImportError, AttributeError):
36+
fastparquet = None
3337
try:
3438
import pandas
3539
except (ImportError, AttributeError):
@@ -3108,8 +3112,15 @@ def test_list_rows_as_dataframe(client):
31083112

31093113

31103114
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
3111-
@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
3112-
def test_load_table_from_dataframe(client, to_delete):
3115+
@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"])
3116+
def test_load_table_from_dataframe(client, to_delete, parquet_engine):
3117+
if parquet_engine == "pyarrow" and pyarrow is None:
3118+
pytest.skip("Requires `pyarrow`")
3119+
if parquet_engine == "fastparquet" and fastparquet is None:
3120+
pytest.skip("Requires `fastparquet`")
3121+
3122+
pandas.set_option("io.parquet.engine", parquet_engine)
3123+
31133124
dataset_id = "load_table_from_dataframe_{}".format(_millis())
31143125
dataset = bigquery.Dataset(client.dataset(dataset_id))
31153126
client.create_dataset(dataset)

bigquery/google/cloud/bigquery/client.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import functools
2525
import gzip
2626
import os
27+
import tempfile
2728
import uuid
2829

2930
import six
@@ -1124,10 +1125,10 @@ def load_table_from_dataframe(
11241125
Raises:
11251126
ImportError:
11261127
If a usable parquet engine cannot be found. This method
1127-
requires :mod:`pyarrow` to be installed.
1128+
requires :mod:`pyarrow` or :mod:`fastparquet` to be
1129+
installed.
11281130
"""
1129-
buffer = six.BytesIO()
1130-
dataframe.to_parquet(buffer)
1131+
job_id = _make_job_id(job_id, job_id_prefix)
11311132

11321133
if job_config is None:
11331134
job_config = job.LoadJobConfig()
@@ -1136,17 +1137,27 @@ def load_table_from_dataframe(
11361137
if location is None:
11371138
location = self.location
11381139

1139-
return self.load_table_from_file(
1140-
buffer,
1141-
destination,
1142-
num_retries=num_retries,
1143-
rewind=True,
1144-
job_id=job_id,
1145-
job_id_prefix=job_id_prefix,
1146-
location=location,
1147-
project=project,
1148-
job_config=job_config,
1149-
)
1140+
tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
1141+
os.close(tmpfd)
1142+
1143+
try:
1144+
dataframe.to_parquet(tmppath)
1145+
1146+
with open(tmppath, "rb") as parquet_file:
1147+
return self.load_table_from_file(
1148+
parquet_file,
1149+
destination,
1150+
num_retries=num_retries,
1151+
rewind=True,
1152+
job_id=job_id,
1153+
job_id_prefix=job_id_prefix,
1154+
location=location,
1155+
project=project,
1156+
job_config=job_config,
1157+
)
1158+
1159+
finally:
1160+
os.remove(tmppath)
11501161

11511162
def _do_resumable_upload(self, stream, metadata, num_retries):
11521163
"""Perform a resumable upload.

bigquery/noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def snippets(session):
124124
session.install('-e', local_dep)
125125
session.install('-e', os.path.join('..', 'storage'))
126126
session.install('-e', os.path.join('..', 'test_utils'))
127-
session.install('-e', '.[pandas, pyarrow]')
127+
session.install('-e', '.[pandas, pyarrow, fastparquet]')
128128

129129
# Run py.test against the snippets tests.
130130
session.run(

bigquery/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
# Exclude PyArrow dependency from Windows Python 2.7.
4040
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':
4141
'pyarrow>=0.4.1',
42+
'fastparquet': ['fastparquet', 'python-snappy'],
4243
}
4344

4445

bigquery/tests/unit/test_client.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4658,17 +4658,15 @@ def test_load_table_from_dataframe(self):
46584658
self.TABLE_REF,
46594659
num_retries=_DEFAULT_NUM_RETRIES,
46604660
rewind=True,
4661-
job_id=None,
4661+
job_id=mock.ANY,
46624662
job_id_prefix=None,
46634663
location=None,
46644664
project=None,
46654665
job_config=mock.ANY,
46664666
)
46674667

46684668
sent_file = load_table_from_file.mock_calls[0][1][1]
4669-
sent_bytes = sent_file.getvalue()
4670-
assert isinstance(sent_bytes, bytes)
4671-
assert len(sent_bytes) > 0
4669+
assert sent_file.closed
46724670

46734671
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
46744672
assert sent_config.source_format == job.SourceFormat.PARQUET
@@ -4695,17 +4693,15 @@ def test_load_table_from_dataframe_w_client_location(self):
46954693
self.TABLE_REF,
46964694
num_retries=_DEFAULT_NUM_RETRIES,
46974695
rewind=True,
4698-
job_id=None,
4696+
job_id=mock.ANY,
46994697
job_id_prefix=None,
47004698
location=self.LOCATION,
47014699
project=None,
47024700
job_config=mock.ANY,
47034701
)
47044702

47054703
sent_file = load_table_from_file.mock_calls[0][1][1]
4706-
sent_bytes = sent_file.getvalue()
4707-
assert isinstance(sent_bytes, bytes)
4708-
assert len(sent_bytes) > 0
4704+
assert sent_file.closed
47094705

47104706
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
47114707
assert sent_config.source_format == job.SourceFormat.PARQUET
@@ -4735,7 +4731,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
47354731
self.TABLE_REF,
47364732
num_retries=_DEFAULT_NUM_RETRIES,
47374733
rewind=True,
4738-
job_id=None,
4734+
job_id=mock.ANY,
47394735
job_id_prefix=None,
47404736
location=self.LOCATION,
47414737
project=None,

0 commit comments

Comments
 (0)