Skip to content

Commit bfc3403

Browse files
committed
Optimize retrieval performance
1 parent 5f1cd69 commit bfc3403

6 files changed

Lines changed: 95 additions & 108 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ vendor
5656
__pycache__/
5757
*.py[cod]
5858
*$py.class
59+
*.prof
5960

6061
# C extensions
6162
*.so

sdk/python/feast/client.py

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,19 @@
2222
GetFeatureSetsRequest,
2323
ApplyFeatureSetResponse,
2424
)
25-
from feast.types.Value_pb2 import ValueType
2625
from feast.serving.ServingService_pb2 import (
2726
GetFeaturesRequest,
2827
GetFeastServingVersionRequest,
2928
GetOnlineFeaturesResponse,
3029
)
3130
from feast.feature_set import FeatureSet, Entity
3231
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
33-
3432
from typing import List
3533
from collections import OrderedDict
3634
from typing import Dict
37-
from datetime import datetime
3835
import os
3936
import pandas as pd
40-
from feast.type_map import (
41-
pandas_value_to_proto_value,
42-
dtype_to_feast_value_attr,
43-
FEAST_VALUETYPE_TO_DTYPE,
44-
)
37+
from feast.type_map import pandas_value_to_proto_value, FEAST_VALUE_ATTR_TO_DTYPE
4538

4639
GRPC_CONNECTION_TIMEOUT = 600 # type: int
4740
FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
@@ -206,7 +199,7 @@ def _apply_feature_set(self, feature_set: FeatureSet):
206199
"Error while trying to apply feature set " + feature_set.name
207200
)
208201
applied_fs = FeatureSet.from_proto(apply_fs_response.feature_set)
209-
feature_set.update_from_feature_set(applied_fs, is_dirty=False)
202+
feature_set._update_from_feature_set(applied_fs, is_dirty=False)
210203
return
211204

212205
def get(
@@ -222,13 +215,11 @@ def get(
222215

223216
entity_names = []
224217
for column in entity_data.columns[1:]:
225-
if column not in self.entities.keys():
226-
raise Exception("Entity " + column + " could not be found")
227218
entity_names.append(column)
228219

229220
entity_dataset_rows = entity_data.apply(
230221
_convert_to_proto_value_fn(entity_data.dtypes), axis=1
231-
).to_list()
222+
)
232223

233224
feature_set_request = create_feature_set_request_from_feature_strings(
234225
feature_ids
@@ -244,7 +235,6 @@ def get(
244235
) # type: GetOnlineFeaturesResponse
245236

246237
feature_dataframe = feature_data_sets_to_pandas_dataframe(
247-
feature_sets=self._feature_sets,
248238
entity_data_set=entity_data.copy(),
249239
feature_data_sets=list(get_online_features_response_proto.feature_datasets),
250240
)
@@ -255,15 +245,15 @@ def _convert_to_proto_value_fn(dtypes: pd.core.generic.NDFrame):
255245
def convert_to_proto_value(row: pd.Series):
256246
entity_dataset_row = GetFeaturesRequest.EntityDatasetRow()
257247
for i in range(len(row) - 1):
258-
proto_value = pandas_value_to_proto_value(dtypes[i + 1], row[i + 1])
259-
entity_dataset_row.value.append(proto_value)
248+
entity_dataset_row.entity_ids.append(
249+
pandas_value_to_proto_value(dtypes[i + 1], row[i + 1])
250+
)
260251
return entity_dataset_row
261252

262253
return convert_to_proto_value
263254

264255

265256
def feature_data_sets_to_pandas_dataframe(
266-
feature_sets: List[FeatureSet],
267257
entity_data_set: pd.DataFrame,
268258
feature_data_sets: List[GetOnlineFeaturesResponse.FeatureDataset],
269259
):
@@ -280,9 +270,7 @@ def feature_data_sets_to_pandas_dataframe(
280270

281271
# Convert to Pandas DataFrame
282272
feature_data_set_dataframes.append(
283-
feature_data_set_to_pandas_dataframe(
284-
feature_sets[feature_data_set.name], feature_data_set
285-
)
273+
feature_data_set_to_pandas_dataframe(feature_data_set)
286274
)
287275

288276
# Join dataframes into a single feature dataframe
@@ -301,29 +289,35 @@ def join_feature_set_dataframes(
301289

302290

303291
def feature_data_set_to_pandas_dataframe(
304-
feature_set: FeatureSet, feature_data_set: GetOnlineFeaturesResponse.FeatureDataset
292+
feature_data_set: GetOnlineFeaturesResponse.FeatureDataset
305293
) -> pd.DataFrame:
306294
feature_set_name = feature_data_set.name
307295
dtypes = {}
296+
value_attr = {}
308297
columns = []
309-
for field in feature_set.entities + feature_set.features:
310-
field_proto = field.to_proto()
311-
feature_id = feature_set_name + "." + field_proto.name
312-
columns.append(feature_id)
313-
feast_value_type = ValueType.Enum.Name(field_proto.value_type)
314-
dtypes[feature_id] = FEAST_VALUETYPE_TO_DTYPE[feast_value_type]
315-
316-
dataframe = pd.DataFrame(columns=columns).reset_index(drop=True).astype(dtypes)
317-
318-
for featureRow in list(feature_data_set.feature_rows):
319-
pandas_row = {}
320-
for field in list(featureRow.fields):
321-
if field.value.WhichOneof("val") is None:
322-
feature_value = None
298+
data = {}
299+
first_run_done = False
300+
301+
for featureRow in feature_data_set.feature_rows:
302+
for field in featureRow.fields:
303+
feature_id = feature_set_name + "." + field.name
304+
305+
if not first_run_done:
306+
columns.append(feature_id)
307+
data[feature_id] = []
308+
value_attr[feature_id] = field.value.WhichOneof("val")
309+
dtypes[feature_id] = FEAST_VALUE_ATTR_TO_DTYPE[value_attr[feature_id]]
310+
311+
if not field.value.HasField(value_attr[feature_id]):
312+
data[feature_id].append(None)
323313
else:
324-
feature_value = getattr(field.value, field.value.WhichOneof("val"))
325-
pandas_row[feature_set_name + "." + field.name] = feature_value
326-
dataframe = dataframe.append(pandas_row, ignore_index=True)
314+
data[feature_id].append(getattr(field.value, value_attr[feature_id]))
315+
316+
first_run_done = True
317+
318+
dataframe = (
319+
pd.DataFrame(columns=columns, data=data).reset_index(drop=True).astype(dtypes)
320+
)
327321

328322
return dataframe
329323

@@ -337,7 +331,7 @@ def create_feature_set_request_from_feature_strings(
337331
if feature_set not in feature_set_request:
338332
feature_set_name, feature_set_version = feature_set.split(":")
339333
feature_set_request[feature_set] = GetFeaturesRequest.FeatureSet(
340-
name=feature_set_name, version=feature_set_version
334+
name=feature_set_name, version=int(feature_set_version)
341335
)
342336
feature_set_request[feature_set].feature_names.append(feature)
343337
return list(feature_set_request.values())

sdk/python/feast/tests/feast_serving_server.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,6 @@ def GetFeastServingVersion(self, request, context):
9696

9797
def GetOnlineFeatures(self, request: GetFeaturesRequest, context):
9898

99-
# for feature_set_request in list(request.featureSets):
100-
# feature_data_set = self._store.get_feature_data(
101-
# feature_set_request=feature_set_request,
102-
# entity_data=request.entityDataSet,
103-
# )
104-
10599
response = GetOnlineFeaturesResponse(
106100
feature_data_sets=[
107101
GetOnlineFeaturesResponse.FeatureDataSet(

sdk/python/feast/tests/test_client.py

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,26 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import time
1514

1615
import grpc
1716
import pandas as pd
17+
import numpy as np
1818
import feast.core.CoreService_pb2_grpc as Core
1919
import feast.serving.ServingService_pb2_grpc as Serving
20-
from feast.entity import Entity
2120
from feast.core.CoreService_pb2 import GetFeastCoreVersionResponse
22-
from feast.core.CoreService_pb2 import GetFeatureSetsResponse
23-
from feast.core.FeatureSet_pb2 import FeatureSetSpec
24-
from feast.serving.ServingService_pb2 import GetFeastServingVersionResponse
21+
from feast.serving.ServingService_pb2 import (
22+
GetFeastServingVersionResponse,
23+
GetOnlineFeaturesResponse,
24+
)
25+
from google.protobuf.timestamp_pb2 import Timestamp
2526
import pytest
2627
from feast.client import Client
27-
from feast.tests import dataframes
2828
from concurrent import futures
2929
from feast.tests.feast_core_server import CoreServicer
3030
from feast.tests.feast_serving_server import ServingServicer
31-
from feast.feature import Feature
32-
from feast.feature_set import FeatureSet
33-
from feast.value_type import ValueType
34-
from feast.source import KafkaSource
35-
from feast.tests.fake_kafka import FakeKafka
31+
from feast.types import FeatureRow_pb2 as FeatureRowProto
32+
from feast.types import Field_pb2 as FieldProto
33+
from feast.types import Value_pb2 as ValueProto
3634

3735
CORE_URL = "core.feast.example.com"
3836
SERVING_URL = "serving.example.com"
@@ -94,63 +92,56 @@ def test_version(self, mock_client, mocker):
9492
and status["serving"]["version"] == "0.3.0"
9593
)
9694

97-
@pytest.mark.parametrize("dataframe", [dataframes.GOOD])
98-
def test_ingest_then_get_one_feature_set_success(
99-
self, core_server, dataframe: pd.DataFrame
100-
):
101-
# Create and register Fake Kafka
102-
fake_kafka = FakeKafka()
95+
def test_get_feature(self, mock_client, mocker):
96+
ROW_COUNT = 300
10397

104-
# Set up Feast Serving with Fake Kafka
105-
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
106-
Serving.add_ServingServiceServicer_to_server(
107-
ServingServicer(kafka=fake_kafka), server
98+
mock_client._serving_service_stub = Serving.ServingServiceStub(
99+
grpc.insecure_channel("")
108100
)
109-
server.add_insecure_port("[::]:50052")
110-
server.start()
111101

112-
# Create Feast client and register with Core and Serving
113-
client = Client(core_url="localhost:50051", serving_url="localhost:50052")
114-
115-
# Create feature set and update based on dataframe
116-
feature_set_1 = FeatureSet(
117-
name="feature_set_1",
118-
entities=[Entity(name="entity_id", dtype=ValueType.INT32)],
119-
features=[
120-
Feature(name="feature_1", dtype=ValueType.FLOAT),
121-
Feature(name="feature_2", dtype=ValueType.STRING),
122-
Feature(name="feature_3", dtype=ValueType.INT32),
123-
],
102+
response = GetOnlineFeaturesResponse()
103+
feature_row = FeatureRowProto.FeatureRow(
104+
event_timestamp=Timestamp(), feature_set="feature_set_1:1"
105+
)
106+
for feature_num in range(1, 10):
107+
field = FieldProto.Field(
108+
name="feature_" + str(feature_num),
109+
value=ValueProto.Value(int64_val=feature_num),
110+
)
111+
feature_row.fields.append(field)
112+
113+
feature_data_set = GetOnlineFeaturesResponse.FeatureDataset(
114+
name="feature_set_1", version=1
124115
)
125116

126-
# Register feature set with Feast core
127-
client.apply(feature_set_1)
117+
for row_number in range(1, ROW_COUNT + 1):
118+
feature_data_set.feature_rows.append(feature_row)
128119

129-
# Register Fake Kafka with feature set
130-
feature_set_1._message_producer = fake_kafka
120+
response.feature_datasets.append(feature_data_set)
131121

132-
# Ingest data into Feast using Fake Kafka
133-
feature_set_1.ingest(dataframe)
122+
mocker.patch.object(
123+
mock_client._serving_service_stub,
124+
"GetOnlineFeatures",
125+
return_value=response,
126+
)
134127

135-
time.sleep(2)
128+
entity_data = pd.DataFrame(
129+
{"datetime": np.repeat(4, ROW_COUNT), "entity_id": np.repeat(4, ROW_COUNT)}
130+
)
136131

137-
# Retrieve feature values from Feast serving
138-
feature_dataframe = client.get(
139-
entity_data=dataframe[["datetime", "entity_id"]],
132+
feature_dataframe = mock_client.get(
133+
entity_data=entity_data,
140134
feature_ids=[
141-
"feature_set_1.feature_1",
142-
"feature_set_1.feature_2",
143-
"feature_set_1.feature_3",
135+
"feature_set_1:1.feature_1",
136+
"feature_set_1:1.feature_2",
137+
"feature_set_1:1.feature_3",
138+
"feature_set_1:1.feature_4",
139+
"feature_set_1:1.feature_5",
140+
"feature_set_1:1.feature_6",
141+
"feature_set_1:1.feature_7",
142+
"feature_set_1:1.feature_8",
143+
"feature_set_1:1.feature_9",
144144
],
145145
)
146146

147-
assert True
148-
149-
# assert (
150-
# feature_dataframe["feature_set_1.feature_1"][0] == 0.2
151-
# and feature_dataframe["feature_set_1.feature_2"][0] == "string1"
152-
# and feature_dataframe["feature_set_1.feature_3"][0] == 1
153-
# )
154-
155-
# Stop Feast Serving server
156-
server.stop(0)
147+
feature_dataframe.head()

sdk/python/feast/tests/test_feature_set.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from datetime import datetime
16-
17-
import pytz
1815
from unittest.mock import MagicMock
1916
from feast.feature_set import FeatureSet, Feature
2017
from feast.value_type import ValueType
@@ -109,7 +106,7 @@ def test_feature_set_ingest_success(self, dataframe, client):
109106
dataframe,
110107
column_mapping={"entity_id": Entity(name="entity", dtype=ValueType.INT64)},
111108
)
112-
driver_fs.source = KafkaSource(topic="feature-topic", brokers="fake.broker.com")
109+
driver_fs.source = KafkaSource(topic="feature-topic", brokers="127.0.0.1")
113110
driver_fs._message_producer = MagicMock()
114111
driver_fs._message_producer.send = MagicMock()
115112

sdk/python/feast/type_map.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
# Mapping of feast value type to Pandas DataFrame dtypes
2222
# Integer and floating values are all 64-bit for better integration
2323
# with BigQuery data types
24-
FEAST_VALUETYPE_TO_DTYPE = {
24+
FEAST_VALUE_TYPE_TO_DTYPE = {
2525
"BYTES": np.byte,
2626
"STRING": np.object,
2727
"INT32": "Int32", # Use pandas nullable int type
@@ -31,6 +31,16 @@
3131
"BOOL": np.bool,
3232
}
3333

34+
FEAST_VALUE_ATTR_TO_DTYPE = {
35+
"bytes_val": np.byte,
36+
"string_val": np.object,
37+
"int32_val": "Int32",
38+
"int64_val": "Int64",
39+
"double_val": np.float64,
40+
"float_val": np.float64,
41+
"bool_val": np.bool,
42+
}
43+
3444

3545
def dtype_to_feast_value_attr(dtype):
3646
# Mapping of Pandas dtype to attribute name in Feast Value
@@ -104,7 +114,7 @@ def pandas_dtype_to_feast_value_type(dtype: pd.DataFrame.dtypes) -> ValueType:
104114
def pandas_value_to_proto_value(pandas_dtype, pandas_value) -> ProtoValue:
105115
value = ProtoValue()
106116
value_attr = dtype_to_feast_value_attr(pandas_dtype)
107-
if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]:
117+
if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]:
108118
pandas_value = int(pandas_value.timestamp())
109119
try:
110120
value.__setattr__(value_attr, pandas_value)

0 commit comments

Comments
 (0)