From 7017ada845731ba5a2e29c6b6292092e6fb2377b Mon Sep 17 00:00:00 2001 From: hao-affirm <104030690+hao-affirm@users.noreply.github.com> Date: Tue, 4 Oct 2022 00:53:17 -0700 Subject: [PATCH 1/5] fix udf in stream feature view UI shows pickeled data Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com> --- sdk/python/feast/infra/registry/base_registry.py | 9 ++++++--- sdk/python/feast/stream_feature_view.py | 13 +++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index e1e9ba99e1..fc65932ea1 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -638,9 +638,12 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: self.list_stream_feature_views(project=project), key=lambda stream_feature_view: stream_feature_view.name, ): - registry_dict["streamFeatureViews"].append( - self._message_to_sorted_dict(stream_feature_view.to_proto()) - ) + sfv_dict = self._message_to_sorted_dict(stream_feature_view.to_proto()) + + sfv_dict["spec"]["userDefinedFunction"][ + "body" + ] = stream_feature_view.udf_string + registry_dict["streamFeatureViews"].append(sfv_dict) for saved_dataset in sorted( self.list_saved_datasets(project=project), key=lambda item: item.name ): diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 176f38d093..17c9a2c79e 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -71,6 +71,7 @@ class StreamFeatureView(FeatureView): timestamp_field: str materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] + udf_string: str def __init__( self, @@ -88,6 +89,7 @@ def __init__( mode: Optional[str] = "spark", timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, + udf_string: str ): if not flags_helper.is_test(): warnings.warn( @@ -114,6 +116,7 @@ def __init__( self.mode = mode or "" self.timestamp_field = timestamp_field or "" self.udf = udf + self.udf_string = udf_string super().__init__( name=name, @@ -143,6 +146,7 @@ def __eq__(self, other): self.mode != other.mode or self.timestamp_field != other.timestamp_field or self.udf.__code__.co_code != other.udf.__code__.co_code + or self.udf_string != other.udf_string or self.aggregations != other.aggregations ): return False @@ -171,6 +175,7 @@ def to_proto(self): udf_proto = UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, ) spec = StreamFeatureViewSpecProto( name=self.name, @@ -209,6 +214,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("user_defined_function") else None ) + udf_string = ( + sfv_proto.spec.user_defined_function.body_text + if sfv_proto.spec.HasField("user_defined_function") + else None + ) stream_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -226,6 +236,7 @@ def from_proto(cls, sfv_proto): source=stream_source, mode=sfv_proto.spec.mode, udf=udf, + udf_string=udf_string, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -315,6 +326,7 @@ def mainify(obj): obj.__module__ = "__main__" def decorator(user_function): + udf_string = dill.source.getsource(user_function) mainify(user_function) stream_feature_view_obj = StreamFeatureView( name=user_function.__name__, @@ -323,6 +335,7 @@ def decorator(user_function): source=source, schema=schema, udf=user_function, + udf_string=udf_string, description=description, tags=tags, online=online, From d3e1c0b59d2e0c77544e73503bb7220689532eb5 Mon Sep 17 00:00:00 2001 From: hao-affirm <104030690+hao-affirm@users.noreply.github.com> Date: Tue, 4 Oct 2022 11:39:10 -0700 Subject: [PATCH 2/5] fix lint Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com> --- sdk/python/feast/stream_feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 17c9a2c79e..5c68a4a2b4 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -89,7 +89,7 @@ def __init__( mode: Optional[str] = "spark", timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, - udf_string: str + udf_string: str = "" ): if not flags_helper.is_test(): warnings.warn( From a6039bc55901fbfc679e6f7a7cd9fd9e3b5e5bb3 Mon Sep 17 00:00:00 2001 From: hao-affirm <104030690+hao-affirm@users.noreply.github.com> Date: Tue, 4 Oct 2022 11:50:09 -0700 Subject: [PATCH 3/5] fix lint Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com> --- sdk/python/feast/stream_feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 5c68a4a2b4..d8c4d8546c 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -89,7 +89,7 @@ def __init__( mode: Optional[str] = "spark", timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, - udf_string: str = "" + udf_string: str = "", ): if not flags_helper.is_test(): warnings.warn( From 80f6a360ce722386383887100d1fc1ce6133be8d Mon Sep 17 00:00:00 2001 From: hao-affirm <104030690+hao-affirm@users.noreply.github.com> Date: Tue, 4 Oct 2022 11:54:03 -0700 Subject: [PATCH 4/5] fix test Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com> --- sdk/python/feast/stream_feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index d8c4d8546c..9765c3cc6e 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -89,7 +89,7 @@ def __init__( mode: Optional[str] = "spark", timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, - udf_string: str = "", + udf_string: Optional[str] = "", ): if not flags_helper.is_test(): warnings.warn( From 4db494ffeb24b669eac597b09bfbd95f97f5b831 Mon Sep 17 00:00:00 2001 From: hao-affirm <104030690+hao-affirm@users.noreply.github.com> Date: Tue, 4 Oct 2022 12:08:02 -0700 Subject: [PATCH 5/5] fix lint Signed-off-by: hao-affirm <104030690+hao-affirm@users.noreply.github.com> --- sdk/python/feast/stream_feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 9765c3cc6e..d3a2164788 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -71,7 +71,7 @@ class StreamFeatureView(FeatureView): timestamp_field: str materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] - udf_string: str + udf_string: Optional[str] def __init__( self,