# Copyright 2020 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from abc import ABC, abstractmethod from enum import Enum from typing import TYPE_CHECKING, Dict, Optional if TYPE_CHECKING: from feast.protos.feast.core.DataFormat_pb2 import TableFormat as TableFormatProto class TableFormatType(Enum): """Enum for supported table formats""" DELTA = "delta" ICEBERG = "iceberg" HUDI = "hudi" class TableFormat(ABC): """ Abstract base class for table formats. Table formats encapsulate metadata and configuration specific to different table storage formats like Iceberg, Delta Lake, Hudi, etc. They provide a unified interface for configuring table-specific properties that are used when reading from or writing to these advanced table formats. This base class defines the contract that all table format implementations must follow, including serialization/deserialization capabilities and property management. Attributes: format_type (TableFormatType): The type of table format (iceberg, delta, hudi). properties (Dict[str, str]): Dictionary of format-specific properties. Examples: Table formats are typically used with data sources to specify advanced table metadata and reading options: >>> from feast.table_format import IcebergFormat >>> iceberg_format = IcebergFormat( ... catalog="my_catalog", ... namespace="my_namespace" ... ) >>> iceberg_format.set_property("snapshot-id", "123456789") """ def __init__( self, format_type: TableFormatType, properties: Optional[Dict[str, str]] = None ): self.format_type = format_type self.properties = properties or {} @abstractmethod def to_dict(self) -> Dict: """Convert table format to dictionary representation""" pass @classmethod @abstractmethod def from_dict(cls, data: Dict) -> "TableFormat": """Create table format from dictionary representation""" pass def get_property(self, key: str, default: Optional[str] = None) -> Optional[str]: """Get a table format property""" return self.properties.get(key, default) def set_property(self, key: str, value: str) -> None: """Set a table format property""" self.properties[key] = value class IcebergFormat(TableFormat): """ Apache Iceberg table format configuration. Iceberg is an open table format for huge analytic datasets. This class provides configuration for Iceberg-specific properties including catalog configuration, namespace settings, and table-level properties for reading and writing Iceberg tables. Args: catalog (Optional[str]): Name of the Iceberg catalog to use. The catalog manages table metadata and provides access to tables. namespace (Optional[str]): Namespace (schema/database) within the catalog where the table is located. properties (Optional[Dict[str, str]]): Properties for configuring Iceberg catalog and table operations (e.g., warehouse location, snapshot-id, as-of-timestamp, file format, compression, partitioning). Attributes: catalog (str): The Iceberg catalog name. namespace (str): The namespace within the catalog. properties (Dict[str, str]): Iceberg configuration properties. Examples: Basic Iceberg configuration: >>> iceberg_format = IcebergFormat( ... catalog="my_catalog", ... namespace="my_database" ... ) Advanced configuration with properties: >>> iceberg_format = IcebergFormat( ... catalog="spark_catalog", ... namespace="lakehouse", ... properties={ ... "warehouse": "s3://my-bucket/warehouse", ... "catalog-impl": "org.apache.iceberg.spark.SparkCatalog", ... "format-version": "2", ... "write.parquet.compression-codec": "snappy" ... } ... ) Reading from a specific snapshot: >>> iceberg_format = IcebergFormat(catalog="my_catalog", namespace="db") >>> iceberg_format.set_property("snapshot-id", "123456789") Time travel queries: >>> iceberg_format.set_property("as-of-timestamp", "1648684800000") """ def __init__( self, catalog: Optional[str] = None, namespace: Optional[str] = None, properties: Optional[Dict[str, str]] = None, ): super().__init__(TableFormatType.ICEBERG, properties) self.catalog = catalog self.namespace = namespace # Add catalog and namespace to properties if provided if catalog: self.properties["iceberg.catalog"] = catalog if namespace: self.properties["iceberg.namespace"] = namespace def to_dict(self) -> Dict: return { "format_type": self.format_type.value, "catalog": self.catalog, "namespace": self.namespace, "properties": self.properties, } @classmethod def from_dict(cls, data: Dict) -> "IcebergFormat": return cls( catalog=data.get("catalog"), namespace=data.get("namespace"), properties=data.get("properties", {}), ) def to_proto(self) -> "TableFormatProto": """Convert to protobuf TableFormat message""" from feast.protos.feast.core.DataFormat_pb2 import ( TableFormat as TableFormatProto, ) iceberg_proto = TableFormatProto.IcebergFormat( catalog=self.catalog or "", namespace=self.namespace or "", properties=self.properties, ) return TableFormatProto(iceberg_format=iceberg_proto) @classmethod def from_proto(cls, proto: "TableFormatProto") -> "IcebergFormat": """Create from protobuf TableFormat message""" iceberg_proto = proto.iceberg_format return cls( catalog=iceberg_proto.catalog if iceberg_proto.catalog else None, namespace=iceberg_proto.namespace if iceberg_proto.namespace else None, properties=dict(iceberg_proto.properties), ) class DeltaFormat(TableFormat): """ Delta Lake table format configuration. Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. This class provides configuration for Delta-specific properties including table properties, checkpoint locations, and versioning options. Args: checkpoint_location (Optional[str]): Location for storing Delta transaction logs and checkpoints. Required for streaming operations. properties (Optional[Dict[str, str]]): Properties for configuring Delta table behavior (e.g., auto-optimize, vacuum settings, data skipping). Attributes: checkpoint_location (str): Path to checkpoint storage location. properties (Dict[str, str]): Delta table configuration properties. Examples: Basic Delta configuration: >>> delta_format = DeltaFormat() Configuration with table properties: >>> delta_format = DeltaFormat( ... properties={ ... "delta.autoOptimize.optimizeWrite": "true", ... "delta.autoOptimize.autoCompact": "true", ... "delta.tuneFileSizesForRewrites": "true" ... } ... ) Streaming configuration with checkpoint: >>> delta_format = DeltaFormat( ... checkpoint_location="s3://my-bucket/checkpoints/my_table" ... ) Time travel - reading specific version: >>> delta_format = DeltaFormat() >>> delta_format.set_property("versionAsOf", "5") Time travel - reading at specific timestamp: >>> delta_format.set_property("timestampAsOf", "2023-01-01 00:00:00") """ def __init__( self, checkpoint_location: Optional[str] = None, properties: Optional[Dict[str, str]] = None, ): super().__init__(TableFormatType.DELTA, properties) self.checkpoint_location = checkpoint_location # Add checkpoint location to properties if provided if checkpoint_location: self.properties["delta.checkpointLocation"] = checkpoint_location def to_dict(self) -> Dict: return { "format_type": self.format_type.value, "checkpoint_location": self.checkpoint_location, "properties": self.properties, } @classmethod def from_dict(cls, data: Dict) -> "DeltaFormat": return cls( checkpoint_location=data.get("checkpoint_location"), properties=data.get("properties", {}), ) def to_proto(self) -> "TableFormatProto": """Convert to protobuf TableFormat message""" from feast.protos.feast.core.DataFormat_pb2 import ( TableFormat as TableFormatProto, ) delta_proto = TableFormatProto.DeltaFormat( checkpoint_location=self.checkpoint_location or "", properties=self.properties, ) return TableFormatProto(delta_format=delta_proto) @classmethod def from_proto(cls, proto: "TableFormatProto") -> "DeltaFormat": """Create from protobuf TableFormat message""" delta_proto = proto.delta_format return cls( checkpoint_location=delta_proto.checkpoint_location if delta_proto.checkpoint_location else None, properties=dict(delta_proto.properties), ) class HudiFormat(TableFormat): """ Apache Hudi table format configuration. Apache Hudi is a data management framework used to simplify incremental data processing and data pipeline development. This class provides configuration for Hudi-specific properties including table type, record keys, and write operations. Args: table_type (Optional[str]): Type of Hudi table. Options are: - "COPY_ON_WRITE": Stores data in columnar format (Parquet) and rewrites entire files - "MERGE_ON_READ": Stores data using combination of columnar and row-based formats record_key (Optional[str]): Field(s) that uniquely identify a record. Can be a single field or comma-separated list for composite keys. precombine_field (Optional[str]): Field used to determine the latest version of a record when multiple updates exist (usually a timestamp or version field). properties (Optional[Dict[str, str]]): Additional Hudi table properties for configuring compaction, indexing, and other Hudi features. Attributes: table_type (str): The Hudi table type (COPY_ON_WRITE or MERGE_ON_READ). record_key (str): The record key field(s). precombine_field (str): The field used for record deduplication. properties (Dict[str, str]): Additional Hudi configuration properties. Examples: Basic Hudi configuration: >>> hudi_format = HudiFormat( ... table_type="COPY_ON_WRITE", ... record_key="user_id", ... precombine_field="timestamp" ... ) Configuration with composite record key: >>> hudi_format = HudiFormat( ... table_type="MERGE_ON_READ", ... record_key="user_id,event_type", ... precombine_field="event_timestamp" ... ) Advanced configuration with table properties: >>> hudi_format = HudiFormat( ... table_type="COPY_ON_WRITE", ... record_key="id", ... precombine_field="updated_at", ... properties={ ... "hoodie.compaction.strategy": "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy", ... "hoodie.index.type": "BLOOM", ... "hoodie.bloom.index.parallelism": "100" ... } ... ) Reading incremental data: >>> hudi_format = HudiFormat(table_type="COPY_ON_WRITE") >>> hudi_format.set_property("hoodie.datasource.query.type", "incremental") >>> hudi_format.set_property("hoodie.datasource.read.begin.instanttime", "20230101000000") """ def __init__( self, table_type: Optional[str] = None, # COPY_ON_WRITE or MERGE_ON_READ record_key: Optional[str] = None, precombine_field: Optional[str] = None, properties: Optional[Dict[str, str]] = None, ): super().__init__(TableFormatType.HUDI, properties) self.table_type = table_type self.record_key = record_key self.precombine_field = precombine_field # Add Hudi-specific properties if provided if table_type: self.properties["hoodie.datasource.write.table.type"] = table_type if record_key: self.properties["hoodie.datasource.write.recordkey.field"] = record_key if precombine_field: self.properties["hoodie.datasource.write.precombine.field"] = ( precombine_field ) def to_dict(self) -> Dict: return { "format_type": self.format_type.value, "table_type": self.table_type, "record_key": self.record_key, "precombine_field": self.precombine_field, "properties": self.properties, } @classmethod def from_dict(cls, data: Dict) -> "HudiFormat": return cls( table_type=data.get("table_type"), record_key=data.get("record_key"), precombine_field=data.get("precombine_field"), properties=data.get("properties", {}), ) def to_proto(self) -> "TableFormatProto": """Convert to protobuf TableFormat message""" from feast.protos.feast.core.DataFormat_pb2 import ( TableFormat as TableFormatProto, ) hudi_proto = TableFormatProto.HudiFormat( table_type=self.table_type or "", record_key=self.record_key or "", precombine_field=self.precombine_field or "", properties=self.properties, ) return TableFormatProto(hudi_format=hudi_proto) @classmethod def from_proto(cls, proto: "TableFormatProto") -> "HudiFormat": """Create from protobuf TableFormat message""" hudi_proto = proto.hudi_format return cls( table_type=hudi_proto.table_type if hudi_proto.table_type else None, record_key=hudi_proto.record_key if hudi_proto.record_key else None, precombine_field=hudi_proto.precombine_field if hudi_proto.precombine_field else None, properties=dict(hudi_proto.properties), ) def create_table_format(format_type: TableFormatType, **kwargs) -> TableFormat: """ Factory function to create appropriate TableFormat instance based on type. This is a convenience function that creates the correct TableFormat subclass based on the provided format type, passing through any additional keyword arguments to the constructor. Args: format_type (TableFormatType): The type of table format to create. **kwargs: Additional keyword arguments passed to the format constructor. Returns: TableFormat: An instance of the appropriate TableFormat subclass. Raises: ValueError: If an unsupported format_type is provided. Examples: Create an Iceberg format: >>> iceberg_format = create_table_format( ... TableFormatType.ICEBERG, ... catalog="my_catalog", ... namespace="my_db" ... ) Create a Delta format: >>> delta_format = create_table_format( ... TableFormatType.DELTA, ... checkpoint_location="s3://bucket/checkpoints" ... ) """ if format_type == TableFormatType.ICEBERG: return IcebergFormat(**kwargs) elif format_type == TableFormatType.DELTA: return DeltaFormat(**kwargs) elif format_type == TableFormatType.HUDI: return HudiFormat(**kwargs) else: raise ValueError(f"Unknown table format type: {format_type}") def table_format_from_dict(data: Dict) -> TableFormat: """ Create TableFormat instance from dictionary representation. This function deserializes a dictionary (typically from JSON or protobuf) back into the appropriate TableFormat instance. The dictionary must contain a 'format_type' field that indicates which format class to instantiate. Args: data (Dict): Dictionary containing table format configuration. Must include 'format_type' field with value 'iceberg', 'delta', or 'hudi'. Returns: TableFormat: An instance of the appropriate TableFormat subclass. Raises: ValueError: If format_type is not recognized. KeyError: If format_type field is missing from data. Examples: Deserialize an Iceberg format: >>> data = { ... "format_type": "iceberg", ... "catalog": "my_catalog", ... "namespace": "my_db" ... } >>> iceberg_format = table_format_from_dict(data) """ if "format_type" not in data: raise KeyError("Missing 'format_type' field in data") format_type = data["format_type"] if format_type == TableFormatType.ICEBERG.value: return IcebergFormat.from_dict(data) elif format_type == TableFormatType.DELTA.value: return DeltaFormat.from_dict(data) elif format_type == TableFormatType.HUDI.value: return HudiFormat.from_dict(data) else: raise ValueError(f"Unknown table format type: {format_type}") def table_format_from_json(json_str: str) -> TableFormat: """ Create TableFormat instance from JSON string. This is a convenience function that parses a JSON string and creates the appropriate TableFormat instance. Useful for loading table format configurations from files or network requests. Args: json_str (str): JSON string containing table format configuration. Returns: TableFormat: An instance of the appropriate TableFormat subclass. Raises: json.JSONDecodeError: If the JSON string is invalid. ValueError: If format_type is not recognized. KeyError: If format_type field is missing. Examples: Load from JSON string: >>> json_config = '{"format_type": "delta", "checkpoint_location": "s3://bucket/checkpoints"}' >>> delta_format = table_format_from_json(json_config) """ data = json.loads(json_str) return table_format_from_dict(data) def table_format_from_proto(proto: "TableFormatProto") -> TableFormat: """ Create TableFormat instance from protobuf TableFormat message. Args: proto: TableFormat protobuf message Returns: TableFormat: An instance of the appropriate TableFormat subclass. Raises: ValueError: If the proto doesn't contain a recognized format. """ which_format = proto.WhichOneof("format") if which_format == "iceberg_format": return IcebergFormat.from_proto(proto) elif which_format == "delta_format": return DeltaFormat.from_proto(proto) elif which_format == "hudi_format": return HudiFormat.from_proto(proto) else: raise ValueError(f"Unknown table format in proto: {which_format}")