# Copyright 2022 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod from datetime import datetime, timezone from enum import Enum from typing import Dict, List, Union import pyarrow from feast.value_type import ValueType PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = { "INVALID": "UNKNOWN", "BYTES": "BYTES", "PDF_BYTES": "PDF_BYTES", "IMAGE_BYTES": "IMAGE_BYTES", "UUID": "UUID", "TIME_UUID": "TIME_UUID", "DECIMAL": "DECIMAL", "STRING": "STRING", "INT32": "INT32", "INT64": "INT64", "FLOAT64": "DOUBLE", "FLOAT32": "FLOAT", "BOOL": "BOOL", "UNIX_TIMESTAMP": "UNIX_TIMESTAMP", "MAP": "MAP", "JSON": "JSON", } def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) class ComplexFeastType(ABC): """ A ComplexFeastType represents a structured type that is recognized by Feast. """ def __init__(self): """Creates a ComplexFeastType object.""" pass @abstractmethod def to_value_type(self) -> ValueType: """ Converts a ComplexFeastType object to the corresponding ValueType enum. """ raise NotImplementedError def __hash__(self): return hash(self.to_value_type().value) def __eq__(self, other): if isinstance(other, ComplexFeastType): return self.to_value_type() == other.to_value_type() else: return False class PrimitiveFeastType(Enum): """ A PrimitiveFeastType represents a primitive type in Feast. Note that these values must match the values in /feast/protos/types/Value.proto. """ INVALID = 0 BYTES = 1 STRING = 2 INT32 = 3 INT64 = 4 FLOAT64 = 5 FLOAT32 = 6 BOOL = 7 UNIX_TIMESTAMP = 8 PDF_BYTES = 9 IMAGE_BYTES = 10 MAP = 11 JSON = 12 UUID = 13 TIME_UUID = 14 DECIMAL = 15 def to_value_type(self) -> ValueType: """ Converts a PrimitiveFeastType object to the corresponding ValueType enum. """ value_type_name = PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES[self.name] return ValueType[value_type_name] def __str__(self): return PRIMITIVE_FEAST_TYPES_TO_STRING[self.name] def __eq__(self, other): if isinstance(other, PrimitiveFeastType): return self.value == other.value else: return False def __hash__(self): return hash((PRIMITIVE_FEAST_TYPES_TO_STRING[self.name])) Invalid = PrimitiveFeastType.INVALID Bytes = PrimitiveFeastType.BYTES PdfBytes = PrimitiveFeastType.PDF_BYTES ImageBytes = PrimitiveFeastType.IMAGE_BYTES String = PrimitiveFeastType.STRING Bool = PrimitiveFeastType.BOOL Int32 = PrimitiveFeastType.INT32 Int64 = PrimitiveFeastType.INT64 Float32 = PrimitiveFeastType.FLOAT32 Float64 = PrimitiveFeastType.FLOAT64 UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP Map = PrimitiveFeastType.MAP Json = PrimitiveFeastType.JSON Uuid = PrimitiveFeastType.UUID TimeUuid = PrimitiveFeastType.TIME_UUID Decimal = PrimitiveFeastType.DECIMAL SUPPORTED_BASE_TYPES = [ Invalid, String, Bytes, PdfBytes, ImageBytes, Bool, Int32, Int64, Float32, Float64, UnixTimestamp, Map, Json, Uuid, TimeUuid, Decimal, ] PRIMITIVE_FEAST_TYPES_TO_STRING = { "INVALID": "Invalid", "STRING": "String", "BYTES": "Bytes", "PDF_BYTES": "PdfBytes", "IMAGE_BYTES": "ImageBytes", "BOOL": "Bool", "INT32": "Int32", "INT64": "Int64", "FLOAT32": "Float32", "FLOAT64": "Float64", "UNIX_TIMESTAMP": "UnixTimestamp", "MAP": "Map", "JSON": "Json", "UUID": "Uuid", "TIME_UUID": "TimeUuid", "DECIMAL": "Decimal", } class Array(ComplexFeastType): """ An Array represents a list of types. Attributes: base_type: The base type of the array. """ base_type: Union[PrimitiveFeastType, ComplexFeastType] def __init__(self, base_type: Union[PrimitiveFeastType, "ComplexFeastType"]): # Allow Struct, Array, and Set as base types for nested collections if ( not isinstance(base_type, (Struct, Array, Set)) and base_type not in SUPPORTED_BASE_TYPES ): raise ValueError( f"Type {type(base_type)} is currently not supported as a base type for Array." ) self.base_type = base_type def to_value_type(self) -> ValueType: if isinstance(self.base_type, Struct): return ValueType.STRUCT_LIST if isinstance(self.base_type, (Array, Set)): return ValueType.VALUE_LIST assert isinstance(self.base_type, PrimitiveFeastType) value_type_name = PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES[self.base_type.name] value_type_list_name = value_type_name + "_LIST" return ValueType[value_type_list_name] def __eq__(self, other): if isinstance(other, Array): return self.base_type == other.base_type return False def __hash__(self): return hash(("Array", hash(self.base_type))) def __str__(self): return f"Array({self.base_type})" class Set(ComplexFeastType): """ A Set represents a set of unique values of a given type. Attributes: base_type: The base type of the set. """ base_type: Union[PrimitiveFeastType, ComplexFeastType] def __init__(self, base_type: Union[PrimitiveFeastType, ComplexFeastType]): # Allow Array and Set as base types for nested collections if not isinstance(base_type, (Array, Set)): # Sets do not support MAP as a base type supported_set_types = [t for t in SUPPORTED_BASE_TYPES if t not in (Map,)] if base_type not in supported_set_types: raise ValueError( f"Type {type(base_type)} is currently not supported as a base type for Set." ) self.base_type = base_type def to_value_type(self) -> ValueType: if isinstance(self.base_type, (Array, Set)): return ValueType.VALUE_SET assert isinstance(self.base_type, PrimitiveFeastType) value_type_name = PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES[self.base_type.name] value_type_set_name = value_type_name + "_SET" return ValueType[value_type_set_name] def __eq__(self, other): if isinstance(other, Set): return self.base_type == other.base_type return False def __hash__(self): return hash(("Set", hash(self.base_type))) def __str__(self): return f"Set({self.base_type})" class Struct(ComplexFeastType): """ A Struct represents a structured type with named, typed fields. Attributes: fields: A dictionary mapping field names to their FeastTypes. """ fields: Dict[str, Union[PrimitiveFeastType, "ComplexFeastType"]] def __init__( self, fields: Dict[str, Union[PrimitiveFeastType, "ComplexFeastType"]] ): if not fields: raise ValueError("Struct must have at least one field.") self.fields = fields def to_value_type(self) -> ValueType: return ValueType.STRUCT def to_pyarrow_type(self) -> pyarrow.DataType: pa_fields = [] for name, feast_type in self.fields.items(): pa_type = from_feast_to_pyarrow_type(feast_type) pa_fields.append(pyarrow.field(name, pa_type)) return pyarrow.struct(pa_fields) def __str__(self): field_strs = ", ".join( f"{name}: {ftype}" for name, ftype in self.fields.items() ) return f"Struct({{{field_strs}}})" def __eq__(self, other): if isinstance(other, Struct): return self.fields == other.fields return False def __hash__(self): return hash( ( "Struct", tuple((k, hash(v)) for k, v in sorted(self.fields.items())), ) ) FeastType = Union[ComplexFeastType, PrimitiveFeastType] VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = { ValueType.UNKNOWN: Invalid, ValueType.BYTES: Bytes, ValueType.PDF_BYTES: PdfBytes, ValueType.IMAGE_BYTES: ImageBytes, ValueType.STRING: String, ValueType.INT32: Int32, ValueType.INT64: Int64, ValueType.DOUBLE: Float64, ValueType.FLOAT: Float32, ValueType.BOOL: Bool, ValueType.UNIX_TIMESTAMP: UnixTimestamp, ValueType.BYTES_LIST: Array(Bytes), ValueType.STRING_LIST: Array(String), ValueType.INT32_LIST: Array(Int32), ValueType.INT64_LIST: Array(Int64), ValueType.DOUBLE_LIST: Array(Float64), ValueType.FLOAT_LIST: Array(Float32), ValueType.BOOL_LIST: Array(Bool), ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp), ValueType.MAP: Map, ValueType.MAP_LIST: Array(Map), ValueType.JSON: Json, ValueType.JSON_LIST: Array(Json), ValueType.BYTES_SET: Set(Bytes), ValueType.STRING_SET: Set(String), ValueType.INT32_SET: Set(Int32), ValueType.INT64_SET: Set(Int64), ValueType.DOUBLE_SET: Set(Float64), ValueType.FLOAT_SET: Set(Float32), ValueType.BOOL_SET: Set(Bool), ValueType.UNIX_TIMESTAMP_SET: Set(UnixTimestamp), ValueType.UUID: Uuid, ValueType.TIME_UUID: TimeUuid, ValueType.UUID_LIST: Array(Uuid), ValueType.TIME_UUID_LIST: Array(TimeUuid), ValueType.UUID_SET: Set(Uuid), ValueType.TIME_UUID_SET: Set(TimeUuid), ValueType.DECIMAL: Decimal, ValueType.DECIMAL_LIST: Array(Decimal), ValueType.DECIMAL_SET: Set(Decimal), } FEAST_TYPES_TO_PYARROW_TYPES = { String: pyarrow.string(), Bool: pyarrow.bool_(), Int32: pyarrow.int32(), Int64: pyarrow.int64(), Float32: pyarrow.float32(), Float64: pyarrow.float64(), # Note: datetime only supports microseconds https://github.com/python/cpython/blob/3.8/Lib/datetime.py#L1559 UnixTimestamp: pyarrow.timestamp("us", tz=_utc_now().tzname()), Map: pyarrow.map_(pyarrow.string(), pyarrow.string()), Json: pyarrow.large_string(), Uuid: pyarrow.string(), TimeUuid: pyarrow.string(), Decimal: pyarrow.string(), } FEAST_VECTOR_TYPES: List[Union[ValueType, PrimitiveFeastType, ComplexFeastType]] = [ ValueType.BYTES_LIST, ValueType.INT32_LIST, ValueType.INT64_LIST, ValueType.FLOAT_LIST, ValueType.BOOL_LIST, ValueType.MAP_LIST, ] for k in VALUE_TYPES_TO_FEAST_TYPES: if k in FEAST_VECTOR_TYPES: FEAST_VECTOR_TYPES.append(VALUE_TYPES_TO_FEAST_TYPES[k]) def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType: """ Converts a Feast type to a PyArrow type. Args: feast_type: The Feast type to be converted. Raises: ValueError: The conversion could not be performed. """ assert isinstance(feast_type, (ComplexFeastType, PrimitiveFeastType)), ( f"Expected FeastType, got {type(feast_type)}" ) if isinstance(feast_type, Struct): return feast_type.to_pyarrow_type() if isinstance(feast_type, PrimitiveFeastType): if feast_type in FEAST_TYPES_TO_PYARROW_TYPES: return FEAST_TYPES_TO_PYARROW_TYPES[feast_type] elif isinstance(feast_type, Array): base_type = feast_type.base_type if isinstance(base_type, Struct): return pyarrow.list_(base_type.to_pyarrow_type()) if isinstance(base_type, (Array, Set)): return pyarrow.list_(from_feast_to_pyarrow_type(base_type)) if isinstance(base_type, PrimitiveFeastType): if base_type == Map: return pyarrow.list_(pyarrow.map_(pyarrow.string(), pyarrow.string())) if base_type in FEAST_TYPES_TO_PYARROW_TYPES: return pyarrow.list_(FEAST_TYPES_TO_PYARROW_TYPES[base_type]) elif isinstance(feast_type, Set): base_type = feast_type.base_type if isinstance(base_type, (Array, Set)): return pyarrow.list_(from_feast_to_pyarrow_type(base_type)) if isinstance(base_type, PrimitiveFeastType): if base_type in FEAST_TYPES_TO_PYARROW_TYPES: return pyarrow.list_(FEAST_TYPES_TO_PYARROW_TYPES[base_type]) raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.") def from_value_type( value_type: ValueType, ) -> FeastType: """ Converts a ValueType enum to a Feast type. Args: value_type: The ValueType to be converted. Raises: ValueError: The conversion could not be performed. """ if value_type in VALUE_TYPES_TO_FEAST_TYPES: return VALUE_TYPES_TO_FEAST_TYPES[value_type] # Struct types cannot be looked up from the dict because they require # field definitions. Return a default placeholder Struct that can be # enriched later from Field tags / schema metadata. if value_type == ValueType.STRUCT: return Struct({"_value": String}) if value_type == ValueType.STRUCT_LIST: return Array(Struct({"_value": String})) # Nested collection types use placeholder inner types. # Real inner type is restored from Field tags during deserialization. if value_type == ValueType.VALUE_LIST: return Array(Array(String)) if value_type == ValueType.VALUE_SET: return Set(Array(String)) raise ValueError(f"Could not convert value type {value_type} to FeastType.") def from_feast_type( feast_type: FeastType, ) -> ValueType: """ Converts a Feast type to a ValueType enum. Args: feast_type: The Feast type to be converted. Returns: The corresponding ValueType enum. Raises: ValueError: The conversion could not be performed. """ # Handle Struct types directly since they are not in the dict if isinstance(feast_type, Struct): return ValueType.STRUCT if isinstance(feast_type, Array) and isinstance(feast_type.base_type, Struct): return ValueType.STRUCT_LIST # Handle nested collection types if isinstance(feast_type, Array) and isinstance(feast_type.base_type, (Array, Set)): return ValueType.VALUE_LIST if isinstance(feast_type, Set) and isinstance(feast_type.base_type, (Array, Set)): return ValueType.VALUE_SET if feast_type in VALUE_TYPES_TO_FEAST_TYPES.values(): return list(VALUE_TYPES_TO_FEAST_TYPES.keys())[ list(VALUE_TYPES_TO_FEAST_TYPES.values()).index(feast_type) ] raise ValueError(f"Could not convert feast type {feast_type} to ValueType.")