|
| 1 | +import base64 |
1 | 2 | import os |
2 | 3 | import uuid |
3 | 4 | from datetime import datetime |
4 | 5 | from pathlib import Path |
5 | 6 | from tempfile import TemporaryFile |
| 7 | +from typing import Sequence, Union |
6 | 8 | from urllib.parse import urlparse |
7 | 9 |
|
8 | 10 | from colorama import Fore, Style |
9 | 11 |
|
10 | | -import feast |
11 | | -from feast.constants import AWS_LAMBDA_FEATURE_SERVER_IMAGE |
12 | | -from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist |
| 12 | +from feast import __version__ |
| 13 | +from feast.constants import ( |
| 14 | + AWS_LAMBDA_FEATURE_SERVER_IMAGE, |
| 15 | + FEAST_USAGE, |
| 16 | + FEATURE_STORE_YAML_ENV_NAME, |
| 17 | +) |
| 18 | +from feast.entity import Entity |
| 19 | +from feast.errors import ( |
| 20 | + AwsAPIGatewayDoesNotExist, |
| 21 | + AwsLambdaDoesNotExist, |
| 22 | + RepoConfigPathDoesNotExist, |
| 23 | + S3RegistryBucketForbiddenAccess, |
| 24 | + S3RegistryBucketNotExist, |
| 25 | +) |
| 26 | +from feast.feature_table import FeatureTable |
| 27 | +from feast.feature_view import FeatureView |
13 | 28 | from feast.infra.passthrough_provider import PassthroughProvider |
| 29 | +from feast.infra.utils import aws_utils |
14 | 30 | from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto |
15 | 31 | from feast.registry_store import RegistryStore |
16 | 32 | from feast.repo_config import RegistryConfig |
17 | 33 |
|
| 34 | +try: |
| 35 | + import boto3 |
| 36 | +except ImportError as e: |
| 37 | + from feast.errors import FeastExtrasDependencyImportError |
| 38 | + |
| 39 | + raise FeastExtrasDependencyImportError("aws", str(e)) |
| 40 | + |
18 | 41 |
|
19 | 42 | class AwsProvider(PassthroughProvider): |
20 | | - def _upload_docker_image(self) -> None: |
| 43 | + def _get_lambda_name(self, project: str): |
| 44 | + return f"feast-python-server-{project}-{__version__.replace('+', '_').replace('.', '_')}" |
| 45 | + |
| 46 | + def update_infra( |
| 47 | + self, |
| 48 | + project: str, |
| 49 | + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], |
| 50 | + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], |
| 51 | + entities_to_delete: Sequence[Entity], |
| 52 | + entities_to_keep: Sequence[Entity], |
| 53 | + partial: bool, |
| 54 | + ): |
| 55 | + self.online_store.update( |
| 56 | + config=self.repo_config, |
| 57 | + tables_to_delete=tables_to_delete, |
| 58 | + tables_to_keep=tables_to_keep, |
| 59 | + entities_to_keep=entities_to_keep, |
| 60 | + entities_to_delete=entities_to_delete, |
| 61 | + partial=partial, |
| 62 | + ) |
| 63 | + |
| 64 | + if self.repo_config.feature_server and self.repo_config.feature_server.enabled: |
| 65 | + image_uri = self._upload_docker_image(project) |
| 66 | + print("Deploying feature server...") |
| 67 | + |
| 68 | + assert self.repo_config.repo_path |
| 69 | + if not self.repo_config.repo_path: |
| 70 | + raise RepoConfigPathDoesNotExist() |
| 71 | + with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f: |
| 72 | + config_bytes = f.read() |
| 73 | + config_base64 = base64.b64encode(config_bytes).decode() |
| 74 | + |
| 75 | + resource_name = self._get_lambda_name(project) |
| 76 | + lambda_client = boto3.client("lambda") |
| 77 | + api_gateway_client = boto3.client("apigatewayv2") |
| 78 | + function = aws_utils.get_lambda_function(lambda_client, resource_name) |
| 79 | + |
| 80 | + if function is None: |
| 81 | + # If the Lambda function does not exist, create it. |
| 82 | + print(" Creating AWS Lambda...") |
| 83 | + lambda_client.create_function( |
| 84 | + FunctionName=resource_name, |
| 85 | + Role=self.repo_config.feature_server.execution_role_name, |
| 86 | + Code={"ImageUri": image_uri}, |
| 87 | + PackageType="Image", |
| 88 | + MemorySize=1769, |
| 89 | + Environment={ |
| 90 | + "Variables": { |
| 91 | + FEATURE_STORE_YAML_ENV_NAME: config_base64, |
| 92 | + FEAST_USAGE: "False", |
| 93 | + } |
| 94 | + }, |
| 95 | + Tags={ |
| 96 | + "feast-owned": "True", |
| 97 | + "project": project, |
| 98 | + "feast-sdk-version": __version__.replace("+", "_").replace( |
| 99 | + ".", "_" |
| 100 | + ), |
| 101 | + }, |
| 102 | + ) |
| 103 | + function = aws_utils.get_lambda_function(lambda_client, resource_name) |
| 104 | + if not function: |
| 105 | + raise AwsLambdaDoesNotExist() |
| 106 | + else: |
| 107 | + # If the feature_store.yaml has changed, need to update the environment variable. |
| 108 | + env = function.get("Environment", {}).get("Variables", {}) |
| 109 | + if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64: |
| 110 | + # Note, that this does not update Lambda gracefully (e.g. no rolling deployment). |
| 111 | + # It's expected that feature_store.yaml is not regularly updated while the lambda |
| 112 | + # is serving production traffic. However, the update in registry (e.g. modifying |
| 113 | + # feature views, feature services, and other definitions does not update lambda). |
| 114 | + print(" Updating AWS Lambda...") |
| 115 | + |
| 116 | + lambda_client.update_function_configuration( |
| 117 | + FunctionName=resource_name, |
| 118 | + Environment={ |
| 119 | + "Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64} |
| 120 | + }, |
| 121 | + ) |
| 122 | + |
| 123 | + api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name) |
| 124 | + if not api: |
| 125 | + # If the API Gateway doesn't exist, create it |
| 126 | + print(" Creating AWS API Gateway...") |
| 127 | + api = api_gateway_client.create_api( |
| 128 | + Name=resource_name, |
| 129 | + ProtocolType="HTTP", |
| 130 | + Target=function["FunctionArn"], |
| 131 | + RouteKey="POST /get-online-features", |
| 132 | + Tags={ |
| 133 | + "feast-owned": "True", |
| 134 | + "project": project, |
| 135 | + "feast-sdk-version": __version__.replace("+", "_").replace( |
| 136 | + ".", "_" |
| 137 | + ), |
| 138 | + }, |
| 139 | + ) |
| 140 | + if not api: |
| 141 | + raise AwsAPIGatewayDoesNotExist() |
| 142 | + # Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway |
| 143 | + api_id = api["ApiId"] |
| 144 | + region = lambda_client.meta.region_name |
| 145 | + account_id = aws_utils.get_account_id() |
| 146 | + lambda_client.add_permission( |
| 147 | + FunctionName=function["FunctionArn"], |
| 148 | + StatementId=str(uuid.uuid4()), |
| 149 | + Action="lambda:InvokeFunction", |
| 150 | + Principal="apigateway.amazonaws.com", |
| 151 | + SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features", |
| 152 | + ) |
| 153 | + |
| 154 | + def teardown_infra( |
| 155 | + self, |
| 156 | + project: str, |
| 157 | + tables: Sequence[Union[FeatureTable, FeatureView]], |
| 158 | + entities: Sequence[Entity], |
| 159 | + ) -> None: |
| 160 | + self.online_store.teardown(self.repo_config, tables, entities) |
| 161 | + |
| 162 | + if ( |
| 163 | + self.repo_config.feature_server is not None |
| 164 | + and self.repo_config.feature_server.enabled |
| 165 | + ): |
| 166 | + print("Tearing down feature server...") |
| 167 | + resource_name = self._get_lambda_name(project) |
| 168 | + lambda_client = boto3.client("lambda") |
| 169 | + api_gateway_client = boto3.client("apigatewayv2") |
| 170 | + |
| 171 | + function = aws_utils.get_lambda_function(lambda_client, resource_name) |
| 172 | + |
| 173 | + if function is not None: |
| 174 | + print(" Tearing down AWS Lambda...") |
| 175 | + aws_utils.delete_lambda_function(lambda_client, resource_name) |
| 176 | + |
| 177 | + api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name) |
| 178 | + if api is not None: |
| 179 | + print(" Tearing down AWS API Gateway...") |
| 180 | + aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"]) |
| 181 | + |
| 182 | + def _upload_docker_image(self, project: str) -> str: |
| 183 | + """ |
| 184 | + Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR. |
| 185 | +
|
| 186 | + Args: |
| 187 | + project: Feast project name |
| 188 | +
|
| 189 | + Returns: |
| 190 | + The URI of the uploaded docker image. |
| 191 | + """ |
21 | 192 | import base64 |
22 | 193 |
|
23 | 194 | try: |
@@ -47,8 +218,8 @@ def _upload_docker_image(self) -> None: |
47 | 218 | ) |
48 | 219 | docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE) |
49 | 220 |
|
50 | | - version = ".".join(feast.__version__.split(".")[:3]) |
51 | | - repository_name = f"feast-python-server-{version}" |
| 221 | + version = __version__.replace("+", "_").replace(".", "_") |
| 222 | + repository_name = f"feast-python-server-{project}-{version}" |
52 | 223 | ecr_client = boto3.client("ecr") |
53 | 224 | try: |
54 | 225 | print( |
@@ -77,17 +248,12 @@ def _upload_docker_image(self) -> None: |
77 | 248 | ) |
78 | 249 | image.tag(image_remote_name) |
79 | 250 | docker_client.api.push(repository_uri, tag=version) |
| 251 | + return image_remote_name |
80 | 252 |
|
81 | 253 |
|
82 | 254 | class S3RegistryStore(RegistryStore): |
83 | 255 | def __init__(self, registry_config: RegistryConfig, repo_path: Path): |
84 | 256 | uri = registry_config.path |
85 | | - try: |
86 | | - import boto3 |
87 | | - except ImportError as e: |
88 | | - from feast.errors import FeastExtrasDependencyImportError |
89 | | - |
90 | | - raise FeastExtrasDependencyImportError("aws", str(e)) |
91 | 257 | self._uri = urlparse(uri) |
92 | 258 | self._bucket = self._uri.hostname |
93 | 259 | self._key = self._uri.path.lstrip("/") |
|
0 commit comments