1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ from datetime import datetime
1415from pathlib import Path
15- from typing import List , Optional , Union
16+ from typing import Dict , List , Optional , Tuple , Union
1617
1718import pandas as pd
19+ import pyarrow
1820
21+ from feast .data_source import FileSource
1922from feast .entity import Entity
2023from feast .feature_view import FeatureView
2124from feast .infra .provider import Provider , get_provider
22- from feast .offline_store import RetrievalJob , get_offline_store_for_retrieval
25+ from feast .offline_store import (
26+ RetrievalJob ,
27+ get_offline_store ,
28+ get_offline_store_for_retrieval ,
29+ )
30+ from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
31+ from feast .protos .feast .types .Value_pb2 import Value as ValueProto
2332from feast .registry import Registry
2433from feast .repo_config import (
2534 LocalOnlineStoreConfig ,
2635 OnlineStoreConfig ,
2736 RepoConfig ,
2837 load_repo_config ,
2938)
39+ from feast .type_map import python_value_to_proto_value
3040
3141
3242class FeatureStore :
@@ -153,6 +163,88 @@ def get_historical_features(
153163 )
154164 return job
155165
166+ def materialize (
167+ self ,
168+ feature_views : Optional [List [str ]],
169+ start_date : datetime ,
170+ end_date : datetime ,
171+ ) -> None :
172+ """
173+ Materialize data from the offline store into the online store.
174+
175+ This method loads feature data in the specified interval from either
176+ the specified feature views, or all feature views if none are specified,
177+ into the online store where it is available for online serving.
178+
179+ Args:
180+ feature_views (List[str]): Optional list of feature view names. If selected, will only run
181+ materialization for the specified feature views.
182+ start_date (datetime): Start date for time range of data to materialize into the online store
183+ end_date (datetime): End date for time range of data to materialize into the online store
184+
185+ Examples:
186+ Materialize all features into the online store over the interval
187+ from 3 hours ago to 10 minutes ago.
188+ >>> from datetime import datetime, timedelta
189+ >>> from feast.feature_store import FeatureStore
190+ >>>
191+ >>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
192+ >>> fs.materialize(
193+ >>> start_date=datetime.utcnow() - timedelta(hours=3),
194+ >>> end_date=datetime.utcnow() - timedelta(minutes=10)
195+ >>> )
196+ """
197+ feature_views_to_materialize = []
198+ registry = self ._get_registry ()
199+ if feature_views is None :
200+ feature_views_to_materialize = registry .list_feature_views (
201+ self .config .project
202+ )
203+ else :
204+ for name in feature_views :
205+ feature_view = registry .get_feature_view (name , self .config .project )
206+ feature_views_to_materialize .append (feature_view )
207+
208+ # TODO paging large loads
209+ for feature_view in feature_views_to_materialize :
210+ if isinstance (feature_view .input , FileSource ):
211+ raise NotImplementedError (
212+ "This function is not yet implemented for File data sources"
213+ )
214+ if not feature_view .input .table_ref :
215+ raise NotImplementedError (
216+ f"This function is only implemented for FeatureViews with a table_ref; { feature_view .name } does not have one."
217+ )
218+ (
219+ entity_names ,
220+ feature_names ,
221+ event_timestamp_column ,
222+ created_timestamp_column ,
223+ ) = _run_reverse_field_mapping (feature_view )
224+
225+ offline_store = get_offline_store (self .config )
226+ table = offline_store .pull_latest_from_table (
227+ feature_view .input ,
228+ entity_names ,
229+ feature_names ,
230+ event_timestamp_column ,
231+ created_timestamp_column ,
232+ start_date ,
233+ end_date ,
234+ )
235+
236+ if feature_view .input .field_mapping is not None :
237+ table = _run_forward_field_mapping (
238+ table , feature_view .input .field_mapping
239+ )
240+
241+ rows_to_write = _convert_arrow_to_proto (table , feature_view )
242+
243+ provider = self ._get_provider ()
244+ provider .online_write_batch (
245+ self .config .project , feature_view , rows_to_write
246+ )
247+
156248
157249def _get_requested_feature_views (
158250 feature_refs : List [str ], all_feature_views : List [FeatureView ]
@@ -176,3 +268,102 @@ def _get_requested_feature_views(
176268 feature_views_list .append (view )
177269
178270 return feature_views_list
271+
272+
273+ def _run_reverse_field_mapping (
274+ feature_view : FeatureView ,
275+ ) -> Tuple [List [str ], List [str ], str , Optional [str ]]:
276+ """
277+ If a field mapping exists, run it in reverse on the entity names,
278+ feature names, event timestamp column, and created timestamp column
279+ to get the names of the relevant columns in the BigQuery table.
280+
281+ Args:
282+ feature_view: FeatureView object containing the field mapping
283+ as well as the names to reverse-map.
284+ Returns:
285+ Tuple containing the list of reverse-mapped entity names,
286+ reverse-mapped feature names, reverse-mapped event timestamp column,
287+ and reverse-mapped created timestamp column that will be passed into
288+ the query to the offline store.
289+ """
290+ # if we have mapped fields, use the original field names in the call to the offline store
291+ event_timestamp_column = feature_view .input .event_timestamp_column
292+ entity_names = [entity for entity in feature_view .entities ]
293+ feature_names = [feature .name for feature in feature_view .features ]
294+ created_timestamp_column = feature_view .input .created_timestamp_column
295+ if feature_view .input .field_mapping is not None :
296+ reverse_field_mapping = {
297+ v : k for k , v in feature_view .input .field_mapping .items ()
298+ }
299+ event_timestamp_column = (
300+ reverse_field_mapping [event_timestamp_column ]
301+ if event_timestamp_column in reverse_field_mapping .keys ()
302+ else event_timestamp_column
303+ )
304+ created_timestamp_column = (
305+ reverse_field_mapping [created_timestamp_column ]
306+ if created_timestamp_column
307+ and created_timestamp_column in reverse_field_mapping .keys ()
308+ else created_timestamp_column
309+ )
310+ entity_names = [
311+ reverse_field_mapping [col ] if col in reverse_field_mapping .keys () else col
312+ for col in entity_names
313+ ]
314+ feature_names = [
315+ reverse_field_mapping [col ] if col in reverse_field_mapping .keys () else col
316+ for col in feature_names
317+ ]
318+ return (
319+ entity_names ,
320+ feature_names ,
321+ event_timestamp_column ,
322+ created_timestamp_column ,
323+ )
324+
325+
326+ def _run_forward_field_mapping (
327+ table : pyarrow .Table , field_mapping : Dict [str , str ],
328+ ) -> pyarrow .Table :
329+ # run field mapping in the forward direction
330+ cols = table .column_names
331+ mapped_cols = [
332+ field_mapping [col ] if col in field_mapping .keys () else col for col in cols
333+ ]
334+ table = table .rename_columns (mapped_cols )
335+ return table
336+
337+
338+ def _convert_arrow_to_proto (
339+ table : pyarrow .Table , feature_view : FeatureView
340+ ) -> List [Tuple [EntityKeyProto , Dict [str , ValueProto ], datetime , Optional [datetime ]]]:
341+ rows_to_write = []
342+ for row in zip (* table .to_pydict ().values ()):
343+ entity_key = EntityKeyProto ()
344+ for entity_name in feature_view .entities :
345+ entity_key .entity_names .append (entity_name )
346+ idx = table .column_names .index (entity_name )
347+ value = python_value_to_proto_value (row [idx ])
348+ entity_key .entity_values .append (value )
349+ feature_dict = {}
350+ for feature in feature_view .features :
351+ idx = table .column_names .index (feature .name )
352+ value = python_value_to_proto_value (row [idx ])
353+ feature_dict [feature .name ] = value
354+ event_timestamp_idx = table .column_names .index (
355+ feature_view .input .event_timestamp_column
356+ )
357+ event_timestamp = row [event_timestamp_idx ]
358+ if feature_view .input .created_timestamp_column is not None :
359+ created_timestamp_idx = table .column_names .index (
360+ feature_view .input .created_timestamp_column
361+ )
362+ created_timestamp = row [created_timestamp_idx ]
363+ else :
364+ created_timestamp = None
365+
366+ rows_to_write .append (
367+ (entity_key , feature_dict , event_timestamp , created_timestamp )
368+ )
369+ return rows_to_write
0 commit comments