@@ -857,6 +857,11 @@ def apply(
857857 ]
858858 sfvs_to_update = [ob for ob in objects if isinstance (ob , StreamFeatureView )]
859859 odfvs_to_update = [ob for ob in objects if isinstance (ob , OnDemandFeatureView )]
860+ odfvs_with_writes_to_update = [
861+ ob
862+ for ob in objects
863+ if isinstance (ob , OnDemandFeatureView ) and ob .write_to_online_store
864+ ]
860865 services_to_update = [ob for ob in objects if isinstance (ob , FeatureService )]
861866 data_sources_set_to_update = {
862867 ob for ob in objects if isinstance (ob , DataSource )
@@ -878,10 +883,20 @@ def apply(
878883 for batch_source in batch_sources_to_add :
879884 data_sources_set_to_update .add (batch_source )
880885
881- for fv in itertools .chain (views_to_update , sfvs_to_update ):
882- data_sources_set_to_update .add (fv .batch_source )
883- if fv .stream_source :
886+ for fv in itertools .chain (
887+ views_to_update , sfvs_to_update , odfvs_with_writes_to_update
888+ ):
889+ if isinstance (fv , FeatureView ):
890+ data_sources_set_to_update .add (fv .batch_source )
891+ if isinstance (fv , StreamFeatureView ):
884892 data_sources_set_to_update .add (fv .stream_source )
893+ if isinstance (fv , OnDemandFeatureView ):
894+ for source_fvp in fv .source_feature_view_projections :
895+ data_sources_set_to_update .add (
896+ fv .source_feature_view_projections [source_fvp ].batch_source
897+ )
898+ else :
899+ pass
885900
886901 for odfv in odfvs_to_update :
887902 for v in odfv .source_request_sources .values ():
@@ -999,7 +1014,9 @@ def apply(
9991014 tables_to_delete : List [FeatureView ] = (
10001015 views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
10011016 )
1002- tables_to_keep : List [FeatureView ] = views_to_update + sfvs_to_update # type: ignore
1017+ tables_to_keep : List [FeatureView ] = (
1018+ views_to_update + sfvs_to_update + odfvs_with_writes_to_update
1019+ ) # type: ignore
10031020
10041021 self ._get_provider ().update_infra (
10051022 project = self .project ,
0 commit comments