Skip to content

Commit a31545b

Browse files
woopoavdeev
andauthored
Add historical retrieval for BigQuery and Parquet (feast-dev#1389)
* Add ParquetSource Signed-off-by: Willem Pienaar <git@willem.co> * Add Parquet source, TTL validation, and fix BQ source conditional Signed-off-by: Willem Pienaar <git@willem.co> * Add partial Parquet Historical retrieval Signed-off-by: Willem Pienaar <git@willem.co> * Reformat and lint Signed-off-by: Willem Pienaar <git@willem.co> * Remove feature_views_to_query from get_historical_features Signed-off-by: Willem Pienaar <git@willem.co> * Clean up get_hist_features test Signed-off-by: Willem Pienaar <git@willem.co> * Added point-in-time query for BigQueryOfflineStore Signed-off-by: Willem Pienaar <git@willem.co> * Fix ParquetSource tests Signed-off-by: Willem Pienaar <git@willem.co> * Clean up and reactor offline_store.py functions Signed-off-by: Willem Pienaar <git@willem.co> * Relax Python dependencies and fix broken TTL Signed-off-by: Willem Pienaar <git@willem.co> * Fix time zones in point-in-time join Signed-off-by: Willem Pienaar <git@willem.co> * Reformat using black Signed-off-by: Willem Pienaar <git@willem.co> * Move get_historical_features into feature_store and get mypy to pass Signed-off-by: Willem Pienaar <git@willem.co> * Replace ParquetSource with FileSource Signed-off-by: Willem Pienaar <git@willem.co> * Fix feature request building Signed-off-by: Willem Pienaar <git@willem.co> * Clean up context building code Signed-off-by: Willem Pienaar <git@willem.co> * Remove RegistryState Class Signed-off-by: Willem Pienaar <git@willem.co> * Fix tests writing to disk Signed-off-by: Willem Pienaar <git@willem.co> * Skip BigQuery test during unit tests Signed-off-by: Willem Pienaar <git@willem.co> * Revert documentation changes Signed-off-by: Willem Pienaar <git@willem.co> * Add documentation for historical retrieval and apply Signed-off-by: Willem Pienaar <git@willem.co> * Add support for loading entity dataframes Signed-off-by: Willem Pienaar <git@willem.co> * Fix typo in query context Signed-off-by: Willem Pienaar <git@willem.co> * Remove hardcoding of created timestamp Signed-off-by: Willem Pienaar <git@willem.co> * Set created timestamp column in BigQuery test Signed-off-by: Willem Pienaar <git@willem.co> * Revert broken documentation and formatting Signed-off-by: Willem Pienaar <git@willem.co> * Set context query to be immutable Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> Signed-off-by: Willem Pienaar <git@willem.co> * Add comments based on PR review Signed-off-by: Willem Pienaar <git@willem.co> * Clarify TODO comment for entity dataframe time ranges Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> Signed-off-by: Willem Pienaar <git@willem.co> Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 0da112b commit a31545b

9 files changed

Lines changed: 1195 additions & 33 deletions

File tree

sdk/python/feast/data_format.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def from_proto(cls, proto):
4343
fmt = proto.WhichOneof("format")
4444
if fmt == "parquet_format":
4545
return ParquetFormat()
46+
if fmt is None:
47+
return None
4648
raise NotImplementedError(f"FileFormat is unsupported: {fmt}")
4749

4850
def __str__(self):

sdk/python/feast/data_source.py

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class FileOptions:
3838
"""
3939

4040
def __init__(
41-
self, file_format: FileFormat, file_url: str,
41+
self, file_format: Optional[FileFormat], file_url: Optional[str],
4242
):
4343
self._file_format = file_format
4444
self._file_url = file_url
@@ -97,7 +97,10 @@ def to_proto(self) -> DataSourceProto.FileOptions:
9797
"""
9898

9999
file_options_proto = DataSourceProto.FileOptions(
100-
file_format=self.file_format.to_proto(), file_url=self.file_url,
100+
file_format=(
101+
None if self.file_format is None else self.file_format.to_proto()
102+
),
103+
file_url=self.file_url,
101104
)
102105

103106
return file_options_proto
@@ -108,10 +111,23 @@ class BigQueryOptions:
108111
DataSource BigQuery options used to source features from BigQuery query
109112
"""
110113

111-
def __init__(
112-
self, table_ref: str,
113-
):
114+
def __init__(self, table_ref: Optional[str], query: Optional[str]):
114115
self._table_ref = table_ref
116+
self._query = query
117+
118+
@property
119+
def query(self):
120+
"""
121+
Returns the BigQuery SQL query referenced by this source
122+
"""
123+
return self._query
124+
125+
@query.setter
126+
def query(self, query):
127+
"""
128+
Sets the BigQuery SQL query referenced by this source
129+
"""
130+
self._query = query
115131

116132
@property
117133
def table_ref(self):
@@ -139,7 +155,10 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
139155
Returns a BigQueryOptions object based on the bigquery_options protobuf
140156
"""
141157

142-
bigquery_options = cls(table_ref=bigquery_options_proto.table_ref,)
158+
bigquery_options = cls(
159+
table_ref=bigquery_options_proto.table_ref,
160+
query=bigquery_options_proto.query,
161+
)
143162

144163
return bigquery_options
145164

@@ -498,18 +517,48 @@ class FileSource(DataSource):
498517
def __init__(
499518
self,
500519
event_timestamp_column: str,
501-
file_format: FileFormat,
502-
file_url: str,
520+
file_url: Optional[str] = None,
521+
path: Optional[str] = None,
522+
file_format: FileFormat = None,
503523
created_timestamp_column: Optional[str] = "",
504524
field_mapping: Optional[Dict[str, str]] = None,
505525
date_partition_column: Optional[str] = "",
506526
):
527+
"""Create a FileSource from a file containing feature data. Only Parquet format supported.
528+
529+
Args:
530+
531+
path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and
532+
feature columns.
533+
event_timestamp_column: Event timestamp column used for point in time joins of feature values.
534+
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
535+
file_url: [Deprecated] Please see path
536+
file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format.
537+
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table
538+
or view. Only used for feature columns, not entities or timestamp columns.
539+
540+
Examples:
541+
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime")
542+
"""
507543
super().__init__(
508544
event_timestamp_column,
509545
created_timestamp_column,
510546
field_mapping,
511547
date_partition_column,
512548
)
549+
if path is None and file_url is None:
550+
raise ValueError(
551+
'No "path" argument provided. Please set "path" to the location of your file source.'
552+
)
553+
554+
if file_url is not None:
555+
from warnings import warn
556+
557+
warn(
558+
'Argument "file_url" is being deprecated. Please use the "path" argument.'
559+
)
560+
else:
561+
file_url = path
513562
self._file_options = FileOptions(file_format=file_format, file_url=file_url)
514563

515564
def __eq__(self, other):
@@ -537,6 +586,13 @@ def file_options(self, file_options):
537586
"""
538587
self._file_options = file_options
539588

589+
@property
590+
def path(self):
591+
"""
592+
Returns the file path of this feature data source
593+
"""
594+
return self._file_options.file_url
595+
540596
def to_proto(self) -> DataSourceProto:
541597
data_source_proto = DataSourceProto(
542598
type=DataSourceProto.BATCH_FILE,
@@ -555,18 +611,19 @@ class BigQuerySource(DataSource):
555611
def __init__(
556612
self,
557613
event_timestamp_column: str,
558-
table_ref: str,
614+
table_ref: Optional[str] = None,
559615
created_timestamp_column: Optional[str] = "",
560616
field_mapping: Optional[Dict[str, str]] = None,
561617
date_partition_column: Optional[str] = "",
618+
query: Optional[str] = None,
562619
):
563620
super().__init__(
564621
event_timestamp_column,
565622
created_timestamp_column,
566623
field_mapping,
567624
date_partition_column,
568625
)
569-
self._bigquery_options = BigQueryOptions(table_ref=table_ref,)
626+
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)
570627

571628
def __eq__(self, other):
572629
if not isinstance(other, BigQuerySource):
@@ -583,6 +640,10 @@ def __eq__(self, other):
583640
def table_ref(self):
584641
return self._bigquery_options.table_ref
585642

643+
@property
644+
def query(self):
645+
return self._bigquery_options.query
646+
586647
@property
587648
def bigquery_options(self):
588649
"""
@@ -610,6 +671,13 @@ def to_proto(self) -> DataSourceProto:
610671

611672
return data_source_proto
612673

674+
def get_table_query_string(self) -> str:
675+
"""Returns a string that can directly be used to reference this table in SQL"""
676+
if self.table_ref is not None:
677+
return f"`{self.table_ref}`"
678+
else:
679+
return f"({self.query})"
680+
613681

614682
class KafkaSource(DataSource):
615683
def __init__(

sdk/python/feast/feature_store.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
from pathlib import Path
15-
from typing import Optional
15+
from typing import List, Optional, Union
1616

17+
import pandas as pd
18+
19+
from feast.entity import Entity
20+
from feast.feature_view import FeatureView
1721
from feast.infra.provider import Provider, get_provider
22+
from feast.offline_store import RetrievalJob, get_offline_store_for_retrieval
1823
from feast.registry import Registry
1924
from feast.repo_config import (
2025
LocalOnlineStoreConfig,
@@ -55,3 +60,119 @@ def _get_provider(self) -> Provider:
5560

5661
def _get_registry(self) -> Registry:
5762
return Registry(self.config.metadata_store)
63+
64+
def apply(self, objects: List[Union[FeatureView, Entity]]):
65+
"""Register objects to metadata store and update related infrastructure.
66+
67+
The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
68+
objects in the Feast registry. Once the registry has been updated, the apply method will update related
69+
infrastructure (e.g., create tables in an online store) in order to reflect these new definitions. All
70+
operations are idempotent, meaning they can safely be rerun.
71+
72+
Args: objects (List[Union[FeatureView, Entity]]): A list of FeatureView or Entity objects that should be
73+
registered
74+
75+
Examples:
76+
Register a single Entity and FeatureView.
77+
>>> from feast.feature_store import FeatureStore
78+
>>> from feast import Entity, FeatureView, Feature, ValueType, FileSource
79+
>>> from datetime import timedelta
80+
>>>
81+
>>> fs = FeatureStore()
82+
>>> customer_entity = Entity(name="customer", value_type=ValueType.INT64, description="customer entity")
83+
>>> customer_feature_view = FeatureView(
84+
>>> name="customer_fv",
85+
>>> entities=["customer"],
86+
>>> features=[Feature(name="age", dtype=ValueType.INT64)],
87+
>>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
88+
>>> ttl=timedelta(days=1)
89+
>>> )
90+
>>> fs.apply([customer_entity, customer_feature_view])
91+
"""
92+
93+
# TODO: Add locking
94+
# TODO: Optimize by only making a single call (read/write)
95+
# TODO: Add infra update operation (currently we are just writing to registry)
96+
registry = self._get_registry()
97+
for ob in objects:
98+
if isinstance(ob, FeatureView):
99+
registry.apply_feature_view(ob, project=self.config.project)
100+
elif isinstance(ob, Entity):
101+
registry.apply_entity(ob, project=self.config.project)
102+
else:
103+
raise ValueError(
104+
f"Unknown object type ({type(ob)}) provided as part of apply() call"
105+
)
106+
107+
def get_historical_features(
108+
self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str],
109+
) -> RetrievalJob:
110+
"""Enrich an entity dataframe with historical feature values for either training or batch scoring.
111+
112+
This method joins historical feature data from one or more feature views to an entity dataframe by using a time
113+
travel join.
114+
115+
Each feature view is joined to the entity dataframe using all entities configured for the respective feature
116+
view. All configured entities must be available in the entity dataframe. Therefore, the entity dataframe must
117+
contain all entities found in all feature views, but the individual feature views can have different entities.
118+
119+
Time travel is based on the configured TTL for each feature view. A shorter TTL will limit the
120+
amount of scanning that will be done in order to find feature data for a specific entity key. Setting a short
121+
TTL may result in null values being returned.
122+
123+
Args:
124+
entity_df (Union[pd.DataFrame, str]): An entity dataframe is a collection of rows containing all entity
125+
columns (e.g., customer_id, driver_id) on which features need to be joined, as well as a event_timestamp
126+
column used to ensure point-in-time correctness. Either a Pandas DataFrame can be provided or a string
127+
SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
128+
feature_refs: A list of features that should be retrieved from the offline store. Feature references are of
129+
the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
130+
131+
Returns:
132+
RetrievalJob which can be used to materialize the results.
133+
134+
Examples:
135+
Retrieve historical features using a BigQuery SQL entity dataframe
136+
>>> from feast.feature_store import FeatureStore
137+
>>>
138+
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
139+
>>> retrieval_job = fs.get_historical_features(
140+
>>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders",
141+
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"]
142+
>>> )
143+
>>> feature_data = job.to_df()
144+
>>> model.fit(feature_data) # insert your modeling framework here.
145+
"""
146+
147+
registry = self._get_registry()
148+
all_feature_views = registry.list_feature_views(project=self.config.project)
149+
feature_views = _get_requested_feature_views(feature_refs, all_feature_views)
150+
offline_store = get_offline_store_for_retrieval(feature_views)
151+
job = offline_store.get_historical_features(
152+
self.config, feature_views, feature_refs, entity_df
153+
)
154+
return job
155+
156+
157+
def _get_requested_feature_views(
158+
feature_refs: List[str], all_feature_views: List[FeatureView]
159+
) -> List[FeatureView]:
160+
"""Get list of feature views based on feature references"""
161+
162+
feature_views_dict = {}
163+
for ref in feature_refs:
164+
ref_parts = ref.split(":")
165+
found = False
166+
for feature_view in all_feature_views:
167+
if feature_view.name == ref_parts[0]:
168+
found = True
169+
feature_views_dict[feature_view.name] = feature_view
170+
continue
171+
172+
if not found:
173+
raise ValueError(f"Could not find feature view from reference {ref}")
174+
feature_views_list = []
175+
for view in feature_views_dict.values():
176+
feature_views_list.append(view)
177+
178+
return feature_views_list

sdk/python/feast/feature_view.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,23 @@
2020
from feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
2121
from feast.core.FeatureView_pb2 import FeatureViewMeta as FeatureViewMetaProto
2222
from feast.core.FeatureView_pb2 import FeatureViewSpec as FeatureViewSpecProto
23-
from feast.data_source import BigQuerySource, DataSource
23+
from feast.data_source import BigQuerySource, DataSource, FileSource
2424
from feast.feature import Feature
2525
from feast.value_type import ValueType
2626

2727

2828
class FeatureView:
2929
"""
30-
A FeatureView defines a logical grouping of servable features.
30+
A FeatureView defines a logical grouping of serveable features.
3131
"""
3232

3333
name: str
3434
entities: List[str]
3535
features: List[Feature]
36-
tags: Dict[str, str]
37-
ttl: Optional[Duration]
36+
tags: Optional[Dict[str, str]]
37+
ttl: Optional[timedelta]
3838
online: bool
39-
input: BigQuerySource
39+
input: Union[BigQuerySource, FileSource]
4040

4141
created_timestamp: Optional[Timestamp] = None
4242
last_updated_timestamp: Optional[Timestamp] = None
@@ -46,10 +46,10 @@ def __init__(
4646
name: str,
4747
entities: List[str],
4848
features: List[Feature],
49-
tags: Dict[str, str],
5049
ttl: Optional[Union[Duration, timedelta]],
51-
online: bool,
52-
input: BigQuerySource,
50+
input: Union[BigQuerySource, FileSource],
51+
tags: Optional[Dict[str, str]] = None,
52+
online: bool = True,
5353
):
5454
cols = [entity for entity in entities] + [feat.name for feat in features]
5555
for col in cols:
@@ -62,10 +62,9 @@ def __init__(
6262
self.entities = entities
6363
self.features = features
6464
self.tags = tags
65-
if isinstance(ttl, timedelta):
66-
proto_ttl = Duration()
67-
proto_ttl.FromTimedelta(ttl)
68-
self.ttl = proto_ttl
65+
66+
if isinstance(ttl, Duration):
67+
self.ttl = timedelta(seconds=int(ttl.seconds))
6968
else:
7069
self.ttl = ttl
7170

@@ -97,12 +96,16 @@ def to_proto(self) -> FeatureViewProto:
9796
last_updated_timestamp=self.last_updated_timestamp,
9897
)
9998

99+
if self.ttl is not None:
100+
ttl_duration = Duration()
101+
ttl_duration.FromTimedelta(self.ttl)
102+
100103
spec = FeatureViewSpecProto(
101104
name=self.name,
102105
entities=self.entities,
103106
features=[feature.to_proto() for feature in self.features],
104107
tags=self.tags,
105-
ttl=self.ttl,
108+
ttl=(ttl_duration if ttl_duration is not None else None),
106109
online=self.online,
107110
input=self.input.to_proto(),
108111
)

0 commit comments

Comments
 (0)