Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
develop Run large materializations in batches of pods
Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
  • Loading branch information
james-crabtree-sp committed Oct 23, 2023
commit 65c039adde784ab8a16e218418e7af9836c9d469
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
config: RepoConfig,
feature_view: FeatureView,
paths: List[str],
worker_index: int
worker_index: int,
):
self.config = config
self.feature_store = FeatureStore(config=config)
Expand All @@ -33,13 +33,7 @@ def __init__(
self._run_dataflow()

def process_path(self, path):
<<<<<<< HEAD
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
=======
fs = s3fs.S3FileSystem()
logger.info(f"Processing path {path}")
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
>>>>>>> 15c523a2 (SAASMLOPS-809 fix bytewax workers so they only process a single file (#6))
batches = []
for fragment in dataset.fragments:
for batch in fragment.to_table().to_batches():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import sys
import logging
import uuid
from datetime import datetime
from time import sleep
from typing import Callable, List, Literal, Sequence, Union

import yaml
from kubernetes import client
from kubernetes import config as k8s_config
from kubernetes import utils
from kubernetes.utils import FailToCreateError
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import FailToCreateError
from pydantic import StrictStr
from tqdm import tqdm
import logging

from feast import FeatureView, RepoConfig
from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJobStatus,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
Expand All @@ -28,16 +28,11 @@
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names, get_default_yaml_file_path
from time import sleep
import signal

from .bytewax_materialization_job import BytewaxMaterializationJob

logger = logging.getLogger(__name__)

def term_handler(signum, frame):
logger.info("Received SIGTERM. Shutting down")
sys.exit(0)

class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for Bytewax"""
Expand Down Expand Up @@ -90,6 +85,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
active_deadline_seconds: int = 86400
""" (optional) Maximum amount of time a materialization job is allowed to run"""

job_batch_size: int = 100
Comment thread
sudohainguyen marked this conversation as resolved.
""" (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""

print_pod_logs_on_failure: bool = True
"""(optional) Print pod logs on job failure. Only applies to synchronous materialization"""


class BytewaxMaterializationEngine(BatchMaterializationEngine):
def __init__(
Expand Down Expand Up @@ -119,8 +120,6 @@ def __init__(
self.batch_engine_config = repo_config.batch_engine
self.namespace = self.batch_engine_config.namespace

signal.signal(signal.SIGTERM, term_handler)

def update(
self,
project: str,
Expand Down Expand Up @@ -195,33 +194,85 @@ def _materialize_one(
)

paths = offline_job.to_remote_storage()
if self.batch_engine_config.synchronous:
offset = 0
total_pods = len(paths)
batch_size = self.batch_engine_config.job_batch_size
if batch_size < 1:
raise ValueError("job_batch_size must be a value greater than 0")

while True:
next_offset = min(offset + batch_size, total_pods)
job = self._await_path_materialization(
paths[offset:next_offset],
feature_view,
offset,
next_offset,
total_pods,
)
offset += batch_size
if offset >= total_pods:
break
else:
job_id = str(uuid.uuid4())
job = self._create_kubernetes_job(job_id, paths, feature_view)

return job

def _await_path_materialization(
self, paths, feature_view, batch_start, batch_end, total_pods
):
job_id = str(uuid.uuid4())
job = self._create_kubernetes_job(job_id, paths, feature_view)
if self.batch_engine_config.synchronous:

try:
while job.status() in (
MaterializationJobStatus.WAITING,
MaterializationJobStatus.RUNNING,
):
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) running..."
)
sleep(30)
Comment thread
sudohainguyen marked this conversation as resolved.
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) complete with status {job.status()}"
)
except BaseException as e:
if self.batch_engine_config.print_pod_logs_on_failure:
self._print_pod_logs(job.job_id(), feature_view, batch_start)

logger.info(f"Deleting job {job.job_id()}")
try:
while job.status() in (MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING):
logger.info(f"{feature_view.name} materialization still running...")
sleep(30)
logger.info(f"{feature_view.name} materialization complete with status {job.status()}")
except BaseException as e:
logger.info(f"Killing job {job.job_id()}")
try:
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as de:
logger.warning(f"Could not delete job due to API Error: {de.body}")
raise e
self._print_pod_logs(job.job_id(), feature_view)
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as ae:
logger.warning(f"Could not delete job due to API Error: {ae.body}")
raise e
finally:
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
try:
self.v1.delete_namespaced_config_map(
self._configmap_name(job_id), self.namespace
)
except ApiException as ae:
logger.warning(
f"Could not delete configmap due to API Error: {ae.body}"
)

return job

def _print_pod_logs(self, job_id, feature_view):
def _print_pod_logs(self, job_id, feature_view, offset=0):
pods_list = self.v1.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {i}")
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
try:
logger.info(self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace))
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
)
except ApiException as e:
logger.warning(f"Could not retrieve pod logs due to: {e.body}")

Expand Down Expand Up @@ -259,7 +310,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
"labels": {**labels, **self.batch_engine_config.labels},
},
"data": {
Expand All @@ -272,7 +323,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_job_definition(self, job_id, namespace, pods, env):
def _configmap_name(self, job_id):
return f"feast-{job_id}"

def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
"""Create a kubernetes job definition."""
job_env = [
{"name": "RUST_BACKTRACE", "value": "full"},
Expand Down Expand Up @@ -375,7 +429,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand Down Expand Up @@ -406,7 +460,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{"mountPath": "/etc/bytewax", "name": "hostfile"},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand All @@ -416,13 +470,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{
"configMap": {
"defaultMode": 420,
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
"name": "python-files",
},
{
"configMap": {"name": f"feast-{job_id}"},
"name": f"feast-{job_id}",
"configMap": {"name": self._configmap_name(job_id)},
"name": self._configmap_name(job_id),
},
],
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

import yaml

from feast import FeatureStore, RepoConfig
Expand All @@ -20,5 +21,5 @@
config,
store.get_feature_view(bytewax_config["feature_view"]),
bytewax_config["paths"],
int(os.environ["JOB_COMPLETION_INDEX"])
int(os.environ["JOB_COMPLETION_INDEX"]),
)