2020
2121from feast import utils
2222from feast .base_feature_view import BaseFeatureView
23- from feast .data_source import DataSource , PushSource
23+ from feast .data_source import DataSource , KafkaSource , KinesisSource , PushSource
2424from feast .entity import Entity
2525from feast .feature import Feature
2626from feast .feature_view_projection import FeatureViewProjection
@@ -61,9 +61,9 @@ class FeatureView(BaseFeatureView):
6161 can result in extremely computationally intensive queries.
6262 batch_source (optional): The batch source of data where this group of features
6363 is stored. This is optional ONLY if a push source is specified as the
64- stream_source, since push sources contain their own batch sources.
64+ stream_source, since push sources contain their own batch sources. This is deprecated in favor of `source`.
6565 stream_source (optional): The stream source of data where this group of features
66- is stored.
66+ is stored. This is deprecated in favor of `source`.
6767 schema: The schema of the feature view, including feature, timestamp, and entity
6868 columns.
6969 features: The list of features defined as part of this feature view. Each
@@ -74,6 +74,8 @@ class FeatureView(BaseFeatureView):
7474 tags: A dictionary of key-value pairs to store arbitrary metadata.
7575 owner: The owner of the feature view, typically the email of the primary
7676 maintainer.
77+ source (optional): The source of data for this group of features. May be a stream source, or a batch source.
78+ If a stream source, the source should contain a batch_source for backfills & batch materialization.
7779 """
7880
7981 name : str
@@ -88,6 +90,7 @@ class FeatureView(BaseFeatureView):
8890 tags : Dict [str , str ]
8991 owner : str
9092 materialization_intervals : List [Tuple [datetime , datetime ]]
93+ source : Optional [DataSource ]
9194
9295 @log_exceptions
9396 def __init__ (
@@ -104,6 +107,7 @@ def __init__(
104107 description : str = "" ,
105108 owner : str = "" ,
106109 schema : Optional [List [Field ]] = None ,
110+ source : Optional [DataSource ] = None ,
107111 ):
108112 """
109113 Creates a FeatureView object.
@@ -126,6 +130,8 @@ def __init__(
126130 primary maintainer.
127131 schema (optional): The schema of the feature view, including feature, timestamp,
128132 and entity columns.
133+ source (optional): The source of data for this group of features. May be a stream source, or a batch source.
134+ If a stream source, the source should contain a batch_source for backfills & batch materialization.
129135
130136 Raises:
131137 ValueError: A field mapping conflicts with an Entity or a Feature.
@@ -163,6 +169,8 @@ def __init__(
163169 self .name = _name
164170 self .entities = _entities if _entities else [DUMMY_ENTITY_NAME ]
165171
172+ self ._initialize_sources (_name , batch_source , stream_source , source )
173+
166174 if isinstance (_ttl , Duration ):
167175 self .ttl = timedelta (seconds = int (_ttl .seconds ))
168176 warnings .warn (
@@ -199,21 +207,6 @@ def __init__(
199207 # current `features` parameter only accepts feature columns.
200208 _features = _schema
201209
202- if stream_source is not None and isinstance (stream_source , PushSource ):
203- if stream_source .batch_source is None or not isinstance (
204- stream_source .batch_source , DataSource
205- ):
206- raise ValueError (
207- f"A batch_source needs to be specified for feature view `{ name } `"
208- )
209- self .batch_source = stream_source .batch_source
210- else :
211- if batch_source is None :
212- raise ValueError (
213- f"A batch_source needs to be specified for feature view `{ name } `"
214- )
215- self .batch_source = batch_source
216-
217210 cols = [entity for entity in self .entities ] + [
218211 field .name for field in _features
219212 ]
@@ -236,9 +229,43 @@ def __init__(
236229 owner = owner ,
237230 )
238231 self .online = online
239- self .stream_source = stream_source
240232 self .materialization_intervals = []
241233
234+ def _initialize_sources (self , name , batch_source , stream_source , source ):
235+ if source :
236+ if (
237+ isinstance (source , PushSource )
238+ or isinstance (source , KafkaSource )
239+ or isinstance (source , KinesisSource )
240+ ):
241+ self .stream_source = source
242+ if not source .batch_source :
243+ raise ValueError (
244+ f"A batch_source needs to be specified for stream source `{ source .name } `"
245+ )
246+ else :
247+ self .batch_source = source .batch_source
248+ else :
249+ self .stream_source = stream_source
250+ self .batch_source = source
251+ else :
252+ warnings .warn (
253+ "batch_source and stream_source have been deprecated in favor or `source`."
254+ "The deprecated fields will be removed in Feast 0.23." ,
255+ DeprecationWarning ,
256+ )
257+ if stream_source is not None and isinstance (stream_source , PushSource ):
258+ self .stream_source = stream_source
259+ self .batch_source = stream_source .batch_source
260+ else :
261+ if batch_source is None :
262+ raise ValueError (
263+ f"A batch_source needs to be specified for feature view `{ name } `"
264+ )
265+ self .stream_source = stream_source
266+ self .batch_source = batch_source
267+ self .source = source
268+
242269 # Note: Python requires redefining hash in child classes that override __eq__
243270 def __hash__ (self ):
244271 return super ().__hash__ ()
0 commit comments