Skip to content

Commit 963d3ac

Browse files
authored
Extend context for usage statistics collection & add latencies for performance analysis (feast-dev#1983)
* Extend context for usage statistics collection & add latencies for performance analysis Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * uncomment test marks Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix UTs Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * make lint happy Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * remove prow hook for usage tests Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * adding decorators through code Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * consistent naming Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * deterministic tests Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * linting Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix conflicts Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix redis span Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * minimize overhead of telemetry Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * all datetimes in utc Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * format Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix tests Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * consistent naming Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * consistent naming Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * add installation timestamp and environment signature to usage context Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * instantiate correct provider class Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * add timeout to exporter Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * generalize provider selection Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 4d0b50f commit 963d3ac

20 files changed

Lines changed: 815 additions & 424 deletions

File tree

.prow.yaml

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,26 +64,6 @@ presubmits:
6464
branches:
6565
- ^v0\.(3|4)-branch$
6666

67-
- name: test-usage
68-
decorate: true
69-
run_if_changed: "sdk/python/.*"
70-
spec:
71-
containers:
72-
- image: python:3.7
73-
command: ["infra/scripts/test-usage.sh"]
74-
env:
75-
- name: GOOGLE_APPLICATION_CREDENTIALS
76-
value: /etc/gcloud/service-account.json
77-
volumeMounts:
78-
- mountPath: /etc/gcloud/service-account.json
79-
name: service-account
80-
readOnly: true
81-
subPath: service-account.json
82-
volumes:
83-
- name: service-account
84-
secret:
85-
secretName: feast-service-account
86-
8767
- name: test-golang-sdk
8868
decorate: true
8969
spec:

sdk/python/feast/feature_store.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
from feast.repo_config import RepoConfig, load_repo_config
6060
from feast.request_feature_view import RequestFeatureView
6161
from feast.type_map import python_value_to_proto_value
62-
from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage
62+
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
6363
from feast.value_type import ValueType
6464
from feast.version import get_version
6565

@@ -464,8 +464,7 @@ def apply(
464464
):
465465
raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME)
466466

467-
if len(odfvs_to_update) > 0:
468-
log_event(UsageEvent.APPLY_WITH_ODFV)
467+
set_usage_attribute("odfv", bool(odfvs_to_update))
469468

470469
_validate_feature_views(
471470
[*views_to_update, *odfvs_to_update, *request_views_to_update]
@@ -678,10 +677,9 @@ def get_historical_features(
678677
feature_views = list(view for view, _ in fvs)
679678
on_demand_feature_views = list(view for view, _ in odfvs)
680679
request_feature_views = list(view for view, _ in request_fvs)
681-
if len(on_demand_feature_views) > 0:
682-
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV)
683-
if len(request_feature_views) > 0:
684-
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_REQUEST_FV)
680+
681+
set_usage_attribute("odfv", bool(on_demand_feature_views))
682+
set_usage_attribute("request_fv", bool(request_feature_views))
685683

686684
# Check that the right request data is present in the entity_df
687685
if type(entity_df) == pd.DataFrame:
@@ -973,10 +971,8 @@ def get_online_features(
973971
all_request_feature_views,
974972
all_on_demand_feature_views,
975973
)
976-
if len(grouped_odfv_refs) > 0:
977-
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV)
978-
if len(grouped_request_fv_refs) > 0:
979-
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_REQUEST_FV)
974+
set_usage_attribute("odfv", bool(grouped_odfv_refs))
975+
set_usage_attribute("request_fv", bool(grouped_request_fv_refs))
980976

981977
feature_views = list(view for view, _ in grouped_refs)
982978
entityless_case = DUMMY_ENTITY_NAME in [

sdk/python/feast/infra/aws.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from feast.registry import get_registry_store_class_from_scheme
3838
from feast.registry_store import RegistryStore
3939
from feast.repo_config import RegistryConfig
40+
from feast.usage import log_exceptions_and_usage
4041
from feast.version import get_version
4142

4243
try:
@@ -50,6 +51,7 @@
5051

5152

5253
class AwsProvider(PassthroughProvider):
54+
@log_exceptions_and_usage(provider="AwsProvider")
5355
def update_infra(
5456
self,
5557
project: str,
@@ -188,6 +190,7 @@ def _deploy_feature_server(self, project: str, image_uri: str):
188190
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
189191
)
190192

193+
@log_exceptions_and_usage(provider="AwsProvider")
191194
def teardown_infra(
192195
self,
193196
project: str,
@@ -216,6 +219,7 @@ def teardown_infra(
216219
_logger.info(" Tearing down AWS API Gateway...")
217220
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])
218221

222+
@log_exceptions_and_usage(provider="AwsProvider")
219223
def get_feature_server_endpoint(self) -> Optional[str]:
220224
project = self.repo_config.project
221225
resource_name = _get_lambda_name(project)
@@ -334,6 +338,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
334338
"s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL")
335339
)
336340

341+
@log_exceptions_and_usage(registry="s3")
337342
def get_registry_proto(self):
338343
file_obj = TemporaryFile()
339344
registry_proto = RegistryProto()
@@ -366,6 +371,7 @@ def get_registry_proto(self):
366371
f"Error while trying to locate Registry at path {self._uri.geturl()}"
367372
) from e
368373

374+
@log_exceptions_and_usage(registry="s3")
369375
def update_registry_proto(self, registry_proto: RegistryProto):
370376
self._write_registry(registry_proto)
371377

sdk/python/feast/infra/gcp.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
99
from feast.registry_store import RegistryStore
1010
from feast.repo_config import RegistryConfig
11+
from feast.usage import log_exceptions_and_usage
1112

1213

1314
class GcpProvider(PassthroughProvider):
@@ -33,6 +34,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
3334
self._bucket = self._uri.hostname
3435
self._blob = self._uri.path.lstrip("/")
3536

37+
@log_exceptions_and_usage(registry="gs")
3638
def get_registry_proto(self):
3739
import google.cloud.storage as storage
3840
from google.cloud.exceptions import NotFound
@@ -56,6 +58,7 @@ def get_registry_proto(self):
5658
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
5759
)
5860

61+
@log_exceptions_and_usage(registry="gs")
5962
def update_registry_proto(self, registry_proto: RegistryProto):
6063
self._write_registry(registry_proto)
6164

sdk/python/feast/infra/local.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
1212
from feast.registry_store import RegistryStore
1313
from feast.repo_config import RegistryConfig
14+
from feast.usage import log_exceptions_and_usage
1415

1516

1617
class LocalProvider(PassthroughProvider):
@@ -40,6 +41,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
4041
else:
4142
self._filepath = repo_path.joinpath(registry_path)
4243

44+
@log_exceptions_and_usage(registry="local")
4345
def get_registry_proto(self):
4446
registry_proto = RegistryProto()
4547
if self._filepath.exists():
@@ -49,6 +51,7 @@ def get_registry_proto(self):
4951
f'Registry not found at path "{self._filepath}". Have you run "feast apply"?'
5052
)
5153

54+
@log_exceptions_and_usage(registry="local")
5255
def update_registry_proto(self, registry_proto: RegistryProto):
5356
self._write_registry(registry_proto)
5457

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from feast.registry import Registry
2727
from feast.repo_config import FeastConfigBaseModel, RepoConfig
2828

29+
from ...usage import log_exceptions_and_usage
2930
from .bigquery_source import BigQuerySource
3031

3132
try:
@@ -62,6 +63,7 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
6263

6364
class BigQueryOfflineStore(OfflineStore):
6465
@staticmethod
66+
@log_exceptions_and_usage(offline_store="bigquery")
6567
def pull_latest_from_table_or_query(
6668
config: RepoConfig,
6769
data_source: DataSource,
@@ -113,6 +115,7 @@ def pull_latest_from_table_or_query(
113115
)
114116

115117
@staticmethod
118+
@log_exceptions_and_usage(offline_store="bigquery")
116119
def get_historical_features(
117120
config: RepoConfig,
118121
feature_views: List[FeatureView],
@@ -221,7 +224,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
221224

222225
def _to_df_internal(self) -> pd.DataFrame:
223226
with self._query_generator() as query:
224-
df = self.client.query(query).to_dataframe(create_bqstorage_client=True)
227+
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
225228
return df
226229

227230
def to_sql(self) -> str:
@@ -265,24 +268,29 @@ def to_bigquery(
265268
return str(job_config.destination)
266269

267270
with self._query_generator() as query:
268-
bq_job = self.client.query(query, job_config=job_config)
269-
270-
if job_config.dry_run:
271-
print(
272-
"This query will process {} bytes.".format(
273-
bq_job.total_bytes_processed
274-
)
275-
)
276-
return None
277-
278-
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
271+
self._execute_query(query, job_config, timeout)
279272

280273
print(f"Done writing to '{job_config.destination}'.")
281274
return str(job_config.destination)
282275

283276
def _to_arrow_internal(self) -> pyarrow.Table:
284277
with self._query_generator() as query:
285-
return self.client.query(query).to_arrow()
278+
return self._execute_query(query).to_arrow()
279+
280+
@log_exceptions_and_usage
281+
def _execute_query(
282+
self, query, job_config=None, timeout: int = 1800
283+
) -> bigquery.job.query.QueryJob:
284+
bq_job = self.client.query(query, job_config=job_config)
285+
286+
if job_config and job_config.dry_run:
287+
print(
288+
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
289+
)
290+
return None
291+
292+
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
293+
return bq_job
286294

287295

288296
def block_until_done(

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from feast.registry import Registry
2222
from feast.repo_config import FeastConfigBaseModel, RepoConfig
23+
from feast.usage import log_exceptions_and_usage
2324

2425

2526
class FileOfflineStoreConfig(FeastConfigBaseModel):
@@ -51,11 +52,13 @@ def full_feature_names(self) -> bool:
5152
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
5253
return self._on_demand_feature_views
5354

55+
@log_exceptions_and_usage
5456
def _to_df_internal(self) -> pd.DataFrame:
5557
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
5658
df = self.evaluation_function()
5759
return df
5860

61+
@log_exceptions_and_usage
5962
def _to_arrow_internal(self):
6063
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
6164
df = self.evaluation_function()
@@ -64,6 +67,7 @@ def _to_arrow_internal(self):
6467

6568
class FileOfflineStore(OfflineStore):
6669
@staticmethod
70+
@log_exceptions_and_usage(offline_store="file")
6771
def get_historical_features(
6872
config: RepoConfig,
6973
feature_views: List[FeatureView],
@@ -264,6 +268,7 @@ def evaluate_historical_retrieval():
264268
return job
265269

266270
@staticmethod
271+
@log_exceptions_and_usage(offline_store="file")
267272
def pull_latest_from_table_or_query(
268273
config: RepoConfig,
269274
data_source: DataSource,

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from feast.infra.utils import aws_utils
1919
from feast.registry import Registry
2020
from feast.repo_config import FeastConfigBaseModel, RepoConfig
21+
from feast.usage import log_exceptions_and_usage
2122

2223

2324
class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
@@ -47,6 +48,7 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
4748

4849
class RedshiftOfflineStore(OfflineStore):
4950
@staticmethod
51+
@log_exceptions_and_usage(offline_store="redshift")
5052
def pull_latest_from_table_or_query(
5153
config: RepoConfig,
5254
data_source: DataSource,
@@ -103,6 +105,7 @@ def pull_latest_from_table_or_query(
103105
)
104106

105107
@staticmethod
108+
@log_exceptions_and_usage(offline_store="redshift")
106109
def get_historical_features(
107110
config: RepoConfig,
108111
feature_views: List[FeatureView],
@@ -227,6 +230,7 @@ def full_feature_names(self) -> bool:
227230
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
228231
return self._on_demand_feature_views
229232

233+
@log_exceptions_and_usage
230234
def _to_df_internal(self) -> pd.DataFrame:
231235
with self._query_generator() as query:
232236
return aws_utils.unload_redshift_query_to_df(
@@ -240,6 +244,7 @@ def _to_df_internal(self) -> pd.DataFrame:
240244
query,
241245
)
242246

247+
@log_exceptions_and_usage
243248
def _to_arrow_internal(self) -> pa.Table:
244249
with self._query_generator() as query:
245250
return aws_utils.unload_redshift_query_to_pa(
@@ -253,6 +258,7 @@ def _to_arrow_internal(self) -> pa.Table:
253258
query,
254259
)
255260

261+
@log_exceptions_and_usage
256262
def to_s3(self) -> str:
257263
""" Export dataset to S3 in Parquet format and return path """
258264
if self.on_demand_feature_views:
@@ -272,6 +278,7 @@ def to_s3(self) -> str:
272278
)
273279
return self._s3_path
274280

281+
@log_exceptions_and_usage
275282
def to_redshift(self, table_name: str) -> None:
276283
""" Save dataset as a new Redshift table """
277284
if self.on_demand_feature_views:

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2727
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2828
from feast.repo_config import FeastConfigBaseModel, RepoConfig
29+
from feast.usage import log_exceptions_and_usage, tracing_span
2930

3031
try:
3132
from google.auth.exceptions import DefaultCredentialsError
@@ -69,6 +70,7 @@ class DatastoreOnlineStore(OnlineStore):
6970

7071
_client: Optional[datastore.Client] = None
7172

73+
@log_exceptions_and_usage(online_store="datastore")
7274
def update(
7375
self,
7476
config: RepoConfig,
@@ -140,6 +142,7 @@ def _get_client(self, online_config: DatastoreOnlineStoreConfig):
140142
)
141143
return self._client
142144

145+
@log_exceptions_and_usage(online_store="datastore")
143146
def online_write_batch(
144147
self,
145148
config: RepoConfig,
@@ -220,6 +223,7 @@ def _write_minibatch(
220223
if progress:
221224
progress(len(entities))
222225

226+
@log_exceptions_and_usage(online_store="datastore")
223227
def online_read(
224228
self,
225229
config: RepoConfig,
@@ -245,7 +249,8 @@ def online_read(
245249

246250
# NOTE: get_multi doesn't return values in the same order as the keys in the request.
247251
# Also, len(values) can be less than len(keys) in the case of missing values.
248-
values = client.get_multi(keys)
252+
with tracing_span(name="remote_call"):
253+
values = client.get_multi(keys)
249254
values_dict = {v.key: v for v in values} if values is not None else {}
250255
for key in keys:
251256
if key in values_dict:

0 commit comments

Comments
 (0)