Skip to content

Commit 24470b6

Browse files
authored
chore: Add support for push sources in feature views (#2452)
* chore: Add support for push sources in feature views Signed-off-by: Achal Shah <achals@gmail.com> * fix test Signed-off-by: Achal Shah <achals@gmail.com> * cr Signed-off-by: Achal Shah <achals@gmail.com> * Add universal test Signed-off-by: Achal Shah <achals@gmail.com> * cr Signed-off-by: Achal Shah <achals@gmail.com>
1 parent f202f92 commit 24470b6

File tree

12 files changed

+206
-56
lines changed

12 files changed

+206
-56
lines changed

go/internal/feast/featurestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs,
733733
joinKeys := make([]string, 0)
734734
fv := featuresAndView.view
735735
featureNames := featuresAndView.featureRefs
736-
for entity, _ := range fv.entities {
736+
for entity := range fv.entities {
737737
joinKeys = append(joinKeys, entityNameToJoinKeyMap[entity])
738738
}
739739

sdk/python/feast/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from feast.infra.offline_stores.redshift_source import RedshiftSource
88
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
99

10-
from .data_source import KafkaSource, KinesisSource, SourceType
10+
from .data_source import KafkaSource, KinesisSource, PushSource, SourceType
1111
from .entity import Entity
1212
from .feature import Feature
1313
from .feature_service import FeatureService
@@ -47,4 +47,5 @@
4747
"RedshiftSource",
4848
"RequestFeatureView",
4949
"SnowflakeSource",
50+
"PushSource",
5051
]

sdk/python/feast/data_source.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ def get_table_column_names_and_types(
396396
def from_proto(data_source: DataSourceProto):
397397
schema_pb = data_source.request_data_options.schema
398398
schema = {}
399-
for key in schema_pb.keys():
400-
schema[key] = ValueType(schema_pb.get(key))
399+
for key, val in schema_pb.items():
400+
schema[key] = ValueType(val)
401401
return RequestDataSource(name=data_source.name, schema=schema)
402402

403403
def to_proto(self) -> DataSourceProto:
@@ -510,27 +510,41 @@ def to_proto(self) -> DataSourceProto:
510510

511511
class PushSource(DataSource):
512512
"""
513-
PushSource that can be used to ingest features on request
514-
515-
Args:
516-
name: Name of the push source
517-
schema: Schema mapping from the input feature name to a ValueType
513+
A source that can be used to ingest features on request
518514
"""
519515

520516
name: str
521517
schema: Dict[str, ValueType]
522-
batch_source: Optional[DataSource]
518+
batch_source: DataSource
519+
event_timestamp_column: str
523520

524521
def __init__(
525522
self,
526523
name: str,
527524
schema: Dict[str, ValueType],
528-
batch_source: Optional[DataSource] = None,
525+
batch_source: DataSource,
526+
event_timestamp_column="timestamp",
529527
):
530-
"""Creates a PushSource object."""
528+
"""
529+
Creates a PushSource object.
530+
Args:
531+
name: Name of the push source
532+
schema: Schema mapping from the input feature name to a ValueType
533+
batch_source: The batch source that backs this push source. It's used when materializing from the offline
534+
store to the online store, and when retrieving historical features.
535+
event_timestamp_column (optional): Event timestamp column used for point in time
536+
joins of feature values.
537+
"""
531538
super().__init__(name)
532539
self.schema = schema
533540
self.batch_source = batch_source
541+
if not self.batch_source:
542+
raise ValueError(f"batch_source is needed for push source {self.name}")
543+
self.event_timestamp_column = event_timestamp_column
544+
if not self.event_timestamp_column:
545+
raise ValueError(
546+
f"event_timestamp_column is needed for push source {self.name}"
547+
)
534548

535549
def validate(self, config: RepoConfig):
536550
pass
@@ -544,21 +558,23 @@ def get_table_column_names_and_types(
544558
def from_proto(data_source: DataSourceProto):
545559
schema_pb = data_source.push_options.schema
546560
schema = {}
547-
for key, value in schema_pb.items():
548-
schema[key] = value
561+
for key, val in schema_pb.items():
562+
schema[key] = ValueType(val)
549563

550-
batch_source = None
551-
if data_source.push_options.HasField("batch_source"):
552-
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
564+
assert data_source.push_options.HasField("batch_source")
565+
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
553566

554567
return PushSource(
555-
name=data_source.name, schema=schema, batch_source=batch_source
568+
name=data_source.name,
569+
schema=schema,
570+
batch_source=batch_source,
571+
event_timestamp_column=data_source.event_timestamp_column,
556572
)
557573

558574
def to_proto(self) -> DataSourceProto:
559575
schema_pb = {}
560576
for key, value in self.schema.items():
561-
schema_pb[key] = value
577+
schema_pb[key] = value.value
562578
batch_source_proto = None
563579
if self.batch_source:
564580
batch_source_proto = self.batch_source.to_proto()
@@ -567,7 +583,10 @@ def to_proto(self) -> DataSourceProto:
567583
schema=schema_pb, batch_source=batch_source_proto
568584
)
569585
data_source_proto = DataSourceProto(
570-
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
586+
name=self.name,
587+
type=DataSourceProto.PUSH_SOURCE,
588+
push_options=options,
589+
event_timestamp_column=self.event_timestamp_column,
571590
)
572591

573592
return data_source_proto

sdk/python/feast/feature_store.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,20 +282,23 @@ def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
282282
return self._registry.list_data_sources(self.project, allow_cache=allow_cache)
283283

284284
@log_exceptions_and_usage
285-
def get_entity(self, name: str) -> Entity:
285+
def get_entity(self, name: str, allow_registry_cache: bool = False) -> Entity:
286286
"""
287287
Retrieves an entity.
288288
289289
Args:
290290
name: Name of entity.
291+
allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
291292
292293
Returns:
293294
The specified entity.
294295
295296
Raises:
296297
EntityNotFoundException: The entity could not be found.
297298
"""
298-
return self._registry.get_entity(name, self.project)
299+
return self._registry.get_entity(
300+
name, self.project, allow_cache=allow_registry_cache
301+
)
299302

300303
@log_exceptions_and_usage
301304
def get_feature_service(
@@ -317,25 +320,33 @@ def get_feature_service(
317320
return self._registry.get_feature_service(name, self.project, allow_cache)
318321

319322
@log_exceptions_and_usage
320-
def get_feature_view(self, name: str) -> FeatureView:
323+
def get_feature_view(
324+
self, name: str, allow_registry_cache: bool = False
325+
) -> FeatureView:
321326
"""
322327
Retrieves a feature view.
323328
324329
Args:
325330
name: Name of feature view.
331+
allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
326332
327333
Returns:
328334
The specified feature view.
329335
330336
Raises:
331337
FeatureViewNotFoundException: The feature view could not be found.
332338
"""
333-
return self._get_feature_view(name)
339+
return self._get_feature_view(name, allow_registry_cache=allow_registry_cache)
334340

335341
def _get_feature_view(
336-
self, name: str, hide_dummy_entity: bool = True
342+
self,
343+
name: str,
344+
hide_dummy_entity: bool = True,
345+
allow_registry_cache: bool = False,
337346
) -> FeatureView:
338-
feature_view = self._registry.get_feature_view(name, self.project)
347+
feature_view = self._registry.get_feature_view(
348+
name, self.project, allow_cache=allow_registry_cache
349+
)
339350
if hide_dummy_entity and feature_view.entities[0] == DUMMY_ENTITY_NAME:
340351
feature_view.entities = []
341352
return feature_view
@@ -1144,6 +1155,31 @@ def tqdm_builder(length):
11441155
feature_view, self.project, start_date, end_date,
11451156
)
11461157

1158+
@log_exceptions_and_usage
1159+
def push(self, push_source_name: str, df: pd.DataFrame):
1160+
"""
1161+
Push features to a push source. This updates all the feature views that have the push source as stream source.
1162+
Args:
1163+
push_source_name: The name of the push source we want to push data to.
1164+
df: the data being pushed.
1165+
"""
1166+
from feast.data_source import PushSource
1167+
1168+
all_fvs = self.list_feature_views(allow_cache=True)
1169+
1170+
fvs_with_push_sources = {
1171+
fv
1172+
for fv in all_fvs
1173+
if (
1174+
fv.stream_source is not None
1175+
and isinstance(fv.stream_source, PushSource)
1176+
and fv.stream_source.name == push_source_name
1177+
)
1178+
}
1179+
1180+
for fv in fvs_with_push_sources:
1181+
self.write_to_online_store(fv.name, df, allow_registry_cache=True)
1182+
11471183
@log_exceptions_and_usage
11481184
def write_to_online_store(
11491185
self,
@@ -1155,12 +1191,14 @@ def write_to_online_store(
11551191
ingests data directly into the Online store
11561192
"""
11571193
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
1158-
feature_view = self._registry.get_feature_view(
1159-
feature_view_name, self.project, allow_cache=allow_registry_cache
1194+
feature_view = self.get_feature_view(
1195+
feature_view_name, allow_registry_cache=allow_registry_cache
11601196
)
11611197
entities = []
11621198
for entity_name in feature_view.entities:
1163-
entities.append(self._registry.get_entity(entity_name, self.project))
1199+
entities.append(
1200+
self.get_entity(entity_name, allow_registry_cache=allow_registry_cache)
1201+
)
11641202
provider = self._get_provider()
11651203
provider.ingest_df(feature_view, entities, df)
11661204

sdk/python/feast/feature_view.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from feast import utils
2222
from feast.base_feature_view import BaseFeatureView
23-
from feast.data_source import DataSource
23+
from feast.data_source import DataSource, PushSource
2424
from feast.entity import Entity
2525
from feast.feature import Feature
2626
from feast.feature_view_projection import FeatureViewProjection
@@ -58,7 +58,9 @@ class FeatureView(BaseFeatureView):
5858
ttl: The amount of time this group of features lives. A ttl of 0 indicates that
5959
this group of features lives forever. Note that large ttl's or a ttl of 0
6060
can result in extremely computationally intensive queries.
61-
batch_source: The batch source of data where this group of features is stored.
61+
batch_source (optional): The batch source of data where this group of features is stored.
62+
This is optional ONLY a push source is specified as the stream_source, since push sources
63+
contain their own batch sources.
6264
stream_source (optional): The stream source of data where this group of features
6365
is stored.
6466
features: The list of features defined as part of this feature view.
@@ -88,7 +90,7 @@ def __init__(
8890
name: str,
8991
entities: List[str],
9092
ttl: Union[Duration, timedelta],
91-
batch_source: DataSource,
93+
batch_source: Optional[DataSource] = None,
9294
stream_source: Optional[DataSource] = None,
9395
features: Optional[List[Feature]] = None,
9496
online: bool = True,
@@ -121,15 +123,30 @@ def __init__(
121123
"""
122124
_features = features or []
123125

126+
if stream_source is not None and isinstance(stream_source, PushSource):
127+
if stream_source.batch_source is None or not isinstance(
128+
stream_source.batch_source, DataSource
129+
):
130+
raise ValueError(
131+
f"A batch_source needs to be specified for feature view `{name}`"
132+
)
133+
self.batch_source = stream_source.batch_source
134+
else:
135+
if batch_source is None:
136+
raise ValueError(
137+
f"A batch_source needs to be specified for feature view `{name}`"
138+
)
139+
self.batch_source = batch_source
140+
124141
cols = [entity for entity in entities] + [feat.name for feat in _features]
125142
for col in cols:
126143
if (
127-
batch_source.field_mapping is not None
128-
and col in batch_source.field_mapping.keys()
144+
self.batch_source.field_mapping is not None
145+
and col in self.batch_source.field_mapping.keys()
129146
):
130147
raise ValueError(
131-
f"The field {col} is mapped to {batch_source.field_mapping[col]} for this data source. "
132-
f"Please either remove this field mapping or use {batch_source.field_mapping[col]} as the "
148+
f"The field {col} is mapped to {self.batch_source.field_mapping[col]} for this data source. "
149+
f"Please either remove this field mapping or use {self.batch_source.field_mapping[col]} as the "
133150
f"Entity or Feature name."
134151
)
135152

@@ -149,9 +166,8 @@ def __init__(
149166
else:
150167
self.ttl = ttl
151168

152-
self.batch_source = batch_source
153-
self.stream_source = stream_source
154169
self.online = online
170+
self.stream_source = stream_source
155171
self.materialization_intervals = []
156172

157173
# Note: Python requires redefining hash in child classes that override __eq__

sdk/python/feast/repo_operations.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import click
1313
from click.exceptions import BadParameter
1414

15+
from feast import PushSource
1516
from feast.data_source import DataSource
1617
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
1718
from feast.entity import Entity
@@ -112,6 +113,8 @@ def parse_repo(repo_root: Path) -> RepoContents:
112113
res.data_sources.add(obj)
113114
if isinstance(obj, FeatureView):
114115
res.feature_views.add(obj)
116+
if isinstance(obj.stream_source, PushSource):
117+
res.data_sources.add(obj.stream_source.batch_source)
115118
elif isinstance(obj, Entity):
116119
res.entities.add(obj)
117120
elif isinstance(obj, FeatureService):

sdk/python/tests/example_repos/example_feature_repo_1.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Feature,
77
FeatureService,
88
FeatureView,
9+
PushSource,
910
ValueType,
1011
)
1112

@@ -26,6 +27,16 @@
2627
event_timestamp_column="event_timestamp",
2728
)
2829

30+
driver_locations_push_source = PushSource(
31+
name="driver_locations_push",
32+
schema={
33+
"driver_id": ValueType.STRING,
34+
"driver_lat": ValueType.FLOAT,
35+
"driver_long": ValueType.STRING,
36+
},
37+
batch_source=driver_locations_source,
38+
)
39+
2940
driver = Entity(
3041
name="driver", # The name is derived from this argument, not object name.
3142
join_key="driver_id",
@@ -53,6 +64,19 @@
5364
tags={},
5465
)
5566

67+
pushed_driver_locations = FeatureView(
68+
name="pushed_driver_locations",
69+
entities=["driver"],
70+
ttl=timedelta(days=1),
71+
features=[
72+
Feature(name="driver_lat", dtype=ValueType.FLOAT),
73+
Feature(name="driver_long", dtype=ValueType.STRING),
74+
],
75+
online=True,
76+
stream_source=driver_locations_push_source,
77+
tags={},
78+
)
79+
5680
customer_profile = FeatureView(
5781
name="customer_profile",
5882
entities=["customer"],

0 commit comments

Comments
 (0)