1919from typing import List
2020from collections import OrderedDict
2121from typing import Dict
22- from feast .source import Source , KafkaSource
22+ from feast .source import Source
2323from feast .type_map import dtype_to_value_type
24- from feast .value_type import ValueType
2524from pandas .api .types import is_datetime64_ns_dtype
2625from feast .entity import Entity
27- from feast .feature import Feature
28- from feast .core .FeatureSet_pb2 import (
29- FeatureSetSpec as FeatureSetSpecProto ,
30- FeatureSpec as FeatureSpecProto ,
31- )
26+ from feast .feature import Feature , Field
27+ from feast .core .FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto
3228from feast .core .Source_pb2 import Source as SourceProto
3329from feast .types import FeatureRow_pb2 as FeatureRow
3430from google .protobuf .timestamp_pb2 import Timestamp
3531from kafka import KafkaProducer
36- from pandas .core .dtypes .common import is_datetime64_any_dtype
3732from tqdm import tqdm
3833from type_map import dtype_to_feast_value_type
3934from feast .types import (
4035 Value_pb2 as ValueProto ,
4136 FeatureRow_pb2 as FeatureRowProto ,
42- Feature_pb2 as FeatureProto ,
4337 Field_pb2 as FieldProto ,
4438)
4539from feast .type_map import dtype_to_feast_value_attr
4640
4741_logger = logging .getLogger (__name__ )
48-
4942DATETIME_COLUMN = "datetime" # type: str
5043
5144
@@ -63,32 +56,38 @@ def __init__(
6356 max_age : int = - 1 ,
6457 ):
6558 self ._name = name
66- self ._features = OrderedDict () # type: Dict[str, Feature]
67- self ._entities = OrderedDict () # type: Dict[str, Entity]
59+ self ._fields = OrderedDict () # type: Dict[str, Field]
6860 if features is not None :
69- self ._add_features (features )
61+ self ._add_fields (features )
7062 if entities is not None :
71- self ._add_entities (entities )
63+ self ._add_fields (entities )
7264 self ._max_age = max_age
7365 self ._version = None
7466 self ._client = None
7567 self ._source = source
7668 self ._message_producer = None
7769 self ._busy_ingesting = False
70+ self ._is_dirty = True
71+
72+ def __eq__ (self , other ):
73+ if not isinstance (other , FeatureSet ):
74+ return NotImplemented
75+
76+ return self .name == other .name and self .version == other .version
7877
7978 @property
8079 def features (self ) -> List [Feature ]:
8180 """
8281 Returns a list of features from this feature set
8382 """
84- return list ( self ._features .values ())
83+ return [ field for field in self ._fields .values () if isinstance ( field , Feature )]
8584
8685 @property
8786 def entities (self ) -> List [Entity ]:
8887 """
8988 Returns list of entities from this feature set
9089 """
91- return list ( self ._entities .values ())
90+ return [ field for field in self ._fields .values () if isinstance ( field , Entity )]
9291
9392 @property
9493 def name (self ):
@@ -121,10 +120,7 @@ def add(self, resource):
121120 :param resource: A resource can be either a Feature or an Entity object
122121 :return:
123122 """
124- if (
125- resource .name in self ._features .keys ()
126- or resource .name in self ._entities .keys ()
127- ):
123+ if resource .name in self ._fields .keys ():
128124 raise ValueError (
129125 'could not add field "'
130126 + resource .name
@@ -133,53 +129,33 @@ def add(self, resource):
133129 + '"'
134130 )
135131
136- if isinstance (resource , Feature ):
137- return self ._add_feature (resource )
138-
139- if isinstance (resource , Entity ):
140- return self ._add_entity (resource )
132+ if issubclass (type (resource ), Field ):
133+ return self ._add_field (resource )
141134
142135 raise ValueError ("Could not identify the resource being added" )
143136
144- def _add_entity (self , entity : Entity ):
145- self ._entities [entity .name ] = entity
146- return
147-
148- def _add_feature (self , feature : Feature ):
149- self ._features [feature .name ] = feature
137+ def _add_field (self , field : Field ):
138+ self ._fields [field .name ] = field
150139 return
151140
152141 def drop (self , name : str ):
153142 """
154143 Removes a Feature or Entity from a Feature Set
155144 :param name: Name of Feature or Entity to be removed
156145 """
157- if name not in self ._features and name not in self . _entities :
146+ if name not in self ._fields :
158147 raise ValueError ("Could not find field " + name + ", no action taken" )
159- if name in self ._features and name in self ._entities :
160- raise ValueError ("Duplicate field found for " + name + "!" )
161- if name in self ._features :
162- del self ._features [name ]
163- return
164- if name in self ._entities :
165- del self ._entities [name ]
148+ if name in self ._fields :
149+ del self ._fields [name ]
166150 return
167151
168- def _add_features (self , features : List [Feature ]):
169- """
170- Adds multiple Features to a Feature Set
171- :param features: List of Feature Objects
172- """
173- for feature in features :
174- self .add (feature )
175-
176- def _add_entities (self , entities : List [Entity ]):
152+ def _add_fields (self , fields : List [Field ]):
177153 """
178- Adds multiple Entities to a Feature Set
179- :param entities : List of Entity Objects
154+ Adds multiple Fields to a Feature Set
155+ :param fields : List of Feature or Entity Objects
180156 """
181- for entity in entities :
182- self .add (entity )
157+ for field in fields :
158+ self .add (field )
183159
184160 def update_from_dataset (self , df : pd .DataFrame , column_mapping = None ):
185161 """
@@ -189,11 +165,8 @@ def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
189165 :param df: Pandas dataframe containing datetime column, entity columns, and feature columns.
190166 """
191167
192- features = OrderedDict ()
193- entities = OrderedDict ()
194- existing_entities = None
195- if self ._client :
196- existing_entities = self ._client .entities
168+ fields = OrderedDict ()
169+ existing_entities = self ._client .entities if self ._client is not None else None
197170
198171 # Validate whether the datetime column exists with the right name
199172 if DATETIME_COLUMN not in df :
@@ -209,54 +182,47 @@ def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
209182 for column in df .columns :
210183 column = column .strip ()
211184
212- # Validate whether the datetime column exists with the right name
185+ # Skip datetime column
213186 if DATETIME_COLUMN in column :
214187 continue
215188
216- # Use entity or feature value if provided
189+ # Use entity or feature value if provided by the column mapping
217190 if column_mapping and column in column_mapping :
218- resource = column_mapping [column ]
219- if isinstance (resource , Entity ):
220- entities [column ] = resource
221- continue
222- if isinstance (resource , Feature ):
223- features [column ] = resource
191+ if issubclass (type (column_mapping [column ]), Field ):
192+ fields [column ] = column_mapping [column ]
224193 continue
225194 raise ValueError (
226195 "Invalid resource type specified at column name " + column
227196 )
228197
229- # Test whether this column is an existing entity. If it is named exactly the same
230- # as an existing entity then it will be detected as such
198+ # Test whether this column is an existing entity (globally).
231199 if existing_entities and column in existing_entities :
232200 entity = existing_entities [column ]
233201
234202 # test whether registered entity type matches user provided type
235203 if entity .dtype == dtype_to_value_type (df [column ].dtype ):
236204 # Store this field as an entity
237- entities [column ] = entity
205+ fields [column ] = entity
238206 continue
239207
240- for feature in self .features :
241- # Ignore features that already exist
242- if feature .name == column :
243- continue
208+ # Ignore fields that already exist
209+ if column in self ._fields :
210+ continue
244211
245212 # Store this field as a feature
246- features [column ] = Feature (
213+ fields [column ] = Feature (
247214 name = column , dtype = dtype_to_feast_value_type (df [column ].dtype )
248215 )
249216
250- if len (entities ) == 0 :
217+ if len ([ field for field in fields . values () if type ( field ) == Entity ] ) == 0 :
251218 raise Exception (
252219 "Could not detect entity column(s). Please provide entity column(s)."
253220 )
254- if len (features ) == 0 :
221+ if len ([ field for field in fields . values () if type ( field ) == Feature ] ) == 0 :
255222 raise Exception (
256- "Could not detect feature columns . Please provide feature column(s)."
223+ "Could not detect feature column(s) . Please provide feature column(s)."
257224 )
258- self ._entities = entities
259- self ._features = features
225+ self ._add_fields (list (fields .values ()))
260226
261227 def ingest (
262228 self , dataframe : pd .DataFrame , force_update : bool = False , timeout : int = 5
@@ -269,9 +235,6 @@ def ingest(
269235 # Validate feature set version with Feast Core
270236 self ._validate_feature_set ()
271237
272- # Validate data schema w.r.t this feature set
273- self ._validate_dataframe_schema (dataframe )
274-
275238 # Create Kafka FeatureRow producer
276239 if self ._message_producer is None :
277240 self ._message_producer = KafkaProducer (
@@ -298,7 +261,7 @@ def ingest(
298261 def _pandas_row_to_feature_row (
299262 self , dataframe : pd .DataFrame , row
300263 ) -> FeatureRow .FeatureRow :
301- if len (self .features ) + len ( self . entities ) + 1 != len (dataframe .columns ):
264+ if len (self ._fields ) != len (dataframe .columns ) - 1 :
302265 raise Exception (
303266 "Amount of entities and features in feature set do not match dataset columns"
304267 )
@@ -351,12 +314,38 @@ def to_proto(self) -> FeatureSetSpecProto:
351314 version = self .version ,
352315 maxAge = self .max_age ,
353316 source = SourceProto (),
354- features = [feature .to_proto () for feature in self ._features .values ()],
355- entities = [entity .to_proto () for entity in self ._entities .values ()],
317+ features = [
318+ field .to_proto ()
319+ for field in self ._fields .values ()
320+ if type (field ) == Feature
321+ ],
322+ entities = [
323+ field .to_proto ()
324+ for field in self ._fields .values ()
325+ if type (field ) == Entity
326+ ],
356327 )
357328
358329 def _validate_feature_set (self ):
359- pass
330+
331+ # Validate whether the feature set has been modified and needs to be saved
332+ if self ._is_dirty :
333+ raise Exception ("Feature set has been modified and must be saved first" )
334+
335+ refreshed_feature_set = [
336+ fs
337+ for fs in self ._client .feature_sets
338+ if fs .name == self .name and fs .version == self .version
339+ ]
340+
341+ if not (len (refreshed_feature_set ) == 1 and refreshed_feature_set [0 ] == self ):
342+ raise Exception (
343+ "Feature set (name:"
344+ + self .name
345+ + ", version:"
346+ + str (self .version )
347+ + ") is inconsistent with Feast core"
348+ )
360349
361350 def _get_kafka_source_brokers (self ) -> str :
362351 if self .source and self .source .source_type is "Kafka" :
@@ -367,6 +356,3 @@ def _get_kafka_source_topic(self) -> str:
367356 if self .source and self .source .source_type == "Kafka" :
368357 return self .source .topics
369358 raise Exception ("Source type could not be identified" )
370-
371- def _validate_dataframe_schema (self , dataframe : pd .DataFrame ) -> bool :
372- return True
0 commit comments