Skip to content

Commit efeea6c

Browse files
authored
Adding telemetry for on demand feature views and making existing usage calls async (feast-dev#1873)
* Change telemetry to be async. Add telemetry calls for applys that have on demand feature views Signed-off-by: Danny Chiao <danny@tecton.ai> * Add telemetry for get_historical_features with odfv Signed-off-by: Danny Chiao <danny@tecton.ai> * Add telemetry for get_online_features. Refactor to have event enum Signed-off-by: Danny Chiao <danny@tecton.ai> * Refactor method out Signed-off-by: Danny Chiao <danny@tecton.ai> * Clean up async Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix async submission Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent d0a2443 commit efeea6c

2 files changed

Lines changed: 137 additions & 55 deletions

File tree

sdk/python/feast/feature_store.py

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
from feast.registry import Registry
5757
from feast.repo_config import RepoConfig, load_repo_config
5858
from feast.type_map import python_value_to_proto_value
59-
from feast.usage import log_exceptions, log_exceptions_and_usage
59+
from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage
6060
from feast.value_type import ValueType
6161
from feast.version import get_version
6262

@@ -387,6 +387,9 @@ def apply(
387387
):
388388
raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME)
389389

390+
if len(odfvs_to_update) > 0:
391+
log_event(UsageEvent.APPLY_WITH_ODFV)
392+
390393
_validate_feature_views(views_to_update)
391394
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
392395
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
@@ -552,6 +555,8 @@ def get_historical_features(
552555
)
553556
feature_views = list(view for view, _ in fvs)
554557
on_demand_feature_views = list(view for view, _ in odfvs)
558+
if len(on_demand_feature_views) > 0:
559+
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV)
555560

556561
# Check that the right request data is present in the entity_df
557562
if type(entity_df) == pd.DataFrame:
@@ -810,6 +815,9 @@ def get_online_features(
810815
grouped_refs, grouped_odfv_refs = _group_feature_refs(
811816
_feature_refs, all_feature_views, all_on_demand_feature_views
812817
)
818+
if len(grouped_odfv_refs) > 0:
819+
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV)
820+
813821
feature_views = list(view for view, _ in grouped_refs)
814822
entityless_case = DUMMY_ENTITY_NAME in [
815823
entity_name
@@ -877,46 +885,16 @@ def get_online_features(
877885
feature_name
878886
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
879887

880-
# Note: each "table" is a feature view
881888
for table, requested_features in grouped_refs:
882-
entity_keys = _get_table_entity_keys(
883-
table, union_of_entity_keys, entity_name_to_join_key_map
884-
)
885-
read_rows = provider.online_read(
886-
config=self.config,
887-
table=table,
888-
entity_keys=entity_keys,
889-
requested_features=requested_features,
889+
self._populate_result_rows_from_feature_view(
890+
entity_name_to_join_key_map,
891+
full_feature_names,
892+
provider,
893+
requested_features,
894+
result_rows,
895+
table,
896+
union_of_entity_keys,
890897
)
891-
# Each row is a set of features for a given entity key
892-
for row_idx, read_row in enumerate(read_rows):
893-
row_ts, feature_data = read_row
894-
result_row = result_rows[row_idx]
895-
896-
if feature_data is None:
897-
for feature_name in requested_features:
898-
feature_ref = (
899-
f"{table.name}__{feature_name}"
900-
if full_feature_names
901-
else feature_name
902-
)
903-
result_row.statuses[
904-
feature_ref
905-
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
906-
else:
907-
for feature_name in feature_data:
908-
feature_ref = (
909-
f"{table.name}__{feature_name}"
910-
if full_feature_names
911-
else feature_name
912-
)
913-
if feature_name in requested_features:
914-
result_row.fields[feature_ref].CopyFrom(
915-
feature_data[feature_name]
916-
)
917-
result_row.statuses[
918-
feature_ref
919-
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
920898

921899
initial_response = OnlineResponse(
922900
GetOnlineFeaturesResponse(field_values=result_rows)
@@ -925,6 +903,55 @@ def get_online_features(
925903
_feature_refs, full_feature_names, initial_response, result_rows
926904
)
927905

906+
def _populate_result_rows_from_feature_view(
907+
self,
908+
entity_name_to_join_key_map: Dict[str, str],
909+
full_feature_names: bool,
910+
provider: Provider,
911+
requested_features: List[str],
912+
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
913+
table: FeatureView,
914+
union_of_entity_keys: List[EntityKeyProto],
915+
):
916+
entity_keys = _get_table_entity_keys(
917+
table, union_of_entity_keys, entity_name_to_join_key_map
918+
)
919+
read_rows = provider.online_read(
920+
config=self.config,
921+
table=table,
922+
entity_keys=entity_keys,
923+
requested_features=requested_features,
924+
)
925+
# Each row is a set of features for a given entity key
926+
for row_idx, read_row in enumerate(read_rows):
927+
row_ts, feature_data = read_row
928+
result_row = result_rows[row_idx]
929+
930+
if feature_data is None:
931+
for feature_name in requested_features:
932+
feature_ref = (
933+
f"{table.name}__{feature_name}"
934+
if full_feature_names
935+
else feature_name
936+
)
937+
result_row.statuses[
938+
feature_ref
939+
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
940+
else:
941+
for feature_name in feature_data:
942+
feature_ref = (
943+
f"{table.name}__{feature_name}"
944+
if full_feature_names
945+
else feature_name
946+
)
947+
if feature_name in requested_features:
948+
result_row.fields[feature_ref].CopyFrom(
949+
feature_data[feature_name]
950+
)
951+
result_row.statuses[
952+
feature_ref
953+
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
954+
928955
def _get_needed_request_data_features(self, grouped_odfv_refs) -> Set[str]:
929956
needed_request_data_features = set()
930957
for odfv_to_feature_names in grouped_odfv_refs:

sdk/python/feast/usage.py

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import concurrent.futures
15+
import enum
1416
import logging
1517
import os
1618
import sys
@@ -28,6 +30,23 @@
2830
USAGE_ENDPOINT = "https://usage.feast.dev"
2931
_logger = logging.getLogger(__name__)
3032

33+
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
34+
35+
36+
@enum.unique
37+
class UsageEvent(enum.Enum):
38+
"""
39+
An event meant to be logged
40+
"""
41+
42+
UNKNOWN = 0
43+
APPLY_WITH_ODFV = 1
44+
GET_HISTORICAL_FEATURES_WITH_ODFV = 2
45+
GET_ONLINE_FEATURES_WITH_ODFV = 3
46+
47+
def __str__(self):
48+
return self.name.lower()
49+
3150

3251
class Usage:
3352
def __init__(self):
@@ -50,7 +69,7 @@ def check_env_and_configure(self):
5069
usage_filepath = join(feast_home_dir, "usage")
5170

5271
self._is_test = os.getenv("FEAST_IS_USAGE_TEST", "False") == "True"
53-
self._usage_counter = {"get_online_features": 0}
72+
self._usage_counter = {}
5473

5574
if os.path.exists(usage_filepath):
5675
with open(usage_filepath, "r") as f:
@@ -73,14 +92,25 @@ def usage_id(self) -> Optional[str]:
7392
return os.getenv("FEAST_FORCE_USAGE_UUID")
7493
return self._usage_id
7594

76-
def log(self, function_name: str):
95+
def _send_usage_request(self, json):
96+
try:
97+
future = executor.submit(requests.post, USAGE_ENDPOINT, json=json)
98+
if self._is_test:
99+
concurrent.futures.wait([future])
100+
except Exception as e:
101+
if self._is_test:
102+
raise e
103+
else:
104+
pass
105+
106+
def log_function(self, function_name: str):
77107
self.check_env_and_configure()
78108
if self._usage_enabled and self.usage_id:
79-
if function_name == "get_online_features":
80-
self._usage_counter["get_online_features"] += 1
81-
if self._usage_counter["get_online_features"] % 10000 != 2:
82-
return
83-
self._usage_counter["get_online_features"] = 2 # avoid overflow
109+
if (
110+
function_name == "get_online_features"
111+
and not self.should_log_for_get_online_features_event(function_name)
112+
):
113+
return
84114
json = {
85115
"function_name": function_name,
86116
"usage_id": self.usage_id,
@@ -89,14 +119,35 @@ def log(self, function_name: str):
89119
"os": sys.platform,
90120
"is_test": self._is_test,
91121
}
92-
try:
93-
requests.post(USAGE_ENDPOINT, json=json)
94-
except Exception as e:
95-
if self._is_test:
96-
raise e
97-
else:
98-
pass
99-
return
122+
self._send_usage_request(json)
123+
124+
def should_log_for_get_online_features_event(self, event_name: str):
125+
if event_name not in self._usage_counter:
126+
self._usage_counter[event_name] = 0
127+
self._usage_counter[event_name] += 1
128+
if self._usage_counter[event_name] % 10000 != 2:
129+
return False
130+
self._usage_counter[event_name] = 2 # avoid overflow
131+
return True
132+
133+
def log_event(self, event: UsageEvent):
134+
self.check_env_and_configure()
135+
if self._usage_enabled and self.usage_id:
136+
event_name = str(event)
137+
if (
138+
event == UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV
139+
and not self.should_log_for_get_online_features_event(event_name)
140+
):
141+
return
142+
json = {
143+
"event_name": event_name,
144+
"usage_id": self.usage_id,
145+
"timestamp": datetime.utcnow().isoformat(),
146+
"version": get_version(),
147+
"os": sys.platform,
148+
"is_test": self._is_test,
149+
}
150+
self._send_usage_request(json)
100151

101152
def log_exception(self, error_type: str, traceback: List[Tuple[str, int, str]]):
102153
self.check_env_and_configure()
@@ -149,7 +200,7 @@ def log_exceptions_and_usage(func):
149200
def exception_logging_wrapper(*args, **kwargs):
150201
try:
151202
result = func(*args, **kwargs)
152-
usage.log(func.__name__)
203+
usage.log_function(func.__name__)
153204
except Exception as e:
154205
error_type = type(e).__name__
155206
trace_to_log = []
@@ -170,6 +221,10 @@ def exception_logging_wrapper(*args, **kwargs):
170221
return exception_logging_wrapper
171222

172223

224+
def log_event(event: UsageEvent):
225+
usage.log_event(event)
226+
227+
173228
def _trim_filename(filename: str) -> str:
174229
return filename.split("/")[-1]
175230

0 commit comments

Comments
 (0)