File tree Expand file tree Collapse file tree 5 files changed +45
-26
lines changed
Expand file tree Collapse file tree 5 files changed +45
-26
lines changed Original file line number Diff line number Diff line change 3030import pytest
3131import six
3232
33+ try :
34+ import fastparquet
35+ except (ImportError , AttributeError ):
36+ fastparquet = None
3337try :
3438 import pandas
3539except (ImportError , AttributeError ):
@@ -3108,8 +3112,15 @@ def test_list_rows_as_dataframe(client):
31083112
31093113
31103114@pytest .mark .skipif (pandas is None , reason = "Requires `pandas`" )
3111- @pytest .mark .skipif (pyarrow is None , reason = "Requires `pyarrow`" )
3112- def test_load_table_from_dataframe (client , to_delete ):
3115+ @pytest .mark .parametrize ("parquet_engine" , ["pyarrow" , "fastparquet" ])
3116+ def test_load_table_from_dataframe (client , to_delete , parquet_engine ):
3117+ if parquet_engine == "pyarrow" and pyarrow is None :
3118+ pytest .skip ("Requires `pyarrow`" )
3119+ if parquet_engine == "fastparquet" and fastparquet is None :
3120+ pytest .skip ("Requires `fastparquet`" )
3121+
3122+ pandas .set_option ("io.parquet.engine" , parquet_engine )
3123+
31133124 dataset_id = "load_table_from_dataframe_{}" .format (_millis ())
31143125 dataset = bigquery .Dataset (client .dataset (dataset_id ))
31153126 client .create_dataset (dataset )
Original file line number Diff line number Diff line change 2424import functools
2525import gzip
2626import os
27+ import tempfile
2728import uuid
2829
2930import six
@@ -1124,10 +1125,10 @@ def load_table_from_dataframe(
11241125 Raises:
11251126 ImportError:
11261127 If a usable parquet engine cannot be found. This method
1127- requires :mod:`pyarrow` to be installed.
1128+ requires :mod:`pyarrow` or :mod:`fastparquet` to be
1129+ installed.
11281130 """
1129- buffer = six .BytesIO ()
1130- dataframe .to_parquet (buffer )
1131+ job_id = _make_job_id (job_id , job_id_prefix )
11311132
11321133 if job_config is None :
11331134 job_config = job .LoadJobConfig ()
@@ -1136,17 +1137,27 @@ def load_table_from_dataframe(
11361137 if location is None :
11371138 location = self .location
11381139
1139- return self .load_table_from_file (
1140- buffer ,
1141- destination ,
1142- num_retries = num_retries ,
1143- rewind = True ,
1144- job_id = job_id ,
1145- job_id_prefix = job_id_prefix ,
1146- location = location ,
1147- project = project ,
1148- job_config = job_config ,
1149- )
1140+ tmpfd , tmppath = tempfile .mkstemp (suffix = "_job_{}.parquet" .format (job_id [:8 ]))
1141+ os .close (tmpfd )
1142+
1143+ try :
1144+ dataframe .to_parquet (tmppath )
1145+
1146+ with open (tmppath , "rb" ) as parquet_file :
1147+ return self .load_table_from_file (
1148+ parquet_file ,
1149+ destination ,
1150+ num_retries = num_retries ,
1151+ rewind = True ,
1152+ job_id = job_id ,
1153+ job_id_prefix = job_id_prefix ,
1154+ location = location ,
1155+ project = project ,
1156+ job_config = job_config ,
1157+ )
1158+
1159+ finally :
1160+ os .remove (tmppath )
11501161
11511162 def _do_resumable_upload (self , stream , metadata , num_retries ):
11521163 """Perform a resumable upload.
Original file line number Diff line number Diff line change @@ -124,7 +124,7 @@ def snippets(session):
124124 session .install ('-e' , local_dep )
125125 session .install ('-e' , os .path .join ('..' , 'storage' ))
126126 session .install ('-e' , os .path .join ('..' , 'test_utils' ))
127- session .install ('-e' , '.[pandas, pyarrow]' )
127+ session .install ('-e' , '.[pandas, pyarrow, fastparquet ]' )
128128
129129 # Run py.test against the snippets tests.
130130 session .run (
Original file line number Diff line number Diff line change 3939 # Exclude PyArrow dependency from Windows Python 2.7.
4040 'pyarrow: platform_system != "Windows" or python_version >= "3.4"' :
4141 'pyarrow>=0.4.1' ,
42+ 'fastparquet' : ['fastparquet' , 'python-snappy' ],
4243}
4344
4445
Original file line number Diff line number Diff line change @@ -4658,17 +4658,15 @@ def test_load_table_from_dataframe(self):
46584658 self .TABLE_REF ,
46594659 num_retries = _DEFAULT_NUM_RETRIES ,
46604660 rewind = True ,
4661- job_id = None ,
4661+ job_id = mock . ANY ,
46624662 job_id_prefix = None ,
46634663 location = None ,
46644664 project = None ,
46654665 job_config = mock .ANY ,
46664666 )
46674667
46684668 sent_file = load_table_from_file .mock_calls [0 ][1 ][1 ]
4669- sent_bytes = sent_file .getvalue ()
4670- assert isinstance (sent_bytes , bytes )
4671- assert len (sent_bytes ) > 0
4669+ assert sent_file .closed
46724670
46734671 sent_config = load_table_from_file .mock_calls [0 ][2 ]["job_config" ]
46744672 assert sent_config .source_format == job .SourceFormat .PARQUET
@@ -4695,17 +4693,15 @@ def test_load_table_from_dataframe_w_client_location(self):
46954693 self .TABLE_REF ,
46964694 num_retries = _DEFAULT_NUM_RETRIES ,
46974695 rewind = True ,
4698- job_id = None ,
4696+ job_id = mock . ANY ,
46994697 job_id_prefix = None ,
47004698 location = self .LOCATION ,
47014699 project = None ,
47024700 job_config = mock .ANY ,
47034701 )
47044702
47054703 sent_file = load_table_from_file .mock_calls [0 ][1 ][1 ]
4706- sent_bytes = sent_file .getvalue ()
4707- assert isinstance (sent_bytes , bytes )
4708- assert len (sent_bytes ) > 0
4704+ assert sent_file .closed
47094705
47104706 sent_config = load_table_from_file .mock_calls [0 ][2 ]["job_config" ]
47114707 assert sent_config .source_format == job .SourceFormat .PARQUET
@@ -4735,7 +4731,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
47354731 self .TABLE_REF ,
47364732 num_retries = _DEFAULT_NUM_RETRIES ,
47374733 rewind = True ,
4738- job_id = None ,
4734+ job_id = mock . ANY ,
47394735 job_id_prefix = None ,
47404736 location = self .LOCATION ,
47414737 project = None ,
You can’t perform that action at this time.
0 commit comments