Skip to content

Commit 627055a

Browse files
committed
Add ingestion to Python SDK
1 parent 4814306 commit 627055a

5 files changed

Lines changed: 267 additions & 41 deletions

File tree

sdk/python/feast/entity.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from feast.value_type import ValueType
1616
from feast.core.FeatureSet_pb2 import EntitySpec as EntityProto
17+
from feast.types import Value_pb2 as ValueTypeProto
1718

1819

1920
class Entity:
@@ -29,8 +30,9 @@ def name(self):
2930
def dtype(self):
3031
return self._dtype
3132

32-
def to_proto(self):
33-
pass
33+
def to_proto(self) -> EntityProto:
34+
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
35+
return EntityProto(name=self.name, valueType=value_type)
3436

3537
@classmethod
3638
def from_proto(cls, entity_proto: EntityProto):

sdk/python/feast/feature.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
from feast.value_type import ValueType
1616
from feast.core.FeatureSet_pb2 import FeatureSpec as FeatureProto
17-
from feast.types import Value_pb2 as ValueProto
17+
from feast.types import Value_pb2 as ValueTypeProto
1818

1919

2020
class Feature:
2121
def __init__(self, name: str, dtype: ValueType):
2222
self._name = name
23+
if not isinstance(dtype, ValueType):
24+
raise ValueError("dtype is not a valid ValueType")
2325
self._dtype = dtype
2426

2527
@property
@@ -31,9 +33,8 @@ def dtype(self) -> ValueType:
3133
return self._dtype
3234

3335
def to_proto(self) -> FeatureProto:
34-
return FeatureProto(
35-
name=self.name, valueType=ValueProto.ValueType.Enum.Value(self._dtype.name)
36-
)
36+
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
37+
return FeatureProto(name=self.name, valueType=value_type)
3738

3839
@classmethod
3940
def from_proto(cls, feature_proto: FeatureProto):

sdk/python/feast/feature_set.py

Lines changed: 154 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,38 @@
1313
# limitations under the License.
1414

1515

16+
import logging
17+
1618
import pandas as pd
1719
from typing import List
1820
from collections import OrderedDict
1921
from typing import Dict
22+
from feast.source import Source, KafkaSource
2023
from feast.type_map import dtype_to_value_type
2124
from feast.value_type import ValueType
2225
from pandas.api.types import is_datetime64_ns_dtype
2326
from feast.entity import Entity
2427
from feast.feature import Feature
2528
from feast.core.FeatureSet_pb2 import (
26-
FeatureSetSpec as FeatureSetProto,
27-
FeatureSpec as FeatureProto,
29+
FeatureSetSpec as FeatureSetSpecProto,
30+
FeatureSpec as FeatureSpecProto,
31+
)
32+
from feast.core.Source_pb2 import Source as SourceProto
33+
from feast.types import FeatureRow_pb2 as FeatureRow
34+
from google.protobuf.timestamp_pb2 import Timestamp
35+
from kafka import KafkaProducer
36+
from pandas.core.dtypes.common import is_datetime64_any_dtype
37+
from tqdm import tqdm
38+
from type_map import dtype_to_feast_value_type
39+
from feast.types import (
40+
Value_pb2 as ValueProto,
41+
FeatureRow_pb2 as FeatureRowProto,
42+
Feature_pb2 as FeatureProto,
43+
Field_pb2 as FieldProto,
2844
)
29-
from feast.core.Source_pb2 import Source
45+
from feast.type_map import dtype_to_feast_value_attr
46+
47+
_logger = logging.getLogger(__name__)
3048

3149
DATETIME_COLUMN = "datetime" # type: str
3250

@@ -41,6 +59,7 @@ def __init__(
4159
name: str,
4260
features: List[Feature] = None,
4361
entities: List[Entity] = None,
62+
source: Source = None,
4463
max_age: int = -1,
4564
):
4665
self._name = name
@@ -53,7 +72,9 @@ def __init__(
5372
self._max_age = max_age
5473
self._version = None
5574
self._client = None
56-
self._source = None
75+
self._source = source
76+
self._message_producer = None
77+
self._busy_ingesting = False
5778

5879
@property
5980
def features(self) -> List[Feature]:
@@ -77,6 +98,14 @@ def name(self):
7798
def source(self):
7899
return self._source
79100

101+
@source.setter
102+
def source(self, source: Source):
103+
self._source = source
104+
105+
# Create Kafka FeatureRow producer
106+
if self._message_producer is not None:
107+
self._message_producer = KafkaProducer(bootstrap_servers=source.brokers)
108+
80109
@property
81110
def version(self):
82111
return self._version
@@ -152,11 +181,14 @@ def _add_entities(self, entities: List[Entity]):
152181
for entity in entities:
153182
self.add(entity)
154183

155-
def update_from_dataset(self, df: pd.DataFrame):
184+
def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
156185
"""
157186
Updates Feature Set values based on the data set. Only Pandas dataframes are supported.
187+
:param column_mapping: Dictionary of column names to resource (entity, feature) mapping. Forces the interpretation
188+
of a column as either an entity or feature. Example: {"driver_id": Entity(name="driver", dtype=ValueType.INT64)}
158189
:param df: Pandas dataframe containing datetime column, entity columns, and feature columns.
159190
"""
191+
160192
features = OrderedDict()
161193
entities = OrderedDict()
162194
existing_entities = None
@@ -181,6 +213,19 @@ def update_from_dataset(self, df: pd.DataFrame):
181213
if DATETIME_COLUMN in column:
182214
continue
183215

216+
# Use entity or feature value if provided
217+
if column_mapping and column in column_mapping:
218+
resource = column_mapping[column]
219+
if isinstance(resource, Entity):
220+
entities[column] = resource
221+
continue
222+
if isinstance(resource, Feature):
223+
features[column] = resource
224+
continue
225+
raise ValueError(
226+
"Invalid resource type specified at column name " + column
227+
)
228+
184229
# Test whether this column is an existing entity. If it is named exactly the same
185230
# as an existing entity then it will be detected as such
186231
if existing_entities and column in existing_entities:
@@ -199,13 +244,94 @@ def update_from_dataset(self, df: pd.DataFrame):
199244

200245
# Store this field as a feature
201246
features[column] = Feature(
202-
name=column, dtype=dtype_to_value_type(df[column].dtype)
247+
name=column, dtype=dtype_to_feast_value_type(df[column].dtype)
248+
)
249+
250+
if len(entities) == 0:
251+
raise Exception(
252+
"Could not detect entity column(s). Please provide entity column(s)."
253+
)
254+
if len(features) == 0:
255+
raise Exception(
256+
"Could not detect feature columns. Please provide feature column(s)."
203257
)
204258
self._entities = entities
205259
self._features = features
206260

261+
def ingest(
262+
self, dataframe: pd.DataFrame, force_update: bool = False, timeout: int = 5
263+
):
264+
# Update feature set from data set and re-register if changed
265+
if force_update:
266+
self.update_from_dataset(dataframe)
267+
self._client.apply(self)
268+
269+
# Validate feature set version with Feast Core
270+
self._validate_feature_set()
271+
272+
# Validate data schema w.r.t this feature set
273+
self._validate_dataframe_schema(dataframe)
274+
275+
# Create Kafka FeatureRow producer
276+
if self._message_producer is None:
277+
self._message_producer = KafkaProducer(
278+
bootstrap_servers=self._get_kafka_source_brokers()
279+
)
280+
281+
_logger.info(
282+
f"Publishing features to topic: '{self._get_kafka_source_topic()}' "
283+
f"on brokers: '{self._get_kafka_source_brokers()}'"
284+
)
285+
286+
# Convert rows to FeatureRows and and push to stream
287+
for index, row in tqdm(
288+
dataframe.iterrows(), unit="rows", total=dataframe.shape[0]
289+
):
290+
feature_row = self._pandas_row_to_feature_row(dataframe, row)
291+
self._message_producer.send(
292+
self._get_kafka_source_topic(), feature_row.SerializeToString()
293+
)
294+
295+
# Wait for all messages to be completely sent
296+
self._message_producer.flush(timeout=timeout)
297+
298+
def _pandas_row_to_feature_row(
299+
self, dataframe: pd.DataFrame, row
300+
) -> FeatureRow.FeatureRow:
301+
if len(self.features) + len(self.entities) + 1 != len(dataframe.columns):
302+
raise Exception(
303+
"Amount of entities and features in feature set do not match dataset columns"
304+
)
305+
306+
event_timestamp = Timestamp()
307+
event_timestamp.FromNanoseconds(row[DATETIME_COLUMN].value)
308+
feature_row = FeatureRowProto.FeatureRow(
309+
eventTimestamp=event_timestamp,
310+
featureSet=self.name + ":" + str(self.version),
311+
)
312+
313+
for column in dataframe.columns:
314+
if column == DATETIME_COLUMN:
315+
continue
316+
317+
feature_value = ValueProto.Value()
318+
feature_value_attr = dtype_to_feast_value_attr(dataframe[column].dtype)
319+
try:
320+
feature_value.__setattr__(feature_value_attr, row[column])
321+
except TypeError as type_error:
322+
# Numpy treats NaN as float. So if there is NaN values in column of
323+
# "str" type, __setattr__ will raise TypeError. This handles that case.
324+
if feature_value_attr == "stringVal" and pd.isnull(row[column]):
325+
feature_value.__setattr__("stringVal", "")
326+
else:
327+
raise type_error
328+
feature_row.fields.extend(
329+
[FieldProto.Field(name=f"column", value=feature_value)]
330+
)
331+
return feature_row
332+
207333
@classmethod
208-
def from_proto(cls, feature_set_proto: FeatureSetProto):
334+
def from_proto(cls, feature_set_proto: FeatureSetSpecProto):
209335
feature_set = cls(
210336
name=feature_set_proto.name,
211337
features=[
@@ -219,16 +345,28 @@ def from_proto(cls, feature_set_proto: FeatureSetProto):
219345
feature_set._source = feature_set_proto.source
220346
return feature_set
221347

222-
def to_proto(self) -> FeatureSetProto:
223-
return FeatureSetProto(
348+
def to_proto(self) -> FeatureSetSpecProto:
349+
return FeatureSetSpecProto(
224350
name=self.name,
225351
version=self.version,
226352
maxAge=self.max_age,
227-
source=Source(),
228-
features=[
229-
feature.to_proto() for featureName, feature in self._features.items()
230-
],
231-
entities=[
232-
entity.to_proto() for entityName, entity in self._entities.items()
233-
],
353+
source=SourceProto(),
354+
features=[feature.to_proto() for feature in self._features.values()],
355+
entities=[entity.to_proto() for entity in self._entities.values()],
234356
)
357+
358+
def _validate_feature_set(self):
359+
pass
360+
361+
def _get_kafka_source_brokers(self) -> str:
362+
if self.source and self.source.source_type is "Kafka":
363+
return self.source.brokers
364+
raise Exception("Source type could not be identified")
365+
366+
def _get_kafka_source_topic(self) -> str:
367+
if self.source and self.source.source_type == "Kafka":
368+
return self.source.topics
369+
raise Exception("Source type could not be identified")
370+
371+
def _validate_dataframe_schema(self, dataframe: pd.DataFrame) -> bool:
372+
return True

0 commit comments

Comments
 (0)