# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Options for reading various file formats.""" from __future__ import annotations import warnings from typing import TYPE_CHECKING import pyarrow as pa from datafusion.expr import sort_list_to_raw_sort_list if TYPE_CHECKING: from datafusion.expr import SortExpr from ._internal import options __all__ = ["CsvReadOptions"] DEFAULT_MAX_INFER_SCHEMA = 1000 class CsvReadOptions: """Options for reading CSV files. This class provides a builder pattern for configuring CSV reading options. All methods starting with ``with_`` return ``self`` to allow method chaining. """ def __init__( self, *, has_header: bool = True, delimiter: str = ",", quote: str = '"', terminator: str | None = None, escape: str | None = None, comment: str | None = None, newlines_in_values: bool = False, schema: pa.Schema | None = None, schema_infer_max_records: int = DEFAULT_MAX_INFER_SCHEMA, file_extension: str = ".csv", table_partition_cols: list[tuple[str, pa.DataType]] | None = None, file_compression_type: str = "", file_sort_order: list[list[SortExpr]] | None = None, null_regex: str | None = None, truncated_rows: bool = False, ) -> None: """Initialize CsvReadOptions. Args: has_header: Does the CSV file have a header row? If schema inference is run on a file with no headers, default column names are created. delimiter: Column delimiter character. Must be a single ASCII character. quote: Quote character for fields containing delimiters or newlines. Must be a single ASCII character. terminator: Optional line terminator character. If ``None``, uses CRLF. Must be a single ASCII character. escape: Optional escape character for quotes. Must be a single ASCII character. comment: If specified, lines beginning with this character are ignored. Must be a single ASCII character. newlines_in_values: Whether newlines in quoted values are supported. Parsing newlines in quoted values may be affected by execution behavior such as parallel file scanning. Setting this to ``True`` ensures that newlines in values are parsed successfully, which may reduce performance. schema: Optional PyArrow schema representing the CSV files. If ``None``, the CSV reader will try to infer it based on data in the file. schema_infer_max_records: Maximum number of rows to read from CSV files for schema inference if needed. file_extension: File extension; only files with this extension are selected for data input. table_partition_cols: Partition columns as a list of tuples of (column_name, data_type). file_compression_type: File compression type. Supported values are ``"gzip"``, ``"bz2"``, ``"xz"``, ``"zstd"``, or empty string for uncompressed. file_sort_order: Optional sort order of the files as a list of sort expressions per file. null_regex: Optional regex pattern to match null values in the CSV. truncated_rows: Whether to allow truncated rows when parsing. By default this is ``False`` and will error if the CSV rows have different lengths. When set to ``True``, it will allow records with less than the expected number of columns and fill the missing columns with nulls. If the record's schema is not nullable, it will still return an error. """ validate_single_character("delimiter", delimiter) validate_single_character("quote", quote) validate_single_character("terminator", terminator) validate_single_character("escape", escape) validate_single_character("comment", comment) self.has_header = has_header self.delimiter = delimiter self.quote = quote self.terminator = terminator self.escape = escape self.comment = comment self.newlines_in_values = newlines_in_values self.schema = schema self.schema_infer_max_records = schema_infer_max_records self.file_extension = file_extension self.table_partition_cols = table_partition_cols or [] self.file_compression_type = file_compression_type self.file_sort_order = file_sort_order or [] self.null_regex = null_regex self.truncated_rows = truncated_rows def with_has_header(self, has_header: bool) -> CsvReadOptions: """Configure whether the CSV has a header row.""" self.has_header = has_header return self def with_delimiter(self, delimiter: str) -> CsvReadOptions: """Configure the column delimiter.""" self.delimiter = delimiter return self def with_quote(self, quote: str) -> CsvReadOptions: """Configure the quote character.""" self.quote = quote return self def with_terminator(self, terminator: str | None) -> CsvReadOptions: """Configure the line terminator character.""" self.terminator = terminator return self def with_escape(self, escape: str | None) -> CsvReadOptions: """Configure the escape character.""" self.escape = escape return self def with_comment(self, comment: str | None) -> CsvReadOptions: """Configure the comment character.""" self.comment = comment return self def with_newlines_in_values(self, newlines_in_values: bool) -> CsvReadOptions: """Configure whether newlines in values are supported.""" self.newlines_in_values = newlines_in_values return self def with_schema(self, schema: pa.Schema | None) -> CsvReadOptions: """Configure the schema.""" self.schema = schema return self def with_schema_infer_max_records( self, schema_infer_max_records: int ) -> CsvReadOptions: """Configure maximum records for schema inference.""" self.schema_infer_max_records = schema_infer_max_records return self def with_file_extension(self, file_extension: str) -> CsvReadOptions: """Configure the file extension filter.""" self.file_extension = file_extension return self def with_table_partition_cols( self, table_partition_cols: list[tuple[str, pa.DataType]] ) -> CsvReadOptions: """Configure table partition columns.""" self.table_partition_cols = table_partition_cols return self def with_file_compression_type(self, file_compression_type: str) -> CsvReadOptions: """Configure file compression type.""" self.file_compression_type = file_compression_type return self def with_file_sort_order( self, file_sort_order: list[list[SortExpr]] ) -> CsvReadOptions: """Configure file sort order.""" self.file_sort_order = file_sort_order return self def with_null_regex(self, null_regex: str | None) -> CsvReadOptions: """Configure null value regex pattern.""" self.null_regex = null_regex return self def with_truncated_rows(self, truncated_rows: bool) -> CsvReadOptions: """Configure whether to allow truncated rows.""" self.truncated_rows = truncated_rows return self def to_inner(self) -> options.CsvReadOptions: """Convert this object into the underlying Rust structure. This is intended for internal use only. """ file_sort_order = ( [] if self.file_sort_order is None else [ sort_list_to_raw_sort_list(sort_list) for sort_list in self.file_sort_order ] ) return options.CsvReadOptions( has_header=self.has_header, delimiter=ord(self.delimiter[0]) if self.delimiter else ord(","), quote=ord(self.quote[0]) if self.quote else ord('"'), terminator=ord(self.terminator[0]) if self.terminator else None, escape=ord(self.escape[0]) if self.escape else None, comment=ord(self.comment[0]) if self.comment else None, newlines_in_values=self.newlines_in_values, schema=self.schema, schema_infer_max_records=self.schema_infer_max_records, file_extension=self.file_extension, table_partition_cols=_convert_table_partition_cols( self.table_partition_cols ), file_compression_type=self.file_compression_type or "", file_sort_order=file_sort_order, null_regex=self.null_regex, truncated_rows=self.truncated_rows, ) def validate_single_character(name: str, value: str | None) -> None: if value is not None and len(value) != 1: message = f"{name} must be a single character" raise ValueError(message) def _convert_table_partition_cols( table_partition_cols: list[tuple[str, str | pa.DataType]], ) -> list[tuple[str, pa.DataType]]: warn = False converted_table_partition_cols = [] for col, data_type in table_partition_cols: if isinstance(data_type, str): warn = True if data_type == "string": converted_data_type = pa.string() elif data_type == "int": converted_data_type = pa.int32() else: message = ( f"Unsupported literal data type '{data_type}' for partition " "column. Supported types are 'string' and 'int'" ) raise ValueError(message) else: converted_data_type = data_type converted_table_partition_cols.append((col, converted_data_type)) if warn: message = ( "using literals for table_partition_cols data types is deprecated," "use pyarrow types instead" ) warnings.warn( message, category=DeprecationWarning, stacklevel=2, ) return converted_table_partition_cols