Skip to content
Merged
Next Next commit
fix: version the entity serialization mechanism to fix issue with int…
…64 vals

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 19, 2022
commit af4ccef9de9e43dc781e104f55d724f60d794928
3 changes: 3 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ message FeatureViewSpec {

// Whether these features should be served online or not
bool online = 8;

// Needed for backwards compatible behaviour when fixing
Comment thread
achals marked this conversation as resolved.
Outdated
int32 entity_key_serialization_version = 13;
}

message FeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ message OnDemandFeatureViewSpec {

// Owner of the on demand feature view.
string owner = 8;

// Needed for backwards compatible behaviour when fixing
int32 entity_key_serialization_version = 9;
}

message OnDemandFeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/RequestFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ message RequestFeatureViewSpec {

// Owner of the request feature view.
string owner = 6;

// Needed for backwards compatible behaviour when fixing
int32 entity_key_serialization_version = 7;
}
3 changes: 3 additions & 0 deletions protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ message StreamFeatureViewSpec {

// Timestamp field for aggregation
string timestamp_field = 16;

// Needed for backwards compatible behaviour when fixing
int32 entity_key_serialization_version = 17;
}

13 changes: 13 additions & 0 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class BaseFeatureView(ABC):
created_timestamp: Optional[datetime]
last_updated_timestamp: Optional[datetime]

_entity_key_serialization_version: Optional[int]
Comment thread
achals marked this conversation as resolved.
Outdated

@abstractmethod
def __init__(
self,
Expand All @@ -58,6 +60,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version: Optional[int] = None,
):
"""
Creates a BaseFeatureView object.
Expand All @@ -82,6 +85,16 @@ def __init__(
self.projection = FeatureViewProjection.from_definition(self)
self.created_timestamp = None
self.last_updated_timestamp = None
self._entity_key_serialization_version = entity_key_serialization_version

@property
def entity_key_serialization_version(self) -> int:
if self._entity_key_serialization_version:
return self._entity_key_serialization_version
return 2 # The default entity key serialization version.

def set_entity_key_serialization_version(self, v: int):
self._entity_key_serialization_version = v

@property
@abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
entity_key_serialization_version: Optional[int] = None,
):

if source is None:
Expand All @@ -55,4 +56,5 @@ def __init__(
owner=owner,
schema=schema,
source=source,
entity_key_serialization_version=entity_key_serialization_version,
)
8 changes: 8 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
entity_key_serialization_version: Optional[int] = None,
):
"""
Creates a FeatureView object.
Expand Down Expand Up @@ -260,6 +261,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
self.online = online
self.materialization_intervals = []
Expand Down Expand Up @@ -430,6 +432,7 @@ def to_proto(self) -> FeatureViewProto:
online=self.online,
batch_source=batch_source_proto,
stream_source=stream_source_proto,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return FeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -521,6 +524,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
)
)

if feature_view_proto.spec.entity_key_serialization_version <= 1:
feature_view.set_entity_key_serialization_version(1)
Comment thread
achals marked this conversation as resolved.
Outdated
else:
feature_view.set_entity_key_serialization_version(2)

return feature_view

@property
Expand Down
19 changes: 16 additions & 3 deletions sdk/python/feast/infra/key_encoding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
from feast.protos.feast.types.Value_pb2 import ValueType


def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
def _serialize_val(
value_type, v: ValueProto, entity_key_serialization_version=1
) -> Tuple[bytes, int]:
if value_type == "string_val":
return v.string_val.encode("utf8"), ValueType.STRING
elif value_type == "bytes_val":
return v.bytes_val, ValueType.BYTES
elif value_type == "int32_val":
return struct.pack("<i", v.int32_val), ValueType.INT32
elif value_type == "int64_val":
if (
entity_key_serialization_version >= 0
and entity_key_serialization_version <= 1
):
return struct.pack("<q", v.int64_val), ValueType.INT64
return struct.pack("<l", v.int64_val), ValueType.INT64
else:
raise ValueError(f"Value type not supported for Firestore: {v}")
Expand All @@ -35,7 +42,9 @@ def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes:
return b"".join(output)


def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
def serialize_entity_key(
entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
"""
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.

Expand All @@ -54,7 +63,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
output.append(struct.pack("<I", ValueType.STRING))
output.append(k.encode("utf8"))
for v in sorted_values:
val_bytes, value_type = _serialize_val(v.WhichOneof("val"), v)
val_bytes, value_type = _serialize_val(
v.WhichOneof("val"),
v,
entity_key_serialization_version=entity_key_serialization_version,
)

output.append(struct.pack("<I", value_type))

Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def online_write_batch(
with self._get_conn(config) as conn, conn.cursor() as cur:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
Expand Down Expand Up @@ -104,7 +107,12 @@ def online_read(
# to PostgreSQL
keys = []
for entity_key in entity_keys:
keys.append(serialize_entity_key(entity_key))
keys.append(
serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
)

cur.execute(
sql.SQL(
Expand Down
23 changes: 19 additions & 4 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
return online_store_class()


def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes:
key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")]
def _redis_key(
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
key: List[bytes] = [
serialize_entity_key(
entity_key,
entity_key_serialization_version=entity_key_serialization_version,
),
project.encode("utf-8"),
]
return b"".join(key)


Expand All @@ -40,10 +48,17 @@ def _mmh3(key: str):
return bytes.fromhex(struct.pack("<Q", key_hash).hex()[:8])


def compute_entity_id(entity_key: EntityKeyProto) -> str:
def compute_entity_id(
entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> str:
"""
Compute Entity id given Feast Entity Key for online stores.
Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys.
It has nothing to do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
return mmh3.hash_bytes(
serialize_entity_key(
entity_key,
entity_key_serialization_version=entity_key_serialization_version,
)
).hex()
10 changes: 8 additions & 2 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ def online_write_batch(

with conn:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = to_naive_utc(created_ts)
Expand Down Expand Up @@ -161,7 +164,10 @@ def online_read(
k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0])
}
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
res = {}
res_ts = None
for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []):
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__( # noqa: C901
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates an OnDemandFeatureView object.
Expand Down Expand Up @@ -219,6 +220,7 @@ def __init__( # noqa: C901
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
assert _sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
Expand Down Expand Up @@ -310,6 +312,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
description=self.description,
tags=self.tags,
owner=self.owner,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return OnDemandFeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -341,6 +344,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
sources.append(
RequestSource.from_proto(on_demand_source.request_data_source)
)

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
schema=[
Expand All @@ -359,6 +363,11 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
owner=on_demand_feature_view_proto.spec.owner,
)

if on_demand_feature_view_proto.spec.entity_key_serialization_version <= 1:
on_demand_feature_view_obj.set_entity_key_serialization_version(1)
else:
on_demand_feature_view_obj.set_entity_key_serialization_version(2)

# FeatureViewProjections are not saved in the OnDemandFeatureView proto.
# Create the default projection.
on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition(
Expand Down Expand Up @@ -524,6 +533,7 @@ def on_demand_feature_view(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates an OnDemandFeatureView object with the given user function as udf.
Expand Down Expand Up @@ -650,6 +660,7 @@ def decorator(user_function):
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
15 changes: 15 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ def apply_feature_view(
else:
raise ValueError(f"Unexpected feature view type: {type(feature_view)}")

saved_entity_key_serialization_version: Optional[int] = None
for idx, existing_feature_view_proto in enumerate(
existing_feature_views_of_same_type
):
Expand All @@ -1175,8 +1176,22 @@ def apply_feature_view(
):
return
else:
saved_entity_key_serialization_version = existing_feature_views_of_same_type[
idx
].spec.entity_key_serialization_version
del existing_feature_views_of_same_type[idx]
break

if (
not saved_entity_key_serialization_version
or saved_entity_key_serialization_version <= 1
):
feature_view_proto.spec.entity_key_serialization_version = 1
feature_view.set_entity_key_serialization_version(1)
else:
feature_view_proto.spec.entity_key_serialization_version = 2
feature_view.set_entity_key_serialization_version(2)

existing_feature_views_of_same_type.append(feature_view_proto)
if commit:
self.commit()
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates a RequestFeatureView object.
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
self.request_source = request_data_source

Expand All @@ -97,6 +99,7 @@ def to_proto(self) -> RequestFeatureViewProto:
description=self.description,
tags=self.tags,
owner=self.owner,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return RequestFeatureViewProto(spec=spec)
Expand All @@ -123,6 +126,11 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto):
owner=request_feature_view_proto.spec.owner,
)

if request_feature_view_proto.spec.entity_key_serialization_version <= 1:
request_feature_view_obj.set_entity_key_serialization_version(1)
else:
request_feature_view_obj.set_entity_key_serialization_version(2)

# FeatureViewProjections are not saved in the RequestFeatureView proto.
# Create the default projection.
request_feature_view_obj.projection = FeatureViewProjection.from_definition(
Expand Down
Loading