|
| 1 | +"""Fast serialization utilities for Feature Server responses. |
| 2 | +
|
| 3 | +Matches the output format of MessageToDict with proto_json.patch() applied. |
| 4 | +Values are serialized as native Python types (not wrapped dicts). |
| 5 | +""" |
| 6 | + |
| 7 | +from datetime import datetime, timezone |
| 8 | +from typing import Any, Dict, Optional |
| 9 | + |
| 10 | +from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse |
| 11 | +from feast.protos.feast.types.Value_pb2 import Value |
| 12 | + |
| 13 | +# FieldStatus enum mapping (protos/feast/serving/ServingService.proto) |
| 14 | +_STATUS_NAMES: Dict[int, str] = { |
| 15 | + 0: "INVALID", |
| 16 | + 1: "PRESENT", |
| 17 | + 2: "NULL_VALUE", |
| 18 | + 3: "NOT_FOUND", |
| 19 | + 4: "OUTSIDE_MAX_AGE", |
| 20 | +} |
| 21 | + |
| 22 | + |
| 23 | +def response_to_dict_fast(response: GetOnlineFeaturesResponse) -> Dict[str, Any]: |
| 24 | + """Convert GetOnlineFeaturesResponse to dict (matches proto_json.patch() format).""" |
| 25 | + result: Dict[str, Any] = { |
| 26 | + "results": [ |
| 27 | + { |
| 28 | + "values": [_value_to_native(v) for v in feature_vector.values], |
| 29 | + "statuses": [ |
| 30 | + _STATUS_NAMES.get(s, "INVALID") for s in feature_vector.statuses |
| 31 | + ], |
| 32 | + "event_timestamps": [ |
| 33 | + _timestamp_to_str(ts) for ts in feature_vector.event_timestamps |
| 34 | + ] |
| 35 | + if feature_vector.event_timestamps |
| 36 | + else [], |
| 37 | + } |
| 38 | + for feature_vector in response.results |
| 39 | + ] |
| 40 | + } |
| 41 | + |
| 42 | + if response.HasField("metadata"): |
| 43 | + result["metadata"] = _metadata_to_dict(response.metadata) |
| 44 | + |
| 45 | + return result |
| 46 | + |
| 47 | + |
| 48 | +def _value_to_native(v: Value) -> Optional[Any]: |
| 49 | + """Convert a Value proto to native Python type (matches proto_json.patch() format).""" |
| 50 | + which = v.WhichOneof("val") |
| 51 | + if which is None or which == "null_val": |
| 52 | + return None |
| 53 | + elif "_list_" in which: |
| 54 | + return list(getattr(v, which).val) |
| 55 | + else: |
| 56 | + return getattr(v, which) |
| 57 | + |
| 58 | + |
| 59 | +def _timestamp_to_str(ts) -> str: |
| 60 | + """Convert protobuf Timestamp to RFC 3339 format with Z suffix.""" |
| 61 | + if ts.seconds == 0 and ts.nanos == 0: |
| 62 | + return "1970-01-01T00:00:00Z" |
| 63 | + dt = datetime.fromtimestamp(ts.seconds + ts.nanos / 1e9, tz=timezone.utc) |
| 64 | + return dt.strftime("%Y-%m-%dT%H:%M:%S") + ( |
| 65 | + ".%06dZ" % (ts.nanos // 1000) if ts.nanos else "Z" |
| 66 | + ) |
| 67 | + |
| 68 | + |
| 69 | +def _metadata_to_dict(metadata) -> Dict[str, Any]: |
| 70 | + """Convert FeatureResponseMeta to dict (matches proto_json.patch() format).""" |
| 71 | + result: Dict[str, Any] = {} |
| 72 | + if metadata.HasField("feature_names"): |
| 73 | + result["feature_names"] = list(metadata.feature_names.val) |
| 74 | + return result |
0 commit comments