# Copyright 2019 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import decimal import json import logging import uuid as uuid_module from collections import defaultdict from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, Union, cast, ) import numpy as np import pandas as pd from google.protobuf.timestamp_pb2 import Timestamp from feast.protos.feast.types.Value_pb2 import ( BoolList, BoolSet, BytesList, BytesSet, DoubleList, DoubleSet, FloatList, FloatSet, Int32List, Int32Set, Int64List, Int64Set, Map, MapList, RepeatedValue, StringList, StringSet, ) from feast.protos.feast.types.Value_pb2 import Value as ProtoValue from feast.value_type import ListType, SetType, ValueType if TYPE_CHECKING: import pyarrow # null timestamps get converted to -9223372036854775808 NULL_TIMESTAMP_INT_VALUE: int = np.datetime64("NaT").astype(int) logger = logging.getLogger(__name__) def feast_value_type_to_python_type( field_value_proto: ProtoValue, feature_type: Optional[ValueType] = None, ) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. Args: field_value_proto: Field value Proto Returns: Python native type representation/version of the given field_value_proto """ val_attr = field_value_proto.WhichOneof("val") if val_attr is None: return None val = getattr(field_value_proto, val_attr) # Handle JSON types — stored as strings but returned as parsed Python objects if val_attr == "json_val": try: return json.loads(val) except (json.JSONDecodeError, TypeError): return val elif val_attr == "json_list_val": result = [] for v in val.val: if isinstance(v, str): try: result.append(json.loads(v)) except (json.JSONDecodeError, TypeError): result.append(v) else: result.append(v) return result # Handle nested collection types (list_val, set_val) if val_attr in ("list_val", "set_val"): return _handle_nested_collection_value(val) # Handle Struct types — stored using Map proto, returned as dicts if val_attr == "struct_val": return _handle_map_value(val) elif val_attr == "struct_list_val": return _handle_map_list_value(val) # Handle Map and MapList types FIRST (before generic list processing) if val_attr == "map_val": return _handle_map_value(val) elif val_attr == "map_list_val": return _handle_map_list_value(val) # If it's a _LIST or _SET type extract the values. if hasattr(val, "val"): val = list(val.val) # Convert UNIX_TIMESTAMP values to `datetime` if val_attr == "unix_timestamp_list_val": val = [ ( datetime.fromtimestamp(v, tz=timezone.utc) if v != NULL_TIMESTAMP_INT_VALUE else None ) for v in val ] elif val_attr == "unix_timestamp_set_val": val = set( [ ( datetime.fromtimestamp(v, tz=timezone.utc) if v != NULL_TIMESTAMP_INT_VALUE else None ) for v in val ] ) elif val_attr == "unix_timestamp_val": val = ( datetime.fromtimestamp(val, tz=timezone.utc) if val != NULL_TIMESTAMP_INT_VALUE else None ) # Convert _SET types to Python sets elif val_attr.endswith("_set_val") and val_attr != "unix_timestamp_set_val": val = set(val) # Convert UUID values to uuid.UUID objects if val_attr in ("uuid_val", "time_uuid_val"): return uuid_module.UUID(val) if isinstance(val, str) else val if val_attr in ("uuid_list_val", "time_uuid_list_val"): return [uuid_module.UUID(v) if isinstance(v, str) else v for v in val] if val_attr in ("uuid_set_val", "time_uuid_set_val"): return {uuid_module.UUID(v) if isinstance(v, str) else v for v in val} # Convert DECIMAL values to decimal.Decimal objects if val_attr == "decimal_val": return decimal.Decimal(val) if isinstance(val, str) else val if val_attr == "decimal_list_val": return [decimal.Decimal(v) if isinstance(v, str) else v for v in val] if val_attr == "decimal_set_val": return {decimal.Decimal(v) if isinstance(v, str) else v for v in val} # Backward compatibility: handle UUIDs stored as string_val/string_list_val with feature_type hint if feature_type in (ValueType.UUID, ValueType.TIME_UUID) and isinstance(val, str): return uuid_module.UUID(val) if feature_type in (ValueType.UUID_LIST, ValueType.TIME_UUID_LIST) and isinstance( val, list ): return [uuid_module.UUID(v) if isinstance(v, str) else v for v in val] if feature_type in (ValueType.UUID_SET, ValueType.TIME_UUID_SET) and isinstance( val, set ): return {uuid_module.UUID(v) if isinstance(v, str) else v for v in val} return val def _handle_map_value(map_message) -> Dict[str, Any]: """Handle Map proto message containing map val.""" result = {} for key, value in map_message.val.items(): # Recursively handle the Value message result[key] = feast_value_type_to_python_type(value) return result def _handle_map_list_value(map_list_message) -> List[Dict[str, Any]]: """Handle MapList proto message containing repeated Map val.""" result = [] for map_item in map_list_message.val: # Handle each Map in the list processed_map = _handle_map_value(map_item) result.append(processed_map) return result def _handle_nested_collection_value(repeated_value) -> List[Any]: """Handle nested collection proto (RepeatedValue containing Values). Each inner Value is itself a list/set proto. We recursively convert each inner Value to a Python list/set via feast_value_type_to_python_type. """ result = [] for inner_value in repeated_value.val: result.append(feast_value_type_to_python_type(inner_value)) return result def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: value_type_to_pandas_type: Dict[ValueType, str] = { ValueType.FLOAT: "float", ValueType.INT32: "int", ValueType.INT64: "int", ValueType.STRING: "str", ValueType.DOUBLE: "float", ValueType.BYTES: "bytes", ValueType.BOOL: "bool", ValueType.UNIX_TIMESTAMP: "datetime64[ns]", ValueType.UUID: "str", ValueType.TIME_UUID: "str", ValueType.DECIMAL: "object", } if ( value_type.name in ("MAP", "JSON", "STRUCT", "VALUE_LIST", "VALUE_SET") or value_type.name.endswith("_LIST") or value_type.name.endswith("_SET") ): return "object" if value_type in value_type_to_pandas_type: return value_type_to_pandas_type[value_type] raise TypeError( f"Casting to pandas type for type {value_type} failed. " f"Type {value_type} not found" ) def python_type_to_feast_value_type( name: str, value: Optional[Any] = None, recurse: bool = True, type_name: Optional[str] = None, ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native and Pandas types are supported. This function will recursively look for nested types when arrays are detected. All types must be homogenous. Args: name: Name of the value or field value: Value that will be inspected recurse: Whether to recursively look for nested types in arrays Returns: Feast Value Type """ type_name = (type_name or type(value).__name__).lower() type_map = { "int": ValueType.INT64, "str": ValueType.STRING, "string": ValueType.STRING, # pandas.StringDtype "float": ValueType.DOUBLE, "bytes": ValueType.BYTES, "float64": ValueType.DOUBLE, "float32": ValueType.FLOAT, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "int16": ValueType.INT32, "uint16": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool_": ValueType.BOOL, # np.bool_ "bool": ValueType.BOOL, "boolean": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, "timestamp": ValueType.UNIX_TIMESTAMP, "datetime": ValueType.UNIX_TIMESTAMP, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP, # special dtype of pandas "datetime64[ns, utc]": ValueType.UNIX_TIMESTAMP, "date": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, "uuid": ValueType.UUID, "decimal": ValueType.DECIMAL, } if type_name in type_map: return type_map[type_name] # Handle pandas "object" dtype by inspecting the actual value if type_name == "object" and value is not None: # Check the actual type of the value actual_type = type(value).__name__.lower() if actual_type == "str": return ValueType.STRING # Check if it's a dictionary (could be a Map) elif actual_type == "dict": return ValueType.MAP # If it's a different type wrapped in object, try to infer from the value elif actual_type in type_map: return type_map[actual_type] if isinstance(value, np.ndarray) and str(value.dtype) in type_map: item_type = type_map[str(value.dtype)] return ValueType[item_type.name + "_LIST"] if isinstance(value, (list, np.ndarray)): # Check if it's a list of maps if value and isinstance(value[0], dict): return ValueType.MAP_LIST # if the value's type is "ndarray" and we couldn't infer from "value.dtype" # this is most probably array of "object", # so we need to iterate over objects and try to infer type of each item if not recurse: raise ValueError( f"Value type for field {name} is {type(value)} but " f"recursion is not allowed. Nested collection types cannot be " f"inferred automatically; use an explicit Field dtype instead " f"(e.g., dtype=Array(Array(Int32)))." ) # This is the final type which we infer from the list common_item_value_type = None for item in value: if isinstance(item, ProtoValue): current_item_value_type: ValueType = _proto_value_to_value_type(item) else: # Get the type from the current item, only one level deep current_item_value_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_item_value_type and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " f"{common_item_value_type} different from " f"{current_item_value_type}." ) common_item_value_type = current_item_value_type if common_item_value_type is None: return ValueType.UNKNOWN return ValueType[common_item_value_type.name + "_LIST"] # Check if it's a set (Set type) if isinstance(value, set): if not recurse: raise ValueError( f"Value type for field {name} is {type(value)} but " f"recursion is not allowed. Set types can only be one level " f"deep." ) # Infer the type from set elements common_set_item_type = None for item in value: if isinstance(item, ProtoValue): current_set_item_type: ValueType = _proto_value_to_value_type(item) else: # Get the type from the current item, only one level deep current_set_item_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_set_item_type and not common_set_item_type == current_set_item_type ): raise ValueError( f"Set value type for field {name} is inconsistent. " f"{common_set_item_type} different from " f"{current_set_item_type}." ) common_set_item_type = current_set_item_type if common_set_item_type is None: return ValueType.UNKNOWN return ValueType[common_set_item_type.name + "_SET"] # Check if it's a dictionary (Map type) if isinstance(value, dict): return ValueType.MAP raise ValueError( f"Value with native type {type_name} cannot be converted into Feast value type" ) def python_values_to_feast_value_type( name: str, values: Any, recurse: bool = True ) -> ValueType: inferred_dtype = ValueType.UNKNOWN for row in values: current_dtype = python_type_to_feast_value_type( name, value=row, recurse=recurse ) if inferred_dtype is ValueType.UNKNOWN: inferred_dtype = current_dtype else: if current_dtype != inferred_dtype and current_dtype not in ( ValueType.UNKNOWN, ValueType.NULL, ): raise TypeError( f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. " ) if inferred_dtype in (ValueType.UNKNOWN, ValueType.NULL): raise ValueError( f"field {name} cannot have all null values for type inference." ) return inferred_dtype def _convert_value_type_str_to_value_type(type_str: str) -> ValueType: type_map = { "UNKNOWN": ValueType.UNKNOWN, "BYTES": ValueType.BYTES, "STRING": ValueType.STRING, "INT32": ValueType.INT32, "INT64": ValueType.INT64, "DOUBLE": ValueType.DOUBLE, "FLOAT": ValueType.FLOAT, "FLOAT32": ValueType.FLOAT, "BOOL": ValueType.BOOL, "NULL": ValueType.NULL, "UNIX_TIMESTAMP": ValueType.UNIX_TIMESTAMP, "BYTES_LIST": ValueType.BYTES_LIST, "STRING_LIST": ValueType.STRING_LIST, "INT32_LIST ": ValueType.INT32_LIST, "INT64_LIST": ValueType.INT64_LIST, "DOUBLE_LIST": ValueType.DOUBLE_LIST, "FLOAT_LIST": ValueType.FLOAT_LIST, "BOOL_LIST": ValueType.BOOL_LIST, "UNIX_TIMESTAMP_LIST": ValueType.UNIX_TIMESTAMP_LIST, "MAP": ValueType.MAP, "MAP_LIST": ValueType.MAP_LIST, "JSON": ValueType.JSON, "JSON_LIST": ValueType.JSON_LIST, "STRUCT": ValueType.STRUCT, "STRUCT_LIST": ValueType.STRUCT_LIST, "BYTES_SET": ValueType.BYTES_SET, "STRING_SET": ValueType.STRING_SET, "INT32_SET": ValueType.INT32_SET, "INT64_SET": ValueType.INT64_SET, "DOUBLE_SET": ValueType.DOUBLE_SET, "FLOAT_SET": ValueType.FLOAT_SET, "BOOL_SET": ValueType.BOOL_SET, "UNIX_TIMESTAMP_SET": ValueType.UNIX_TIMESTAMP_SET, "UUID": ValueType.UUID, "TIME_UUID": ValueType.TIME_UUID, "UUID_LIST": ValueType.UUID_LIST, "TIME_UUID_LIST": ValueType.TIME_UUID_LIST, "UUID_SET": ValueType.UUID_SET, "TIME_UUID_SET": ValueType.TIME_UUID_SET, "VALUE_LIST": ValueType.VALUE_LIST, "VALUE_SET": ValueType.VALUE_SET, "DECIMAL": ValueType.DECIMAL, "DECIMAL_LIST": ValueType.DECIMAL_LIST, "DECIMAL_SET": ValueType.DECIMAL_SET, } return type_map.get(type_str, ValueType.STRING) def _type_err(item, dtype): raise TypeError(f'Value "{item}" is of type {type(item)} not of type {dtype}') PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[ListType, str, List[Type]] ] = { ValueType.FLOAT_LIST: ( FloatList, "float_list_val", [np.float32, np.float64, float], ), ValueType.DOUBLE_LIST: ( DoubleList, "double_list_val", [np.float64, np.float32, float], ), ValueType.INT32_LIST: (Int32List, "int32_list_val", [np.int64, np.int32, int]), ValueType.INT64_LIST: (Int64List, "int64_list_val", [np.int64, np.int32, int]), ValueType.UNIX_TIMESTAMP_LIST: ( Int64List, "int64_list_val", [np.datetime64, np.int64, np.int32, int, datetime, Timestamp], ), ValueType.STRING_LIST: (StringList, "string_list_val", [np.str_, str]), ValueType.BOOL_LIST: (BoolList, "bool_list_val", [np.bool_, bool]), ValueType.BYTES_LIST: (BytesList, "bytes_list_val", [np.bytes_, bytes]), ValueType.UUID_LIST: ( StringList, "uuid_list_val", [np.str_, str, uuid_module.UUID], ), ValueType.TIME_UUID_LIST: ( StringList, "time_uuid_list_val", [np.str_, str, uuid_module.UUID], ), ValueType.DECIMAL_LIST: ( StringList, "decimal_list_val", [np.str_, str, decimal.Decimal], ), } PYTHON_SET_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[SetType, str, List[Type]] ] = { ValueType.FLOAT_SET: ( FloatSet, "float_set_val", [np.float32, np.float64, float], ), ValueType.DOUBLE_SET: ( DoubleSet, "double_set_val", [np.float64, np.float32, float], ), ValueType.INT32_SET: (Int32Set, "int32_set_val", [np.int64, np.int32, int]), ValueType.INT64_SET: (Int64Set, "int64_set_val", [np.int64, np.int32, int]), ValueType.UNIX_TIMESTAMP_SET: ( Int64Set, "unix_timestamp_set_val", [np.datetime64, np.int64, np.int32, int, datetime, Timestamp], ), ValueType.STRING_SET: (StringSet, "string_set_val", [np.str_, str]), ValueType.BOOL_SET: (BoolSet, "bool_set_val", [np.bool_, bool]), ValueType.BYTES_SET: (BytesSet, "bytes_set_val", [np.bytes_, bytes]), ValueType.UUID_SET: (StringSet, "uuid_set_val", [np.str_, str, uuid_module.UUID]), ValueType.TIME_UUID_SET: ( StringSet, "time_uuid_set_val", [np.str_, str, uuid_module.UUID], ), ValueType.DECIMAL_SET: ( StringSet, "decimal_set_val", [np.str_, str, decimal.Decimal], ), } PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[str, Any, Optional[Set[Type]]] ] = { ValueType.INT32: ("int32_val", lambda x: int(x), None), ValueType.INT64: ( "int64_val", lambda x: ( int(x.timestamp()) if isinstance(x, pd._libs.tslibs.timestamps.Timestamp) else int(x) ), None, ), ValueType.FLOAT: ("float_val", lambda x: float(x), None), ValueType.DOUBLE: ( "double_val", lambda x: x, {float, np.float64, int, np.int_, decimal.Decimal}, ), ValueType.STRING: ("string_val", lambda x: str(x), None), ValueType.BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.IMAGE_BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.BOOL: ("bool_val", lambda x: x, {bool, np.bool_, int, np.int_}), ValueType.UUID: ("uuid_val", lambda x: str(x), {str, uuid_module.UUID}), ValueType.TIME_UUID: ("time_uuid_val", lambda x: str(x), {str, uuid_module.UUID}), ValueType.DECIMAL: ("decimal_val", lambda x: str(x), {decimal.Decimal, str}), } def _python_datetime_to_int_timestamp( values: Sequence[Any], ) -> Sequence[Union[int, np.int_]]: # Fast path for Numpy array. if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): if values.ndim != 1: raise ValueError("Only 1 dimensional arrays are supported.") return cast(Sequence[np.int_], values.astype("datetime64[s]").astype(np.int_)) int_timestamps = [] for value in values: if isinstance(value, datetime): int_timestamps.append(int(value.timestamp())) elif isinstance(value, Timestamp): int_timestamps.append(int(value.ToSeconds())) elif isinstance(value, np.datetime64): int_timestamps.append(value.astype("datetime64[s]").astype(np.int_)) # type: ignore[attr-defined] elif isinstance(value, type(np.nan)): int_timestamps.append(NULL_TIMESTAMP_INT_VALUE) else: int_timestamps.append(int(value)) return int_timestamps def _convert_timestamp_collection_to_proto( values: List[Any], proto_field: str, proto_type: type, ) -> List[ProtoValue]: """Convert timestamp collection values (list or set) to proto. Args: values: List of timestamp collections to convert. proto_field: The proto field name (e.g., 'unix_timestamp_list_val'). proto_type: The proto type class (e.g., Int64List). Returns: List of ProtoValue with converted timestamps. """ result = [] for value in values: if value is not None: result.append( ProtoValue( **{ proto_field: proto_type( val=_python_datetime_to_int_timestamp(value) ) } # type: ignore ) ) else: result.append(ProtoValue()) return result def _convert_bool_collection_to_proto( values: List[Any], proto_field: str, proto_type: type, ) -> List[ProtoValue]: """Convert boolean collection values (list or set) to proto. ProtoValue does not support direct conversion of np.bool_, so we need to explicitly convert each element to Python bool. Args: values: List of boolean collections to convert. proto_field: The proto field name (e.g., 'bool_list_val'). proto_type: The proto type class (e.g., BoolList). Returns: List of ProtoValue with converted booleans. """ result = [] for value in values: if value is not None: result.append( ProtoValue(**{proto_field: proto_type(val=[bool(e) for e in value])}) # type: ignore ) else: result.append(ProtoValue()) return result def _validate_collection_item_types( sample: Any, valid_types: List[Type], feast_value_type: ValueType, ) -> None: """Validate that collection items match expected types. Args: sample: A sample collection value to check. valid_types: List of valid Python types for items. feast_value_type: The Feast value type for error messages. Raises: TypeError: If any item in sample is not a valid type. """ if sample is None: return if all(type(item) in valid_types for item in sample): return # to_numpy() upcasts INT32/INT64 with NULL to Float64 automatically int_collection_types = [ ValueType.INT32_LIST, ValueType.INT64_LIST, ValueType.INT32_SET, ValueType.INT64_SET, ] for item in sample: if type(item) not in valid_types: if feast_value_type in int_collection_types: # Check if the float values are due to NULL upcast if not any(np.isnan(i) for i in sample if isinstance(i, float)): logger.error( f"{feast_value_type.name} has NULL values. to_numpy() upcasts to Float64 automatically." ) raise _type_err(item, valid_types[0]) def _python_set_to_proto_values( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """ Converts Python set values to Feast Proto Values. Args: feast_value_type: The target set value type values: List of set values that will be converted Returns: List of Feast Value Proto """ # Feature can be set but None is still valid if feast_value_type not in PYTHON_SET_VALUE_TYPE_TO_PROTO_VALUE: return [] set_proto_type, set_field_name, set_valid_types = ( PYTHON_SET_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] ) # Convert set to list for proto (proto doesn't have native set type) def convert_set_to_list(value: Any) -> Any: if value is None: return None if isinstance(value, set): return list(value) if isinstance(value, (list, tuple, np.ndarray)): return list(set(value)) return value converted_values = [convert_set_to_list(v) for v in values] sample = next(filter(_non_empty_value, converted_values), None) # Bytes to array type conversion if isinstance(sample, (bytes, bytearray)): if feast_value_type == ValueType.BYTES_SET: raise _type_err(sample, ValueType.BYTES_SET) json_sample = json.loads(sample) if isinstance(json_sample, list): json_values = [ json.loads(value) if value is not None else None for value in converted_values ] if feast_value_type == ValueType.BOOL_SET: json_values = [ [bool(item) for item in list_item] if list_item is not None else None for list_item in json_values ] return [ ProtoValue(**{set_field_name: set_proto_type(val=v)}) # type: ignore[arg-type] if v is not None else ProtoValue() for v in json_values ] raise _type_err(sample, set_valid_types[0]) # Validate item types using shared helper _validate_collection_item_types(sample, set_valid_types, feast_value_type) # Handle special types using shared helpers if feast_value_type == ValueType.UNIX_TIMESTAMP_SET: return _convert_timestamp_collection_to_proto( converted_values, "unix_timestamp_set_val", Int64Set ) if feast_value_type == ValueType.BOOL_SET: return _convert_bool_collection_to_proto( converted_values, set_field_name, set_proto_type ) if feast_value_type in (ValueType.UUID_SET, ValueType.TIME_UUID_SET): # uuid.UUID objects must be converted to str for StringSet proto. return [ ( ProtoValue( **{set_field_name: set_proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] ) if value is not None else ProtoValue() ) for value in converted_values ] if feast_value_type == ValueType.DECIMAL_SET: # decimal.Decimal objects must be converted to str for StringSet proto. return [ ( ProtoValue( **{set_field_name: set_proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] ) if value is not None else ProtoValue() ) for value in converted_values ] # Generic set conversion return [ ProtoValue(**{set_field_name: set_proto_type(val=value)}) # type: ignore[arg-type] if value is not None else ProtoValue() for value in converted_values ] def _convert_list_values_to_proto( feast_value_type: ValueType, values: List[Any], sample: Any, ) -> List[ProtoValue]: """Convert list-type values to proto. Args: feast_value_type: The target list value type. values: List of list values to convert. sample: First non-empty value for type checking. Returns: List of ProtoValue. """ if feast_value_type not in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: raise Exception(f"Unsupported list type: {feast_value_type}") proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[ feast_value_type ] # Bytes to array type conversion if isinstance(sample, (bytes, bytearray)): if feast_value_type == ValueType.BYTES_LIST: raise _type_err(sample, ValueType.BYTES_LIST) json_sample = json.loads(sample) if isinstance(json_sample, list): json_values = [json.loads(value) for value in values] if feast_value_type == ValueType.BOOL_LIST: json_values = [ [bool(item) for item in list_item] for list_item in json_values ] return [ ProtoValue(**{field_name: proto_type(val=v)}) # type: ignore[arg-type] for v in json_values ] raise _type_err(sample, valid_types[0]) # Validate item types using shared helper _validate_collection_item_types(sample, valid_types, feast_value_type) # Handle special types using shared helpers if feast_value_type == ValueType.UNIX_TIMESTAMP_LIST: return _convert_timestamp_collection_to_proto( values, "unix_timestamp_list_val", Int64List ) if feast_value_type == ValueType.BOOL_LIST: return _convert_bool_collection_to_proto(values, field_name, proto_type) if feast_value_type in (ValueType.UUID_LIST, ValueType.TIME_UUID_LIST): # uuid.UUID objects must be converted to str for StringList proto. return [ ( ProtoValue( **{field_name: proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] ) if value is not None else ProtoValue() ) for value in values ] if feast_value_type == ValueType.DECIMAL_LIST: # decimal.Decimal objects must be converted to str for StringList proto. return [ ( ProtoValue( **{field_name: proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] ) if value is not None else ProtoValue() ) for value in values ] # Generic list conversion return [ ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore[arg-type] if value is not None else ProtoValue() for value in values ] def _convert_scalar_values_to_proto( feast_value_type: ValueType, values: List[Any], sample: Any, ) -> List[ProtoValue]: """Convert scalar-type values to proto. Args: feast_value_type: The target scalar value type. values: List of scalar values to convert. sample: First non-empty value for type checking. Returns: List of ProtoValue. """ if sample is None: # All input values are None return [ProtoValue()] * len(values) if feast_value_type == ValueType.UNIX_TIMESTAMP: int_timestamps = _python_datetime_to_int_timestamp(values) return [ProtoValue(unix_timestamp_val=ts) for ts in int_timestamps] # type: ignore field_name, func, valid_scalar_types = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[ feast_value_type ] # Validate scalar types if valid_scalar_types: if (sample == 0 or sample == 0.0) and feast_value_type != ValueType.BOOL: # Numpy converts 0 to int, but column type may be float allowed_types = {np.int64, int, np.float64, float, decimal.Decimal} assert type(sample) in allowed_types, ( f"Type `{type(sample)}` not in {allowed_types}" ) else: assert type(sample) in valid_scalar_types, ( f"Type `{type(sample)}` not in {valid_scalar_types}" ) # Handle BOOL specially due to np.bool_ conversion requirement if feast_value_type == ValueType.BOOL: return [ ProtoValue( **{field_name: func(bool(value) if type(value) is np.bool_ else value)} ) # type: ignore if not pd.isnull(value) else ProtoValue() for value in values ] # Generic scalar conversion out = [] for value in values: if isinstance(value, ProtoValue): out.append(value) elif not pd.isnull(value): out.append(ProtoValue(**{field_name: func(value)})) else: out.append(ProtoValue()) return out def _python_value_to_proto_value( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """ Converts a Python (native, pandas) value to a Feast Proto Value based on a provided value type. Args: feast_value_type: The target value type values: List of Values that will be converted Returns: List of Feast Value Proto """ # Handle nested collection types (VALUE_LIST, VALUE_SET) if feast_value_type in (ValueType.VALUE_LIST, ValueType.VALUE_SET): return _convert_nested_collection_to_proto(feast_value_type, values) # Handle Map types if feast_value_type == ValueType.MAP: result = [] for value in values: if value is None: result.append(ProtoValue()) else: if isinstance(value, str): value = json.loads(value) if not isinstance(value, dict): raise TypeError( f"Expected dict for MAP type, got {type(value).__name__}: {value!r}" ) result.append(ProtoValue(map_val=_python_dict_to_map_proto(value))) return result if feast_value_type == ValueType.MAP_LIST: result = [] for value in values: if value is None: result.append(ProtoValue()) else: if isinstance(value, str): value = json.loads(value) if not isinstance(value, list): raise TypeError( f"Expected list for MAP_LIST type, got {type(value).__name__}: {value!r}" ) result.append( ProtoValue(map_list_val=_python_list_to_map_list_proto(value)) ) return result # Handle JSON type — serialize Python objects as JSON strings if feast_value_type == ValueType.JSON: result = [] for value in values: if value is None: result.append(ProtoValue()) else: if isinstance(value, str): try: json.loads(value) except (json.JSONDecodeError, TypeError) as e: raise ValueError( f"Invalid JSON string for JSON type: {e}" ) from e json_str = value else: json_str = json.dumps(value) result.append(ProtoValue(json_val=json_str)) return result if feast_value_type == ValueType.JSON_LIST: result = [] for value in values: if value is None: result.append(ProtoValue()) else: json_strings = [] for v in value: if isinstance(v, str): try: json.loads(v) except (json.JSONDecodeError, TypeError) as e: raise ValueError( f"Invalid JSON string in JSON_LIST: {e}" ) from e json_strings.append(v) else: json_strings.append(json.dumps(v)) result.append(ProtoValue(json_list_val=StringList(val=json_strings))) return result # Handle Struct type — reuses Map proto for storage if feast_value_type == ValueType.STRUCT: result = [] for value in values: if value is None: result.append(ProtoValue()) else: if isinstance(value, str): value = json.loads(value) if not isinstance(value, dict): value = ( dict(value) if hasattr(value, "items") else {"_value": str(value)} ) result.append(ProtoValue(struct_val=_python_dict_to_map_proto(value))) return result if feast_value_type == ValueType.STRUCT_LIST: result = [] for value in values: if value is None: result.append(ProtoValue()) else: if isinstance(value, str): value = json.loads(value) result.append( ProtoValue(struct_list_val=_python_list_to_map_list_proto(value)) ) return result # Get sample for type checking sample = next(filter(_non_empty_value, values), None) # Dispatch to appropriate converter based on type category type_name_lower = feast_value_type.name.lower() if "list" in type_name_lower: return _convert_list_values_to_proto(feast_value_type, values, sample) if "set" in type_name_lower: return _python_set_to_proto_values(feast_value_type, values) # Scalar types if ( feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE or feast_value_type == ValueType.UNIX_TIMESTAMP ): return _convert_scalar_values_to_proto(feast_value_type, values, sample) raise Exception(f"Unsupported data type: {feast_value_type}") def _convert_nested_collection_to_proto( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """Convert nested collection values (list-of-lists, list-of-sets, etc.) to proto.""" val_attr = "list_val" if feast_value_type == ValueType.VALUE_LIST else "set_val" result = [] for value in values: if value is None: result.append(ProtoValue()) else: inner_values = [] for inner_collection in value: if inner_collection is None: inner_values.append(ProtoValue()) else: inner_list = list(inner_collection) if len(inner_list) == 0: # Empty inner collection: store as empty ProtoValue inner_values.append(ProtoValue()) elif any( isinstance(item, (list, set, tuple, np.ndarray)) for item in inner_list ): # Deeper nesting (3+ levels): recurse using VALUE_LIST inner_proto = _convert_nested_collection_to_proto( ValueType.VALUE_LIST, [inner_list] ) inner_values.append(inner_proto[0]) else: # Leaf level: wrap as a single list-typed Value proto_vals = python_values_to_proto_values( [inner_list], ValueType.UNKNOWN ) inner_values.append(proto_vals[0]) repeated = RepeatedValue(val=inner_values) proto = ProtoValue() getattr(proto, val_attr).CopyFrom(repeated) result.append(proto) return result def _python_dict_to_map_proto(python_dict: Dict[str, Any]) -> Map: """Convert a Python dictionary to a Map proto message.""" map_proto = Map() for key, value in python_dict.items(): # Handle None values explicitly if value is None: map_proto.val[key].CopyFrom( ProtoValue() ) # Empty ProtoValue represents None continue if isinstance(value, dict): # Nested map nested_map_proto = _python_dict_to_map_proto(value) map_proto.val[key].CopyFrom(ProtoValue(map_val=nested_map_proto)) elif isinstance(value, list) and value and isinstance(value[0], dict): # List of maps (MapList) map_list_proto = _python_list_to_map_list_proto(value) map_proto.val[key].CopyFrom(ProtoValue(map_list_val=map_list_proto)) else: # Handle scalar values and regular lists # Let python_values_to_proto_values infer the type proto_values = python_values_to_proto_values([value], ValueType.UNKNOWN) map_proto.val[key].CopyFrom(proto_values[0]) return map_proto def _python_list_to_map_list_proto(python_list: List[Dict[str, Any]]) -> MapList: """Convert a Python list of dictionaries to a MapList proto message.""" map_list_proto = MapList() for item in python_list: if isinstance(item, dict): map_proto = _python_dict_to_map_proto(item) map_list_proto.val.append(map_proto) else: raise ValueError(f"MapList can only contain dictionaries, got {type(item)}") return map_list_proto def python_values_to_proto_values( values: List[Any], feature_type: ValueType = ValueType.UNKNOWN ) -> List[ProtoValue]: value_type = feature_type sample = next(filter(_non_empty_value, values), None) # first not empty value if sample is not None and feature_type == ValueType.UNKNOWN: if isinstance(sample, (list, np.ndarray)): value_type = ( feature_type if len(sample) == 0 else python_type_to_feast_value_type("", sample) ) else: value_type = python_type_to_feast_value_type("", sample) if value_type == ValueType.UNKNOWN: raise TypeError("Couldn't infer value type from empty value") proto_values = _python_value_to_proto_value(value_type, values) if len(proto_values) != len(values): raise ValueError( f"Number of proto values {len(proto_values)} does not match number of values {len(values)}" ) return proto_values PROTO_VALUE_TO_VALUE_TYPE_MAP: Dict[str, ValueType] = { "int32_val": ValueType.INT32, "int64_val": ValueType.INT64, "double_val": ValueType.DOUBLE, "float_val": ValueType.FLOAT, "string_val": ValueType.STRING, "bytes_val": ValueType.BYTES, "bool_val": ValueType.BOOL, "unix_timestamp_val": ValueType.UNIX_TIMESTAMP, "int32_list_val": ValueType.INT32_LIST, "int64_list_val": ValueType.INT64_LIST, "double_list_val": ValueType.DOUBLE_LIST, "float_list_val": ValueType.FLOAT_LIST, "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, "unix_timestamp_list_val": ValueType.UNIX_TIMESTAMP_LIST, "map_val": ValueType.MAP, "map_list_val": ValueType.MAP_LIST, "json_val": ValueType.JSON, "json_list_val": ValueType.JSON_LIST, "struct_val": ValueType.STRUCT, "struct_list_val": ValueType.STRUCT_LIST, "list_val": ValueType.VALUE_LIST, "set_val": ValueType.VALUE_SET, "int32_set_val": ValueType.INT32_SET, "int64_set_val": ValueType.INT64_SET, "double_set_val": ValueType.DOUBLE_SET, "float_set_val": ValueType.FLOAT_SET, "string_set_val": ValueType.STRING_SET, "bytes_set_val": ValueType.BYTES_SET, "bool_set_val": ValueType.BOOL_SET, "unix_timestamp_set_val": ValueType.UNIX_TIMESTAMP_SET, "uuid_set_val": ValueType.UUID_SET, "time_uuid_set_val": ValueType.TIME_UUID_SET, "uuid_val": ValueType.UUID, "time_uuid_val": ValueType.TIME_UUID, "uuid_list_val": ValueType.UUID_LIST, "time_uuid_list_val": ValueType.TIME_UUID_LIST, "decimal_val": ValueType.DECIMAL, "decimal_list_val": ValueType.DECIMAL_LIST, "decimal_set_val": ValueType.DECIMAL_SET, } VALUE_TYPE_TO_PROTO_VALUE_MAP: Dict[ValueType, str] = { v: k for k, v in PROTO_VALUE_TO_VALUE_TYPE_MAP.items() } def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. Args: proto_str: str Returns: A variant of ValueType. """ proto_str = proto_value.WhichOneof("val") if proto_str is None: return ValueType.UNKNOWN return PROTO_VALUE_TO_VALUE_TYPE_MAP[proto_str] def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: is_list = False if pa_type_as_str.startswith("list": ValueType.DOUBLE_LIST, "list": ValueType.INT64_LIST, "list": ValueType.INT32_LIST, "list": ValueType.STRING_LIST, "list": ValueType.BOOL_LIST, "list": ValueType.BYTES_LIST, "list": ValueType.FLOAT_LIST, } value_type = type_map[pa_type_as_str] if is_list: value_type = ValueType[value_type.name + "_LIST"] return value_type def bq_to_feast_value_type(bq_type_as_str: str) -> ValueType: is_list = False if bq_type_as_str.startswith("ARRAY<"): is_list = True bq_type_as_str = bq_type_as_str[6:-1] type_map: Dict[str, ValueType] = { "DATETIME": ValueType.UNIX_TIMESTAMP, "TIMESTAMP": ValueType.UNIX_TIMESTAMP, "INTEGER": ValueType.INT64, "NUMERIC": ValueType.INT64, "INT64": ValueType.INT64, "STRING": ValueType.STRING, "FLOAT": ValueType.DOUBLE, "FLOAT64": ValueType.DOUBLE, "BYTES": ValueType.BYTES, "BOOL": ValueType.BOOL, "BOOLEAN": ValueType.BOOL, # legacy sql data type "NULL": ValueType.NULL, "JSON": ValueType.JSON, "STRUCT": ValueType.STRUCT, "RECORD": ValueType.STRUCT, } value_type = type_map.get(bq_type_as_str, ValueType.STRING) if is_list: value_type = ValueType[value_type.name + "_LIST"] return value_type def mssql_to_feast_value_type(mssql_type_as_str: str) -> ValueType: type_map = { "bigint": ValueType.FLOAT, "binary": ValueType.BYTES, "bit": ValueType.BOOL, "char": ValueType.STRING, "date": ValueType.UNIX_TIMESTAMP, "datetime": ValueType.UNIX_TIMESTAMP, "datetimeoffset": ValueType.UNIX_TIMESTAMP, "float": ValueType.FLOAT, "int": ValueType.INT32, "nchar": ValueType.STRING, "nvarchar": ValueType.STRING, "nvarchar(max)": ValueType.STRING, "json": ValueType.JSON, "real": ValueType.FLOAT, "smallint": ValueType.INT32, "tinyint": ValueType.INT32, "varbinary": ValueType.BYTES, "varchar": ValueType.STRING, "None": ValueType.NULL, # skip date, geometry, hllsketch, time, timetz } if mssql_type_as_str.lower() not in type_map: raise ValueError(f"Mssql type not supported by feast {mssql_type_as_str}") return type_map[mssql_type_as_str.lower()] def oracle_to_feast_value_type(oracle_type_as_str: str) -> ValueType: """Convert an Oracle/ibis type string to a Feast ValueType. Handles type strings returned by ibis schema introspection for the Oracle backend (e.g. "int64", "float64", "string", "timestamp", "decimal") as well as Oracle native type names. """ type_str = oracle_type_as_str.lower().strip() # Handle parameterized types like "decimal(10, 2)" if "(" in type_str: type_str = type_str.split("(")[0].strip() type_map: Dict[str, ValueType] = { # Ibis types returned by Oracle backend "int8": ValueType.INT32, "int16": ValueType.INT32, "int32": ValueType.INT32, "int64": ValueType.INT64, "float16": ValueType.FLOAT, "float32": ValueType.FLOAT, "float64": ValueType.DOUBLE, "decimal": ValueType.DOUBLE, "string": ValueType.STRING, "binary": ValueType.BYTES, "boolean": ValueType.BOOL, "timestamp": ValueType.UNIX_TIMESTAMP, "date": ValueType.UNIX_TIMESTAMP, "time": ValueType.UNIX_TIMESTAMP, "null": ValueType.NULL, # Oracle native type names "number": ValueType.DOUBLE, "varchar2": ValueType.STRING, "nvarchar2": ValueType.STRING, "char": ValueType.STRING, "nchar": ValueType.STRING, "clob": ValueType.STRING, "nclob": ValueType.STRING, "blob": ValueType.BYTES, "raw": ValueType.BYTES, "long raw": ValueType.BYTES, "long": ValueType.STRING, "integer": ValueType.INT32, "smallint": ValueType.INT32, "float": ValueType.DOUBLE, "double precision": ValueType.DOUBLE, "real": ValueType.FLOAT, "binary_float": ValueType.FLOAT, "binary_double": ValueType.DOUBLE, "interval": ValueType.UNIX_TIMESTAMP, } return type_map.get(type_str, ValueType.STRING) def pa_to_mssql_type(pa_type: "pyarrow.DataType") -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # MS Sql types: https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16 pa_type_as_str = str(pa_type).lower() if pa_type_as_str.startswith("timestamp"): if "tz=" in pa_type_as_str: return "datetime2" else: return "datetime" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("decimal"): return pa_type_as_str if pa_type_as_str.startswith("map<"): return "nvarchar(max)" if pa_type_as_str == "large_string": return "nvarchar(max)" if pa_type_as_str.startswith("struct<") or pa_type_as_str.startswith("struct{"): return "nvarchar(max)" # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "None", "bool": "bit", "int8": "tinyint", "int16": "smallint", "int32": "int", "int64": "bigint", "uint8": "tinyint", "uint16": "smallint", "uint32": "int", "uint64": "bigint", "float": "float", "double": "real", "binary": "binary", "string": "varchar", } if pa_type_as_str.lower() not in type_map: raise ValueError(f"MS SQL Server type not supported by feast {pa_type_as_str}") return type_map[pa_type_as_str] def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType: # Type names from https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html type_map = { "int2": ValueType.INT32, "int4": ValueType.INT32, "int8": ValueType.INT64, "numeric": ValueType.DOUBLE, "float4": ValueType.FLOAT, "float8": ValueType.DOUBLE, "bool": ValueType.BOOL, "character": ValueType.STRING, "varchar": ValueType.STRING, "timestamp": ValueType.UNIX_TIMESTAMP, "timestamptz": ValueType.UNIX_TIMESTAMP, "super": ValueType.MAP, "json": ValueType.JSON, # skip date, geometry, hllsketch, time, timetz } return type_map[redshift_type_as_str.lower()] def snowflake_type_to_feast_value_type(snowflake_type: str) -> ValueType: type_map = { "BINARY": ValueType.BYTES, "VARCHAR": ValueType.STRING, "NUMBER32": ValueType.INT32, "NUMBER64": ValueType.INT64, "NUMBERwSCALE": ValueType.DOUBLE, "DOUBLE": ValueType.DOUBLE, "BOOLEAN": ValueType.BOOL, "DATE": ValueType.UNIX_TIMESTAMP, "TIMESTAMP": ValueType.UNIX_TIMESTAMP, "TIMESTAMP_TZ": ValueType.UNIX_TIMESTAMP, "TIMESTAMP_LTZ": ValueType.UNIX_TIMESTAMP, "TIMESTAMP_NTZ": ValueType.UNIX_TIMESTAMP, "VARIANT": ValueType.MAP, "OBJECT": ValueType.MAP, "ARRAY": ValueType.STRING_LIST, "JSON": ValueType.JSON, } return type_map[snowflake_type] def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) -> str: name_map = { "BYTES": f"feast_{project_name}_snowflake_binary_to_bytes_proto", "STRING": f"feast_{project_name}_snowflake_varchar_to_string_proto", "INT32": f"feast_{project_name}_snowflake_number_to_int32_proto", "INT64": f"feast_{project_name}_snowflake_number_to_int64_proto", "DOUBLE": f"feast_{project_name}_snowflake_float_to_double_proto", "FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto", "BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto", "UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto", "BYTES_LIST": f"feast_{project_name}_snowflake_array_bytes_to_list_bytes_proto", "STRING_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "INT32_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int32_proto", "INT64_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int64_proto", "DOUBLE_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto", "FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto", "BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto", "UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto", "UUID": f"feast_{project_name}_snowflake_varchar_to_string_proto", "TIME_UUID": f"feast_{project_name}_snowflake_varchar_to_string_proto", "UUID_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "TIME_UUID_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "UUID_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "TIME_UUID_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "DECIMAL": f"feast_{project_name}_snowflake_varchar_to_string_proto", "DECIMAL_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", "DECIMAL_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", } return name_map[value_name].upper() def pa_to_redshift_value_type(pa_type: "pyarrow.DataType") -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html pa_type_as_str = str(pa_type).lower() if pa_type_as_str.startswith("timestamp"): if "tz=" in pa_type_as_str: return "timestamptz" else: return "timestamp" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("decimal"): # PyArrow decimal types (e.g. "decimal(38,37)") luckily directly map to the Redshift type. return pa_type_as_str if pa_type_as_str.startswith("list"): return "super" if pa_type_as_str.startswith("map<"): return "super" if pa_type_as_str == "large_string": return "super" if pa_type_as_str.startswith("struct<"): return "super" # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "int4", "bool": "bool", "int8": "int4", "int16": "int4", "int32": "int4", "int64": "int8", "uint8": "int4", "uint16": "int4", "uint32": "int8", "uint64": "int8", "float": "float4", "double": "float8", "binary": "varchar", "string": "varchar", } return type_map[pa_type_as_str] def _non_empty_value(value: Any) -> bool: """ Check that there's enough data we can use for type inference. If primitive type - just checking that it's not None If iterable - checking that there's some elements (len > 0) String is special case: "" - empty string is considered non empty """ return value is not None and ( not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) ) def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: # Current non-convertible types: interval, struct, structfield, binary type_map: Dict[str, ValueType] = { "null": ValueType.UNKNOWN, "byte": ValueType.BYTES, "string": ValueType.STRING, "int": ValueType.INT32, "short": ValueType.INT32, "bigint": ValueType.INT64, "long": ValueType.INT64, "double": ValueType.DOUBLE, "decimal": ValueType.DOUBLE, "float": ValueType.FLOAT, "boolean": ValueType.BOOL, "timestamp": ValueType.UNIX_TIMESTAMP, "date": ValueType.UNIX_TIMESTAMP, "array": ValueType.BYTES_LIST, "array": ValueType.STRING_LIST, "array": ValueType.INT32_LIST, "array": ValueType.INT64_LIST, "array": ValueType.DOUBLE_LIST, "array": ValueType.DOUBLE_LIST, "array": ValueType.FLOAT_LIST, "array": ValueType.BOOL_LIST, "array": ValueType.UNIX_TIMESTAMP_LIST, "array": ValueType.UNIX_TIMESTAMP_LIST, } if not isinstance(spark_type_as_str, str): return ValueType.NULL spark_type_lower = spark_type_as_str.lower() if spark_type_lower.startswith("map<"): return ValueType.MAP if spark_type_lower.startswith("array Iterator[np.dtype]: # TODO recheck all typing (also tz for timestamp) # https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics type_map = defaultdict( lambda: np.dtype("O"), { "boolean": np.dtype("bool"), "double": np.dtype("float64"), "float": np.dtype("float64"), "int": np.dtype("int64"), "bigint": np.dtype("int64"), "smallint": np.dtype("int64"), "timestamp": np.dtype("datetime64[ns]"), }, ) return (type_map[t] for _, t in dtypes) def arrow_to_pg_type(t_str: str) -> str: try: if t_str.startswith("timestamp") or t_str.startswith("datetime"): return "timestamptz" if "tz=" in t_str else "timestamp" if t_str.startswith("map<"): return "jsonb" if t_str == "large_string": return "jsonb" if t_str.startswith("struct<") or t_str.startswith("struct{"): return "jsonb" return { "null": "null", "bool": "boolean", "int8": "tinyint", "int16": "smallint", "int32": "int", "int64": "bigint", "list": "int[]", "list": "bigint[]", "list": "boolean[]", "list": "double precision[]", "list": "timestamp[]", "uint8": "smallint", "uint16": "int", "uint32": "bigint", "uint64": "bigint", "float": "float", "double": "double precision", "binary": "binary", "string": "text", }[t_str] except KeyError: raise ValueError(f"Unsupported type: {t_str}") def pg_type_to_feast_value_type(type_str: str) -> ValueType: type_map: Dict[str, ValueType] = { "boolean": ValueType.BOOL, "bytea": ValueType.BYTES, "char": ValueType.STRING, "bigint": ValueType.INT64, "smallint": ValueType.INT32, "integer": ValueType.INT32, "real": ValueType.DOUBLE, "double precision": ValueType.DOUBLE, "boolean[]": ValueType.BOOL_LIST, "bytea[]": ValueType.BYTES_LIST, "char[]": ValueType.STRING_LIST, "smallint[]": ValueType.INT32_LIST, "integer[]": ValueType.INT32_LIST, "text": ValueType.STRING, "text[]": ValueType.STRING_LIST, "character[]": ValueType.STRING_LIST, "bigint[]": ValueType.INT64_LIST, "real[]": ValueType.DOUBLE_LIST, "double precision[]": ValueType.DOUBLE_LIST, "character": ValueType.STRING, "character varying": ValueType.STRING, "date": ValueType.UNIX_TIMESTAMP, "time without time zone": ValueType.UNIX_TIMESTAMP, "timestamp without time zone": ValueType.UNIX_TIMESTAMP, "timestamp without time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "date[]": ValueType.UNIX_TIMESTAMP_LIST, "time without time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "timestamp with time zone": ValueType.UNIX_TIMESTAMP, "timestamp with time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "numeric[]": ValueType.DOUBLE_LIST, "numeric": ValueType.DOUBLE, "uuid": ValueType.UUID, "uuid[]": ValueType.UUID_LIST, "json": ValueType.MAP, "jsonb": ValueType.MAP, "json[]": ValueType.MAP_LIST, "jsonb[]": ValueType.MAP_LIST, } value = ( type_map[type_str.lower()] if type_str.lower() in type_map else ValueType.UNKNOWN ) if value == ValueType.UNKNOWN: print("unknown type:", type_str) return value def feast_value_type_to_pa( feast_type: ValueType, timestamp_unit: str = "us" ) -> "pyarrow.DataType": import pyarrow type_map = { ValueType.INT32: pyarrow.int32(), ValueType.INT64: pyarrow.int64(), ValueType.DOUBLE: pyarrow.float64(), ValueType.FLOAT: pyarrow.float32(), ValueType.STRING: pyarrow.string(), ValueType.BYTES: pyarrow.binary(), ValueType.BOOL: pyarrow.bool_(), ValueType.UNIX_TIMESTAMP: pyarrow.timestamp(timestamp_unit), ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()), ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()), ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()), ValueType.FLOAT_LIST: pyarrow.list_(pyarrow.float32()), ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()), ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()), ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()), ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp(timestamp_unit)), ValueType.MAP: pyarrow.map_(pyarrow.string(), pyarrow.string()), ValueType.MAP_LIST: pyarrow.list_( pyarrow.map_(pyarrow.string(), pyarrow.string()) ), ValueType.JSON: pyarrow.large_string(), ValueType.JSON_LIST: pyarrow.list_(pyarrow.large_string()), ValueType.STRUCT: pyarrow.struct([]), ValueType.STRUCT_LIST: pyarrow.list_(pyarrow.struct([])), # Placeholder: inner type is unknown from ValueType alone. # Callers needing accurate inner types should use from_feast_to_pyarrow_type() with a FeastType. ValueType.VALUE_LIST: pyarrow.list_(pyarrow.list_(pyarrow.string())), ValueType.VALUE_SET: pyarrow.list_(pyarrow.list_(pyarrow.string())), ValueType.NULL: pyarrow.null(), ValueType.UUID: pyarrow.string(), ValueType.TIME_UUID: pyarrow.string(), ValueType.UUID_LIST: pyarrow.list_(pyarrow.string()), ValueType.TIME_UUID_LIST: pyarrow.list_(pyarrow.string()), ValueType.UUID_SET: pyarrow.list_(pyarrow.string()), ValueType.TIME_UUID_SET: pyarrow.list_(pyarrow.string()), ValueType.DECIMAL: pyarrow.string(), ValueType.DECIMAL_LIST: pyarrow.list_(pyarrow.string()), ValueType.DECIMAL_SET: pyarrow.list_(pyarrow.string()), } return type_map[feast_type] def pg_type_code_to_pg_type(code: int) -> str: """Map the postgres type code a Feast type string Rather than raise an exception on an unknown type, we return the string representation of the type code. This way rather than raising an exception on unknown types, Feast will just skip the problem columns. Note that json and jsonb are not supported but this shows up in the log as a warning. Since postgres allows custom types we return an unknown for those cases. See: https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html """ PG_TYPE_MAP = { 16: "boolean", 17: "bytea", 20: "bigint", 21: "smallint", 23: "integer", 25: "text", 114: "json", 199: "json[]", 700: "real", 701: "double precision", 1000: "boolean[]", 1001: "bytea[]", 1005: "smallint[]", 1007: "integer[]", 1009: "text[]", 1014: "character[]", 1016: "bigint[]", 1021: "real[]", 1022: "double precision[]", 1042: "character", 1043: "character varying", 1082: "date", 1083: "time without time zone", 1114: "timestamp without time zone", 1115: "timestamp without time zone[]", 1182: "date[]", 1183: "time without time zone[]", 1184: "timestamp with time zone", 1185: "timestamp with time zone[]", 1231: "numeric[]", 1700: "numeric", 2950: "uuid", 2951: "uuid[]", 3802: "jsonb", 3807: "jsonb[]", } return PG_TYPE_MAP.get(code, "unknown") def pg_type_code_to_arrow(code: int) -> str: return feast_value_type_to_pa( pg_type_to_feast_value_type(pg_type_code_to_pg_type(code)) ) def athena_to_feast_value_type(athena_type_as_str: str) -> ValueType: # Type names from https://docs.aws.amazon.com/athena/latest/ug/data-types.html type_map = { "null": ValueType.UNKNOWN, "boolean": ValueType.BOOL, "tinyint": ValueType.INT32, "smallint": ValueType.INT32, "int": ValueType.INT32, "bigint": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "binary": ValueType.BYTES, "char": ValueType.STRING, "varchar": ValueType.STRING, "string": ValueType.STRING, "timestamp": ValueType.UNIX_TIMESTAMP, "json": ValueType.JSON, "struct": ValueType.STRUCT, "map": ValueType.MAP, } return type_map[athena_type_as_str.lower()] def pa_to_athena_value_type(pa_type: "pyarrow.DataType") -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # Type names from https://docs.aws.amazon.com/athena/latest/ug/data-types.html pa_type_as_str = str(pa_type).lower() if pa_type_as_str.startswith("timestamp"): return "timestamp" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("python_values_to_proto_values"): return pa_type_as_str if pa_type_as_str.startswith("list"): return "array" if pa_type_as_str.startswith("map<"): return "string" if pa_type_as_str == "large_string": return "string" if pa_type_as_str.startswith("struct<"): return "string" # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "null", "bool": "boolean", "int8": "tinyint", "int16": "smallint", "int32": "int", "int64": "bigint", "uint8": "tinyint", "uint16": "tinyint", "uint32": "tinyint", "uint64": "tinyint", "float": "float", "double": "double", "binary": "binary", "string": "string", } return type_map[pa_type_as_str] def cb_columnar_type_to_feast_value_type(type_str: str) -> ValueType: """ Convert a Couchbase Columnar type string to a Feast ValueType """ type_map: Dict[str, ValueType] = { # primitive types "boolean": ValueType.BOOL, "string": ValueType.STRING, "bigint": ValueType.INT64, "double": ValueType.DOUBLE, # special types "null": ValueType.NULL, "missing": ValueType.UNKNOWN, # composite types # todo: support for arrays of primitives "object": ValueType.UNKNOWN, "array": ValueType.UNKNOWN, "multiset": ValueType.UNKNOWN, "uuid": ValueType.UUID, } value = ( type_map[type_str.lower()] if type_str.lower() in type_map else ValueType.UNKNOWN ) if value == ValueType.UNKNOWN: print("unknown type:", type_str) return value def convert_scalar_column( series: pd.Series, value_type: ValueType, target_pandas_type: str ) -> pd.Series: """Convert a scalar feature column to the appropriate pandas type.""" if value_type == ValueType.INT32: return pd.to_numeric(series, errors="coerce").astype("Int32") elif value_type == ValueType.INT64: return pd.to_numeric(series, errors="coerce").astype("Int64") elif value_type in [ValueType.FLOAT, ValueType.DOUBLE]: return pd.to_numeric(series, errors="coerce").astype("float64") elif value_type == ValueType.BOOL: return series.astype("boolean") elif value_type == ValueType.STRING: return series.astype("string") elif value_type in [ValueType.UUID, ValueType.TIME_UUID]: return series.astype("string") elif value_type == ValueType.UNIX_TIMESTAMP: return pd.to_datetime(series, unit="s", errors="coerce") elif value_type in (ValueType.JSON, ValueType.STRUCT, ValueType.MAP): return series else: return series.astype(target_pandas_type) def convert_array_column(series: pd.Series, value_type: ValueType) -> pd.Series: """Convert an array feature column to the appropriate type with proper empty array handling.""" base_type_map = { ValueType.INT32_LIST: np.int32, ValueType.INT64_LIST: np.int64, ValueType.FLOAT_LIST: np.float32, ValueType.DOUBLE_LIST: np.float64, ValueType.BOOL_LIST: np.bool_, ValueType.STRING_LIST: object, ValueType.BYTES_LIST: object, ValueType.UNIX_TIMESTAMP_LIST: "datetime64[s]", ValueType.UUID_LIST: object, ValueType.TIME_UUID_LIST: object, ValueType.BYTES_SET: object, ValueType.STRING_SET: object, ValueType.INT32_SET: np.int32, ValueType.INT64_SET: np.int64, ValueType.FLOAT_SET: np.float32, ValueType.DOUBLE_SET: np.float64, ValueType.BOOL_SET: np.bool_, ValueType.UNIX_TIMESTAMP_SET: "datetime64[s]", ValueType.UUID_SET: object, ValueType.TIME_UUID_SET: object, } target_dtype = base_type_map.get(value_type, object) def convert_array_item(item) -> Union[np.ndarray, Any]: if item is None or (isinstance(item, list) and len(item) == 0): if target_dtype == object: return np.empty(0, dtype=object) else: return np.empty(0, dtype=target_dtype) # type: ignore else: return item return series.apply(convert_array_item)