1313# limitations under the License.
1414
1515
16+ import logging
17+
1618import pandas as pd
1719from typing import List
1820from collections import OrderedDict
1921from typing import Dict
22+ from feast .source import Source , KafkaSource
2023from feast .type_map import dtype_to_value_type
2124from feast .value_type import ValueType
2225from pandas .api .types import is_datetime64_ns_dtype
2326from feast .entity import Entity
2427from feast .feature import Feature
2528from feast .core .FeatureSet_pb2 import (
26- FeatureSetSpec as FeatureSetProto ,
27- FeatureSpec as FeatureProto ,
29+ FeatureSetSpec as FeatureSetSpecProto ,
30+ FeatureSpec as FeatureSpecProto ,
31+ )
32+ from feast .core .Source_pb2 import Source as SourceProto
33+ from feast .types import FeatureRow_pb2 as FeatureRow
34+ from google .protobuf .timestamp_pb2 import Timestamp
35+ from kafka import KafkaProducer
36+ from pandas .core .dtypes .common import is_datetime64_any_dtype
37+ from tqdm import tqdm
38+ from type_map import dtype_to_feast_value_type
39+ from feast .types import (
40+ Value_pb2 as ValueProto ,
41+ FeatureRow_pb2 as FeatureRowProto ,
42+ Feature_pb2 as FeatureProto ,
43+ Field_pb2 as FieldProto ,
2844)
29- from feast .core .Source_pb2 import Source
45+ from feast .type_map import dtype_to_feast_value_attr
46+
47+ _logger = logging .getLogger (__name__ )
3048
3149DATETIME_COLUMN = "datetime" # type: str
3250
@@ -41,6 +59,7 @@ def __init__(
4159 name : str ,
4260 features : List [Feature ] = None ,
4361 entities : List [Entity ] = None ,
62+ source : Source = None ,
4463 max_age : int = - 1 ,
4564 ):
4665 self ._name = name
@@ -53,7 +72,9 @@ def __init__(
5372 self ._max_age = max_age
5473 self ._version = None
5574 self ._client = None
56- self ._source = None
75+ self ._source = source
76+ self ._message_producer = None
77+ self ._busy_ingesting = False
5778
5879 @property
5980 def features (self ) -> List [Feature ]:
@@ -77,6 +98,14 @@ def name(self):
7798 def source (self ):
7899 return self ._source
79100
101+ @source .setter
102+ def source (self , source : Source ):
103+ self ._source = source
104+
105+ # Create Kafka FeatureRow producer
106+ if self ._message_producer is not None :
107+ self ._message_producer = KafkaProducer (bootstrap_servers = source .brokers )
108+
80109 @property
81110 def version (self ):
82111 return self ._version
@@ -152,11 +181,14 @@ def _add_entities(self, entities: List[Entity]):
152181 for entity in entities :
153182 self .add (entity )
154183
155- def update_from_dataset (self , df : pd .DataFrame ):
184+ def update_from_dataset (self , df : pd .DataFrame , column_mapping = None ):
156185 """
157186 Updates Feature Set values based on the data set. Only Pandas dataframes are supported.
187+ :param column_mapping: Dictionary of column names to resource (entity, feature) mapping. Forces the interpretation
188+ of a column as either an entity or feature. Example: {"driver_id": Entity(name="driver", dtype=ValueType.INT64)}
158189 :param df: Pandas dataframe containing datetime column, entity columns, and feature columns.
159190 """
191+
160192 features = OrderedDict ()
161193 entities = OrderedDict ()
162194 existing_entities = None
@@ -181,6 +213,19 @@ def update_from_dataset(self, df: pd.DataFrame):
181213 if DATETIME_COLUMN in column :
182214 continue
183215
216+ # Use entity or feature value if provided
217+ if column_mapping and column in column_mapping :
218+ resource = column_mapping [column ]
219+ if isinstance (resource , Entity ):
220+ entities [column ] = resource
221+ continue
222+ if isinstance (resource , Feature ):
223+ features [column ] = resource
224+ continue
225+ raise ValueError (
226+ "Invalid resource type specified at column name " + column
227+ )
228+
184229 # Test whether this column is an existing entity. If it is named exactly the same
185230 # as an existing entity then it will be detected as such
186231 if existing_entities and column in existing_entities :
@@ -199,13 +244,94 @@ def update_from_dataset(self, df: pd.DataFrame):
199244
200245 # Store this field as a feature
201246 features [column ] = Feature (
202- name = column , dtype = dtype_to_value_type (df [column ].dtype )
247+ name = column , dtype = dtype_to_feast_value_type (df [column ].dtype )
248+ )
249+
250+ if len (entities ) == 0 :
251+ raise Exception (
252+ "Could not detect entity column(s). Please provide entity column(s)."
253+ )
254+ if len (features ) == 0 :
255+ raise Exception (
256+ "Could not detect feature columns. Please provide feature column(s)."
203257 )
204258 self ._entities = entities
205259 self ._features = features
206260
261+ def ingest (
262+ self , dataframe : pd .DataFrame , force_update : bool = False , timeout : int = 5
263+ ):
264+ # Update feature set from data set and re-register if changed
265+ if force_update :
266+ self .update_from_dataset (dataframe )
267+ self ._client .apply (self )
268+
269+ # Validate feature set version with Feast Core
270+ self ._validate_feature_set ()
271+
272+ # Validate data schema w.r.t this feature set
273+ self ._validate_dataframe_schema (dataframe )
274+
275+ # Create Kafka FeatureRow producer
276+ if self ._message_producer is None :
277+ self ._message_producer = KafkaProducer (
278+ bootstrap_servers = self ._get_kafka_source_brokers ()
279+ )
280+
281+ _logger .info (
282+ f"Publishing features to topic: '{ self ._get_kafka_source_topic ()} ' "
283+ f"on brokers: '{ self ._get_kafka_source_brokers ()} '"
284+ )
285+
286+ # Convert rows to FeatureRows and and push to stream
287+ for index , row in tqdm (
288+ dataframe .iterrows (), unit = "rows" , total = dataframe .shape [0 ]
289+ ):
290+ feature_row = self ._pandas_row_to_feature_row (dataframe , row )
291+ self ._message_producer .send (
292+ self ._get_kafka_source_topic (), feature_row .SerializeToString ()
293+ )
294+
295+ # Wait for all messages to be completely sent
296+ self ._message_producer .flush (timeout = timeout )
297+
298+ def _pandas_row_to_feature_row (
299+ self , dataframe : pd .DataFrame , row
300+ ) -> FeatureRow .FeatureRow :
301+ if len (self .features ) + len (self .entities ) + 1 != len (dataframe .columns ):
302+ raise Exception (
303+ "Amount of entities and features in feature set do not match dataset columns"
304+ )
305+
306+ event_timestamp = Timestamp ()
307+ event_timestamp .FromNanoseconds (row [DATETIME_COLUMN ].value )
308+ feature_row = FeatureRowProto .FeatureRow (
309+ eventTimestamp = event_timestamp ,
310+ featureSet = self .name + ":" + str (self .version ),
311+ )
312+
313+ for column in dataframe .columns :
314+ if column == DATETIME_COLUMN :
315+ continue
316+
317+ feature_value = ValueProto .Value ()
318+ feature_value_attr = dtype_to_feast_value_attr (dataframe [column ].dtype )
319+ try :
320+ feature_value .__setattr__ (feature_value_attr , row [column ])
321+ except TypeError as type_error :
322+ # Numpy treats NaN as float. So if there is NaN values in column of
323+ # "str" type, __setattr__ will raise TypeError. This handles that case.
324+ if feature_value_attr == "stringVal" and pd .isnull (row [column ]):
325+ feature_value .__setattr__ ("stringVal" , "" )
326+ else :
327+ raise type_error
328+ feature_row .fields .extend (
329+ [FieldProto .Field (name = f"column" , value = feature_value )]
330+ )
331+ return feature_row
332+
207333 @classmethod
208- def from_proto (cls , feature_set_proto : FeatureSetProto ):
334+ def from_proto (cls , feature_set_proto : FeatureSetSpecProto ):
209335 feature_set = cls (
210336 name = feature_set_proto .name ,
211337 features = [
@@ -219,16 +345,28 @@ def from_proto(cls, feature_set_proto: FeatureSetProto):
219345 feature_set ._source = feature_set_proto .source
220346 return feature_set
221347
222- def to_proto (self ) -> FeatureSetProto :
223- return FeatureSetProto (
348+ def to_proto (self ) -> FeatureSetSpecProto :
349+ return FeatureSetSpecProto (
224350 name = self .name ,
225351 version = self .version ,
226352 maxAge = self .max_age ,
227- source = Source (),
228- features = [
229- feature .to_proto () for featureName , feature in self ._features .items ()
230- ],
231- entities = [
232- entity .to_proto () for entityName , entity in self ._entities .items ()
233- ],
353+ source = SourceProto (),
354+ features = [feature .to_proto () for feature in self ._features .values ()],
355+ entities = [entity .to_proto () for entity in self ._entities .values ()],
234356 )
357+
358+ def _validate_feature_set (self ):
359+ pass
360+
361+ def _get_kafka_source_brokers (self ) -> str :
362+ if self .source and self .source .source_type is "Kafka" :
363+ return self .source .brokers
364+ raise Exception ("Source type could not be identified" )
365+
366+ def _get_kafka_source_topic (self ) -> str :
367+ if self .source and self .source .source_type == "Kafka" :
368+ return self .source .topics
369+ raise Exception ("Source type could not be identified" )
370+
371+ def _validate_dataframe_schema (self , dataframe : pd .DataFrame ) -> bool :
372+ return True
0 commit comments