Skip to content

Commit 396f729

Browse files
authored
Optimize _populate_result_rows_from_feature_view (feast-dev#2223)
* Optimize `_populate_result_rows_from_feature_view` This commit optimizes the fetching of features by only fetching the features for each unique Entity once and then expands the result to the shape of input EntityKeys. Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Initialize the minimum number of EntityKeyProtos per FeatureView Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Refactor for readability Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Update `mypy` and `mypy-protobuf` to allow Enum typing Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Fix newly detected linting errors Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Fix test failure Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Remove accidentally committed typo Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Define Entity type Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Ensure entities are sorted for `itertools.groupby` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Add simple unittest for `_get_unique_entities` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
1 parent b3174c9 commit 396f729

26 files changed

Lines changed: 398 additions & 156 deletions

sdk/python/feast/feature_store.py

Lines changed: 136 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,27 +1150,24 @@ def _get_online_features(
11501150
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
11511151
)
11521152

1153-
# Initialize the set of EntityKeyProtos once and reuse them for each FeatureView
1154-
# to avoid initialization overhead.
1155-
entity_keys = [EntityKeyProto() for _ in range(num_rows)]
11561153
provider = self._get_provider()
11571154
for table, requested_features in grouped_refs:
11581155
# Get the correct set of entity values with the correct join keys.
1159-
table_entity_values = self._get_table_entity_values(
1160-
table, entity_name_to_join_key_map, join_key_values,
1156+
table_entity_values, idxs = self._get_unique_entities(
1157+
table, join_key_values, entity_name_to_join_key_map,
11611158
)
11621159

1163-
# Set the EntityKeyProtos inplace.
1164-
self._set_table_entity_keys(
1165-
table_entity_values, entity_keys,
1160+
# Fetch feature data for the minimum set of Entities.
1161+
feature_data = self._read_from_online_store(
1162+
table_entity_values, provider, requested_features, table,
11661163
)
11671164

11681165
# Populate the result_rows with the Features from the OnlineStore inplace.
1169-
self._populate_result_rows_from_feature_view(
1166+
self._populate_response_from_feature_data(
1167+
feature_data,
1168+
idxs,
11701169
online_features_response,
1171-
entity_keys,
11721170
full_feature_names,
1173-
provider,
11741171
requested_features,
11751172
table,
11761173
)
@@ -1255,22 +1252,6 @@ def _get_table_entity_values(
12551252
}
12561253
return entity_values
12571254

1258-
@staticmethod
1259-
def _set_table_entity_keys(
1260-
entity_values: Dict[str, List[Value]], entity_keys: List[EntityKeyProto],
1261-
):
1262-
"""
1263-
This method sets the a list of EntityKeyProtos inplace.
1264-
"""
1265-
keys = entity_values.keys()
1266-
# Columar to rowise (dict keys and values are guaranteed to have the same order).
1267-
rowise_values = zip(*entity_values.values())
1268-
for entity_key in entity_keys:
1269-
# Make sure entity_keys are empty before setting.
1270-
entity_key.Clear()
1271-
entity_key.join_keys.extend(keys)
1272-
entity_key.entity_values.extend(next(rowise_values))
1273-
12741255
@staticmethod
12751256
def _populate_result_rows_from_columnar(
12761257
online_features_response: GetOnlineFeaturesResponse,
@@ -1323,21 +1304,134 @@ def ensure_request_data_values_exist(
13231304
feature_names=missing_features
13241305
)
13251306

1326-
def _populate_result_rows_from_feature_view(
1307+
def _get_unique_entities(
13271308
self,
1328-
online_features_response: GetOnlineFeaturesResponse,
1329-
entity_keys: List[EntityKeyProto],
1330-
full_feature_names: bool,
1309+
table: FeatureView,
1310+
join_key_values: Dict[str, List[Value]],
1311+
entity_name_to_join_key_map: Dict[str, str],
1312+
) -> Tuple[Tuple[Dict[str, Value], ...], Tuple[List[int], ...]]:
1313+
""" Return the set of unique composite Entities for a Feature View and the indexes at which they appear.
1314+
1315+
This method allows us to query the OnlineStore for data we need only once
1316+
rather than requesting and processing data for the same combination of
1317+
Entities multiple times.
1318+
"""
1319+
# Get the correct set of entity values with the correct join keys.
1320+
table_entity_values = self._get_table_entity_values(
1321+
table, entity_name_to_join_key_map, join_key_values,
1322+
)
1323+
1324+
# Convert back to rowise.
1325+
keys = table_entity_values.keys()
1326+
# Sort the rowise data to allow for grouping but keep original index. This lambda is
1327+
# sufficient as Entity types cannot be complex (ie. lists).
1328+
rowise = list(enumerate(zip(*table_entity_values.values())))
1329+
rowise.sort(
1330+
key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1])
1331+
)
1332+
1333+
# Identify unique entities and the indexes at which they occur.
1334+
unique_entities: Tuple[Dict[str, Value], ...]
1335+
indexes: Tuple[List[int], ...]
1336+
unique_entities, indexes = tuple(
1337+
zip(
1338+
*[
1339+
(dict(zip(keys, k)), [_[0] for _ in g])
1340+
for k, g in itertools.groupby(rowise, key=lambda x: x[1])
1341+
]
1342+
)
1343+
)
1344+
return unique_entities, indexes
1345+
1346+
def _read_from_online_store(
1347+
self,
1348+
entity_rows: Iterable[Mapping[str, Value]],
13311349
provider: Provider,
13321350
requested_features: List[str],
13331351
table: FeatureView,
1334-
):
1352+
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
1353+
""" Read and process data from the OnlineStore for a given FeatureView.
1354+
1355+
This method guarentees that the order of the data in each element of the
1356+
List returned is the same as the order of `requested_features`.
1357+
1358+
This method assumes that `provider.online_read` returns data for each
1359+
combination of Entities in `entity_rows` in the same order as they
1360+
are provided.
1361+
"""
1362+
# Instantiate one EntityKeyProto per Entity.
1363+
entity_key_protos = [
1364+
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
1365+
for row in entity_rows
1366+
]
1367+
1368+
# Fetch data for Entities.
13351369
read_rows = provider.online_read(
13361370
config=self.config,
13371371
table=table,
1338-
entity_keys=entity_keys,
1372+
entity_keys=entity_key_protos,
13391373
requested_features=requested_features,
13401374
)
1375+
1376+
# Each row is a set of features for a given entity key. We only need to convert
1377+
# the data to Protobuf once.
1378+
row_ts_proto = Timestamp()
1379+
null_value = Value()
1380+
read_row_protos = []
1381+
for read_row in read_rows:
1382+
row_ts, feature_data = read_row
1383+
if row_ts is not None:
1384+
row_ts_proto.FromDatetime(row_ts)
1385+
event_timestamps = [row_ts_proto] * len(requested_features)
1386+
if feature_data is None:
1387+
statuses = [FieldStatus.NOT_FOUND] * len(requested_features)
1388+
values = [null_value] * len(requested_features)
1389+
else:
1390+
statuses = []
1391+
values = []
1392+
for feature_name in requested_features:
1393+
# Make sure order of data is the same as requested_features.
1394+
if feature_name not in feature_data:
1395+
statuses.append(FieldStatus.NOT_FOUND)
1396+
values.append(null_value)
1397+
else:
1398+
statuses.append(FieldStatus.PRESENT)
1399+
values.append(feature_data[feature_name])
1400+
read_row_protos.append((event_timestamps, statuses, values))
1401+
return read_row_protos
1402+
1403+
@staticmethod
1404+
def _populate_response_from_feature_data(
1405+
feature_data: Iterable[
1406+
Tuple[
1407+
Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value]
1408+
]
1409+
],
1410+
indexes: Iterable[Iterable[int]],
1411+
online_features_response: GetOnlineFeaturesResponse,
1412+
full_feature_names: bool,
1413+
requested_features: Iterable[str],
1414+
table: FeatureView,
1415+
):
1416+
""" Populate the GetOnlineFeaturesReponse with feature data.
1417+
1418+
This method assumes that `_read_from_online_store` returns data for each
1419+
combination of Entities in `entity_rows` in the same order as they
1420+
are provided.
1421+
1422+
Args:
1423+
feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore.
1424+
indexes: A list of indexes which should be the same length as `feature_data`. Each list
1425+
of indexes corresponds to a set of result rows in `online_features_response`.
1426+
online_features_response: The object to populate.
1427+
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
1428+
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
1429+
"customer_fv__daily_transactions").
1430+
requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the
1431+
data in `feature_data`.
1432+
table: The FeatureView that `feature_data` was retrieved from.
1433+
"""
1434+
# Add the feature names to the response.
13411435
requested_feature_refs = [
13421436
f"{table.projection.name_to_use()}__{feature_name}"
13431437
if full_feature_names
@@ -1347,28 +1441,16 @@ def _populate_result_rows_from_feature_view(
13471441
online_features_response.metadata.feature_names.val.extend(
13481442
requested_feature_refs
13491443
)
1350-
# Each row is a set of features for a given entity key
1351-
for row_idx, read_row in enumerate(read_rows):
1352-
row_ts, feature_data = read_row
1353-
result_row = online_features_response.results[row_idx]
1354-
row_ts_proto = Timestamp()
1355-
if row_ts is not None:
1356-
row_ts_proto.FromDatetime(row_ts)
1357-
result_row.event_timestamps.extend([row_ts_proto] * len(requested_features))
13581444

1359-
if feature_data is None:
1360-
result_row.statuses.extend(
1361-
[FieldStatus.NOT_FOUND] * len(requested_features)
1362-
)
1363-
result_row.values.extend([Value()] * len(requested_features))
1364-
else:
1365-
for feature_name in requested_features:
1366-
if feature_name not in feature_data:
1367-
result_row.statuses.append(FieldStatus.NOT_FOUND)
1368-
result_row.values.append(Value())
1369-
else:
1370-
result_row.statuses.append(FieldStatus.PRESENT)
1371-
result_row.values.append(feature_data[feature_name])
1445+
# Populate the result with data fetched from the OnlineStore
1446+
# which is guarenteed to be aligned with `requested_features`.
1447+
for feature_row, dest_idxs in zip(feature_data, indexes):
1448+
event_timestamps, statuses, values = feature_row
1449+
for dest_idx in dest_idxs:
1450+
result_row = online_features_response.results[dest_idx]
1451+
result_row.event_timestamps.extend(event_timestamps)
1452+
result_row.statuses.extend(statuses)
1453+
result_row.values.extend(values)
13721454

13731455
@staticmethod
13741456
def _augment_response_with_on_demand_transforms(

sdk/python/feast/infra/feature_servers/aws_lambda/__init__.py

Whitespace-only changes.

sdk/python/feast/infra/feature_servers/gcp_cloudrun/__init__.py

Whitespace-only changes.

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,13 @@ def _get_features_for_entity(
277277
res_ts = Timestamp()
278278
ts_val = res_val.pop(f"_ts:{feature_view}")
279279
if ts_val:
280-
res_ts.ParseFromString(ts_val)
280+
res_ts.ParseFromString(bytes(ts_val))
281281

282282
res = {}
283283
for feature_name, val_bin in res_val.items():
284284
val = ValueProto()
285285
if val_bin:
286-
val.ParseFromString(val_bin)
286+
val.ParseFromString(bytes(val_bin))
287287
res[feature_name] = val
288288

289289
if not res:

sdk/python/feast/repo_operations.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import re
66
import sys
77
from importlib.abc import Loader
8+
from importlib.machinery import ModuleSpec
89
from pathlib import Path
910
from typing import List, Set, Union
1011

@@ -82,7 +83,11 @@ def get_repo_files(repo_root: Path) -> List[Path]:
8283
ignore_files = get_ignore_files(repo_root, ignore_paths)
8384

8485
# List all Python files in the root directory (recursively)
85-
repo_files = {p.resolve() for p in repo_root.glob("**/*.py") if p.is_file()}
86+
repo_files = {
87+
p.resolve()
88+
for p in repo_root.glob("**/*.py")
89+
if p.is_file() and "__init__.py" != p.name
90+
}
8691
# Ignore all files that match any of the ignore paths in .feastignore
8792
repo_files -= ignore_files
8893

@@ -352,6 +357,7 @@ def init_repo(repo_name: str, template: str):
352357
import importlib.util
353358

354359
spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path))
360+
assert isinstance(spec, ModuleSpec)
355361
bootstrap = importlib.util.module_from_spec(spec)
356362
assert isinstance(spec.loader, Loader)
357363
spec.loader.exec_module(bootstrap)

sdk/python/feast/templates/aws/__init__.py

Whitespace-only changes.

sdk/python/feast/templates/gcp/__init__.py

Whitespace-only changes.

sdk/python/feast/templates/local/__init__.py

Whitespace-only changes.

sdk/python/feast/type_map.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import numpy as np
2020
import pandas as pd
2121
import pyarrow
22-
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
2322
from google.protobuf.timestamp_pb2 import Timestamp
2423

2524
from feast.protos.feast.types.Value_pb2 import (
@@ -32,7 +31,7 @@
3231
StringList,
3332
)
3433
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
35-
from feast.value_type import ValueType
34+
from feast.value_type import ListType, ValueType
3635

3736

3837
def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
@@ -195,7 +194,7 @@ def _type_err(item, dtype):
195194

196195

197196
PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[
198-
ValueType, Tuple[GeneratedProtocolMessageType, str, List[Type]]
197+
ValueType, Tuple[ListType, str, List[Type]]
199198
] = {
200199
ValueType.FLOAT_LIST: (
201200
FloatList,
@@ -273,7 +272,7 @@ def _python_value_to_proto_value(
273272
raise _type_err(first_invalid, valid_types[0])
274273

275274
return [
276-
ProtoValue(**{field_name: proto_type(val=value)})
275+
ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore
277276
if value is not None
278277
else ProtoValue()
279278
for value in values

sdk/python/feast/value_type.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import enum
15+
from typing import Type, Union
16+
17+
from feast.protos.feast.types.Value_pb2 import (
18+
BoolList,
19+
BytesList,
20+
DoubleList,
21+
FloatList,
22+
Int32List,
23+
Int64List,
24+
StringList,
25+
)
1526

1627

1728
class ValueType(enum.Enum):
@@ -37,3 +48,14 @@ class ValueType(enum.Enum):
3748
BOOL_LIST = 17
3849
UNIX_TIMESTAMP_LIST = 18
3950
NULL = 19
51+
52+
53+
ListType = Union[
54+
Type[BoolList],
55+
Type[BytesList],
56+
Type[DoubleList],
57+
Type[FloatList],
58+
Type[Int32List],
59+
Type[Int64List],
60+
Type[StringList],
61+
]

0 commit comments

Comments
 (0)