Skip to content

Commit f371c2d

Browse files
jklegarwoop
andauthored
Port telemetry to FeatureStore API (feast-dev#1446)
* Port telemetry to FeatureStore API Signed-off-by: Jacob Klegar <jacob@tecton.ai> * formatting Signed-off-by: Jacob Klegar <jacob@tecton.ai> * lint Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Add telemetry to all FeatureStore methods Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Move telemetry into module Signed-off-by: Willem Pienaar <git@willem.co> * Remove version code Signed-off-by: Willem Pienaar <git@willem.co> * Fix broken telemetry tests Signed-off-by: Willem Pienaar <git@willem.co> * Remove debug code Signed-off-by: Willem Pienaar <git@willem.co> Co-authored-by: Willem Pienaar <git@willem.co>
1 parent 41a7a4a commit f371c2d

File tree

6 files changed

+219
-113
lines changed

6 files changed

+219
-113
lines changed

sdk/python/feast/client.py

Lines changed: 7 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,8 @@
1313
# limitations under the License.
1414
import logging
1515
import multiprocessing
16-
import os
1716
import shutil
18-
import uuid
1917
import warnings
20-
from datetime import datetime
21-
from os.path import expanduser, join
2218
from typing import Any, Dict, List, Optional, Union
2319

2420
import grpc
@@ -73,7 +69,7 @@
7369
)
7470
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
7571
from feast.registry import Registry
76-
from feast.telemetry import log_usage
72+
from feast.telemetry import Telemetry
7773

7874
_logger = logging.getLogger(__name__)
7975

@@ -121,7 +117,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs):
121117
if self._config.getboolean(opt.ENABLE_AUTH):
122118
self._auth_metadata = feast_auth.get_auth_metadata_plugin(self._config)
123119

124-
self._configure_telemetry()
120+
self._tele = Telemetry()
125121

126122
@property
127123
def config(self) -> Config:
@@ -351,27 +347,6 @@ def version(self, sdk_only=False):
351347

352348
return result
353349

354-
def _configure_telemetry(self):
355-
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")
356-
self._telemetry_enabled = (
357-
self._config.get(opt.TELEMETRY, "True") == "True"
358-
) # written this way to turn the env var string into a boolean
359-
if self._telemetry_enabled:
360-
self._telemetry_counter = {"get_online_features": 0}
361-
if os.path.exists(telemetry_filepath):
362-
with open(telemetry_filepath, "r") as f:
363-
self._telemetry_id = f.read()
364-
else:
365-
self._telemetry_id = str(uuid.uuid4())
366-
print(
367-
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/master/advanced/telemetry"
368-
)
369-
with open(telemetry_filepath, "w") as f:
370-
f.write(self._telemetry_id)
371-
else:
372-
if os.path.exists(telemetry_filepath):
373-
os.remove(telemetry_filepath)
374-
375350
@property
376351
def project(self) -> str:
377352
"""
@@ -492,13 +467,7 @@ def apply(
492467
>>> feast_client.apply(entity)
493468
"""
494469

495-
if self._telemetry_enabled:
496-
log_usage(
497-
"apply",
498-
self._telemetry_id,
499-
datetime.utcnow(),
500-
self.version(sdk_only=True),
501-
)
470+
self._tele.log("apply")
502471
if project is None:
503472
project = self.project
504473

@@ -612,13 +581,7 @@ def get_entity(self, name: str, project: str = None) -> Entity:
612581
none is found
613582
"""
614583

615-
if self._telemetry_enabled:
616-
log_usage(
617-
"get_entity",
618-
self._telemetry_id,
619-
datetime.utcnow(),
620-
self.version(sdk_only=True),
621-
)
584+
self._tele.log("get_entity")
622585

623586
if project is None:
624587
project = self.project
@@ -743,13 +706,7 @@ def get_feature_table(self, name: str, project: str = None) -> FeatureTable:
743706
none is found
744707
"""
745708

746-
if self._telemetry_enabled:
747-
log_usage(
748-
"get_feature_table",
749-
self._telemetry_id,
750-
datetime.utcnow(),
751-
self.version(sdk_only=True),
752-
)
709+
self._tele.log("get_feature_table")
753710

754711
if project is None:
755712
project = self.project
@@ -890,13 +847,7 @@ def ingest(
890847
>>> client.ingest(driver_ft, ft_df)
891848
"""
892849

893-
if self._telemetry_enabled:
894-
log_usage(
895-
"ingest",
896-
self._telemetry_id,
897-
datetime.utcnow(),
898-
self.version(sdk_only=True),
899-
)
850+
self._tele.log("ingest")
900851
if project is None:
901852
project = self.project
902853
if isinstance(feature_table, str):
@@ -1021,15 +972,7 @@ def get_online_features(
1021972
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
1022973
"""
1023974

1024-
if self._telemetry_enabled:
1025-
if self._telemetry_counter["get_online_features"] % 1000 == 0:
1026-
log_usage(
1027-
"get_online_features",
1028-
self._telemetry_id,
1029-
datetime.utcnow(),
1030-
self.version(sdk_only=True),
1031-
)
1032-
self._telemetry_counter["get_online_features"] += 1
975+
self._tele.log("get_online_features")
1033976
try:
1034977
response = self._serving_service.GetOnlineFeaturesV2(
1035978
GetOnlineFeaturesRequestV2(

sdk/python/feast/feature_store.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
RepoConfig,
3838
load_repo_config,
3939
)
40+
from feast.telemetry import Telemetry
4041
from feast.type_map import python_value_to_proto_value
42+
from feast.version import get_version
4143

4244

4345
class FeatureStore:
@@ -74,6 +76,12 @@ def __init__(
7476
registry_path=registry_config.path,
7577
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
7678
)
79+
self._tele = Telemetry()
80+
81+
def version(self) -> str:
82+
"""Returns the version of the current Feast SDK/CLI"""
83+
84+
return get_version()
7785

7886
@property
7987
def project(self) -> str:
@@ -96,6 +104,7 @@ def refresh_registry(self):
96104
greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
97105
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
98106
"""
107+
self._tele.log("refresh_registry")
99108

100109
registry_config = self.config.get_registry_config()
101110
self._registry = Registry(
@@ -111,6 +120,8 @@ def list_entities(self) -> List[Entity]:
111120
Returns:
112121
List of entities
113122
"""
123+
self._tele.log("list_entities")
124+
114125
return self._registry.list_entities(self.project)
115126

116127
def list_feature_views(self) -> List[FeatureView]:
@@ -120,6 +131,8 @@ def list_feature_views(self) -> List[FeatureView]:
120131
Returns:
121132
List of feature views
122133
"""
134+
self._tele.log("list_feature_views")
135+
123136
return self._registry.list_feature_views(self.project)
124137

125138
def get_entity(self, name: str) -> Entity:
@@ -133,6 +146,8 @@ def get_entity(self, name: str) -> Entity:
133146
Returns either the specified entity, or raises an exception if
134147
none is found
135148
"""
149+
self._tele.log("get_entity")
150+
136151
return self._registry.get_entity(name, self.project)
137152

138153
def get_feature_view(self, name: str) -> FeatureView:
@@ -146,6 +161,8 @@ def get_feature_view(self, name: str) -> FeatureView:
146161
Returns either the specified feature view, or raises an exception if
147162
none is found
148163
"""
164+
self._tele.log("get_feature_view")
165+
149166
return self._registry.get_feature_view(name, self.project)
150167

151168
def delete_feature_view(self, name: str):
@@ -155,6 +172,8 @@ def delete_feature_view(self, name: str):
155172
Args:
156173
name: Name of feature view
157174
"""
175+
self._tele.log("delete_feature_view")
176+
158177
return self._registry.delete_feature_view(name, self.project)
159178

160179
def apply(self, objects: List[Union[FeatureView, Entity]]):
@@ -186,6 +205,8 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
186205
>>> fs.apply([customer_entity, customer_feature_view])
187206
"""
188207

208+
self._tele.log("apply")
209+
189210
# TODO: Add locking
190211
# TODO: Optimize by only making a single call (read/write)
191212

@@ -246,6 +267,7 @@ def get_historical_features(
246267
>>> feature_data = job.to_df()
247268
>>> model.fit(feature_data) # insert your modeling framework here.
248269
"""
270+
self._tele.log("get_historical_features")
249271

250272
all_feature_views = self._registry.list_feature_views(
251273
project=self.config.project
@@ -282,6 +304,8 @@ def materialize_incremental(
282304
>>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj"))
283305
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
284306
"""
307+
self._tele.log("materialize_incremental")
308+
285309
feature_views_to_materialize = []
286310
if feature_views is None:
287311
feature_views_to_materialize = self._registry.list_feature_views(
@@ -335,6 +359,8 @@ def materialize(
335359
>>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
336360
>>> )
337361
"""
362+
self._tele.log("materialize")
363+
338364
feature_views_to_materialize = []
339365
if feature_views is None:
340366
feature_views_to_materialize = self._registry.list_feature_views(
@@ -424,6 +450,7 @@ def get_online_features(
424450
>>> print(online_response_dict)
425451
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
426452
"""
453+
self._tele.log("get_online_features")
427454

428455
response = self._get_online_features(
429456
feature_refs=feature_refs,

sdk/python/feast/telemetry.py

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,69 @@
1414

1515
import os
1616
import sys
17+
import uuid
1718
from datetime import datetime
18-
from typing import Dict
19+
from os.path import expanduser, join
1920

2021
import requests
2122

23+
from feast.version import get_version
24+
2225
TELEMETRY_ENDPOINT = (
2326
"https://us-central1-kf-feast.cloudfunctions.net/bq_telemetry_logger"
2427
)
2528

2629

27-
def log_usage(
28-
function_name: str,
29-
telemetry_id: str,
30-
timestamp: datetime,
31-
version: Dict[str, Dict[str, str]],
32-
):
33-
json = {
34-
"function_name": function_name,
35-
"telemetry_id": telemetry_id,
36-
"timestamp": timestamp.isoformat(),
37-
"version": version,
38-
"os": sys.platform,
39-
"is_test": os.getenv("FEAST_IS_TELEMETRY_TEST", "False"),
40-
}
41-
try:
42-
requests.post(TELEMETRY_ENDPOINT, json=json)
43-
except Exception:
44-
pass
45-
return
30+
class Telemetry:
31+
def __init__(self):
32+
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")
33+
self._telemetry_enabled = (
34+
os.getenv("FEAST_TELEMETRY", default="True") == "True"
35+
) # written this way to turn the env var string into a boolean
36+
37+
self._is_test = os.getenv("FEAST_IS_TELEMETRY_TEST", "False") == "True"
38+
39+
if self._telemetry_enabled:
40+
self._telemetry_counter = {"get_online_features": 0}
41+
if os.path.exists(telemetry_filepath):
42+
with open(telemetry_filepath, "r") as f:
43+
self._telemetry_id = f.read()
44+
else:
45+
self._telemetry_id = str(uuid.uuid4())
46+
with open(telemetry_filepath, "w") as f:
47+
f.write(self._telemetry_id)
48+
print(
49+
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn"
50+
" more see https://docs.feast.dev/v/master/feast-on-kubernetes/advanced-1/telemetry"
51+
)
52+
53+
@property
54+
def telemetry_id(self):
55+
if os.environ["FEAST_FORCE_TELEMETRY_UUID"]:
56+
return os.environ["FEAST_FORCE_TELEMETRY_UUID"]
57+
return self._telemetry_id
58+
59+
def log(self, function_name: str):
60+
61+
if self._telemetry_enabled and self.telemetry_id:
62+
if function_name == "get_online_features":
63+
if self._telemetry_counter["get_online_features"] % 10000 != 0:
64+
self._telemetry_counter["get_online_features"] += 1
65+
return
66+
67+
json = {
68+
"function_name": function_name,
69+
"telemetry_id": self.telemetry_id,
70+
"timestamp": datetime.utcnow().isoformat(),
71+
"version": get_version(),
72+
"os": sys.platform,
73+
"is_test": self._is_test,
74+
}
75+
try:
76+
requests.post(TELEMETRY_ENDPOINT, json=json)
77+
except Exception as e:
78+
if self._is_test:
79+
raise e
80+
else:
81+
pass
82+
return

sdk/python/feast/version.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pkg_resources
2+
3+
4+
def get_version():
5+
"""
6+
Returns version information of the Feast Python Package
7+
"""
8+
9+
try:
10+
sdk_version = pkg_resources.get_distribution("feast").version
11+
except pkg_resources.DistributionNotFound:
12+
sdk_version = "unknown"
13+
return sdk_version

sdk/python/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
"pytest-mock==1.10.4",
8484
"Sphinx",
8585
"sphinx-rtd-theme",
86+
"tenacity",
8687
"adlfs==0.5.9",
8788
"firebase-admin==4.5.2",
8889
"google-cloud-datastore==2.1.0",

0 commit comments

Comments
 (0)