Skip to content

Commit 5116e77

Browse files
committed
Refactoring SDK
* Cleaned up type mapping in SDK * Added tracking of dirty state * Added type comparison * Added support for sources
1 parent 7565fe6 commit 5116e77

File tree

7 files changed

+320
-105
lines changed

7 files changed

+320
-105
lines changed

sdk/python/feast/client.py

Lines changed: 185 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,32 @@
1414

1515

1616
import grpc
17-
1817
from feast.core.CoreService_pb2_grpc import CoreServiceStub
1918
from feast.core.CoreService_pb2 import (
19+
GetFeastCoreVersionRequest,
2020
GetFeatureSetsResponse,
2121
ApplyFeatureSetRequest,
2222
GetFeatureSetsRequest,
2323
ApplyFeatureSetResponse,
2424
)
25-
from feast.core.FeatureSet_pb2 import FeatureSetSpec, FeatureSpec, EntitySpec
26-
from feast.core.Source_pb2 import Source
25+
from feast.serving.ServingService_pb2 import (
26+
GetFeaturesRequest,
27+
GetFeastServingVersionRequest,
28+
GetOnlineFeaturesResponse,
29+
)
2730
from feast.feature_set import FeatureSet, Entity
2831
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
29-
from google.protobuf import empty_pb2 as empty
32+
3033
from typing import List
3134
from collections import OrderedDict
3235
from typing import Dict
33-
from feast.type_map import dtype_to_value_type
3436
import os
37+
import pandas as pd
38+
from feast.type_map import (
39+
pandas_value_to_proto_value,
40+
dtype_to_feast_value_attr,
41+
FEAST_VALUETYPE_TO_DTYPE,
42+
)
3543

3644
GRPC_CONNECTION_TIMEOUT = 5 # type: int
3745
FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
@@ -42,16 +50,14 @@ class Client:
4250
def __init__(
4351
self, core_url: str = None, serving_url: str = None, verbose: bool = False
4452
):
45-
self._feature_sets = [] # type: List[FeatureSet]
53+
self._feature_sets = OrderedDict() # type: Dict[FeatureSet]
4654
self._core_url = core_url
4755
self._serving_url = serving_url
4856
self._verbose = verbose
4957
self.__core_channel: grpc.Channel = None
5058
self.__serving_channel: grpc.Channel = None
5159
self._core_service_stub: CoreServiceStub = None
5260
self._serving_service_stub: ServingServiceStub = None
53-
self._is_serving_connected = False
54-
self._is_core_connected = False
5561

5662
@property
5763
def core_url(self) -> str:
@@ -83,14 +89,14 @@ def version(self):
8389

8490
try:
8591
core_version = self._core_service_stub.GetFeastCoreVersion(
86-
empty, timeout=GRPC_CONNECTION_TIMEOUT
92+
GetFeastCoreVersionRequest(), timeout=GRPC_CONNECTION_TIMEOUT
8793
).version
8894
except grpc.FutureCancelledError:
8995
core_version = "not connected"
9096

9197
try:
9298
serving_version = self._serving_service_stub.GetFeastServingVersion(
93-
empty, timeout=GRPC_CONNECTION_TIMEOUT
99+
GetFeastServingVersionRequest(), timeout=GRPC_CONNECTION_TIMEOUT
94100
).version
95101
except grpc.FutureCancelledError:
96102
serving_version = "not connected"
@@ -100,10 +106,13 @@ def version(self):
100106
"serving": {"url": self.serving_url, "version": serving_version},
101107
}
102108

103-
def _connect_core(self):
109+
def _connect_core(self, skip_if_connected=True):
104110
"""
105111
Connect to Core API
106112
"""
113+
if skip_if_connected and self._core_service_stub:
114+
return
115+
107116
if not self.core_url:
108117
raise ValueError("Please set Feast Core URL.")
109118

@@ -121,13 +130,15 @@ def _connect_core(self):
121130
)
122131
else:
123132
self._core_service_stub = CoreServiceStub(self.__core_channel)
124-
self._is_core_connected = True
125133

126-
def _connect_serving(self):
134+
def _connect_serving(self, skip_if_connected=True):
127135
"""
128136
Connect to Serving API
129137
"""
130138

139+
if skip_if_connected and self._serving_service_stub:
140+
return
141+
131142
if not self.serving_url:
132143
raise ValueError("Please set Feast Serving URL.")
133144

@@ -145,49 +156,194 @@ def _connect_serving(self):
145156
)
146157
else:
147158
self._serving_service_stub = ServingServiceStub(self.__serving_channel)
148-
self._is_serving_connected = True
149159

150160
def apply(self, resource):
151161
if isinstance(resource, FeatureSet):
152162
self._apply_feature_set(resource)
163+
return
164+
raise Exception("Could not determine resource type to apply")
153165

154166
def refresh(self):
155167
"""
156168
Refresh list of Feature Sets from Feast Core
157169
"""
158-
self._connect_core()
170+
self._connect_core(skip_if_connected=True)
159171

160172
# Get latest Feature Sets from Feast Core
161173
feature_set_protos = self._core_service_stub.GetFeatureSets(
162174
GetFeatureSetsRequest(filter=GetFeatureSetsRequest.Filter())
163175
) # type: GetFeatureSetsResponse
164176

165177
# Store list of Feature Sets
166-
self._feature_sets = [
167-
FeatureSet.from_proto(feature_set)
168-
for feature_set in feature_set_protos.featureSets
169-
]
178+
for feature_set_proto in feature_set_protos.feature_sets:
179+
feature_set = FeatureSet.from_proto(feature_set_proto)
180+
feature_set._is_dirty = False
181+
feature_set._client = self
182+
self._feature_sets[feature_set.name.strip()] = feature_set
170183

171184
@property
172185
def feature_sets(self) -> List[FeatureSet]:
173186
self.refresh()
174-
return self._feature_sets
187+
return list(self._feature_sets.values())
175188

176189
@property
177190
def entities(self) -> Dict[str, Entity]:
178191
entities_dict = OrderedDict()
179-
for fs in self._feature_sets:
180-
for entityName, entity in fs.entities:
181-
entities_dict[entityName] = entity
192+
for fs in list(self._feature_sets.values()):
193+
for entity in fs.entities:
194+
entities_dict[entity.name] = entity
182195
return entities_dict
183196

184197
def _apply_feature_set(self, feature_set: FeatureSet):
185-
self._connect_core()
186-
apply_feature_set_response = self._core_service_stub.ApplyFeatureSet(
187-
ApplyFeatureSetRequest(featureSet=feature_set.to_proto()),
198+
self._connect_core(skip_if_connected=True)
199+
feature_set._client = self
200+
201+
apply_fs_response = self._core_service_stub.ApplyFeatureSet(
202+
ApplyFeatureSetRequest(feature_set=feature_set.to_proto()),
188203
timeout=GRPC_CONNECTION_TIMEOUT,
189204
) # type: ApplyFeatureSetResponse
190205

191-
feature_set._version = apply_feature_set_response.featureSet.version
192-
feature_set._is_dirty = False
193-
feature_set._client = self
206+
if apply_fs_response.status == ApplyFeatureSetResponse.Status.ERROR:
207+
raise Exception(
208+
"Error while trying to apply feature set " + feature_set.name
209+
)
210+
211+
# Refresh state from Feast Core to local client
212+
self.refresh()
213+
214+
# Replace applied feature set with refreshed feature set from Feast Core
215+
deep_update_feature_set(
216+
source=self._feature_sets[feature_set.name], target=feature_set
217+
)
218+
219+
def get(
220+
self,
221+
entity_data: pd.DataFrame,
222+
feature_ids: List[str],
223+
join_on: Dict[str, str] = None,
224+
) -> pd.DataFrame:
225+
self._connect_serving(skip_if_connected=True)
226+
227+
if "datetime" != entity_data.columns[0]:
228+
raise ValueError("The first column in entity_data should be 'datetime'")
229+
230+
entity_data_field_names = ["datetime"]
231+
for column in entity_data.columns[1:]:
232+
if column not in self.entities.keys():
233+
raise Exception("Entity " + column + " could not be found")
234+
entity_data_field_names.append(column)
235+
236+
entity_dataset_rows = []
237+
for row in entity_data.iterrows():
238+
entity_dataset_row = GetFeaturesRequest.EntityDataSetRow()
239+
for i in range(len(entity_data.columns[1:])):
240+
proto_value = pandas_value_to_proto_value(
241+
entity_data[entity_data.columns[i]].dtype, row[i]
242+
)
243+
entity_dataset_row.value.append(proto_value)
244+
entity_dataset_rows.append(entity_dataset_row)
245+
246+
feature_set_request = create_feature_set_request_from_feature_strings(
247+
feature_ids
248+
)
249+
250+
get_online_features_response_proto = self._serving_service_stub.GetOnlineFeatures(
251+
GetFeaturesRequest(
252+
entityDataSet=GetFeaturesRequest.EntityDataSet(
253+
entity_data_set_rows=entity_dataset_rows,
254+
fieldNames=entity_data_field_names,
255+
),
256+
featureSets=feature_set_request,
257+
)
258+
) # type: GetOnlineFeaturesResponse
259+
260+
feature_dataframe = feature_data_sets_to_pandas_dataframe(
261+
entity_data_set=entity_data.copy(),
262+
feature_data_sets=list(
263+
get_online_features_response_proto.feature_data_sets
264+
),
265+
)
266+
return feature_dataframe
267+
268+
269+
def feature_data_sets_to_pandas_dataframe(
270+
entity_data_set: pd.DataFrame,
271+
feature_data_sets: List[GetOnlineFeaturesResponse.FeatureDataSet],
272+
):
273+
feature_data_set_dataframes = []
274+
for feature_data_set in feature_data_sets:
275+
# Validate feature data set length
276+
if len(feature_data_set.feature_rows) != len(entity_data_set.index):
277+
raise Exception(
278+
"Feature data set response is of different size "
279+
+ str(len(feature_data_set.feature_rows))
280+
+ " than the entity data set request "
281+
+ str(len(entity_data_set.index))
282+
)
283+
284+
# Convert to Pandas DataFrame
285+
feature_data_set_dataframes.append(
286+
feature_data_set_to_pandas_dataframe(feature_data_set)
287+
)
288+
289+
# Join dataframes into a single feature dataframe
290+
dataframe = join_feature_set_dataframes(feature_data_set_dataframes)
291+
return dataframe
292+
293+
294+
def join_feature_set_dataframes(
295+
feature_data_set_dataframes: List[pd.DataFrame]
296+
) -> pd.DataFrame:
297+
return (
298+
feature_data_set_dataframes[0]
299+
if len(feature_data_set_dataframes) > 0
300+
else pd.DataFrame
301+
)
302+
303+
304+
def feature_data_set_to_pandas_dataframe(
305+
feature_data_set: GetOnlineFeaturesResponse.FeatureDataSet
306+
) -> pd.DataFrame:
307+
feature_set_name = feature_data_set.name
308+
dtypes = {}
309+
columns = []
310+
for field in list(feature_data_set.feature_rows[0].fields):
311+
feature_id = feature_set_name + "." + field.name
312+
columns.append(feature_id)
313+
dtypes[feature_id] = FEAST_VALUETYPE_TO_DTYPE[field.value.WhichOneof("val")]
314+
315+
dataframe = pd.DataFrame(columns=columns).reset_index(drop=True).astype(dtypes)
316+
317+
for featureRow in list(feature_data_set.feature_rows):
318+
pandas_row = {}
319+
for field in list(featureRow.fields):
320+
pandas_row[feature_set_name + "." + field.name] = getattr(
321+
field.value, field.value.WhichOneof("val")
322+
)
323+
dataframe = dataframe.append(pandas_row, ignore_index=True)
324+
325+
return dataframe
326+
327+
328+
def create_feature_set_request_from_feature_strings(
329+
feature_ids: List[str]
330+
) -> List[GetFeaturesRequest.FeatureSet]:
331+
feature_set_request = dict() # type: Dict[str, GetFeaturesRequest.FeatureSet]
332+
for feature_id in feature_ids:
333+
feature_set, feature = feature_id.split(".")
334+
if feature_set not in feature_set_request:
335+
feature_set_request[feature_set] = GetFeaturesRequest.FeatureSet(
336+
name=feature_set
337+
)
338+
feature_set_request[feature_set].feature_names.append(feature)
339+
return list(feature_set_request.values())
340+
341+
342+
def deep_update_feature_set(source: FeatureSet, target: FeatureSet):
343+
target._name = source.name
344+
target._version = source.version
345+
target._source = source.source
346+
target._max_age = source.max_age
347+
target._features = source.features
348+
target._entities = source.entities
349+
target._is_dirty = source._is_dirty

sdk/python/feast/entity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
class Entity(Field):
2222
def to_proto(self) -> EntityProto:
2323
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
24-
return EntityProto(name=self.name, valueType=value_type)
24+
return EntityProto(name=self.name, value_type=value_type)
2525

2626
@classmethod
2727
def from_proto(cls, entity_proto: EntityProto):
28-
return cls(name=entity_proto.name, dtype=ValueType(entity_proto.valueType))
28+
return cls(name=entity_proto.name, dtype=ValueType(entity_proto.value_type))

sdk/python/feast/feature.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
class Feature(Field):
2222
def to_proto(self) -> FeatureProto:
2323
value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name)
24-
return FeatureProto(name=self.name, valueType=value_type)
24+
return FeatureProto(name=self.name, value_type=value_type)
2525

2626
@classmethod
2727
def from_proto(cls, feature_proto: FeatureProto):
28-
return cls(name=feature_proto.name, dtype=ValueType(feature_proto.valueType))
28+
return cls(name=feature_proto.name, dtype=ValueType(feature_proto.value_type))

0 commit comments

Comments
 (0)