From a7b4f23b9433fb19ad00681160c750dbee7428e7 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 12 May 2022 11:30:11 -0400 Subject: [PATCH 1/3] fix: Forcing ODFV udfs to be __main__ module so clients don't need the udf's module. Fixing duplicate data source conflict between push source and batch source Signed-off-by: Danny Chiao --- sdk/python/feast/on_demand_feature_view.py | 7 +++++++ sdk/python/feast/repo_operations.py | 6 ++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 244000c6ded..1cddc0b8814 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -628,7 +628,14 @@ def on_demand_feature_view( if not _sources: raise ValueError("The `sources` parameter must be specified.") + def mainify(obj): + # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same + # name as the original file defining the ODFV. + if obj.__module__ != "__main__": + obj.__module__ = "__main__" + def decorator(user_function): + mainify(user_function) on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=_sources, diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 261e501a876..53749ec53c2 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -110,6 +110,7 @@ def parse_repo(repo_root: Path) -> RepoContents: request_feature_views=[], ) + data_sources_set = set() for repo_file in get_repo_files(repo_root): module_path = py_path_to_module(repo_file) module = importlib.import_module(module_path) @@ -118,7 +119,7 @@ def parse_repo(repo_root: Path) -> RepoContents: if isinstance(obj, DataSource) and not any( (obj is ds) for ds in res.data_sources ): - res.data_sources.append(obj) + data_sources_set.add(obj) if isinstance(obj, FeatureView) and not any( (obj is fv) for fv in res.feature_views ): @@ -126,7 +127,7 @@ def parse_repo(repo_root: Path) -> RepoContents: if isinstance(obj.stream_source, PushSource) and not any( (obj is ds) for ds in res.data_sources ): - res.data_sources.append(obj.stream_source.batch_source) + data_sources_set.add(obj.stream_source.batch_source) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ): @@ -144,6 +145,7 @@ def parse_repo(repo_root: Path) -> RepoContents: ): res.request_feature_views.append(obj) res.entities.append(DUMMY_ENTITY) + res.data_sources.extend(data_sources_set) return res From 48f83215563375e9098b791a0383bc9331e2075b Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 12 May 2022 14:46:59 -0400 Subject: [PATCH 2/3] fix Signed-off-by: Danny Chiao --- sdk/python/feast/repo_operations.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 53749ec53c2..a5d358dc98a 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -119,6 +119,7 @@ def parse_repo(repo_root: Path) -> RepoContents: if isinstance(obj, DataSource) and not any( (obj is ds) for ds in res.data_sources ): + res.data_sources.append(obj) data_sources_set.add(obj) if isinstance(obj, FeatureView) and not any( (obj is fv) for fv in res.feature_views @@ -127,7 +128,10 @@ def parse_repo(repo_root: Path) -> RepoContents: if isinstance(obj.stream_source, PushSource) and not any( (obj is ds) for ds in res.data_sources ): - data_sources_set.add(obj.stream_source.batch_source) + push_source_dep = obj.stream_source.batch_source + # Don't add if the push source is a duplicate of an existing batch source + if push_source_dep not in data_sources_set: + res.data_sources.append(push_source_dep) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ): @@ -145,7 +149,6 @@ def parse_repo(repo_root: Path) -> RepoContents: ): res.request_feature_views.append(obj) res.entities.append(DUMMY_ENTITY) - res.data_sources.extend(data_sources_set) return res From 673064b5e3a656d14cc48e587e42adc0d23e556a Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 12 May 2022 14:48:04 -0400 Subject: [PATCH 3/3] fix Signed-off-by: Danny Chiao --- sdk/python/feast/repo_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a5d358dc98a..0e82fdf47ad 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -129,7 +129,7 @@ def parse_repo(repo_root: Path) -> RepoContents: (obj is ds) for ds in res.data_sources ): push_source_dep = obj.stream_source.batch_source - # Don't add if the push source is a duplicate of an existing batch source + # Don't add if the push source's batch source is a duplicate of an existing batch source if push_source_dep not in data_sources_set: res.data_sources.append(push_source_dep) elif isinstance(obj, Entity) and not any(