|
49 | 49 | if typing.TYPE_CHECKING: |
50 | 50 | from feast.feature_service import FeatureService |
51 | 51 | from feast.feature_view import FeatureView |
| 52 | + from feast.infra.registry.base_registry import BaseRegistry |
52 | 53 | from feast.on_demand_feature_view import OnDemandFeatureView |
53 | 54 |
|
54 | 55 | APPLICATION_NAME = "feast-dev/feast" |
@@ -756,61 +757,64 @@ def _list_feature_views( |
756 | 757 |
|
757 | 758 |
|
758 | 759 | def _get_feature_views_to_use( |
759 | | - registry, |
| 760 | + registry: "BaseRegistry", |
760 | 761 | project, |
761 | 762 | features: Optional[Union[List[str], "FeatureService"]], |
762 | 763 | allow_cache=False, |
763 | 764 | hide_dummy_entity: bool = True, |
764 | 765 | ) -> Tuple[List["FeatureView"], List["OnDemandFeatureView"]]: |
765 | 766 | from feast.feature_service import FeatureService |
766 | | - |
767 | | - fvs = { |
768 | | - fv.name: fv |
769 | | - for fv in [ |
770 | | - *_list_feature_views(registry, project, allow_cache, hide_dummy_entity), |
771 | | - *registry.list_stream_feature_views( |
772 | | - project=project, allow_cache=allow_cache |
773 | | - ), |
774 | | - ] |
775 | | - } |
776 | | - |
777 | | - od_fvs = { |
778 | | - fv.name: fv |
779 | | - for fv in registry.list_on_demand_feature_views( |
780 | | - project=project, allow_cache=allow_cache |
781 | | - ) |
782 | | - } |
| 767 | + from feast.feature_view import DUMMY_ENTITY_NAME |
| 768 | + from feast.on_demand_feature_view import OnDemandFeatureView |
783 | 769 |
|
784 | 770 | if isinstance(features, FeatureService): |
785 | | - fvs_to_use, od_fvs_to_use = [], [] |
786 | | - for fv_name, projection in [ |
| 771 | + feature_views = [ |
787 | 772 | (projection.name, projection) |
788 | 773 | for projection in features.feature_view_projections |
789 | | - ]: |
790 | | - if fv_name in fvs: |
791 | | - fvs_to_use.append(fvs[fv_name].with_projection(copy.copy(projection))) |
792 | | - elif fv_name in od_fvs: |
793 | | - odfv = od_fvs[fv_name].with_projection(copy.copy(projection)) |
794 | | - od_fvs_to_use.append(odfv) |
795 | | - # Let's make sure to include an FVs which the ODFV requires Features from. |
796 | | - for projection in odfv.source_feature_view_projections.values(): |
797 | | - fv = fvs[projection.name].with_projection(copy.copy(projection)) |
798 | | - if fv not in fvs_to_use: |
799 | | - fvs_to_use.append(fv) |
800 | | - else: |
801 | | - raise ValueError( |
802 | | - f"The provided feature service {features.name} contains a reference to a feature view" |
803 | | - f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" |
804 | | - f'{fv_name} and that you have registered it by running "apply".' |
805 | | - ) |
806 | | - views_to_use = (fvs_to_use, od_fvs_to_use) |
| 774 | + ] |
807 | 775 | else: |
808 | | - views_to_use = ( |
809 | | - [*fvs.values()], |
810 | | - [*od_fvs.values()], |
811 | | - ) |
| 776 | + assert features is not None |
| 777 | + feature_views = [(feature.split(":")[0], None) for feature in features] # type: ignore[misc] |
| 778 | + |
| 779 | + fvs_to_use, od_fvs_to_use = [], [] |
| 780 | + for name, projection in feature_views: |
| 781 | + fv = registry.get_any_feature_view(name, project, allow_cache) |
| 782 | + |
| 783 | + if isinstance(fv, OnDemandFeatureView): |
| 784 | + od_fvs_to_use.append( |
| 785 | + fv.with_projection(copy.copy(projection)) if projection else fv |
| 786 | + ) |
| 787 | + |
| 788 | + for source_projection in fv.source_feature_view_projections.values(): |
| 789 | + source_fv = registry.get_any_feature_view( |
| 790 | + source_projection.name, project, allow_cache |
| 791 | + ) |
| 792 | + # TODO better way to handler dummy entities |
| 793 | + if ( |
| 794 | + hide_dummy_entity |
| 795 | + and source_fv.entities # type: ignore[attr-defined] |
| 796 | + and source_fv.entities[0] == DUMMY_ENTITY_NAME # type: ignore[attr-defined] |
| 797 | + ): |
| 798 | + source_fv.entities = [] # type: ignore[attr-defined] |
| 799 | + source_fv.entity_columns = [] # type: ignore[attr-defined] |
| 800 | + |
| 801 | + if source_fv not in fvs_to_use: |
| 802 | + fvs_to_use.append( |
| 803 | + source_fv.with_projection(copy.copy(source_projection)) |
| 804 | + ) |
| 805 | + else: |
| 806 | + if ( |
| 807 | + hide_dummy_entity |
| 808 | + and fv.entities # type: ignore[attr-defined] |
| 809 | + and fv.entities[0] == DUMMY_ENTITY_NAME # type: ignore[attr-defined] |
| 810 | + ): |
| 811 | + fv.entities = [] # type: ignore[attr-defined] |
| 812 | + fv.entity_columns = [] # type: ignore[attr-defined] |
| 813 | + fvs_to_use.append( |
| 814 | + fv.with_projection(copy.copy(projection)) if projection else fv |
| 815 | + ) |
812 | 816 |
|
813 | | - return views_to_use |
| 817 | + return (fvs_to_use, od_fvs_to_use) |
814 | 818 |
|
815 | 819 |
|
816 | 820 | def _get_online_request_context( |
|
0 commit comments