import struct import warnings from typing import List, Tuple, Union from google.protobuf.internal.containers import RepeatedScalarFieldContainer from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.protos.feast.types.Value_pb2 import ValueType def _serialize_val( value_type, v: ValueProto, entity_key_serialization_version=3 ) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING elif value_type == "bytes_val": return v.bytes_val, ValueType.BYTES elif value_type == "int32_val": return struct.pack(" ValueProto: if value_type == ValueType.INT64: value = struct.unpack(" bytes: """ Serialize keys to a bytestring, so it can be used to prefix-scan through items stored in the online store using serialize_entity_key. This encoding is a partial implementation of serialize_entity_key, only operating on the keys of entities, and not the values. """ # Fast path optimization for single entity if len(entity_keys) == 1: sorted_keys = [entity_keys[0]] else: sorted_keys = sorted(entity_keys) output: List[bytes] = [] if entity_key_serialization_version > 2: output.append(struct.pack(" 2: output.append(struct.pack(" bytes: """ Deserialize version 2 entity key and reserialize it to version 3. Args: serialized_key_v2: serialized entity key of version 2 Returns: bytes of the serialized entity key in version 3 """ offset = 0 keys = [] values = [] num_keys = 1 for _ in range(num_keys): value_type = struct.unpack_from(" bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. We need this encoding to be stable; therefore we cannot just use protobuf serialization here since it does not guarantee that two proto messages containing the same data will serialize to the same byte string[1]. [1] https://developers.google.com/protocol-buffers/docs/encoding Args: entity_key_serialization_version: version of the entity key serialization Versions: version 3: entity_key size is added to the serialization for deserialization purposes entity_key: EntityKeyProto Returns: bytes of the serialized entity key """ if entity_key_serialization_version < 3: # Not raising the error, keeping it in warning state for reserialization purpose # We should remove this after few releases warnings.warn( "Serialization of entity key with version < 3 is removed. Please use version 3 by setting entity_key_serialization_version=3." "To reserializa your online store featrues refer - https://github.com/feast-dev/feast/blob/master/docs/how-to-guides/entity-reserialization-of-from-v2-to-v3.md" ) sorted_keys: List[str] sorted_values: List[ValueProto] if not entity_key.join_keys: sorted_keys = [] sorted_values = [] elif len(entity_key.join_keys) == 1: # Fast path: single entity, no sorting needed sorted_keys = [entity_key.join_keys[0]] sorted_values = [entity_key.entity_values[0]] else: # Multi-entity: use sorting pairs = sorted(zip(entity_key.join_keys, entity_key.entity_values)) sorted_keys = [k for k, _ in pairs] sorted_values = [v for _, v in pairs] output: List[bytes] = [] if entity_key_serialization_version > 2: output.append(struct.pack(" 2: output.append(struct.pack(" EntityKeyProto: """ Deserialize entity key from a bytestring. This function can only be used with entity_key_serialization_version > 2. Args: entity_key_serialization_version: version of the entity key serialization serialized_entity_key: serialized entity key bytes Returns: EntityKeyProto """ if entity_key_serialization_version < 3: # Not raising the error, keeping it in warning state for reserialization purpose # We should remove this after few releases warnings.warn( "Deserialization of entity key with version < 3 is removed. Please use version 3 by setting entity_key_serialization_version=3." "To reserializa your online store featrues refer - https://github.com/feast-dev/feast/blob/master/docs/how-to-guides/entity-reserialization-of-from-v2-to-v3.md" ) # Optimized deserialization using memoryview for zero-copy slicing buffer = memoryview(serialized_entity_key) pos = 0 keys = [] values = [] # Read number of keys if len(buffer) < pos + 4: raise ValueError( "Invalid serialized entity key: insufficient data for key count" ) num_keys = struct.unpack(" bytes: """serializes a list of floats into a compact "raw bytes" format""" return struct.pack(f"{vector_length}f", *vector) def deserialize_f32(byte_vector: bytes, vector_length: int) -> List[float]: """deserializes a list of floats from a compact "raw bytes" format""" num_floats = vector_length // 4 # 4 bytes per float return list(struct.unpack(f"{num_floats}f", byte_vector))