Skip to content

Commit 4d8cdb8

Browse files
committed
version the serialization mechanism and plumb through all the places
Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 097da5c commit 4d8cdb8

File tree

13 files changed

+90
-11
lines changed

13 files changed

+90
-11
lines changed

protos/feast/core/FeatureView.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ message FeatureViewSpec {
7575

7676
// Whether these features should be served online or not
7777
bool online = 8;
78+
79+
// Needed for backwards compatible behaviour when fixing
80+
int32 entity_key_serialization_version = 13;
7881
}
7982

8083
message FeatureViewMeta {

protos/feast/core/OnDemandFeatureView.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ message OnDemandFeatureViewSpec {
5858

5959
// Owner of the on demand feature view.
6060
string owner = 8;
61+
62+
// Needed for backwards compatible behaviour when fixing
63+
int32 entity_key_serialization_version = 9;
6164
}
6265

6366
message OnDemandFeatureViewMeta {

protos/feast/core/RequestFeatureView.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,7 @@ message RequestFeatureViewSpec {
4848

4949
// Owner of the request feature view.
5050
string owner = 6;
51+
52+
// Needed for backwards compatible behaviour when fixing
53+
int32 entity_key_serialization_version = 7;
5154
}

sdk/python/feast/base_feature_view.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class BaseFeatureView(ABC):
4949
created_timestamp: Optional[datetime]
5050
last_updated_timestamp: Optional[datetime]
5151

52+
entity_key_serialization_version: int
53+
5254
@abstractmethod
5355
def __init__(
5456
self,
@@ -58,6 +60,7 @@ def __init__(
5860
description: str = "",
5961
tags: Optional[Dict[str, str]] = None,
6062
owner: str = "",
63+
entity_key_serialization_version: int = 1,
6164
):
6265
"""
6366
Creates a BaseFeatureView object.
@@ -82,6 +85,7 @@ def __init__(
8285
self.projection = FeatureViewProjection.from_definition(self)
8386
self.created_timestamp = None
8487
self.last_updated_timestamp = None
88+
self.entity_key_serialization_version = entity_key_serialization_version
8589

8690
@property
8791
@abstractmethod

sdk/python/feast/batch_feature_view.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(
3030
owner: str = "",
3131
schema: Optional[List[Field]] = None,
3232
source: Optional[DataSource] = None,
33+
entity_key_serialization_version: int = 1,
3334
):
3435

3536
if source is None:
@@ -55,4 +56,5 @@ def __init__(
5556
owner=owner,
5657
schema=schema,
5758
source=source,
59+
entity_key_serialization_version=entity_key_serialization_version,
5860
)

sdk/python/feast/feature_view.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(
110110
owner: str = "",
111111
schema: Optional[List[Field]] = None,
112112
source: Optional[DataSource] = None,
113+
entity_key_serialization_version: int = 1,
113114
):
114115
"""
115116
Creates a FeatureView object.
@@ -260,6 +261,7 @@ def __init__(
260261
description=description,
261262
tags=tags,
262263
owner=owner,
264+
entity_key_serialization_version=entity_key_serialization_version,
263265
)
264266
self.online = online
265267
self.materialization_intervals = []
@@ -443,6 +445,7 @@ def to_proto(self) -> FeatureViewProto:
443445
online=self.online,
444446
batch_source=batch_source_proto,
445447
stream_source=stream_source_proto,
448+
entity_key_serialization_version=self.entity_key_serialization_version,
446449
)
447450

448451
return FeatureViewProto(spec=spec, meta=meta)
@@ -514,6 +517,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
514517
)
515518
)
516519

520+
feature_view.entity_key_serialization_version = (
521+
feature_view_proto.spec.entity_key_serialization_version
522+
)
523+
517524
return feature_view
518525

519526
@property

sdk/python/feast/infra/key_encoding_utils.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from feast.protos.feast.types.Value_pb2 import ValueType
77

88

9-
def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
9+
def _serialize_val(
10+
value_type, v: ValueProto, entity_key_serialization_version=1
11+
) -> Tuple[bytes, int]:
1012
if value_type == "string_val":
1113
return v.string_val.encode("utf8"), ValueType.STRING
1214
elif value_type == "bytes_val":
@@ -35,7 +37,9 @@ def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes:
3537
return b"".join(output)
3638

3739

38-
def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
40+
def serialize_entity_key(
41+
entity_key: EntityKeyProto, entity_key_serialization_version=1
42+
) -> bytes:
3943
"""
4044
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.
4145
@@ -54,7 +58,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
5458
output.append(struct.pack("<I", ValueType.STRING))
5559
output.append(k.encode("utf8"))
5660
for v in sorted_values:
57-
val_bytes, value_type = _serialize_val(v.WhichOneof("val"), v)
61+
val_bytes, value_type = _serialize_val(
62+
v.WhichOneof("val"),
63+
v,
64+
entity_key_serialization_version=entity_key_serialization_version,
65+
)
5866

5967
output.append(struct.pack("<I", value_type))
6068

sdk/python/feast/infra/online_stores/contrib/postgres.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ def online_write_batch(
4949
with self._get_conn(config) as conn, conn.cursor() as cur:
5050
insert_values = []
5151
for entity_key, values, timestamp, created_ts in data:
52-
entity_key_bin = serialize_entity_key(entity_key)
52+
entity_key_bin = serialize_entity_key(
53+
entity_key,
54+
entity_key_serialization_version=table.entity_key_serialization_version,
55+
)
5356
timestamp = _to_naive_utc(timestamp)
5457
if created_ts is not None:
5558
created_ts = _to_naive_utc(created_ts)
@@ -104,7 +107,12 @@ def online_read(
104107
# to PostgreSQL
105108
keys = []
106109
for entity_key in entity_keys:
107-
keys.append(serialize_entity_key(entity_key))
110+
keys.append(
111+
serialize_entity_key(
112+
entity_key,
113+
entity_key_serialization_version=table.entity_key_serialization_version,
114+
)
115+
)
108116

109117
cur.execute(
110118
sql.SQL(

sdk/python/feast/infra/online_stores/helpers.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,16 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
2121
return online_store_class()
2222

2323

24-
def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes:
25-
key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")]
24+
def _redis_key(
25+
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
26+
) -> bytes:
27+
key: List[bytes] = [
28+
serialize_entity_key(
29+
entity_key,
30+
entity_key_serialization_version=entity_key_serialization_version,
31+
),
32+
project.encode("utf-8"),
33+
]
2634
return b"".join(key)
2735

2836

@@ -40,10 +48,17 @@ def _mmh3(key: str):
4048
return bytes.fromhex(struct.pack("<Q", key_hash).hex()[:8])
4149

4250

43-
def compute_entity_id(entity_key: EntityKeyProto) -> str:
51+
def compute_entity_id(
52+
entity_key: EntityKeyProto, entity_key_serialization_version=1
53+
) -> str:
4454
"""
4555
Compute Entity id given Feast Entity Key for online stores.
4656
Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys.
4757
It has nothing to do with the Entity concept we have in Feast.
4858
"""
49-
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
59+
return mmh3.hash_bytes(
60+
serialize_entity_key(
61+
entity_key,
62+
entity_key_serialization_version=entity_key_serialization_version,
63+
)
64+
).hex()

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ def online_write_batch(
9595

9696
with conn:
9797
for entity_key, values, timestamp, created_ts in data:
98-
entity_key_bin = serialize_entity_key(entity_key)
98+
entity_key_bin = serialize_entity_key(
99+
entity_key,
100+
entity_key_serialization_version=table.entity_key_serialization_version,
101+
)
99102
timestamp = to_naive_utc(timestamp)
100103
if created_ts is not None:
101104
created_ts = to_naive_utc(created_ts)
@@ -161,7 +164,10 @@ def online_read(
161164
k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0])
162165
}
163166
for entity_key in entity_keys:
164-
entity_key_bin = serialize_entity_key(entity_key)
167+
entity_key_bin = serialize_entity_key(
168+
entity_key,
169+
entity_key_serialization_version=table.entity_key_serialization_version,
170+
)
165171
res = {}
166172
res_ts = None
167173
for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []):

0 commit comments

Comments
 (0)