Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
parallelize with threads
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 14, 2022
commit 20f8a5422fe3ff72dc915bf8c2efbbb068f9e2ec
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ def teardown_infra(
if api is not None:
_logger.info(" Tearing down AWS API Gateway...")
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])
if self.batch_engine:
self.batch_engine.teardown_infra(project, tables, entities)

@log_exceptions_and_usage(provider="AwsProvider")
def get_feature_server_endpoint(self) -> Optional[str]:
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ def handler(event, context):
for entity in feature_view.entity_columns
}

written_rows = 0

for batch in table.to_batches(DEFAULT_BATCH_SIZE):
Comment thread
achals marked this conversation as resolved.
Outdated
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
store._provider.online_write_batch(
store.config, feature_view, rows_to_write, lambda x: None,
)
written_rows += len(rows_to_write)
return {"written_rows": written_rows}
except Exception as e:
print(f"Exception: {e}", flush=True)
print("Traceback:", flush=True)
Expand Down
39 changes: 29 additions & 10 deletions sdk/python/feast/infra/materialization/lambda/lambda_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import logging
from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union
Expand Down Expand Up @@ -137,7 +138,6 @@ def __init__(
)

self.lambda_name = f"feast-materialize-{self.repo_config.project}"
# self.lambda_name = "feast-lambda-consumer"
self.lambda_client = boto3.client("lambda")

def materialize(
Expand Down Expand Up @@ -189,6 +189,9 @@ def _materialize_one(
)

paths = offline_job.to_remote_storage()
max_workers = len(paths) if len(paths) <= 20 else 20
executor = ThreadPoolExecutor(max_workers=max_workers)
futures = []

for path in paths:
payload = {
Expand All @@ -199,18 +202,34 @@ def _materialize_one(
}
# Invoke a lambda to materialize this file.

response = self.lambda_client.invoke(
FunctionName=self.lambda_name,
InvocationType="RequestResponse",
Payload=json.dumps(payload),
logger.info("Invoking materialization for %s", path)
futures.append(
executor.submit(
self.lambda_client.invoke,
FunctionName=self.lambda_name,
InvocationType="RequestResponse",
Payload=json.dumps(payload),
)
)

done, not_done = wait(futures)
logger.info("Done: %s Not Done: %s", done, not_done)
for f in done:
response = f.result()
output = json.loads(response["Payload"].read())

logger.info(
f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}"
)
print(
f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}"
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
f"rows written: {output['written_rows']}"
)

for f in not_done:
response = f.result()
logger.error(f"Ingestion failed: {response}")

return LambdaMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
job_id=job_id,
status=MaterializationJobStatus.SUCCEEDED
if not not_done
else MaterializationJobStatus.ERROR,
)
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def teardown_infra(
set_usage_attribute("provider", self.__class__.__name__)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)
if self.batch_engine:
self.batch_engine.teardown_infra(project, tables, entities)

def online_write_batch(
self,
Expand Down