|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | from pathlib import Path |
15 | | -from typing import Optional |
| 15 | +from typing import List, Optional, Union |
16 | 16 |
|
| 17 | +import pandas as pd |
| 18 | + |
| 19 | +from feast.entity import Entity |
| 20 | +from feast.feature_view import FeatureView |
17 | 21 | from feast.infra.provider import Provider, get_provider |
| 22 | +from feast.offline_store import RetrievalJob, get_offline_store_for_retrieval |
18 | 23 | from feast.registry import Registry |
19 | 24 | from feast.repo_config import ( |
20 | 25 | LocalOnlineStoreConfig, |
@@ -55,3 +60,119 @@ def _get_provider(self) -> Provider: |
55 | 60 |
|
56 | 61 | def _get_registry(self) -> Registry: |
57 | 62 | return Registry(self.config.metadata_store) |
| 63 | + |
| 64 | + def apply(self, objects: List[Union[FeatureView, Entity]]): |
| 65 | + """Register objects to metadata store and update related infrastructure. |
| 66 | +
|
| 67 | + The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these |
| 68 | + objects in the Feast registry. Once the registry has been updated, the apply method will update related |
| 69 | + infrastructure (e.g., create tables in an online store) in order to reflect these new definitions. All |
| 70 | + operations are idempotent, meaning they can safely be rerun. |
| 71 | +
|
| 72 | + Args: objects (List[Union[FeatureView, Entity]]): A list of FeatureView or Entity objects that should be |
| 73 | + registered |
| 74 | +
|
| 75 | + Examples: |
| 76 | + Register a single Entity and FeatureView. |
| 77 | + >>> from feast.feature_store import FeatureStore |
| 78 | + >>> from feast import Entity, FeatureView, Feature, ValueType, FileSource |
| 79 | + >>> from datetime import timedelta |
| 80 | + >>> |
| 81 | + >>> fs = FeatureStore() |
| 82 | + >>> customer_entity = Entity(name="customer", value_type=ValueType.INT64, description="customer entity") |
| 83 | + >>> customer_feature_view = FeatureView( |
| 84 | + >>> name="customer_fv", |
| 85 | + >>> entities=["customer"], |
| 86 | + >>> features=[Feature(name="age", dtype=ValueType.INT64)], |
| 87 | + >>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"), |
| 88 | + >>> ttl=timedelta(days=1) |
| 89 | + >>> ) |
| 90 | + >>> fs.apply([customer_entity, customer_feature_view]) |
| 91 | + """ |
| 92 | + |
| 93 | + # TODO: Add locking |
| 94 | + # TODO: Optimize by only making a single call (read/write) |
| 95 | + # TODO: Add infra update operation (currently we are just writing to registry) |
| 96 | + registry = self._get_registry() |
| 97 | + for ob in objects: |
| 98 | + if isinstance(ob, FeatureView): |
| 99 | + registry.apply_feature_view(ob, project=self.config.project) |
| 100 | + elif isinstance(ob, Entity): |
| 101 | + registry.apply_entity(ob, project=self.config.project) |
| 102 | + else: |
| 103 | + raise ValueError( |
| 104 | + f"Unknown object type ({type(ob)}) provided as part of apply() call" |
| 105 | + ) |
| 106 | + |
| 107 | + def get_historical_features( |
| 108 | + self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], |
| 109 | + ) -> RetrievalJob: |
| 110 | + """Enrich an entity dataframe with historical feature values for either training or batch scoring. |
| 111 | +
|
| 112 | + This method joins historical feature data from one or more feature views to an entity dataframe by using a time |
| 113 | + travel join. |
| 114 | +
|
| 115 | + Each feature view is joined to the entity dataframe using all entities configured for the respective feature |
| 116 | + view. All configured entities must be available in the entity dataframe. Therefore, the entity dataframe must |
| 117 | + contain all entities found in all feature views, but the individual feature views can have different entities. |
| 118 | +
|
| 119 | + Time travel is based on the configured TTL for each feature view. A shorter TTL will limit the |
| 120 | + amount of scanning that will be done in order to find feature data for a specific entity key. Setting a short |
| 121 | + TTL may result in null values being returned. |
| 122 | +
|
| 123 | + Args: |
| 124 | + entity_df (Union[pd.DataFrame, str]): An entity dataframe is a collection of rows containing all entity |
| 125 | + columns (e.g., customer_id, driver_id) on which features need to be joined, as well as a event_timestamp |
| 126 | + column used to ensure point-in-time correctness. Either a Pandas DataFrame can be provided or a string |
| 127 | + SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery) |
| 128 | + feature_refs: A list of features that should be retrieved from the offline store. Feature references are of |
| 129 | + the format "feature_view:feature", e.g., "customer_fv:daily_transactions". |
| 130 | +
|
| 131 | + Returns: |
| 132 | + RetrievalJob which can be used to materialize the results. |
| 133 | +
|
| 134 | + Examples: |
| 135 | + Retrieve historical features using a BigQuery SQL entity dataframe |
| 136 | + >>> from feast.feature_store import FeatureStore |
| 137 | + >>> |
| 138 | + >>> fs = FeatureStore(config=RepoConfig(provider="gcp")) |
| 139 | + >>> retrieval_job = fs.get_historical_features( |
| 140 | + >>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders", |
| 141 | + >>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"] |
| 142 | + >>> ) |
| 143 | + >>> feature_data = job.to_df() |
| 144 | + >>> model.fit(feature_data) # insert your modeling framework here. |
| 145 | + """ |
| 146 | + |
| 147 | + registry = self._get_registry() |
| 148 | + all_feature_views = registry.list_feature_views(project=self.config.project) |
| 149 | + feature_views = _get_requested_feature_views(feature_refs, all_feature_views) |
| 150 | + offline_store = get_offline_store_for_retrieval(feature_views) |
| 151 | + job = offline_store.get_historical_features( |
| 152 | + self.config, feature_views, feature_refs, entity_df |
| 153 | + ) |
| 154 | + return job |
| 155 | + |
| 156 | + |
| 157 | +def _get_requested_feature_views( |
| 158 | + feature_refs: List[str], all_feature_views: List[FeatureView] |
| 159 | +) -> List[FeatureView]: |
| 160 | + """Get list of feature views based on feature references""" |
| 161 | + |
| 162 | + feature_views_dict = {} |
| 163 | + for ref in feature_refs: |
| 164 | + ref_parts = ref.split(":") |
| 165 | + found = False |
| 166 | + for feature_view in all_feature_views: |
| 167 | + if feature_view.name == ref_parts[0]: |
| 168 | + found = True |
| 169 | + feature_views_dict[feature_view.name] = feature_view |
| 170 | + continue |
| 171 | + |
| 172 | + if not found: |
| 173 | + raise ValueError(f"Could not find feature view from reference {ref}") |
| 174 | + feature_views_list = [] |
| 175 | + for view in feature_views_dict.values(): |
| 176 | + feature_views_list.append(view) |
| 177 | + |
| 178 | + return feature_views_list |
0 commit comments