Skip to content
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
21e47d2
test
mavysavydav Jun 12, 2021
06e0c77
refactored existing tests to test full_feature_names feature on data …
Mwad22 Jun 16, 2021
4b7dd18
removed full_feature_names usage from quickstart and README to have m…
Mwad22 Jun 16, 2021
579e08f
Update CHANGELOG for Feast v0.10.8
Jun 17, 2021
462da43
GitBook: [master] 2 pages modified
achals Jun 17, 2021
df95ee8
Schema Inferencing should happen at apply time (#1646)
mavysavydav Jun 18, 2021
e383575
GitBook: [master] 80 pages modified
woop Jun 19, 2021
dd25ad6
GitBook: [master] 80 pages modified
woop Jun 20, 2021
cef2869
Provide descriptive error on invalid table reference (#1627)
codyjlin Jun 21, 2021
c2e2b4d
Refactor OnlineStoreConfig classes into owning modules (#1649)
achals Jun 21, 2021
d2cda24
Possibility to specify a project for BigQuery queries (#1656)
MattDelac Jun 21, 2021
4ab4c60
Refactor OfflineStoreConfig classes into their owning modules (#1657)
achals Jun 22, 2021
64a2cb5
Run python unit tests in parallel (#1652)
achals Jun 22, 2021
9e4c907
Rename telemetry to usage (#1660)
Jun 22, 2021
b951282
resolved final comments on PR (variable renaming, refactor tests)
Mwad22 Jun 23, 2021
a68b12b
reformatted after merge conflict
Mwad22 Jun 23, 2021
094dbf3
Update CHANGELOG for Feast v0.11.0
woop Jun 24, 2021
0a148f9
Update charts README (#1659)
szalai1 Jun 25, 2021
0ce8210
Added Redis to list of online stores for local provider in providers …
nels Jun 25, 2021
d71e4c5
Grouped inferencing statements together in apply methods for easier r…
mavysavydav Jun 25, 2021
c14023f
Add RedshiftDataSource (#1669)
Jun 28, 2021
d138648
Provide the user with more options for setting the to_bigquery config…
codyjlin Jun 28, 2021
c02b9eb
Add streaming sources to the FeatureView API (#1664)
achals Jun 28, 2021
12dbbea
Add to_table() to RetrievalJob object (#1663)
MattDelac Jun 29, 2021
d0fe0a9
Rename to_table to to_arrow (#1671)
MattDelac Jun 29, 2021
6e8670e
Cancel BigQuery job if timeout hits (#1672)
MattDelac Jun 29, 2021
5314024
Fix Feature References example (#1674)
GregKuhlmann Jun 30, 2021
eb1da5e
Allow strings for online/offline store instead of dicts (#1673)
achals Jun 30, 2021
183a0b9
Remove default list from the FeatureView constructor (#1679)
achals Jul 1, 2021
b714a12
made changes requested by @tsotnet
Mwad22 Jul 2, 2021
c78894f
Fix unit tests that got broken by Pandas 1.3.0 release (#1683)
Jul 3, 2021
20c9461
Add support for DynamoDB and S3 registry (#1483)
leonid133 Jul 3, 2021
d36d1a0
Parallelize integration tests (#1684)
Jul 4, 2021
651bce3
BQ exception should be raised first before we check the timedout (#1675)
MattDelac Jul 5, 2021
f3b92c3
Update sdk/python/feast/infra/provider.py
Mwad22 Jul 5, 2021
f400d65
Update sdk/python/feast/feature_store.py
Mwad22 Jul 5, 2021
082fca7
made error logic/messages more descriptive
Mwad22 Jul 5, 2021
3aca976
made error logic/messages more descriptive.
Mwad22 Jul 5, 2021
79aa736
Simplified error messages
Mwad22 Jul 6, 2021
d7d08ef
ran formatter, issue in errors.py
Mwad22 Jul 7, 2021
2ab8eea
Merge branch 'master' into mwad22-1618-PR
Mwad22 Jul 7, 2021
650340d
python linter issues resolved
Mwad22 Jul 7, 2021
5d582a6
removed unnecessary default assignment in get_historical_features. de…
Mwad22 Jul 8, 2021
8724e0b
added error message assertion for feature name collisions, and other …
Mwad22 Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ print(training_df.head())
# model = ml.fit(training_df)
```
```commandline
event_timestamp driver_id driver_hourly_stats__conv_rate driver_hourly_stats__acc_rate
2021-04-12 08:12:10 1002 0.497279 0.357702
Comment thread
Mwad22 marked this conversation as resolved.
Outdated
2021-04-12 10:59:42 1001 0.979747 0.008166
2021-04-12 15:01:12 1004 0.151432 0.551748
2021-04-12 16:40:26 1003 0.951506 0.753572
event_timestamp driver_id conv_rate acc_rate avg_daily_trips
0 2021-04-12 08:12:10+00:00 1002 0.713465 0.597095 531
1 2021-04-12 10:59:42+00:00 1001 0.072752 0.044344 11
Comment thread
woop marked this conversation as resolved.
Outdated
2 2021-04-12 15:01:12+00:00 1004 0.658182 0.079150 220
3 2021-04-12 16:40:26+00:00 1003 0.162092 0.309035 959

```

Expand Down
6 changes: 3 additions & 3 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ pprint(feature_vector)
```python
{
'driver_id': [1001],
'driver_hourly_stats__conv_rate': [0.49274],
'driver_hourly_stats__acc_rate': [0.92743],
'driver_hourly_stats__avg_daily_trips': [72],
'conv_rate': [0.49274],
'acc_rate': [0.92743],
'avg_daily_trips': [72],
}
```

Expand Down
23 changes: 22 additions & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set
from typing import List, Set

from colorama import Fore, Style

Expand Down Expand Up @@ -88,6 +88,27 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeatureNameCollisionError(Exception):
def __init__(self, feature_refs_collisions: List[str], full_feature_names):
Comment thread
Mwad22 marked this conversation as resolved.
Outdated
if full_feature_names:
collisions = [ref.replace(":", "__") for ref in feature_refs_collisions]
error_message = (
"To resolve this collision, please ensure that the features in question "
"have different names."
)
else:
collisions = [ref.split(":")[1] for ref in feature_refs_collisions]
error_message = (
"To resolve this collision, either use the full feature name by setting "
"'full_feature_names=True', or ensure that the features in question have different names."
)

feature_names = ", ".join(set(collisions))
super().__init__(
f"Duplicate features named {feature_names} found.\n{error_message}"
)


class FeastOnlineStoreInvalidName(Exception):
def __init__(self, online_store_class_name: str):
super().__init__(
Expand Down
107 changes: 70 additions & 37 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from collections import OrderedDict, defaultdict
from collections import Counter, OrderedDict, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
Expand All @@ -24,7 +23,7 @@

from feast import utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
from feast.errors import FeatureNameCollisionError, FeatureViewNotFoundException
from feast.feature_view import FeatureView
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
Expand Down Expand Up @@ -230,9 +229,11 @@ def apply(
update_entities_with_inferred_types_from_feature_views(
entities_to_update, views_to_update, self.config
)

update_data_sources_with_inferred_event_timestamp_col(
[view.input for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_input_source(self.config)

Expand All @@ -255,7 +256,10 @@ def apply(

@log_exceptions_and_usage
def get_historical_features(
self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str],
self,
entity_df: Union[pd.DataFrame, str],
feature_refs: List[str],
full_feature_names: bool = False,
) -> RetrievalJob:
"""Enrich an entity dataframe with historical feature values for either training or batch scoring.

Expand All @@ -277,6 +281,9 @@ def get_historical_features(
SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
feature_refs: A list of features that should be retrieved from the offline store. Feature references are of
the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
"customer_fv__daily_transactions"). By default, this value is set to False.

Returns:
RetrievalJob which can be used to materialize the results.
Expand All @@ -289,32 +296,29 @@ def get_historical_features(
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
>>> retrieval_job = fs.get_historical_features(
>>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders",
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"]
>>> )
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"],
>>> )
>>> feature_data = retrieval_job.to_df()
>>> model.fit(feature_data) # insert your modeling framework here.
"""

all_feature_views = self._registry.list_feature_views(project=self.project)
try:
feature_views = _get_requested_feature_views(
feature_refs, all_feature_views
)
except FeatureViewNotFoundException as e:
sys.exit(e)

_validate_feature_refs(feature_refs, full_feature_names)
feature_views = list(
view for view, _ in _group_feature_refs(feature_refs, all_feature_views)
)

provider = self._get_provider()
try:
job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
)
except FeastProviderLoginError as e:
sys.exit(e)

job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
full_feature_names,
)

return job

Expand Down Expand Up @@ -480,7 +484,10 @@ def tqdm_builder(length):

@log_exceptions_and_usage
def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
self,
feature_refs: List[str],
entity_rows: List[Dict[str, Any]],
full_feature_names: bool = False,
) -> OnlineResponse:
"""
Retrieves the latest online feature data.
Expand Down Expand Up @@ -548,7 +555,8 @@ def get_online_features(
project=self.project, allow_cache=True
)

grouped_refs = _group_refs(feature_refs, all_feature_views)
_validate_feature_refs(feature_refs, full_feature_names)
grouped_refs = _group_feature_refs(feature_refs, all_feature_views)
for table, requested_features in grouped_refs:
entity_keys = _get_table_entity_keys(
table, union_of_entity_keys, entity_name_to_join_key_map
Expand All @@ -565,13 +573,21 @@ def get_online_features(

if feature_data is None:
for feature_name in requested_features:
feature_ref = f"{table.name}__{feature_name}"
feature_ref = (
f"{table.name}__{feature_name}"
if full_feature_names
else feature_name
)
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
else:
for feature_name in feature_data:
feature_ref = f"{table.name}__{feature_name}"
feature_ref = (
f"{table.name}__{feature_name}"
if full_feature_names
else feature_name
)
if feature_name in requested_features:
result_row.fields[feature_ref].CopyFrom(
feature_data[feature_name]
Expand Down Expand Up @@ -599,7 +615,31 @@ def _entity_row_to_field_values(
return result


def _group_refs(
def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = False):
Comment thread
Mwad22 marked this conversation as resolved.
Outdated
collided_feature_refs = []

if full_feature_names:
collided_feature_refs = [
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
]
else:
feature_names = [ref.split(":")[1] for ref in feature_refs]
collided_feature_names = [
ref
for ref, occurrences in Counter(feature_names).items()
if occurrences > 1
]

for feature_name in collided_feature_names:
collided_feature_refs.extend(
[ref for ref in feature_refs if ref.endswith(":" + feature_name)]
)

if len(collided_feature_refs) > 0:
raise FeatureNameCollisionError(collided_feature_refs, full_feature_names)


def _group_feature_refs(
feature_refs: List[str], all_feature_views: List[FeatureView]
) -> List[Tuple[FeatureView, List[str]]]:
""" Get list of feature views and corresponding feature names based on feature references"""
Expand All @@ -612,6 +652,7 @@ def _group_refs(

for ref in feature_refs:
view_name, feat_name = ref.split(":")

if view_name not in view_index:
raise FeatureViewNotFoundException(view_name)
views_features[view_name].append(feat_name)
Expand All @@ -622,14 +663,6 @@ def _group_refs(
return result


def _get_requested_feature_views(
feature_refs: List[str], all_feature_views: List[FeatureView]
) -> List[FeatureView]:
"""Get list of feature views based on feature references"""
# TODO: Get rid of this function. We only need _group_refs
return list(view for view, _ in _group_refs(feature_refs, all_feature_views))


def _get_table_entity_keys(
table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str],
) -> List[EntityKeyProto]:
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def to_proto(self) -> FeatureViewProto:
ttl_duration = Duration()
ttl_duration.FromTimedelta(self.ttl)

print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been removed in #1678

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove and push, must have gotten added when I rebased.


spec = FeatureViewSpecProto(
name=self.name,
entities=self.entities,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
Comment thread
Mwad22 marked this conversation as resolved.
Outdated
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
Expand All @@ -137,5 +138,6 @@ def get_historical_features(
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)
return job
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
Expand All @@ -139,5 +140,6 @@ def get_historical_features(
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)
return job
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
return self.offline_store.get_historical_features(
config=config,
Expand All @@ -138,6 +139,7 @@ def get_historical_features(
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)


Expand Down
11 changes: 8 additions & 3 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def get_historical_features(
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
Expand All @@ -121,7 +122,7 @@ def get_historical_features(

# Build a query context containing all information required to template the BigQuery SQL query
query_context = get_feature_view_query_context(
feature_refs, feature_views, registry, project
feature_refs, feature_views, registry, project, full_feature_names
Comment thread
Mwad22 marked this conversation as resolved.
Outdated
)

# TODO: Infer min_timestamp and max_timestamp from entity_df
Expand All @@ -132,6 +133,7 @@ def get_historical_features(
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=str(table.reference),
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
full_feature_names=full_feature_names,
)

job = BigQueryRetrievalJob(query=query, client=client, config=config)
Expand Down Expand Up @@ -373,6 +375,7 @@ def get_feature_view_query_context(
feature_views: List[FeatureView],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> List[FeatureViewQueryContext]:
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""

Expand Down Expand Up @@ -432,6 +435,7 @@ def build_point_in_time_query(
max_timestamp: datetime,
left_table_query_string: str,
entity_df_event_timestamp_col: str,
full_feature_names: bool = False,
):
"""Build point-in-time query between each feature view table and the entity dataframe"""
template = Environment(loader=BaseLoader()).from_string(
Expand All @@ -448,6 +452,7 @@ def build_point_in_time_query(
[entity for fv in feature_view_query_contexts for entity in fv.entities]
),
"featureviews": [asdict(context) for context in feature_view_query_contexts],
"full_feature_names": full_feature_names,
}

query = template.render(template_context)
Expand Down Expand Up @@ -521,7 +526,7 @@ def _get_bigquery_client(project: Optional[str] = None):
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
),
Expand Down Expand Up @@ -614,7 +619,7 @@ def _get_bigquery_client(project: Optional[str] = None):
SELECT
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }},
{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %},
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING (entity_row_unique_id)
Expand Down
Loading