diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index 07ef92d070..4dcc17927f 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -108,12 +108,15 @@ jobs: node-version: '17.x' registry-url: 'https://registry.npmjs.org' - name: Build and install dependencies + # There's a `git restore` in here because `make install-go-ci-dependencies` is actually messing up go.mod & go.sum. run: | pip install -U pip setuptools wheel twine make install-protoc-dependencies make install-go-proto-dependencies make install-go-ci-dependencies make build-ui + git status + git restore go.mod go.sum - name: Build run: | python3 setup.py sdist diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index c9e38bf344..522a496c35 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -75,6 +75,9 @@ message FeatureViewSpec { // Whether these features should be served online or not bool online = 8; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 13; } message FeatureViewMeta { diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 33c51f5c4d..b3bf431f75 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -58,6 +58,9 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 9; } message OnDemandFeatureViewMeta { diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto index 4049053c2b..5a3a55bb88 100644 --- a/protos/feast/core/RequestFeatureView.proto +++ b/protos/feast/core/RequestFeatureView.proto @@ -48,4 +48,7 @@ message RequestFeatureViewSpec { // Owner of the request feature view. string owner = 6; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 7; } diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 5feb1d7d89..d2daf040b4 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -49,6 +49,8 @@ class BaseFeatureView(ABC): created_timestamp: Optional[datetime] last_updated_timestamp: Optional[datetime] + entity_key_serialization_version: int + @abstractmethod def __init__( self, @@ -58,6 +60,7 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version: int = 1, ): """ Creates a BaseFeatureView object. @@ -82,6 +85,7 @@ def __init__( self.projection = FeatureViewProjection.from_definition(self) self.created_timestamp = None self.last_updated_timestamp = None + self.entity_key_serialization_version = entity_key_serialization_version @property @abstractmethod diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index 2f9fb080db..77830d70dc 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -30,6 +30,7 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, + entity_key_serialization_version: int = 1, ): if source is None: @@ -55,4 +56,5 @@ def __init__( owner=owner, schema=schema, source=source, + entity_key_serialization_version=entity_key_serialization_version, ) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 348c3019c5..80a600c307 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -110,6 +110,7 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, + entity_key_serialization_version: int = 1, ): """ Creates a FeatureView object. @@ -260,6 +261,7 @@ def __init__( description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) self.online = online self.materialization_intervals = [] @@ -443,6 +445,7 @@ def to_proto(self) -> FeatureViewProto: online=self.online, batch_source=batch_source_proto, stream_source=stream_source_proto, + entity_key_serialization_version=self.entity_key_serialization_version, ) return FeatureViewProto(spec=spec, meta=meta) @@ -514,6 +517,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) ) + feature_view.entity_key_serialization_version = ( + feature_view_proto.spec.entity_key_serialization_version + ) + return feature_view @property diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index 8333610473..a64f759356 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -6,7 +6,9 @@ from feast.protos.feast.types.Value_pb2 import ValueType -def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: +def _serialize_val( + value_type, v: ValueProto, entity_key_serialization_version=1 +) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING elif value_type == "bytes_val": @@ -14,7 +16,7 @@ def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: elif value_type == "int32_val": return struct.pack(" bytes: return b"".join(output) -def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: +def serialize_entity_key( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. @@ -54,7 +58,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: output.append(struct.pack(" OnlineStore: return online_store_class() -def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes: - key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")] +def _redis_key( + project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: + key: List[bytes] = [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ), + project.encode("utf-8"), + ] return b"".join(key) @@ -40,10 +48,17 @@ def _mmh3(key: str): return bytes.fromhex(struct.pack(" str: +def compute_entity_id( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> str: """ Compute Entity id given Feast Entity Key for online stores. Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys. It has nothing to do with the Entity concept we have in Feast. """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + return mmh3.hash_bytes( + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + ).hex() diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 2f0e902942..b691e43c66 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -95,7 +95,10 @@ def online_write_batch( with conn: for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=table.entity_key_serialization_version, + ) timestamp = to_naive_utc(timestamp) if created_ts is not None: created_ts = to_naive_utc(created_ts) @@ -161,7 +164,10 @@ def online_read( k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0]) } for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=table.entity_key_serialization_version, + ) res = {} res_ts = None for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index bad4edba81..8ea0aa25f6 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -88,6 +88,7 @@ def __init__( # noqa: C901 description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates an OnDemandFeatureView object. @@ -219,6 +220,7 @@ def __init__( # noqa: C901 description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) assert _sources is not None self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} @@ -310,6 +312,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, + entity_key_serialization_version=self.entity_key_serialization_version, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -357,6 +360,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, + entity_key_serialization_version=on_demand_feature_view_proto.spec.entity_key_serialization_version, ) # FeatureViewProjections are not saved in the OnDemandFeatureView proto. @@ -524,6 +528,7 @@ def on_demand_feature_view( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -650,6 +655,7 @@ def decorator(user_function): description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index c721bd648a..2f9b3d85ad 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -1114,6 +1114,7 @@ def apply_feature_view( else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") + saved_entity_key_serialization_version: Optional[int] = None for idx, existing_feature_view_proto in enumerate( existing_feature_views_of_same_type ): @@ -1127,8 +1128,17 @@ def apply_feature_view( ): return else: + saved_entity_key_serialization_version = existing_feature_views_of_same_type[ + idx + ].spec.entity_key_serialization_version del existing_feature_views_of_same_type[idx] break + + if saved_entity_key_serialization_version: + feature_view_proto.spec.entity_key_serialization_version = ( + saved_entity_key_serialization_version + ) + existing_feature_views_of_same_type.append(feature_view_proto) if commit: self.commit() diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py index 7248ffe989..65bbfdde1d 100644 --- a/sdk/python/feast/request_feature_view.py +++ b/sdk/python/feast/request_feature_view.py @@ -44,6 +44,7 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates a RequestFeatureView object. @@ -77,6 +78,7 @@ def __init__( description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) self.request_source = request_data_source @@ -97,6 +99,7 @@ def to_proto(self) -> RequestFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, + entity_key_serialization_version=self.entity_key_serialization_version, ) return RequestFeatureViewProto(spec=spec) @@ -121,6 +124,7 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto): description=request_feature_view_proto.spec.description, tags=dict(request_feature_view_proto.spec.tags), owner=request_feature_view_proto.spec.owner, + entity_key_serialization_version=request_feature_view_proto.spec.entity_key_serialization_version, ) # FeatureViewProjections are not saved in the RequestFeatureView proto. diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py new file mode 100644 index 0000000000..58429ca5b1 --- /dev/null +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -0,0 +1,28 @@ +import pytest + +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto + + +@pytest.mark.parametrize( + "entity_key,expected_contains", + [ + ( + EntityKeyProto( + join_keys=["customer"], + entity_values=[ValueProto(int64_val=int(2 ** 31))], + ), + b"customer", + ), + ( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int32_val=int(2 ** 15))] + ), + b"user", + ), + ], +) +def test_serialize_entity_key(entity_key, expected_contains): + output = serialize_entity_key(entity_key) + assert output.find(expected_contains) >= 0