Skip to content

Commit 7d4369f

Browse files
authored
Change the feast serve endpoint to be sync rather than async. (feast-dev#2119)
When an endpoint is async, everything happens on the event loop thread. This is great for functionality that is IO heavy since the job can be awaited and other requests processed in the meantime. But if there are no awaits, then everything happens synchronously on one thread which means that while we're waiting for a response from a DB or API call, nothing else gets processed. To address this I've removed the async keyword from the endpoint. This makes FastAPI process each request on a thread in a thread pool. Signed-off-by: Gunnar Sv Sigurbjörnsson <gunnar.sigurbjornsson@gmail.com>
1 parent c5caeeb commit 7d4369f

File tree

4 files changed

+56
-35
lines changed

4 files changed

+56
-35
lines changed

sdk/python/feast/cli.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,14 +508,17 @@ def init_command(project_directory, minimal: bool, template: str):
508508
default=6566,
509509
help="Specify a port for the server [default: 6566]",
510510
)
511+
@click.option(
512+
"--no-access-log", is_flag=True, help="Disable the Uvicorn access log.",
513+
)
511514
@click.pass_context
512-
def serve_command(ctx: click.Context, host: str, port: int):
515+
def serve_command(ctx: click.Context, host: str, port: int, no_access_log: bool):
513516
"""[Experimental] Start a the feature consumption server locally on a given port."""
514517
repo = ctx.obj["CHDIR"]
515518
cli_check_repo(repo)
516519
store = FeatureStore(repo_path=str(repo))
517520

518-
store.serve(host, port)
521+
store.serve(host, port, no_access_log)
519522

520523

521524
@cli.command("serve_transformations")

sdk/python/feast/feature_server.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import uvicorn
33
from fastapi import FastAPI, HTTPException, Request
44
from fastapi.logger import logger
5+
from fastapi.params import Depends
56
from google.protobuf.json_format import MessageToDict, Parse
67

78
import feast
@@ -15,17 +16,21 @@ def get_app(store: "feast.FeatureStore"):
1516

1617
app = FastAPI()
1718

19+
async def get_body(request: Request):
20+
return await request.body()
21+
1822
@app.post("/get-online-features")
19-
async def get_online_features(request: Request):
23+
def get_online_features(body=Depends(get_body)):
2024
try:
2125
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
22-
body = await request.body()
2326
request_proto = GetOnlineFeaturesRequest()
2427
Parse(body, request_proto)
2528

2629
# Initialize parameters for FeatureStore.get_online_features(...) call
2730
if request_proto.HasField("feature_service"):
28-
features = store.get_feature_service(request_proto.feature_service)
31+
features = store.get_feature_service(
32+
request_proto.feature_service, allow_cache=True
33+
)
2934
else:
3035
features = list(request_proto.features.val)
3136

@@ -61,11 +66,13 @@ async def get_online_features(request: Request):
6166
return app
6267

6368

64-
def start_server(store: "feast.FeatureStore", host: str, port: int):
69+
def start_server(
70+
store: "feast.FeatureStore", host: str, port: int, no_access_log: bool
71+
):
6572
app = get_app(store)
6673
click.echo(
6774
"This is an "
6875
+ click.style("experimental", fg="yellow", bold=True, underline=True)
6976
+ " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
7077
)
71-
uvicorn.run(app, host=host, port=port)
78+
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))

sdk/python/feast/feature_store.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ def get_entity(self, name: str) -> Entity:
288288
return self._registry.get_entity(name, self.project)
289289

290290
@log_exceptions_and_usage
291-
def get_feature_service(self, name: str) -> FeatureService:
291+
def get_feature_service(
292+
self, name: str, allow_cache: bool = False
293+
) -> FeatureService:
292294
"""
293295
Retrieves a feature service.
294296
@@ -301,7 +303,7 @@ def get_feature_service(self, name: str) -> FeatureService:
301303
Raises:
302304
FeatureServiceNotFoundException: The feature service could not be found.
303305
"""
304-
return self._registry.get_feature_service(name, self.project)
306+
return self._registry.get_feature_service(name, self.project, allow_cache)
305307

306308
@log_exceptions_and_usage
307309
def get_feature_view(self, name: str) -> FeatureView:
@@ -369,14 +371,19 @@ def delete_feature_service(self, name: str):
369371
"""
370372
return self._registry.delete_feature_service(name, self.project)
371373

372-
def _get_features(self, features: Union[List[str], FeatureService],) -> List[str]:
374+
def _get_features(
375+
self, features: Union[List[str], FeatureService], allow_cache: bool = False,
376+
) -> List[str]:
373377
_features = features
378+
374379
if not _features:
375380
raise ValueError("No features specified for retrieval")
376381

377382
_feature_refs = []
378383
if isinstance(_features, FeatureService):
379-
feature_service_from_registry = self.get_feature_service(_features.name)
384+
feature_service_from_registry = self.get_feature_service(
385+
_features.name, allow_cache
386+
)
380387
if feature_service_from_registry != _features:
381388
warnings.warn(
382389
"The FeatureService object that has been passed in as an argument is"
@@ -1040,7 +1047,7 @@ def get_online_features(
10401047
... )
10411048
>>> online_response_dict = online_response.to_dict()
10421049
"""
1043-
_feature_refs = self._get_features(features)
1050+
_feature_refs = self._get_features(features, allow_cache=True)
10441051
(
10451052
requested_feature_views,
10461053
requested_request_feature_views,
@@ -1455,12 +1462,12 @@ def _get_feature_views_to_use(
14551462
return views_to_use
14561463

14571464
@log_exceptions_and_usage
1458-
def serve(self, host: str, port: int) -> None:
1465+
def serve(self, host: str, port: int, no_access_log: bool) -> None:
14591466
"""Start the feature consumption server locally on a given port."""
14601467
if not flags_helper.enable_python_feature_server(self.config):
14611468
raise ExperimentalFeatureNotEnabled(flags.FLAG_PYTHON_FEATURE_SERVER_NAME)
14621469

1463-
feature_server.start_server(self, host, port)
1470+
feature_server.start_server(self, host, port, no_access_log)
14641471

14651472
@log_exceptions_and_usage
14661473
def get_feature_server_endpoint(self) -> Optional[str]:

sdk/python/feast/registry.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from collections import defaultdict
1616
from datetime import datetime, timedelta
1717
from pathlib import Path
18+
from threading import Lock
1819
from typing import Any, Dict, List, Optional
1920
from urllib.parse import urlparse
2021

@@ -101,7 +102,6 @@ class Registry:
101102
cached_registry_proto: Optional[RegistryProto] = None
102103
cached_registry_proto_created: Optional[datetime] = None
103104
cached_registry_proto_ttl: timedelta
104-
cache_being_updated: bool = False
105105

106106
def __init__(
107107
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
@@ -115,6 +115,8 @@ def __init__(
115115
or where it will be created if it does not exist yet.
116116
"""
117117

118+
self._refresh_lock = Lock()
119+
118120
if registry_config:
119121
registry_store_type = registry_config.registry_store_type
120122
registry_path = registry_config.path
@@ -325,6 +327,7 @@ def get_feature_service(
325327
Args:
326328
name: Name of feature service
327329
project: Feast project that this feature service belongs to
330+
allow_cache: Whether to allow returning this feature service from a cached registry
328331
329332
Returns:
330333
Returns either the specified feature service, or raises an exception if
@@ -347,6 +350,7 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti
347350
Args:
348351
name: Name of entity
349352
project: Feast project that this entity belongs to
353+
allow_cache: Whether to allow returning this entity from a cached registry
350354
351355
Returns:
352356
Returns either the specified entity, or raises an exception if
@@ -419,8 +423,8 @@ def list_on_demand_feature_views(
419423
Retrieve a list of on demand feature views from the registry
420424
421425
Args:
422-
allow_cache: Whether to allow returning on demand feature views from a cached registry
423426
project: Filter on demand feature views based on project name
427+
allow_cache: Whether to allow returning on demand feature views from a cached registry
424428
425429
Returns:
426430
List of on demand feature views
@@ -733,31 +737,31 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto:
733737
734738
Returns: Returns a RegistryProto object which represents the state of the registry
735739
"""
736-
expired = (
737-
self.cached_registry_proto is None
738-
or self.cached_registry_proto_created is None
739-
) or (
740-
self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity
741-
and (
742-
datetime.now()
743-
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
740+
with self._refresh_lock:
741+
expired = (
742+
self.cached_registry_proto is None
743+
or self.cached_registry_proto_created is None
744+
) or (
745+
self.cached_registry_proto_ttl.total_seconds()
746+
> 0 # 0 ttl means infinity
747+
and (
748+
datetime.now()
749+
> (
750+
self.cached_registry_proto_created
751+
+ self.cached_registry_proto_ttl
752+
)
753+
)
744754
)
745-
)
746755

747-
if allow_cache and (not expired or self.cache_being_updated):
748-
assert isinstance(self.cached_registry_proto, RegistryProto)
749-
return self.cached_registry_proto
756+
if allow_cache and not expired:
757+
assert isinstance(self.cached_registry_proto, RegistryProto)
758+
return self.cached_registry_proto
750759

751-
try:
752-
self.cache_being_updated = True
753760
registry_proto = self._registry_store.get_registry_proto()
754761
self.cached_registry_proto = registry_proto
755762
self.cached_registry_proto_created = datetime.now()
756-
except Exception as e:
757-
raise e
758-
finally:
759-
self.cache_being_updated = False
760-
return registry_proto
763+
764+
return registry_proto
761765

762766
def _check_conflicting_feature_view_names(self, feature_view: BaseFeatureView):
763767
name_to_fv_protos = self._existing_feature_view_names_to_fvs()

0 commit comments

Comments
 (0)