Skip to content

Commit 41535d0

Browse files
Tsotne Tabidzefelixwang9817
andauthored
Create & teardown Lambda & API Gateway resources for serverless feature server (#1900)
* Create & teardown Lambda & API Gateway resources for serverless feature server Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Change AWS Lambda feature server to require only config Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Turn off telemetry for lambda Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix nits Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix naming Signed-off-by: Felix Wang <wangfelix98@gmail.com> Co-authored-by: Felix Wang <wangfelix98@gmail.com>
1 parent 982b7cd commit 41535d0

File tree

9 files changed

+291
-36
lines changed

9 files changed

+291
-36
lines changed

sdk/python/feast/constants.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,10 @@
1717
# Maximum interval(secs) to wait between retries for retry function
1818
MAX_WAIT_INTERVAL: str = "60"
1919

20-
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server"
20+
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
21+
22+
# feature_store.yaml environment variable name for remote feature server
23+
FEATURE_STORE_YAML_ENV_NAME: str = "FEATURE_STORE_YAML_BASE64"
24+
25+
# Environment variable for toggling usage
26+
FEAST_USAGE = "FEAST_USAGE"

sdk/python/feast/errors.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,18 @@ def __init__(self, feature_flag_name: str):
274274
f"You are attempting to use an experimental feature that is not enabled. Please run "
275275
f"`feast alpha enable {feature_flag_name}` "
276276
)
277+
278+
279+
class RepoConfigPathDoesNotExist(Exception):
280+
def __init__(self):
281+
super().__init__("The repo_path attribute does not exist for the repo_config.")
282+
283+
284+
class AwsLambdaDoesNotExist(Exception):
285+
def __init__(self):
286+
super().__init__("The created AWS Lambda function does not exist.")
287+
288+
289+
class AwsAPIGatewayDoesNotExist(Exception):
290+
def __init__(self):
291+
super().__init__("The created AWS API Gateway does not exist.")

sdk/python/feast/infra/aws.py

Lines changed: 178 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,194 @@
1+
import base64
12
import os
23
import uuid
34
from datetime import datetime
45
from pathlib import Path
56
from tempfile import TemporaryFile
7+
from typing import Sequence, Union
68
from urllib.parse import urlparse
79

810
from colorama import Fore, Style
911

10-
import feast
11-
from feast.constants import AWS_LAMBDA_FEATURE_SERVER_IMAGE
12-
from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist
12+
from feast import __version__
13+
from feast.constants import (
14+
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
15+
FEAST_USAGE,
16+
FEATURE_STORE_YAML_ENV_NAME,
17+
)
18+
from feast.entity import Entity
19+
from feast.errors import (
20+
AwsAPIGatewayDoesNotExist,
21+
AwsLambdaDoesNotExist,
22+
RepoConfigPathDoesNotExist,
23+
S3RegistryBucketForbiddenAccess,
24+
S3RegistryBucketNotExist,
25+
)
26+
from feast.feature_table import FeatureTable
27+
from feast.feature_view import FeatureView
1328
from feast.infra.passthrough_provider import PassthroughProvider
29+
from feast.infra.utils import aws_utils
1430
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
1531
from feast.registry_store import RegistryStore
1632
from feast.repo_config import RegistryConfig
1733

34+
try:
35+
import boto3
36+
except ImportError as e:
37+
from feast.errors import FeastExtrasDependencyImportError
38+
39+
raise FeastExtrasDependencyImportError("aws", str(e))
40+
1841

1942
class AwsProvider(PassthroughProvider):
20-
def _upload_docker_image(self) -> None:
43+
def _get_lambda_name(self, project: str):
44+
return f"feast-python-server-{project}-{__version__.replace('+', '_').replace('.', '_')}"
45+
46+
def update_infra(
47+
self,
48+
project: str,
49+
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
50+
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
51+
entities_to_delete: Sequence[Entity],
52+
entities_to_keep: Sequence[Entity],
53+
partial: bool,
54+
):
55+
self.online_store.update(
56+
config=self.repo_config,
57+
tables_to_delete=tables_to_delete,
58+
tables_to_keep=tables_to_keep,
59+
entities_to_keep=entities_to_keep,
60+
entities_to_delete=entities_to_delete,
61+
partial=partial,
62+
)
63+
64+
if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
65+
image_uri = self._upload_docker_image(project)
66+
print("Deploying feature server...")
67+
68+
assert self.repo_config.repo_path
69+
if not self.repo_config.repo_path:
70+
raise RepoConfigPathDoesNotExist()
71+
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
72+
config_bytes = f.read()
73+
config_base64 = base64.b64encode(config_bytes).decode()
74+
75+
resource_name = self._get_lambda_name(project)
76+
lambda_client = boto3.client("lambda")
77+
api_gateway_client = boto3.client("apigatewayv2")
78+
function = aws_utils.get_lambda_function(lambda_client, resource_name)
79+
80+
if function is None:
81+
# If the Lambda function does not exist, create it.
82+
print(" Creating AWS Lambda...")
83+
lambda_client.create_function(
84+
FunctionName=resource_name,
85+
Role=self.repo_config.feature_server.execution_role_name,
86+
Code={"ImageUri": image_uri},
87+
PackageType="Image",
88+
MemorySize=1769,
89+
Environment={
90+
"Variables": {
91+
FEATURE_STORE_YAML_ENV_NAME: config_base64,
92+
FEAST_USAGE: "False",
93+
}
94+
},
95+
Tags={
96+
"feast-owned": "True",
97+
"project": project,
98+
"feast-sdk-version": __version__.replace("+", "_").replace(
99+
".", "_"
100+
),
101+
},
102+
)
103+
function = aws_utils.get_lambda_function(lambda_client, resource_name)
104+
if not function:
105+
raise AwsLambdaDoesNotExist()
106+
else:
107+
# If the feature_store.yaml has changed, need to update the environment variable.
108+
env = function.get("Environment", {}).get("Variables", {})
109+
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
110+
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
111+
# It's expected that feature_store.yaml is not regularly updated while the lambda
112+
# is serving production traffic. However, the update in registry (e.g. modifying
113+
# feature views, feature services, and other definitions does not update lambda).
114+
print(" Updating AWS Lambda...")
115+
116+
lambda_client.update_function_configuration(
117+
FunctionName=resource_name,
118+
Environment={
119+
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
120+
},
121+
)
122+
123+
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
124+
if not api:
125+
# If the API Gateway doesn't exist, create it
126+
print(" Creating AWS API Gateway...")
127+
api = api_gateway_client.create_api(
128+
Name=resource_name,
129+
ProtocolType="HTTP",
130+
Target=function["FunctionArn"],
131+
RouteKey="POST /get-online-features",
132+
Tags={
133+
"feast-owned": "True",
134+
"project": project,
135+
"feast-sdk-version": __version__.replace("+", "_").replace(
136+
".", "_"
137+
),
138+
},
139+
)
140+
if not api:
141+
raise AwsAPIGatewayDoesNotExist()
142+
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
143+
api_id = api["ApiId"]
144+
region = lambda_client.meta.region_name
145+
account_id = aws_utils.get_account_id()
146+
lambda_client.add_permission(
147+
FunctionName=function["FunctionArn"],
148+
StatementId=str(uuid.uuid4()),
149+
Action="lambda:InvokeFunction",
150+
Principal="apigateway.amazonaws.com",
151+
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
152+
)
153+
154+
def teardown_infra(
155+
self,
156+
project: str,
157+
tables: Sequence[Union[FeatureTable, FeatureView]],
158+
entities: Sequence[Entity],
159+
) -> None:
160+
self.online_store.teardown(self.repo_config, tables, entities)
161+
162+
if (
163+
self.repo_config.feature_server is not None
164+
and self.repo_config.feature_server.enabled
165+
):
166+
print("Tearing down feature server...")
167+
resource_name = self._get_lambda_name(project)
168+
lambda_client = boto3.client("lambda")
169+
api_gateway_client = boto3.client("apigatewayv2")
170+
171+
function = aws_utils.get_lambda_function(lambda_client, resource_name)
172+
173+
if function is not None:
174+
print(" Tearing down AWS Lambda...")
175+
aws_utils.delete_lambda_function(lambda_client, resource_name)
176+
177+
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
178+
if api is not None:
179+
print(" Tearing down AWS API Gateway...")
180+
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])
181+
182+
def _upload_docker_image(self, project: str) -> str:
183+
"""
184+
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
185+
186+
Args:
187+
project: Feast project name
188+
189+
Returns:
190+
The URI of the uploaded docker image.
191+
"""
21192
import base64
22193

23194
try:
@@ -47,8 +218,8 @@ def _upload_docker_image(self) -> None:
47218
)
48219
docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
49220

50-
version = ".".join(feast.__version__.split(".")[:3])
51-
repository_name = f"feast-python-server-{version}"
221+
version = __version__.replace("+", "_").replace(".", "_")
222+
repository_name = f"feast-python-server-{project}-{version}"
52223
ecr_client = boto3.client("ecr")
53224
try:
54225
print(
@@ -77,17 +248,12 @@ def _upload_docker_image(self) -> None:
77248
)
78249
image.tag(image_remote_name)
79250
docker_client.api.push(repository_uri, tag=version)
251+
return image_remote_name
80252

81253

82254
class S3RegistryStore(RegistryStore):
83255
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
84256
uri = registry_config.path
85-
try:
86-
import boto3
87-
except ImportError as e:
88-
from feast.errors import FeastExtrasDependencyImportError
89-
90-
raise FeastExtrasDependencyImportError("aws", str(e))
91257
self._uri = urlparse(uri)
92258
self._bucket = self._uri.hostname
93259
self._key = self._uri.path.lstrip("/")

sdk/python/feast/infra/feature_servers/Dockerfile renamed to sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile

File renamed without changes.

sdk/python/feast/infra/feature_servers/app.py renamed to sdk/python/feast/infra/feature_servers/aws_lambda/app.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import tempfile
44
from pathlib import Path
55

6-
import yaml
76
from mangum import Mangum
87

98
from feast import FeatureStore
@@ -13,24 +12,12 @@
1312
config_base64 = os.environ["FEAST_CONFIG_BASE64"]
1413
config_bytes = base64.b64decode(config_base64)
1514

16-
# Override the registry path
17-
config_yaml = yaml.safe_load(config_bytes)
18-
config_yaml["registry"] = "registry.db"
19-
config_bytes = yaml.safe_dump(config_yaml).encode()
20-
21-
# Load Registry
22-
registry_base64 = os.environ["FEAST_REGISTRY_BASE64"]
23-
registry_bytes = base64.b64decode(registry_base64)
24-
25-
# Create a new unique directory for writing feature_store.yaml and registry.db files
15+
# Create a new unique directory for writing feature_store.yaml
2616
repo_path = Path(tempfile.mkdtemp())
2717

2818
with open(repo_path / "feature_store.yaml", "wb") as f:
2919
f.write(config_bytes)
3020

31-
with open(repo_path / "registry.db", "wb") as f:
32-
f.write(registry_bytes)
33-
3421
# Initialize the feature store
3522
store = FeatureStore(repo_path=str(repo_path.resolve()))
3623

sdk/python/feast/infra/feature_servers/requirements.txt renamed to sdk/python/feast/infra/feature_servers/aws_lambda/requirements.txt

File renamed without changes.

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ def update_infra(
5252
partial=partial,
5353
)
5454

55-
if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
56-
self._upload_docker_image()
57-
5855
def teardown_infra(
5956
self,
6057
project: str,
@@ -150,7 +147,3 @@ def get_historical_features(
150147
full_feature_names=full_feature_names,
151148
)
152149
return job
153-
154-
def _upload_docker_image(self) -> None:
155-
"""Upload the docker image for the feature server to the cloud."""
156-
pass

0 commit comments

Comments
 (0)