Skip to content

Commit b791284

Browse files
feat: Updating protos for Projections to include more info
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent 334e5d7 commit b791284

File tree

4 files changed

+165
-44
lines changed

4 files changed

+165
-44
lines changed

protos/feast/core/FeatureViewProjection.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto";
66
option java_package = "feast.proto.core";
77

88
import "feast/core/Feature.proto";
9+
import "feast/core/DataSource.proto";
910

1011

1112
// A projection to be applied on top of a FeatureView.
@@ -22,4 +23,13 @@ message FeatureViewProjection {
2223

2324
// Map for entity join_key overrides of feature data entity join_key to entity data join_key
2425
map<string,string> join_key_map = 4;
26+
27+
string timestamp_field = 5;
28+
string date_partition_column = 6;
29+
string created_timestamp_column = 7;
30+
// Batch/Offline DataSource where this view can retrieve offline feature data.
31+
DataSource batch_source = 8;
32+
// Streaming DataSource from where this view can consume "online" feature data.
33+
DataSource stream_source = 9;
34+
2535
}

protos/feast/core/OnDemandFeatureView.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec {
6363
// Owner of the on demand feature view.
6464
string owner = 8;
6565
string mode = 11;
66+
bool write_to_online_store = 12;
67+
68+
// List of names of entities associated with this feature view.
69+
repeated string entities = 13;
70+
// List of specifications for each entity defined as part of this feature view.
71+
repeated FeatureSpecV2 entity_columns = 14;
6672
}
6773

6874
message OnDemandFeatureViewMeta {

sdk/python/feast/feature_view_projection.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from attr import dataclass
44

5+
from feast.data_source import DataSource
56
from feast.field import Field
67
from feast.protos.feast.core.FeatureViewProjection_pb2 import (
78
FeatureViewProjection as FeatureViewProjectionProto,
@@ -27,13 +28,24 @@ class FeatureViewProjection:
2728
is not ready to be projected, i.e. still needs to go through feature inference.
2829
join_key_map: A map to modify join key columns during retrieval of this feature
2930
view projection.
31+
timestamp_field: The timestamp field of the feature view projection.
32+
date_partition_column: The date partition column of the feature view projection.
33+
created_timestamp_column: The created timestamp column of the feature view projection.
34+
batch_source: The batch source of data where this group of features
35+
is stored. This is optional ONLY if a push source is specified as the
36+
stream_source, since push sources contain their own batch sources.
37+
3038
"""
3139

3240
name: str
3341
name_alias: Optional[str]
3442
desired_features: List[str]
3543
features: List[Field]
3644
join_key_map: Dict[str, str] = {}
45+
timestamp_field: Optional[str] = None
46+
date_partition_column: Optional[str] = None
47+
created_timestamp_column: Optional[str] = None
48+
batch_source: Optional[DataSource] = None
3749

3850
def name_to_use(self):
3951
return self.name_alias or self.name
@@ -43,6 +55,10 @@ def to_proto(self) -> FeatureViewProjectionProto:
4355
feature_view_name=self.name,
4456
feature_view_name_alias=self.name_alias or "",
4557
join_key_map=self.join_key_map,
58+
timestamp_field=self.timestamp_field,
59+
date_partition_column=self.date_partition_column,
60+
created_timestamp_column=self.created_timestamp_column,
61+
batch_source=self.batch_source.to_proto() or None,
4662
)
4763
for feature in self.features:
4864
feature_reference_proto.feature_columns.append(feature.to_proto())
@@ -57,6 +73,10 @@ def from_proto(proto: FeatureViewProjectionProto):
5773
features=[],
5874
join_key_map=dict(proto.join_key_map),
5975
desired_features=[],
76+
timestamp_field=proto.timestamp_field or None,
77+
date_partition_column=proto.date_partition_column or None,
78+
created_timestamp_column=proto.created_timestamp_column or None,
79+
batch_source=proto.batch_source or None,
6080
)
6181
for feature_column in proto.feature_columns:
6282
feature_view_projection.features.append(Field.from_proto(feature_column))
@@ -65,6 +85,21 @@ def from_proto(proto: FeatureViewProjectionProto):
6585

6686
@staticmethod
6787
def from_definition(base_feature_view: "BaseFeatureView"):
88+
# TODO need to implement this for StreamFeatureViews
89+
if getattr(base_feature_view, "batch_source", None):
90+
return FeatureViewProjection(
91+
name=base_feature_view.name,
92+
name_alias=None,
93+
features=base_feature_view.features,
94+
desired_features=[],
95+
timestamp_field=base_feature_view.batch_source.created_timestamp_column
96+
or None,
97+
created_timestamp_column=base_feature_view.batch_source.created_timestamp_column
98+
or None,
99+
date_partition_column=base_feature_view.batch_source.date_partition_column
100+
or None,
101+
batch_source=base_feature_view.batch_source or None,
102+
)
68103
return FeatureViewProjection(
69104
name=base_feature_view.name,
70105
name_alias=None,

sdk/python/feast/inference.py

Lines changed: 114 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from feast.infra.offline_stores.file_source import FileSource
1414
from feast.infra.offline_stores.redshift_source import RedshiftSource
1515
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
16+
from feast.on_demand_feature_view import OnDemandFeatureView
1617
from feast.repo_config import RepoConfig
1718
from feast.stream_feature_view import StreamFeatureView
1819
from feast.types import String
@@ -94,7 +95,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
9495

9596

9697
def update_feature_views_with_inferred_features_and_entities(
97-
fvs: Union[List[FeatureView], List[StreamFeatureView]],
98+
fvs: Union[List[FeatureView], List[StreamFeatureView], List[OnDemandFeatureView]],
9899
entities: List[Entity],
99100
config: RepoConfig,
100101
) -> None:
@@ -127,13 +128,14 @@ def update_feature_views_with_inferred_features_and_entities(
127128

128129
# Fields whose names match a join key are considered to be entity columns; all
129130
# other fields are considered to be feature columns.
131+
entity_columns = fv.entity_columns if fv.entity_columns else []
130132
for field in fv.schema:
131133
if field.name in join_keys:
132134
# Do not override a preexisting field with the same name.
133135
if field.name not in [
134-
entity_column.name for entity_column in fv.entity_columns
136+
entity_column.name for entity_column in entity_columns
135137
]:
136-
fv.entity_columns.append(field)
138+
entity_columns.append(field)
137139
else:
138140
if field.name not in [feature.name for feature in fv.features]:
139141
fv.features.append(field)
@@ -146,10 +148,10 @@ def update_feature_views_with_inferred_features_and_entities(
146148
continue
147149
if (
148150
entity.join_key
149-
not in [entity_column.name for entity_column in fv.entity_columns]
151+
not in [entity_column.name for entity_column in entity_columns]
150152
and entity.value_type != ValueType.UNKNOWN
151153
):
152-
fv.entity_columns.append(
154+
entity_columns.append(
153155
Field(
154156
name=entity.join_key,
155157
dtype=from_value_type(entity.value_type),
@@ -160,10 +162,11 @@ def update_feature_views_with_inferred_features_and_entities(
160162
if (
161163
len(fv.entities) == 1
162164
and fv.entities[0] == DUMMY_ENTITY_NAME
163-
and not fv.entity_columns
165+
and not entity_columns
164166
):
165-
fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String))
167+
entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String))
166168

169+
fv.entity_columns = entity_columns
167170
# Run inference for entity columns if there are fewer entity fields than expected.
168171
run_inference_for_entities = len(fv.entity_columns) < len(join_keys)
169172

@@ -200,49 +203,116 @@ def _infer_features_and_entities(
200203
run_inference_for_features: Whether to run inference for features.
201204
config: The config for the current feature store.
202205
"""
203-
columns_to_exclude = {
204-
fv.batch_source.timestamp_field,
205-
fv.batch_source.created_timestamp_column,
206-
}
207-
for original_col, mapped_col in fv.batch_source.field_mapping.items():
208-
if mapped_col in columns_to_exclude:
209-
columns_to_exclude.remove(mapped_col)
210-
columns_to_exclude.add(original_col)
211-
212-
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
213-
config
214-
)
215-
216-
for col_name, col_datatype in table_column_names_and_types:
217-
if col_name in columns_to_exclude:
218-
continue
219-
elif col_name in join_keys:
220-
field = Field(
221-
name=col_name,
222-
dtype=from_value_type(
223-
fv.batch_source.source_datatype_to_feast_value_type()(col_datatype)
224-
),
206+
entity_columns = []
207+
if isinstance(fv, OnDemandFeatureView):
208+
columns_to_exclude = set()
209+
for (
210+
source_feature_view_name,
211+
source_feature_view,
212+
) in fv.source_feature_view_projections.items():
213+
columns_to_exclude.add(source_feature_view.timestamp_field)
214+
columns_to_exclude.add(source_feature_view.created_timestamp_column)
215+
216+
for (
217+
original_col,
218+
mapped_col,
219+
) in source_feature_view.batch_source.field_mapping.items():
220+
if mapped_col in columns_to_exclude:
221+
columns_to_exclude.remove(mapped_col)
222+
columns_to_exclude.add(original_col)
223+
224+
table_column_names_and_types = (
225+
source_feature_view.batch_source.get_table_column_names_and_types(
226+
config
227+
)
225228
)
226-
if field.name not in [
227-
entity_column.name for entity_column in fv.entity_columns
228-
]:
229-
fv.entity_columns.append(field)
230-
elif not re.match(
231-
"^__|__$", col_name
232-
): # double underscores often signal an internal-use column
233-
if run_inference_for_features:
234-
feature_name = (
235-
fv.batch_source.field_mapping[col_name]
236-
if col_name in fv.batch_source.field_mapping
237-
else col_name
229+
230+
for col_name, col_datatype in table_column_names_and_types:
231+
if col_name in columns_to_exclude:
232+
continue
233+
elif col_name in join_keys:
234+
field = Field(
235+
name=col_name,
236+
dtype=from_value_type(
237+
source_feature_view.batch_source.source_datatype_to_feast_value_type()(
238+
col_datatype
239+
)
240+
),
238241
)
242+
if field.name not in [
243+
entity_column.name for entity_column in entity_columns
244+
]:
245+
entity_columns.append(field)
246+
elif not re.match(
247+
"^__|__$", col_name
248+
): # double underscores often signal an internal-use column
249+
if run_inference_for_features:
250+
feature_name = (
251+
source_feature_view.batch_source.field_mapping[col_name]
252+
if col_name in source_feature_view.batch_source.field_mapping
253+
else col_name
254+
)
255+
field = Field(
256+
name=feature_name,
257+
dtype=from_value_type(
258+
source_feature_view.batch_source.source_datatype_to_feast_value_type()(
259+
col_datatype
260+
)
261+
),
262+
)
263+
if field.name not in [
264+
feature.name for feature in source_feature_view.features
265+
]:
266+
source_feature_view.features.append(field)
267+
268+
else:
269+
columns_to_exclude = {
270+
fv.batch_source.timestamp_field,
271+
fv.batch_source.created_timestamp_column,
272+
}
273+
for original_col, mapped_col in fv.batch_source.field_mapping.items():
274+
if mapped_col in columns_to_exclude:
275+
columns_to_exclude.remove(mapped_col)
276+
columns_to_exclude.add(original_col)
277+
278+
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
279+
config
280+
)
281+
282+
for col_name, col_datatype in table_column_names_and_types:
283+
if col_name in columns_to_exclude:
284+
continue
285+
elif col_name in join_keys:
239286
field = Field(
240-
name=feature_name,
287+
name=col_name,
241288
dtype=from_value_type(
242289
fv.batch_source.source_datatype_to_feast_value_type()(
243290
col_datatype
244291
)
245292
),
246293
)
247-
if field.name not in [feature.name for feature in fv.features]:
248-
fv.features.append(field)
294+
if field.name not in [
295+
entity_column.name for entity_column in entity_columns
296+
]:
297+
entity_columns.append(field)
298+
elif not re.match(
299+
"^__|__$", col_name
300+
): # double underscores often signal an internal-use column
301+
if run_inference_for_features:
302+
feature_name = (
303+
fv.batch_source.field_mapping[col_name]
304+
if col_name in fv.batch_source.field_mapping
305+
else col_name
306+
)
307+
field = Field(
308+
name=feature_name,
309+
dtype=from_value_type(
310+
fv.batch_source.source_datatype_to_feast_value_type()(
311+
col_datatype
312+
)
313+
),
314+
)
315+
if field.name not in [feature.name for feature in fv.features]:
316+
fv.features.append(field)
317+
318+
fv.entity_columns = entity_columns

0 commit comments

Comments
 (0)