Skip to content

Commit e99fdd1

Browse files
committed
Added Field type as super class to Feature and Entity in Python SDK
1 parent 89cd543 commit e99fdd1

File tree

8 files changed

+127
-138
lines changed

8 files changed

+127
-138
lines changed

sdk/python/feast/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
GetFeatureSetsResponse,
2121
ApplyFeatureSetRequest,
2222
GetFeatureSetsRequest,
23+
ApplyFeatureSetResponse,
2324
)
2425
from feast.core.FeatureSet_pb2 import FeatureSetSpec, FeatureSpec, EntitySpec
2526
from feast.core.Source_pb2 import Source
@@ -182,8 +183,11 @@ def entities(self) -> Dict[str, Entity]:
182183

183184
def _apply_feature_set(self, feature_set: FeatureSet):
184185
self._connect_core()
185-
self._core_service_stub.ApplyFeatureSet(
186+
apply_feature_set_response = self._core_service_stub.ApplyFeatureSet(
186187
ApplyFeatureSetRequest(featureSet=feature_set.to_proto()),
187188
timeout=GRPC_CONNECTION_TIMEOUT,
188-
)
189+
) # type: ApplyFeatureSetResponse
190+
191+
feature_set._version = apply_feature_set_response.featureSet.version
192+
feature_set._is_dirty = False
189193
feature_set._client = self

sdk/python/feast/entity.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,10 @@
1515
from feast.value_type import ValueType
1616
from feast.core.FeatureSet_pb2 import EntitySpec as EntityProto
1717
from feast.types import Value_pb2 as ValueTypeProto
18+
from feast.field import Field
1819

1920

20-
class Entity:
21-
def __init__(self, name: str, dtype: ValueType):
22-
self._name = name
23-
self._dtype = dtype
24-
25-
@property
26-
def name(self):
27-
return self._name
28-
29-
@property
30-
def dtype(self):
31-
return self._dtype
32-
21+
class Entity(Field):
3322
def to_proto(self) -> EntityProto:
3423
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
3524
return EntityProto(name=self.name, valueType=value_type)

sdk/python/feast/feature.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,10 @@
1515
from feast.value_type import ValueType
1616
from feast.core.FeatureSet_pb2 import FeatureSpec as FeatureProto
1717
from feast.types import Value_pb2 as ValueTypeProto
18+
from feast.field import Field
1819

1920

20-
class Feature:
21-
def __init__(self, name: str, dtype: ValueType):
22-
self._name = name
23-
if not isinstance(dtype, ValueType):
24-
raise ValueError("dtype is not a valid ValueType")
25-
self._dtype = dtype
26-
27-
@property
28-
def name(self):
29-
return self._name
30-
31-
@property
32-
def dtype(self) -> ValueType:
33-
return self._dtype
34-
21+
class Feature(Field):
3522
def to_proto(self) -> FeatureProto:
3623
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
3724
return FeatureProto(name=self.name, valueType=value_type)

sdk/python/feast/feature_set.py

Lines changed: 74 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,26 @@
1919
from typing import List
2020
from collections import OrderedDict
2121
from typing import Dict
22-
from feast.source import Source, KafkaSource
22+
from feast.source import Source
2323
from feast.type_map import dtype_to_value_type
24-
from feast.value_type import ValueType
2524
from pandas.api.types import is_datetime64_ns_dtype
2625
from feast.entity import Entity
27-
from feast.feature import Feature
28-
from feast.core.FeatureSet_pb2 import (
29-
FeatureSetSpec as FeatureSetSpecProto,
30-
FeatureSpec as FeatureSpecProto,
31-
)
26+
from feast.feature import Feature, Field
27+
from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto
3228
from feast.core.Source_pb2 import Source as SourceProto
3329
from feast.types import FeatureRow_pb2 as FeatureRow
3430
from google.protobuf.timestamp_pb2 import Timestamp
3531
from kafka import KafkaProducer
36-
from pandas.core.dtypes.common import is_datetime64_any_dtype
3732
from tqdm import tqdm
3833
from type_map import dtype_to_feast_value_type
3934
from feast.types import (
4035
Value_pb2 as ValueProto,
4136
FeatureRow_pb2 as FeatureRowProto,
42-
Feature_pb2 as FeatureProto,
4337
Field_pb2 as FieldProto,
4438
)
4539
from feast.type_map import dtype_to_feast_value_attr
4640

4741
_logger = logging.getLogger(__name__)
48-
4942
DATETIME_COLUMN = "datetime" # type: str
5043

5144

@@ -63,32 +56,38 @@ def __init__(
6356
max_age: int = -1,
6457
):
6558
self._name = name
66-
self._features = OrderedDict() # type: Dict[str, Feature]
67-
self._entities = OrderedDict() # type: Dict[str, Entity]
59+
self._fields = OrderedDict() # type: Dict[str, Field]
6860
if features is not None:
69-
self._add_features(features)
61+
self._add_fields(features)
7062
if entities is not None:
71-
self._add_entities(entities)
63+
self._add_fields(entities)
7264
self._max_age = max_age
7365
self._version = None
7466
self._client = None
7567
self._source = source
7668
self._message_producer = None
7769
self._busy_ingesting = False
70+
self._is_dirty = True
71+
72+
def __eq__(self, other):
73+
if not isinstance(other, FeatureSet):
74+
return NotImplemented
75+
76+
return self.name == other.name and self.version == other.version
7877

7978
@property
8079
def features(self) -> List[Feature]:
8180
"""
8281
Returns a list of features from this feature set
8382
"""
84-
return list(self._features.values())
83+
return [field for field in self._fields.values() if isinstance(field, Feature)]
8584

8685
@property
8786
def entities(self) -> List[Entity]:
8887
"""
8988
Returns list of entities from this feature set
9089
"""
91-
return list(self._entities.values())
90+
return [field for field in self._fields.values() if isinstance(field, Entity)]
9291

9392
@property
9493
def name(self):
@@ -121,10 +120,7 @@ def add(self, resource):
121120
:param resource: A resource can be either a Feature or an Entity object
122121
:return:
123122
"""
124-
if (
125-
resource.name in self._features.keys()
126-
or resource.name in self._entities.keys()
127-
):
123+
if resource.name in self._fields.keys():
128124
raise ValueError(
129125
'could not add field "'
130126
+ resource.name
@@ -133,53 +129,33 @@ def add(self, resource):
133129
+ '"'
134130
)
135131

136-
if isinstance(resource, Feature):
137-
return self._add_feature(resource)
138-
139-
if isinstance(resource, Entity):
140-
return self._add_entity(resource)
132+
if issubclass(type(resource), Field):
133+
return self._add_field(resource)
141134

142135
raise ValueError("Could not identify the resource being added")
143136

144-
def _add_entity(self, entity: Entity):
145-
self._entities[entity.name] = entity
146-
return
147-
148-
def _add_feature(self, feature: Feature):
149-
self._features[feature.name] = feature
137+
def _add_field(self, field: Field):
138+
self._fields[field.name] = field
150139
return
151140

152141
def drop(self, name: str):
153142
"""
154143
Removes a Feature or Entity from a Feature Set
155144
:param name: Name of Feature or Entity to be removed
156145
"""
157-
if name not in self._features and name not in self._entities:
146+
if name not in self._fields:
158147
raise ValueError("Could not find field " + name + ", no action taken")
159-
if name in self._features and name in self._entities:
160-
raise ValueError("Duplicate field found for " + name + "!")
161-
if name in self._features:
162-
del self._features[name]
163-
return
164-
if name in self._entities:
165-
del self._entities[name]
148+
if name in self._fields:
149+
del self._fields[name]
166150
return
167151

168-
def _add_features(self, features: List[Feature]):
169-
"""
170-
Adds multiple Features to a Feature Set
171-
:param features: List of Feature Objects
172-
"""
173-
for feature in features:
174-
self.add(feature)
175-
176-
def _add_entities(self, entities: List[Entity]):
152+
def _add_fields(self, fields: List[Field]):
177153
"""
178-
Adds multiple Entities to a Feature Set
179-
:param entities: List of Entity Objects
154+
Adds multiple Fields to a Feature Set
155+
:param fields: List of Feature or Entity Objects
180156
"""
181-
for entity in entities:
182-
self.add(entity)
157+
for field in fields:
158+
self.add(field)
183159

184160
def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
185161
"""
@@ -189,11 +165,8 @@ def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
189165
:param df: Pandas dataframe containing datetime column, entity columns, and feature columns.
190166
"""
191167

192-
features = OrderedDict()
193-
entities = OrderedDict()
194-
existing_entities = None
195-
if self._client:
196-
existing_entities = self._client.entities
168+
fields = OrderedDict()
169+
existing_entities = self._client.entities if self._client is not None else None
197170

198171
# Validate whether the datetime column exists with the right name
199172
if DATETIME_COLUMN not in df:
@@ -209,54 +182,47 @@ def update_from_dataset(self, df: pd.DataFrame, column_mapping=None):
209182
for column in df.columns:
210183
column = column.strip()
211184

212-
# Validate whether the datetime column exists with the right name
185+
# Skip datetime column
213186
if DATETIME_COLUMN in column:
214187
continue
215188

216-
# Use entity or feature value if provided
189+
# Use entity or feature value if provided by the column mapping
217190
if column_mapping and column in column_mapping:
218-
resource = column_mapping[column]
219-
if isinstance(resource, Entity):
220-
entities[column] = resource
221-
continue
222-
if isinstance(resource, Feature):
223-
features[column] = resource
191+
if issubclass(type(column_mapping[column]), Field):
192+
fields[column] = column_mapping[column]
224193
continue
225194
raise ValueError(
226195
"Invalid resource type specified at column name " + column
227196
)
228197

229-
# Test whether this column is an existing entity. If it is named exactly the same
230-
# as an existing entity then it will be detected as such
198+
# Test whether this column is an existing entity (globally).
231199
if existing_entities and column in existing_entities:
232200
entity = existing_entities[column]
233201

234202
# test whether registered entity type matches user provided type
235203
if entity.dtype == dtype_to_value_type(df[column].dtype):
236204
# Store this field as an entity
237-
entities[column] = entity
205+
fields[column] = entity
238206
continue
239207

240-
for feature in self.features:
241-
# Ignore features that already exist
242-
if feature.name == column:
243-
continue
208+
# Ignore fields that already exist
209+
if column in self._fields:
210+
continue
244211

245212
# Store this field as a feature
246-
features[column] = Feature(
213+
fields[column] = Feature(
247214
name=column, dtype=dtype_to_feast_value_type(df[column].dtype)
248215
)
249216

250-
if len(entities) == 0:
217+
if len([field for field in fields.values() if type(field) == Entity]) == 0:
251218
raise Exception(
252219
"Could not detect entity column(s). Please provide entity column(s)."
253220
)
254-
if len(features) == 0:
221+
if len([field for field in fields.values() if type(field) == Feature]) == 0:
255222
raise Exception(
256-
"Could not detect feature columns. Please provide feature column(s)."
223+
"Could not detect feature column(s). Please provide feature column(s)."
257224
)
258-
self._entities = entities
259-
self._features = features
225+
self._add_fields(list(fields.values()))
260226

261227
def ingest(
262228
self, dataframe: pd.DataFrame, force_update: bool = False, timeout: int = 5
@@ -269,9 +235,6 @@ def ingest(
269235
# Validate feature set version with Feast Core
270236
self._validate_feature_set()
271237

272-
# Validate data schema w.r.t this feature set
273-
self._validate_dataframe_schema(dataframe)
274-
275238
# Create Kafka FeatureRow producer
276239
if self._message_producer is None:
277240
self._message_producer = KafkaProducer(
@@ -298,7 +261,7 @@ def ingest(
298261
def _pandas_row_to_feature_row(
299262
self, dataframe: pd.DataFrame, row
300263
) -> FeatureRow.FeatureRow:
301-
if len(self.features) + len(self.entities) + 1 != len(dataframe.columns):
264+
if len(self._fields) != len(dataframe.columns) - 1:
302265
raise Exception(
303266
"Amount of entities and features in feature set do not match dataset columns"
304267
)
@@ -351,12 +314,38 @@ def to_proto(self) -> FeatureSetSpecProto:
351314
version=self.version,
352315
maxAge=self.max_age,
353316
source=SourceProto(),
354-
features=[feature.to_proto() for feature in self._features.values()],
355-
entities=[entity.to_proto() for entity in self._entities.values()],
317+
features=[
318+
field.to_proto()
319+
for field in self._fields.values()
320+
if type(field) == Feature
321+
],
322+
entities=[
323+
field.to_proto()
324+
for field in self._fields.values()
325+
if type(field) == Entity
326+
],
356327
)
357328

358329
def _validate_feature_set(self):
359-
pass
330+
331+
# Validate whether the feature set has been modified and needs to be saved
332+
if self._is_dirty:
333+
raise Exception("Feature set has been modified and must be saved first")
334+
335+
refreshed_feature_set = [
336+
fs
337+
for fs in self._client.feature_sets
338+
if fs.name == self.name and fs.version == self.version
339+
]
340+
341+
if not (len(refreshed_feature_set) == 1 and refreshed_feature_set[0] == self):
342+
raise Exception(
343+
"Feature set (name:"
344+
+ self.name
345+
+ ", version:"
346+
+ str(self.version)
347+
+ ") is inconsistent with Feast core"
348+
)
360349

361350
def _get_kafka_source_brokers(self) -> str:
362351
if self.source and self.source.source_type is "Kafka":
@@ -367,6 +356,3 @@ def _get_kafka_source_topic(self) -> str:
367356
if self.source and self.source.source_type == "Kafka":
368357
return self.source.topics
369358
raise Exception("Source type could not be identified")
370-
371-
def _validate_dataframe_schema(self, dataframe: pd.DataFrame) -> bool:
372-
return True

0 commit comments

Comments
 (0)