@@ -34,6 +34,7 @@ use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
3434use datafusion:: datasource:: TableProvider ;
3535use datafusion:: error:: DataFusionError ;
3636use datafusion:: execution:: SendableRecordBatchStream ;
37+ use datafusion:: logical_expr:: dml:: InsertOp ;
3738use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
3839use datafusion:: prelude:: * ;
3940use datafusion_ffi:: table_provider:: FFI_TableProvider ;
@@ -58,6 +59,27 @@ use crate::{
5859 expr:: { sort_expr:: PySortExpr , PyExpr } ,
5960} ;
6061
62+ #[ derive( Clone , Copy , PartialEq ) ]
63+ #[ pyclass( name = "InsertOp" , module = "datafusion" , eq, eq_int) ]
64+ pub enum PyInsertOp {
65+ #[ pyo3( name = "Append" ) ]
66+ Append ,
67+ #[ pyo3( name = "Overwrite" ) ]
68+ Overwrite ,
69+ #[ pyo3( name = "Replace" ) ]
70+ Replace ,
71+ }
72+
73+ impl From < PyInsertOp > for InsertOp {
74+ fn from ( op : PyInsertOp ) -> Self {
75+ match op {
76+ PyInsertOp :: Append => InsertOp :: Append ,
77+ PyInsertOp :: Overwrite => InsertOp :: Overwrite ,
78+ PyInsertOp :: Replace => InsertOp :: Replace ,
79+ }
80+ }
81+ }
82+
6183// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
6284// - we have not decided on the table_provider approach yet
6385// this is an interim implementation
@@ -743,33 +765,42 @@ impl PyDataFrame {
743765 }
744766
745767 /// Write a `DataFrame` to a CSV file.
746- fn write_csv ( & self , path : & str , with_header : bool , py : Python ) -> PyDataFusionResult < ( ) > {
768+ fn write_csv ( & self , path : & str , with_header : bool , insert_operation : PyInsertOp , single_file_output : bool , partition_by : Vec < String > , py : Python ) -> PyDataFusionResult < ( ) > {
747769 let csv_options = CsvOptions {
748770 has_header : Some ( with_header) ,
749771 ..Default :: default ( )
750772 } ;
751- wait_for_future (
773+ let _ = wait_for_future (
752774 py,
753775 self . df . as_ref ( ) . clone ( ) . write_csv (
754776 path,
755- DataFrameWriteOptions :: new ( ) ,
777+ DataFrameWriteOptions :: new ( )
778+ . with_insert_operation ( insert_operation. into ( ) )
779+ . with_single_file_output ( single_file_output)
780+ . with_partition_by ( partition_by) ,
756781 Some ( csv_options) ,
757782 ) ,
758- ) ?? ;
783+ ) ?;
759784 Ok ( ( ) )
760785 }
761786
762787 /// Write a `DataFrame` to a Parquet file.
763788 #[ pyo3( signature = (
764789 path,
765790 compression="zstd" ,
766- compression_level=None
791+ compression_level=None ,
792+ insert_operation=PyInsertOp :: Append ,
793+ single_file_output=false ,
794+ partition_by=vec![ ] ,
767795 ) ) ]
768796 fn write_parquet (
769797 & self ,
770798 path : & str ,
771799 compression : & str ,
772800 compression_level : Option < u32 > ,
801+ insert_operation : PyInsertOp ,
802+ single_file_output : bool ,
803+ partition_by : Vec < String > ,
773804 py : Python ,
774805 ) -> PyDataFusionResult < ( ) > {
775806 fn verify_compression_level ( cl : Option < u32 > ) -> Result < u32 , PyErr > {
@@ -813,19 +844,33 @@ impl PyDataFrame {
813844 py,
814845 self . df . as_ref ( ) . clone ( ) . write_parquet (
815846 path,
816- DataFrameWriteOptions :: new ( ) ,
847+ DataFrameWriteOptions :: new ( )
848+ . with_insert_operation ( insert_operation. into ( ) )
849+ . with_single_file_output ( single_file_output)
850+ . with_partition_by ( partition_by) ,
817851 Option :: from ( options) ,
818852 ) ,
819853 ) ??;
820854 Ok ( ( ) )
821855 }
822856
823857 /// Write a `DataFrame` to a Parquet file, using advanced options.
858+ #[ pyo3( signature = (
859+ path,
860+ options,
861+ column_specific_options,
862+ insert_operation=PyInsertOp :: Append ,
863+ single_file_output=false ,
864+ partition_by=vec![ ] ,
865+ ) ) ]
824866 fn write_parquet_with_options (
825867 & self ,
826868 path : & str ,
827869 options : PyParquetWriterOptions ,
828870 column_specific_options : HashMap < String , PyParquetColumnOptions > ,
871+ insert_operation : PyInsertOp ,
872+ single_file_output : bool ,
873+ partition_by : Vec < String > ,
829874 py : Python ,
830875 ) -> PyDataFusionResult < ( ) > {
831876 let table_options = TableParquetOptions {
@@ -841,22 +886,29 @@ impl PyDataFrame {
841886 py,
842887 self . df . as_ref ( ) . clone ( ) . write_parquet (
843888 path,
844- DataFrameWriteOptions :: new ( ) ,
889+ DataFrameWriteOptions :: new ( )
890+ . with_insert_operation ( insert_operation. into ( ) )
891+ . with_single_file_output ( single_file_output)
892+ . with_partition_by ( partition_by) ,
845893 Option :: from ( table_options) ,
846894 ) ,
847895 ) ??;
848896 Ok ( ( ) )
849897 }
850898
851899 /// Executes a query and writes the results to a partitioned JSON file.
852- fn write_json ( & self , path : & str , py : Python ) -> PyDataFusionResult < ( ) > {
853- wait_for_future (
900+ fn write_json ( & self , path : & str , insert_operation : PyInsertOp , single_file_output : bool , partition_by : Vec < String > , py : Python ) -> PyDataFusionResult < ( ) > {
901+ let _ = wait_for_future (
854902 py,
855903 self . df
856904 . as_ref ( )
857905 . clone ( )
858- . write_json ( path, DataFrameWriteOptions :: new ( ) , None ) ,
859- ) ??;
906+ . write_json ( path, DataFrameWriteOptions :: new ( )
907+ . with_insert_operation ( insert_operation. into ( ) )
908+ . with_single_file_output ( single_file_output)
909+ . with_partition_by ( partition_by) ,
910+ None ) ,
911+ ) ?;
860912 Ok ( ( ) )
861913 }
862914
0 commit comments