Skip to content

Commit 51fe128

Browse files
authored
Add streaming sources to the FeatureView API (#1664)
* Add a streaming source to the FeatureView API This diff only updates the API. It is currently up to the providers to actually use this information to spin up resources to consume events from the stream sources. Signed-off-by: Achal Shah <achals@gmail.com> * remove stuff from rebase Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * Update protos Signed-off-by: Achal Shah <achals@gmail.com> * lint Signed-off-by: Achal Shah <achals@gmail.com> * format Signed-off-by: Achal Shah <achals@gmail.com> * CR Signed-off-by: Achal Shah <achals@gmail.com> * fix test Signed-off-by: Achal Shah <achals@gmail.com> * lint Signed-off-by: Achal Shah <achals@gmail.com>
1 parent b9dd955 commit 51fe128

File tree

6 files changed

+42
-10
lines changed

6 files changed

+42
-10
lines changed

protos/feast/core/FeatureView.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ message FeatureViewSpec {
5959
google.protobuf.Duration ttl = 6;
6060

6161
// Batch/Offline DataSource where this view can retrieve offline feature data.
62-
DataSource input = 7;
62+
DataSource batch_source = 7;
63+
// Streaming DataSource from where this view can consume "online" feature data.
64+
DataSource stream_source = 9;
6365

6466
// Whether these features should be served online or not
6567
bool online = 8;

sdk/python/feast/data_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,9 @@ def __init__(
869869
)
870870

871871
def __eq__(self, other):
872+
if other is None:
873+
return False
874+
872875
if not isinstance(other, KinesisSource):
873876
raise TypeError(
874877
"Comparisons should only involve KinesisSource class objects."

sdk/python/feast/feature_view.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ class FeatureView:
5050
ttl: Optional[timedelta]
5151
online: bool
5252
input: DataSource
53-
53+
batch_source: Optional[DataSource] = None
54+
stream_source: Optional[DataSource] = None
5455
created_timestamp: Optional[Timestamp] = None
5556
last_updated_timestamp: Optional[Timestamp] = None
5657
materialization_intervals: List[Tuple[datetime, datetime]]
@@ -62,15 +63,22 @@ def __init__(
6263
entities: List[str],
6364
ttl: Optional[Union[Duration, timedelta]],
6465
input: DataSource,
66+
batch_source: Optional[DataSource] = None,
67+
stream_source: Optional[DataSource] = None,
6568
features: List[Feature] = [],
6669
tags: Optional[Dict[str, str]] = None,
6770
online: bool = True,
6871
):
72+
_input = input or batch_source
73+
assert _input is not None
74+
6975
cols = [entity for entity in entities] + [feat.name for feat in features]
7076
for col in cols:
71-
if input.field_mapping is not None and col in input.field_mapping.keys():
77+
if _input.field_mapping is not None and col in _input.field_mapping.keys():
7278
raise ValueError(
73-
f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name."
79+
f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. "
80+
f"Please either remove this field mapping or use {_input.field_mapping[col]} as the "
81+
f"Entity or Feature name."
7482
)
7583

7684
self.name = name
@@ -84,7 +92,9 @@ def __init__(
8492
self.ttl = ttl
8593

8694
self.online = online
87-
self.input = input
95+
self.input = _input
96+
self.batch_source = _input
97+
self.stream_source = stream_source
8898

8999
self.materialization_intervals = []
90100

@@ -118,6 +128,8 @@ def __eq__(self, other):
118128
return False
119129
if self.input != other.input:
120130
return False
131+
if self.stream_source != other.stream_source:
132+
return False
121133

122134
return True
123135

@@ -157,14 +169,21 @@ def to_proto(self) -> FeatureViewProto:
157169
ttl_duration = Duration()
158170
ttl_duration.FromTimedelta(self.ttl)
159171

172+
print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}")
173+
160174
spec = FeatureViewSpecProto(
161175
name=self.name,
162176
entities=self.entities,
163177
features=[feature.to_proto() for feature in self.features],
164178
tags=self.tags,
165179
ttl=(ttl_duration if ttl_duration is not None else None),
166180
online=self.online,
167-
input=self.input.to_proto(),
181+
batch_source=self.input.to_proto(),
182+
stream_source=(
183+
self.stream_source.to_proto()
184+
if self.stream_source is not None
185+
else None
186+
),
168187
)
169188

170189
return FeatureViewProto(spec=spec, meta=meta)
@@ -181,6 +200,12 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
181200
Returns a FeatureViewProto object based on the feature view protobuf
182201
"""
183202

203+
_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
204+
stream_source = (
205+
DataSource.from_proto(feature_view_proto.spec.stream_source)
206+
if feature_view_proto.spec.HasField("stream_source")
207+
else None
208+
)
184209
feature_view = cls(
185210
name=feature_view_proto.spec.name,
186211
entities=[entity for entity in feature_view_proto.spec.entities],
@@ -200,7 +225,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
200225
and feature_view_proto.spec.ttl.nanos == 0
201226
else feature_view_proto.spec.ttl
202227
),
203-
input=DataSource.from_proto(feature_view_proto.spec.input),
228+
input=_input,
229+
batch_source=_input,
230+
stream_source=stream_source,
204231
)
205232

206233
feature_view.created_timestamp = feature_view_proto.meta.created_timestamp

sdk/python/tensorflow_metadata/proto/v0/path_pb2.py

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)