@@ -181,6 +181,55 @@ def get_historical_features(
181181 )
182182 return job
183183
184+ def materialize_incremental (
185+ self , feature_views : Optional [List [str ]], end_date : datetime ,
186+ ) -> None :
187+ """
188+ Materialize incremental new data from the offline store into the online store.
189+
190+ This method loads incremental new feature data up to the specified end time from either
191+ the specified feature views, or all feature views if none are specified,
192+ into the online store where it is available for online serving. The start time of
193+ the interval materialized is either the most recent end time of a prior materialization or
194+ (now - ttl) if no such prior materialization exists.
195+
196+ Args:
197+ feature_views (List[str]): Optional list of feature view names. If selected, will only run
198+ materialization for the specified feature views.
199+ end_date (datetime): End date for time range of data to materialize into the online store
200+
201+ Examples:
202+ Materialize all features into the online store up to 5 minutes ago.
203+ >>> from datetime import datetime, timedelta
204+ >>> from feast.feature_store import FeatureStore
205+ >>>
206+ >>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
207+ >>> fs.materialize_incremental(
208+ >>> end_date=datetime.utcnow() - timedelta(minutes=5)
209+ >>> )
210+ """
211+ feature_views_to_materialize = []
212+ registry = self ._get_registry ()
213+ if feature_views is None :
214+ feature_views_to_materialize = registry .list_feature_views (
215+ self .config .project
216+ )
217+ else :
218+ for name in feature_views :
219+ feature_view = registry .get_feature_view (name , self .config .project )
220+ feature_views_to_materialize .append (feature_view )
221+
222+ # TODO paging large loads
223+ for feature_view in feature_views_to_materialize :
224+ start_date = feature_view .most_recent_end_time
225+ if start_date is None :
226+ if feature_view .ttl is None :
227+ raise Exception (
228+ f"No start time found for feature view { feature_view .name } . materialize_incremental() requires either a ttl to be set or for materialize() to have been run at least once."
229+ )
230+ start_date = datetime .utcnow () - feature_view .ttl
231+ self ._materialize_single_feature_view (feature_view , start_date , end_date )
232+
184233 def materialize (
185234 self ,
186235 feature_views : Optional [List [str ]],
@@ -225,39 +274,43 @@ def materialize(
225274
226275 # TODO paging large loads
227276 for feature_view in feature_views_to_materialize :
228- if isinstance (feature_view .input , FileSource ):
229- raise NotImplementedError (
230- "This function is not yet implemented for File data sources"
231- )
232- (
233- entity_names ,
234- feature_names ,
235- event_timestamp_column ,
236- created_timestamp_column ,
237- ) = _run_reverse_field_mapping (feature_view )
238-
239- offline_store = get_offline_store (self .config )
240- table = offline_store .pull_latest_from_table_or_query (
241- feature_view .input ,
242- entity_names ,
243- feature_names ,
244- event_timestamp_column ,
245- created_timestamp_column ,
246- start_date ,
247- end_date ,
277+ self ._materialize_single_feature_view (feature_view , start_date , end_date )
278+
279+ def _materialize_single_feature_view (
280+ self , feature_view : FeatureView , start_date : datetime , end_date : datetime
281+ ) -> None :
282+ if isinstance (feature_view .input , FileSource ):
283+ raise NotImplementedError (
284+ "This function is not yet implemented for File data sources"
248285 )
286+ (
287+ entity_names ,
288+ feature_names ,
289+ event_timestamp_column ,
290+ created_timestamp_column ,
291+ ) = _run_reverse_field_mapping (feature_view )
292+
293+ offline_store = get_offline_store (self .config )
294+ table = offline_store .pull_latest_from_table_or_query (
295+ feature_view .input ,
296+ entity_names ,
297+ feature_names ,
298+ event_timestamp_column ,
299+ created_timestamp_column ,
300+ start_date ,
301+ end_date ,
302+ )
249303
250- if feature_view .input .field_mapping is not None :
251- table = _run_forward_field_mapping (
252- table , feature_view .input .field_mapping
253- )
304+ if feature_view .input .field_mapping is not None :
305+ table = _run_forward_field_mapping (table , feature_view .input .field_mapping )
254306
255- rows_to_write = _convert_arrow_to_proto (table , feature_view )
307+ rows_to_write = _convert_arrow_to_proto (table , feature_view )
256308
257- provider = self ._get_provider ()
258- provider .online_write_batch (
259- self .config .project , feature_view , rows_to_write
260- )
309+ provider = self ._get_provider ()
310+ provider .online_write_batch (self .config .project , feature_view , rows_to_write )
311+
312+ feature_view .materialization_intervals .append ((start_date , end_date ))
313+ self .apply ([feature_view ])
261314
262315 def get_online_features (
263316 self , feature_refs : List [str ], entity_rows : List [Dict [str , Any ]],
0 commit comments