From 85131e4dfce84a9fea40f35cf3651828e13b1214 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Fri, 28 Mar 2025 12:19:36 +0100 Subject: [PATCH 1/5] Set read_timeout for lambda client Signed-off-by: hkuepers --- .../infra/materialization/aws_lambda/lambda_engine.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py index d686ba99394..cf55325a8e8 100644 --- a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py @@ -7,6 +7,7 @@ from typing import Callable, List, Literal, Optional, Sequence, Union import boto3 +from botocore.config import Config from pydantic import StrictStr from tqdm import tqdm @@ -33,6 +34,7 @@ from feast.version import get_version DEFAULT_BATCH_SIZE = 10_000 +DEFAULT_TIMEOUT = 600 logger = logging.getLogger(__name__) @@ -97,7 +99,7 @@ def update( PackageType="Image", Role=self.repo_config.batch_engine.lambda_role, Code={"ImageUri": self.repo_config.batch_engine.materialization_image}, - Timeout=600, + Timeout=DEFAULT_TIMEOUT, Tags={ "feast-owned": "True", "project": project, @@ -149,7 +151,8 @@ def __init__( self.lambda_name = f"feast-materialize-{self.repo_config.project}" if len(self.lambda_name) > 64: self.lambda_name = self.lambda_name[:64] - self.lambda_client = boto3.client("lambda") + config = Config(read_timeout=DEFAULT_TIMEOUT + 10) + self.lambda_client = boto3.client("lambda", config=config) def materialize( self, registry, tasks: List[MaterializationTask] From 276f8db4d978585568e38372f4675ec7a915e774 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Wed, 30 Apr 2025 12:07:44 +0200 Subject: [PATCH 2/5] Handle empty return from offline store Signed-off-by: hkuepers --- .../aws_lambda/lambda_engine.py | 84 ++++++++++--------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py index cf55325a8e8..8dadf008488 100644 --- a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py @@ -203,47 +203,53 @@ def _materialize_one( ) paths = offline_job.to_remote_storage() - max_workers = len(paths) if len(paths) <= 20 else 20 - executor = ThreadPoolExecutor(max_workers=max_workers) - futures = [] - - for path in paths: - payload = { - FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64, - "view_name": feature_view.name, - "view_type": "batch", - "path": path, - } - # Invoke a lambda to materialize this file. - - logger.info("Invoking materialization for %s", path) - futures.append( - executor.submit( - self.lambda_client.invoke, - FunctionName=self.lambda_name, - InvocationType="RequestResponse", - Payload=json.dumps(payload), - ) + if (num_files := len(paths)) == 0: + logger.warning("No values to update for the given time range.") + return LambdaMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED ) + else: + max_workers = num_files if num_files <= 20 else 20 + executor = ThreadPoolExecutor(max_workers=max_workers) + futures = [] + + for path in paths: + payload = { + FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64, + "view_name": feature_view.name, + "view_type": "batch", + "path": path, + } + # Invoke a lambda to materialize this file. + + logger.info("Invoking materialization for %s", path) + futures.append( + executor.submit( + self.lambda_client.invoke, + FunctionName=self.lambda_name, + InvocationType="RequestResponse", + Payload=json.dumps(payload), + ) + ) - done, not_done = wait(futures) - logger.info("Done: %s Not Done: %s", done, not_done) - for f in done: - response = f.result() - output = json.loads(response["Payload"].read()) + done, not_done = wait(futures) + logger.info("Done: %s Not Done: %s", done, not_done) + for f in done: + response = f.result() + output = json.loads(response["Payload"].read()) - logger.info( - f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, " - f"Output: {output}" - ) + logger.info( + f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, " + f"Output: {output}" + ) - for f in not_done: - response = f.result() - logger.error(f"Ingestion failed: {response}") + for f in not_done: + response = f.result() + logger.error(f"Ingestion failed: {response}") - return LambdaMaterializationJob( - job_id=job_id, - status=MaterializationJobStatus.SUCCEEDED - if not not_done - else MaterializationJobStatus.ERROR, - ) + return LambdaMaterializationJob( + job_id=job_id, + status=MaterializationJobStatus.SUCCEEDED + if not not_done + else MaterializationJobStatus.ERROR, + ) From 7a217d25b74ce655001cc2a75658433428a1072b Mon Sep 17 00:00:00 2001 From: hkuepers Date: Wed, 30 Apr 2025 12:49:26 +0200 Subject: [PATCH 3/5] Add lambda read timeout retries Signed-off-by: hkuepers --- .../aws_lambda/lambda_engine.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py index 8dadf008488..dd0f141ef7a 100644 --- a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py @@ -35,6 +35,7 @@ DEFAULT_BATCH_SIZE = 10_000 DEFAULT_TIMEOUT = 600 +LAMBDA_TIMEOUT_RETRIES = 5 logger = logging.getLogger(__name__) @@ -225,7 +226,7 @@ def _materialize_one( logger.info("Invoking materialization for %s", path) futures.append( executor.submit( - self.lambda_client.invoke, + self.invoke_with_retries, FunctionName=self.lambda_name, InvocationType="RequestResponse", Payload=json.dumps(payload), @@ -253,3 +254,20 @@ def _materialize_one( if not not_done else MaterializationJobStatus.ERROR, ) + + def invoke_with_retries(self, **kwargs): + """Invoke the Lambda function and retry if it times out. + + The Lambda function may time out initially if many values are updated + and DynamoDB throttles requests. As soon as the DynamoDB tables + are scaled up, the Lambda function can succeed upon retry with higher + throughput. + + """ + retries = 0 + while retries < LAMBDA_TIMEOUT_RETRIES: + response = self.lambda_client.invoke(**kwargs) + if "Task timed out after" not in json.loads(response["Payload"].read()): + break + retries += 1 + return response From 9bb2f0ef86d35c336c753914e57fe991e3e87be4 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Mon, 19 May 2025 16:19:59 +0200 Subject: [PATCH 4/5] Add error handling in Lambda materialization engine Signed-off-by: hkuepers --- .../infra/materialization/aws_lambda/lambda_engine.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py index dd0f141ef7a..14b59518ffa 100644 --- a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py @@ -235,14 +235,17 @@ def _materialize_one( done, not_done = wait(futures) logger.info("Done: %s Not Done: %s", done, not_done) + errors = False for f in done: response = f.result() - output = json.loads(response["Payload"].read()) + output: dict = json.loads(response["Payload"].read()) logger.info( f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, " f"Output: {output}" ) + if "errorMessage" in output.keys(): + errors = True for f in not_done: response = f.result() @@ -251,7 +254,7 @@ def _materialize_one( return LambdaMaterializationJob( job_id=job_id, status=MaterializationJobStatus.SUCCEEDED - if not not_done + if (not not_done and not errors) else MaterializationJobStatus.ERROR, ) From e32b84401f5bc97740a3747afb6150d1d9bfaf02 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Tue, 20 May 2025 12:30:55 +0200 Subject: [PATCH 5/5] Fix error handling Signed-off-by: hkuepers --- .../aws_lambda/lambda_engine.py | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py index 14b59518ffa..03eb51a2b66 100644 --- a/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py @@ -55,11 +55,16 @@ class LambdaMaterializationEngineConfig(FeastConfigBaseModel): @dataclass class LambdaMaterializationJob(MaterializationJob): - def __init__(self, job_id: str, status: MaterializationJobStatus) -> None: + def __init__( + self, + job_id: str, + status: MaterializationJobStatus, + error: Optional[BaseException] = None, + ) -> None: super().__init__() self._job_id: str = job_id self._status = status - self._error = None + self._error = error def status(self) -> MaterializationJobStatus: return self._status @@ -235,28 +240,33 @@ def _materialize_one( done, not_done = wait(futures) logger.info("Done: %s Not Done: %s", done, not_done) - errors = False + errors = [] for f in done: - response = f.result() - output: dict = json.loads(response["Payload"].read()) + response, payload = f.result() logger.info( f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, " - f"Output: {output}" + f"Output: {payload}" ) - if "errorMessage" in output.keys(): - errors = True + if "errorMessage" in payload.keys(): + errors.append(payload["errorMessage"]) for f in not_done: - response = f.result() - logger.error(f"Ingestion failed: {response}") + response, payload = f.result() + logger.error(f"Ingestion failed: {response=}, {payload=}") - return LambdaMaterializationJob( - job_id=job_id, - status=MaterializationJobStatus.SUCCEEDED - if (not not_done and not errors) - else MaterializationJobStatus.ERROR, - ) + if len(not_done) == 0 and len(errors) == 0: + return LambdaMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) + else: + return LambdaMaterializationJob( + job_id=job_id, + status=MaterializationJobStatus.ERROR, + error=RuntimeError( + f"Lambda functions did not finish successfully: {errors}" + ), + ) def invoke_with_retries(self, **kwargs): """Invoke the Lambda function and retry if it times out. @@ -270,7 +280,12 @@ def invoke_with_retries(self, **kwargs): retries = 0 while retries < LAMBDA_TIMEOUT_RETRIES: response = self.lambda_client.invoke(**kwargs) - if "Task timed out after" not in json.loads(response["Payload"].read()): + payload = json.loads(response["Payload"].read()) or {} + if "Task timed out after" not in payload.get("errorMessage", ""): break retries += 1 - return response + logger.warning( + "Retrying lambda function after lambda timeout in request" + f"{response['ResponseMetadata']['RequestId']}" + ) + return response, payload