From 2e0892b3c2f97901e074d7dd9aa8a60447d53efb Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 23 Jun 2021 10:41:05 -0700 Subject: [PATCH 1/9] Add a streaming source to the FeatureView API This diff only updates the API. It is currently up to the providers to actually use this information to spin up resources to consume events from the stream sources. Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 44 ++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index db22fd7e4a3..fe4114a3be1 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,8 +20,8 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import utils -from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure +from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource, KinesisSource from feast.feature import Feature from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( @@ -50,7 +50,8 @@ class FeatureView: ttl: Optional[timedelta] online: bool input: DataSource - + batch_source: Optional[DataSource] = None + stream_source: Optional[DataSource] = None created_timestamp: Optional[Timestamp] = None last_updated_timestamp: Optional[Timestamp] = None materialization_intervals: List[Tuple[datetime, datetime]] @@ -62,15 +63,46 @@ def __init__( entities: List[str], ttl: Optional[Union[Duration, timedelta]], input: DataSource, + batch_source: Optional[DataSource] = None, + stream_source: Optional[DataSource] = None, features: List[Feature] = [], tags: Optional[Dict[str, str]] = None, online: bool = True, ): + assert input or batch_source + _input = input or batch_source + + if not features: + features = [] # to handle python's mutable default arguments + columns_to_exclude = { + _input.event_timestamp_column, + _input.created_timestamp_column, + } | set(entities) + + for col_name, col_datatype in _input.get_table_column_names_and_types(): + if col_name not in columns_to_exclude and not re.match( + "^__|__$", col_name + ): + features.append( + Feature( + col_name, + _input.source_datatype_to_feast_value_type()(col_datatype), + ) + ) + + if not features: + raise ValueError( + f"Could not infer Features for the FeatureView named {name}." + f" Please specify Features explicitly for this FeatureView." + ) + cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: - if input.field_mapping is not None and col in input.field_mapping.keys(): + if _input.field_mapping is not None and col in _input.field_mapping.keys(): raise ValueError( - f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name." + f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. " + f"Please either remove this field mapping or use {_input.field_mapping[col]} as the " + f"Entity or Feature name." ) self.name = name @@ -84,7 +116,9 @@ def __init__( self.ttl = ttl self.online = online - self.input = input + self.input = _input + self.batch_source = _input + self.stream_source = stream_source self.materialization_intervals = [] From 40969793c6fa4b7b326b4095f8b542cacda6ef94 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 23 Jun 2021 10:46:28 -0700 Subject: [PATCH 2/9] remove stuff from rebase Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index fe4114a3be1..d7a0da0c278 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -72,30 +72,6 @@ def __init__( assert input or batch_source _input = input or batch_source - if not features: - features = [] # to handle python's mutable default arguments - columns_to_exclude = { - _input.event_timestamp_column, - _input.created_timestamp_column, - } | set(entities) - - for col_name, col_datatype in _input.get_table_column_names_and_types(): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", col_name - ): - features.append( - Feature( - col_name, - _input.source_datatype_to_feast_value_type()(col_datatype), - ) - ) - - if not features: - raise ValueError( - f"Could not infer Features for the FeatureView named {name}." - f" Please specify Features explicitly for this FeatureView." - ) - cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: if _input.field_mapping is not None and col in _input.field_mapping.keys(): From 286b6194874c84ff5413856ede3294d49be9b05b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 23 Jun 2021 10:59:37 -0700 Subject: [PATCH 3/9] make format Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index d7a0da0c278..634da14f44e 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,8 +20,14 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import utils +from feast.data_source import ( + BigQuerySource, + DataSource, + FileSource, + KafkaSource, + KinesisSource, +) from feast.errors import RegistryInferenceFailure -from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource, KinesisSource from feast.feature import Feature from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( @@ -69,8 +75,8 @@ def __init__( tags: Optional[Dict[str, str]] = None, online: bool = True, ): - assert input or batch_source _input = input or batch_source + assert _input is not None cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: From df447b33ea50209952b7a21329fac51f4c2701ba Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 24 Jun 2021 22:33:49 -0700 Subject: [PATCH 4/9] Update protos Signed-off-by: Achal Shah --- protos/feast/core/FeatureView.proto | 3 ++- sdk/python/feast/feature_view.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index d98f54825a6..0f2cb4a3e7c 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -59,7 +59,8 @@ message FeatureViewSpec { google.protobuf.Duration ttl = 6; // Batch/Offline DataSource where this view can retrieve offline feature data. - DataSource input = 7; + DataSource batch_source = 7; + DataSource stream_source = 9; // Whether these features should be served online or not bool online = 8; diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 634da14f44e..6ec1fd1ff6d 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -134,6 +134,8 @@ def __eq__(self, other): return False if self.input != other.input: return False + if self.stream_source != other.stream_source: + return False return True @@ -180,7 +182,8 @@ def to_proto(self) -> FeatureViewProto: tags=self.tags, ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, - input=self.input.to_proto(), + batch_source=self.input.to_proto(), + stream_source=self.stream_source.to_proto() ) return FeatureViewProto(spec=spec, meta=meta) From 62a64319ef380f21eb2f708bf78bedf73a5ad92c Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 24 Jun 2021 22:47:27 -0700 Subject: [PATCH 5/9] lint Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 6ec1fd1ff6d..d17a14955b9 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -183,7 +183,7 @@ def to_proto(self) -> FeatureViewProto: ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, batch_source=self.input.to_proto(), - stream_source=self.stream_source.to_proto() + stream_source=(self.stream_source.to_proto() if self.stream_source is not None else None) ) return FeatureViewProto(spec=spec, meta=meta) @@ -200,6 +200,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): Returns a FeatureViewProto object based on the feature view protobuf """ + _input = DataSource.from_proto(feature_view_proto.spec.batch_source) feature_view = cls( name=feature_view_proto.spec.name, entities=[entity for entity in feature_view_proto.spec.entities], @@ -219,7 +220,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): and feature_view_proto.spec.ttl.nanos == 0 else feature_view_proto.spec.ttl ), - input=DataSource.from_proto(feature_view_proto.spec.input), + input=_input, + batch_source=_input, + stream_source=(feature_view_proto.spec.stream_source + if feature_view_proto.spec.stream_source is None + else None) ) feature_view.created_timestamp = feature_view_proto.meta.created_timestamp From ac1dbfab8fe1fb385665136fd1b1e52f1fc0c1c9 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Jun 2021 08:35:32 -0700 Subject: [PATCH 6/9] format Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index d17a14955b9..60da93b14d3 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -183,7 +183,11 @@ def to_proto(self) -> FeatureViewProto: ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, batch_source=self.input.to_proto(), - stream_source=(self.stream_source.to_proto() if self.stream_source is not None else None) + stream_source=( + self.stream_source.to_proto() + if self.stream_source is not None + else None + ), ) return FeatureViewProto(spec=spec, meta=meta) @@ -222,9 +226,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ), input=_input, batch_source=_input, - stream_source=(feature_view_proto.spec.stream_source - if feature_view_proto.spec.stream_source is None - else None) + stream_source=( + feature_view_proto.spec.stream_source + if feature_view_proto.spec.stream_source is None + else None + ), ) feature_view.created_timestamp = feature_view_proto.meta.created_timestamp From 81d6d41d38d2fc35e428f97115b81d0f1d480f8c Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 28 Jun 2021 10:44:31 -0700 Subject: [PATCH 7/9] CR Signed-off-by: Achal Shah --- protos/feast/core/FeatureView.proto | 1 + sdk/python/feast/data_source.py | 3 +++ sdk/python/feast/feature_view.py | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index 0f2cb4a3e7c..f39fcf5e732 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -60,6 +60,7 @@ message FeatureViewSpec { // Batch/Offline DataSource where this view can retrieve offline feature data. DataSource batch_source = 7; + // Streaming DataSource from where this view can consume "online" feature data. DataSource stream_source = 9; // Whether these features should be served online or not diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index a5a620b55bb..7480d7fd4f4 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -869,6 +869,9 @@ def __init__( ) def __eq__(self, other): + if other is None: + return False + if not isinstance(other, KinesisSource): raise TypeError( "Comparisons should only involve KinesisSource class objects." diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 60da93b14d3..1d3c4c56a97 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -228,7 +228,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): batch_source=_input, stream_source=( feature_view_proto.spec.stream_source - if feature_view_proto.spec.stream_source is None + if feature_view_proto.spec.stream_source is not None else None ), ) From 6d4006341d37ac77766ddb397d6d04189dcae0d0 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 28 Jun 2021 11:28:11 -0700 Subject: [PATCH 8/9] fix test Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 1d3c4c56a97..0c29579d8f0 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -175,6 +175,8 @@ def to_proto(self) -> FeatureViewProto: ttl_duration = Duration() ttl_duration.FromTimedelta(self.ttl) + print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}") + spec = FeatureViewSpecProto( name=self.name, entities=self.entities, @@ -205,6 +207,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): """ _input = DataSource.from_proto(feature_view_proto.spec.batch_source) + stream_source = DataSource.from_proto(feature_view_proto.spec.stream_source)\ + if feature_view_proto.spec.HasField("stream_source") \ + else None feature_view = cls( name=feature_view_proto.spec.name, entities=[entity for entity in feature_view_proto.spec.entities], @@ -226,11 +231,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ), input=_input, batch_source=_input, - stream_source=( - feature_view_proto.spec.stream_source - if feature_view_proto.spec.stream_source is not None - else None - ), + stream_source=stream_source, ) feature_view.created_timestamp = feature_view_proto.meta.created_timestamp From dcfa59a24dc36b712df7f1d93a68d052f9bb6ec9 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 28 Jun 2021 12:34:25 -0700 Subject: [PATCH 9/9] lint Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 14 +++++--------- .../tensorflow_metadata/proto/v0/path_pb2.py | 2 +- .../tensorflow_metadata/proto/v0/schema_pb2.py | 2 +- .../tensorflow_metadata/proto/v0/statistics_pb2.py | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 0c29579d8f0..3d20b9334f4 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,13 +20,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import utils -from feast.data_source import ( - BigQuerySource, - DataSource, - FileSource, - KafkaSource, - KinesisSource, -) +from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -207,9 +201,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): """ _input = DataSource.from_proto(feature_view_proto.spec.batch_source) - stream_source = DataSource.from_proto(feature_view_proto.spec.stream_source)\ - if feature_view_proto.spec.HasField("stream_source") \ + stream_source = ( + DataSource.from_proto(feature_view_proto.spec.stream_source) + if feature_view_proto.spec.HasField("stream_source") else None + ) feature_view = cls( name=feature_view_proto.spec.name, entities=[entity for entity in feature_view_proto.spec.entities], diff --git a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py index d732119ead5..4b6dec828cf 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/path.proto - +"""Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection diff --git a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py index 78fda8003da..d3bfc50616c 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/schema.proto - +"""Generated protocol buffer code.""" from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message diff --git a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py index d8e12bd1209..21473adc75c 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/statistics.proto - +"""Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection