Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
allowing using entity's join_key in get_online_features
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Mar 18, 2022
commit c455a25222379d08c29fe0541d46258f3fde38ff
34 changes: 25 additions & 9 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,9 +1266,11 @@ def _get_online_features(
features=features, allow_cache=True, hide_dummy_entity=False
)

entity_name_to_join_key_map, entity_type_map = self._get_entity_maps(
requested_feature_views
)
(
entity_name_to_join_key_map,
entity_type_map,
join_keys_set,
) = self._get_entity_maps(requested_feature_views)

# Extract Sequence from RepeatedValue Protobuf.
entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = {
Expand Down Expand Up @@ -1334,10 +1336,18 @@ def _get_online_features(
requested_result_row_names.add(entity_name)
request_data_features[entity_name] = values
else:
try:
join_key = entity_name_to_join_key_map[entity_name]
except KeyError:
raise EntityNotFoundException(entity_name, self.project)
if entity_name in join_keys_set:
join_key = entity_name
else:
try:
join_key = entity_name_to_join_key_map[entity_name]
except KeyError:
raise EntityNotFoundException(entity_name, self.project)
else:
warnings.warn(
"Using entity name is deprecated. Use join_key instead."
)

# All join keys should be returned in the result.
requested_result_row_names.add(join_key)
join_key_values[join_key] = values
Expand Down Expand Up @@ -1422,7 +1432,9 @@ def _get_columnar_entity_values(
return res
return cast(Dict[str, List[Any]], columnar)

def _get_entity_maps(self, feature_views):
def _get_entity_maps(
self, feature_views
) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]:
entities = self._list_entities(allow_cache=True, hide_dummy_entity=False)
entity_name_to_join_key_map: Dict[str, str] = {}
entity_type_map: Dict[str, ValueType] = {}
Expand All @@ -1444,7 +1456,11 @@ def _get_entity_maps(self, feature_views):
)
entity_name_to_join_key_map[entity_name] = join_key
entity_type_map[join_key] = entity.value_type
return entity_name_to_join_key_map, entity_type_map
return (
entity_name_to_join_key_map,
entity_type_map,
set(entity_name_to_join_key_map.values()),
)

@staticmethod
def _get_table_entity_values(
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

driver = Entity(
name="driver", # The name is derived from this argument, not object name.
join_key="driver_id",
value_type=ValueType.INT64,
description="driver id",
)

customer = Entity(
name="customer", # The name is derived from this argument, not object name.
join_key="customer_id",
value_type=ValueType.STRING,
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def check_offline_and_online_features(
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
[{"driver_id": driver_id}],
full_feature_names=full_feature_names,
).to_dict()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_online() -> None:
provider = store._get_provider()

driver_key = EntityKeyProto(
join_keys=["driver"], entity_values=[ValueProto(int64_val=1)]
join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1)]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -54,7 +54,7 @@ def test_online() -> None:
)

customer_key = EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val="5")]
join_keys=["customer_id"], entity_values=[ValueProto(string_val="5")]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -75,7 +75,7 @@ def test_online() -> None:
)

customer_key = EntityKeyProto(
join_keys=["customer", "driver"],
join_keys=["customer_id", "driver_id"],
entity_values=[ValueProto(string_val="5"), ValueProto(int64_val=1)],
)
provider.online_write_batch(
Expand All @@ -100,15 +100,18 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": "5"}, {"driver": 1, "customer": 5}],
entity_rows=[
{"driver_id": 1, "customer_id": "5"},
{"driver_id": 1, "customer_id": 5},
],
full_feature_names=False,
).to_dict()

assert "lon" in result
assert "avg_orders_day" in result
assert "name" in result
assert result["driver"] == [1, 1]
assert result["customer"] == ["5", "5"]
assert result["driver_id"] == [1, 1]
assert result["customer_id"] == ["5", "5"]
assert result["lon"] == ["1.0", "1.0"]
assert result["avg_orders_day"] == [1.0, 1.0]
assert result["name"] == ["John", "John"]
Expand All @@ -117,7 +120,7 @@ def test_online() -> None:
# Ensure features are still in result when keys not found
result = store.get_online_features(
features=["customer_driver_combined:trips"],
entity_rows=[{"driver": 0, "customer": 0}],
entity_rows=[{"driver_id": 0, "customer_id": 0}],
full_feature_names=False,
).to_dict()

Expand All @@ -127,7 +130,7 @@ def test_online() -> None:
with pytest.raises(FeatureViewNotFoundException):
store.get_online_features(
features=["driver_locations_bad:lon"],
entity_rows=[{"driver": 1}],
entity_rows=[{"driver_id": 1}],
full_feature_names=False,
)

Expand All @@ -152,7 +155,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -173,7 +176,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()

Expand All @@ -188,7 +191,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -214,7 +217,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -234,7 +237,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand Down Expand Up @@ -284,7 +287,7 @@ def test_online_to_df():
3 3.0 0.3
"""
driver_key = EntityKeyProto(
join_keys=["driver"], entity_values=[ValueProto(int64_val=d)]
join_keys=["driver_id"], entity_values=[ValueProto(int64_val=d)]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -311,7 +314,7 @@ def test_online_to_df():
6 6.0 foo6 60
"""
customer_key = EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val=str(c))]
join_keys=["customer_id"], entity_values=[ValueProto(string_val=str(c))]
)
provider.online_write_batch(
config=store.config,
Expand Down Expand Up @@ -340,7 +343,7 @@ def test_online_to_df():
6 3 18
"""
combo_keys = EntityKeyProto(
join_keys=["customer", "driver"],
join_keys=["customer_id", "driver_id"],
entity_values=[ValueProto(string_val=str(c)), ValueProto(int64_val=d)],
)
provider.online_write_batch(
Expand Down Expand Up @@ -369,7 +372,7 @@ def test_online_to_df():
],
# Reverse the row order
entity_rows=[
{"driver": d, "customer": c}
{"driver_id": d, "customer_id": c}
for (d, c) in zip(reversed(driver_ids), reversed(customer_ids))
],
).to_df()
Expand All @@ -381,8 +384,8 @@ def test_online_to_df():
1 4 1.0 0.1 4.0 foo4 40 4
"""
df_dict = {
"driver": driver_ids,
"customer": [str(c) for c in customer_ids],
"driver_id": driver_ids,
"customer_id": [str(c) for c in customer_ids],
"lon": [str(d * lon_multiply) for d in driver_ids],
"lat": [d * lat_multiply for d in driver_ids],
"avg_orders_day": [c * avg_order_day_multiply for c in customer_ids],
Expand All @@ -392,8 +395,8 @@ def test_online_to_df():
}
# Requested column order
ordered_column = [
"driver",
"customer",
"driver_id",
"customer_id",
"lon",
"lat",
"avg_orders_day",
Expand Down
Loading