Skip to content

Commit f5a12db

Browse files
committed
Add retry and client options for object store(only s3)
Add partitioning functionality for write operations
1 parent 61f981b commit f5a12db

6 files changed

Lines changed: 212 additions & 23 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,5 @@ name = "datafusion_python"
6161
crate-type = ["cdylib", "rlib"]
6262

6363
[profile.release]
64-
lto = true
65-
codegen-units = 1
64+
lto = "thin"
65+
#codegen-units = 1

python/datafusion/dataframe.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
TYPE_CHECKING,
2727
Any,
2828
Iterable,
29+
List,
2930
Literal,
3031
Optional,
3132
Union,
@@ -56,6 +57,7 @@
5657

5758
from enum import Enum
5859

60+
from datafusion._internal import InsertOp
5961

6062
# excerpt from deltalake
6163
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
@@ -875,6 +877,7 @@ def except_all(self, other: DataFrame) -> DataFrame:
875877
"""
876878
return DataFrame(self.df.except_all(other.df))
877879

880+
@overload
878881
def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None:
879882
"""Execute the :py:class:`DataFrame` and write the results to a CSV file.
880883
@@ -883,6 +886,19 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None
883886
with_header: If true, output the CSV header row.
884887
"""
885888
self.df.write_csv(str(path), with_header)
889+
890+
@overload
891+
def write_csv(self, path: str | pathlib.Path, with_header: bool = False, insert_operation: InsertOp = InsertOp.Append, single_file_output: bool = False, partition_by: Optional[List[str]] = None,) -> None:
892+
"""Execute the :py:class:`DataFrame` and write the results to a CSV file.
893+
894+
Args:
895+
path: Path of the CSV file to write.
896+
with_header: If true, output the CSV header row.
897+
insert_operation: The operation to perform on the CSV file(Append, Overwrite, Replace).
898+
single_file_output: If true, write the CSV file as a single file.
899+
partition_by: The columns to partition the CSV file by.
900+
"""
901+
self.df.write_csv(str(path), with_header, insert_operation, single_file_output, partition_by or [])
886902

887903
@overload
888904
def write_parquet(
@@ -911,8 +927,11 @@ def write_parquet(
911927
def write_parquet(
912928
self,
913929
path: str | pathlib.Path,
914-
compression: Union[str, Compression, ParquetWriterOptions] = Compression.ZSTD,
930+
compression: Union[str, Compression] = Compression.ZSTD,
915931
compression_level: int | None = None,
932+
insert_operation: InsertOp = InsertOp.Append,
933+
single_file_output: bool = False,
934+
partition_by: Optional[List[str]] = None,
916935
) -> None:
917936
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
918937
@@ -931,12 +950,16 @@ def write_parquet(
931950
compression_level: Compression level to use. For ZSTD, the
932951
recommended range is 1 to 22, with the default being 4. Higher levels
933952
provide better compression but slower speed.
953+
insert_operation: The operation to perform on the Parquet file(Append, Overwrite, Replace).
954+
single_file_output: If true, write the Parquet file as a single file.
955+
partition_by: The columns to partition the Parquet file by.
934956
"""
957+
935958
if isinstance(compression, ParquetWriterOptions):
936959
if compression_level is not None:
937960
msg = "compression_level should be None when using ParquetWriterOptions"
938961
raise ValueError(msg)
939-
self.write_parquet_with_options(path, compression)
962+
self.write_parquet_with_options(path, compression, insert_operation, single_file_output, partition_by or [])
940963
return
941964

942965
if isinstance(compression, str):
@@ -948,10 +971,14 @@ def write_parquet(
948971
):
949972
compression_level = compression.get_default_level()
950973

951-
self.df.write_parquet(str(path), compression.value, compression_level)
974+
self.df.write_parquet(str(path), compression.value, compression_level, insert_operation, single_file_output, partition_by or [])
952975

953976
def write_parquet_with_options(
954-
self, path: str | pathlib.Path, options: ParquetWriterOptions
977+
self, path: str | pathlib.Path,
978+
options: ParquetWriterOptions,
979+
insert_operation: InsertOp = InsertOp.Append,
980+
single_file_output: bool = False,
981+
partition_by: Optional[List[str]] = None,
955982
) -> None:
956983
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
957984
@@ -1000,15 +1027,21 @@ def write_parquet_with_options(
10001027
str(path),
10011028
options_internal,
10021029
column_specific_options_internal,
1030+
insert_operation,
1031+
single_file_output,
1032+
partition_by,
10031033
)
10041034

1005-
def write_json(self, path: str | pathlib.Path) -> None:
1035+
def write_json(self, path: str | pathlib.Path, insert_operation: InsertOp = InsertOp.Append, single_file_output: bool = False, partition_by: Optional[List[str]] = None) -> None:
10061036
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.
10071037
10081038
Args:
10091039
path: Path of the JSON file to write.
1040+
insert_operation: The operation to perform on the JSON file(Append, Overwrite, Replace).
1041+
single_file_output: If true, write the JSON file as a single file.
1042+
partition_by: The columns to partition the JSON file by.
10101043
"""
1011-
self.df.write_json(str(path))
1044+
self.df.write_json(str(path), insert_operation, single_file_output, partition_by or [])
10121045

10131046
def to_arrow_table(self) -> pa.Table:
10141047
"""Execute the :py:class:`DataFrame` and convert it into an Arrow Table.

python/datafusion/object_store.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,7 @@
2424
MicrosoftAzure = object_store.MicrosoftAzure
2525
Http = object_store.Http
2626

27-
__all__ = ["AmazonS3", "GoogleCloud", "Http", "LocalFileSystem", "MicrosoftAzure"]
27+
RetryConfig = object_store.RetryConfig
28+
ClientOptions = object_store.ClientOptions
29+
30+
__all__ = ["AmazonS3", "GoogleCloud", "Http", "LocalFileSystem", "MicrosoftAzure", "RetryConfig", "ClientOptions"]

src/dataframe.rs

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
3434
use datafusion::datasource::TableProvider;
3535
use datafusion::error::DataFusionError;
3636
use datafusion::execution::SendableRecordBatchStream;
37+
use datafusion::logical_expr::dml::InsertOp;
3738
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
3839
use datafusion::prelude::*;
3940
use datafusion_ffi::table_provider::FFI_TableProvider;
@@ -58,6 +59,27 @@ use crate::{
5859
expr::{sort_expr::PySortExpr, PyExpr},
5960
};
6061

62+
#[derive(Clone, Copy, PartialEq)]
63+
#[pyclass(name = "InsertOp", module = "datafusion", eq, eq_int)]
64+
pub enum PyInsertOp {
65+
#[pyo3(name = "Append")]
66+
Append,
67+
#[pyo3(name = "Overwrite")]
68+
Overwrite,
69+
#[pyo3(name = "Replace")]
70+
Replace,
71+
}
72+
73+
impl From<PyInsertOp> for InsertOp {
74+
fn from(op: PyInsertOp) -> Self {
75+
match op {
76+
PyInsertOp::Append => InsertOp::Append,
77+
PyInsertOp::Overwrite => InsertOp::Overwrite,
78+
PyInsertOp::Replace => InsertOp::Replace,
79+
}
80+
}
81+
}
82+
6183
// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
6284
// - we have not decided on the table_provider approach yet
6385
// this is an interim implementation
@@ -743,33 +765,42 @@ impl PyDataFrame {
743765
}
744766

745767
/// Write a `DataFrame` to a CSV file.
746-
fn write_csv(&self, path: &str, with_header: bool, py: Python) -> PyDataFusionResult<()> {
768+
fn write_csv(&self, path: &str, with_header: bool, insert_operation: PyInsertOp, single_file_output: bool, partition_by: Vec<String>, py: Python) -> PyDataFusionResult<()> {
747769
let csv_options = CsvOptions {
748770
has_header: Some(with_header),
749771
..Default::default()
750772
};
751-
wait_for_future(
773+
let _ = wait_for_future(
752774
py,
753775
self.df.as_ref().clone().write_csv(
754776
path,
755-
DataFrameWriteOptions::new(),
777+
DataFrameWriteOptions::new()
778+
.with_insert_operation(insert_operation.into())
779+
.with_single_file_output(single_file_output)
780+
.with_partition_by(partition_by),
756781
Some(csv_options),
757782
),
758-
)??;
783+
)?;
759784
Ok(())
760785
}
761786

762787
/// Write a `DataFrame` to a Parquet file.
763788
#[pyo3(signature = (
764789
path,
765790
compression="zstd",
766-
compression_level=None
791+
compression_level=None,
792+
insert_operation=PyInsertOp::Append,
793+
single_file_output=false,
794+
partition_by=vec![],
767795
))]
768796
fn write_parquet(
769797
&self,
770798
path: &str,
771799
compression: &str,
772800
compression_level: Option<u32>,
801+
insert_operation: PyInsertOp,
802+
single_file_output: bool,
803+
partition_by: Vec<String>,
773804
py: Python,
774805
) -> PyDataFusionResult<()> {
775806
fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
@@ -813,19 +844,33 @@ impl PyDataFrame {
813844
py,
814845
self.df.as_ref().clone().write_parquet(
815846
path,
816-
DataFrameWriteOptions::new(),
847+
DataFrameWriteOptions::new()
848+
.with_insert_operation(insert_operation.into())
849+
.with_single_file_output(single_file_output)
850+
.with_partition_by(partition_by),
817851
Option::from(options),
818852
),
819853
)??;
820854
Ok(())
821855
}
822856

823857
/// Write a `DataFrame` to a Parquet file, using advanced options.
858+
#[pyo3(signature = (
859+
path,
860+
options,
861+
column_specific_options,
862+
insert_operation=PyInsertOp::Append,
863+
single_file_output=false,
864+
partition_by=vec![],
865+
))]
824866
fn write_parquet_with_options(
825867
&self,
826868
path: &str,
827869
options: PyParquetWriterOptions,
828870
column_specific_options: HashMap<String, PyParquetColumnOptions>,
871+
insert_operation: PyInsertOp,
872+
single_file_output: bool,
873+
partition_by: Vec<String>,
829874
py: Python,
830875
) -> PyDataFusionResult<()> {
831876
let table_options = TableParquetOptions {
@@ -841,22 +886,29 @@ impl PyDataFrame {
841886
py,
842887
self.df.as_ref().clone().write_parquet(
843888
path,
844-
DataFrameWriteOptions::new(),
889+
DataFrameWriteOptions::new()
890+
.with_insert_operation(insert_operation.into())
891+
.with_single_file_output(single_file_output)
892+
.with_partition_by(partition_by),
845893
Option::from(table_options),
846894
),
847895
)??;
848896
Ok(())
849897
}
850898

851899
/// Executes a query and writes the results to a partitioned JSON file.
852-
fn write_json(&self, path: &str, py: Python) -> PyDataFusionResult<()> {
853-
wait_for_future(
900+
fn write_json(&self, path: &str, insert_operation: PyInsertOp, single_file_output: bool, partition_by: Vec<String>, py: Python) -> PyDataFusionResult<()> {
901+
let _ = wait_for_future(
854902
py,
855903
self.df
856904
.as_ref()
857905
.clone()
858-
.write_json(path, DataFrameWriteOptions::new(), None),
859-
)??;
906+
.write_json(path, DataFrameWriteOptions::new()
907+
.with_insert_operation(insert_operation.into())
908+
.with_single_file_output(single_file_output)
909+
.with_partition_by(partition_by),
910+
None),
911+
)?;
860912
Ok(())
861913
}
862914

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
9797
m.add_class::<physical_plan::PyExecutionPlan>()?;
9898
m.add_class::<record_batch::PyRecordBatch>()?;
9999
m.add_class::<record_batch::PyRecordBatchStream>()?;
100+
m.add_class::<dataframe::PyInsertOp>()?;
100101

101102
let catalog = PyModule::new(py, "catalog")?;
102103
catalog::init_module(&catalog)?;

0 commit comments

Comments
 (0)