Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Updates Bytewax materialization engine.
- Since no data is exchanged between workers, remove k8s service
  definition.
- Update bytewax dataflow to only use one worker.

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
  • Loading branch information
whoahbot committed Nov 30, 2022
commit 5290b0fd5ddef5252f02f82f2cf3f55473c48363
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,5 @@ def _run_dataflow(self):
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow,
ManualInputConfig(self.input_builder),
self.output_builder,
**proc_env(),
flow, ManualInputConfig(self.input_builder), self.output_builder, [], 0
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.utils import _get_column_names, get_default_yaml_file_path

from .bytewax_materialization_job import BytewaxMaterializationJob

Expand Down Expand Up @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
# Create a k8s configmap with information needed by bytewax
self._create_configuration_map(job_id, paths, feature_view, self.namespace)

# Create the k8s service definition, used for bytewax communication
self._create_service_definition(job_id, self.namespace)

# Create the k8s job definition
self._create_job_definition(
job_id,
Expand All @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"""Create a Kubernetes configmap for this job"""

feature_store_configuration = yaml.dump(
yaml.safe_load(
self.repo_config.json(
exclude={"repo_path"},
exclude_unset=True,
)
)
)
repo_path = self.repo_config.repo_path
assert repo_path
feature_store_path = get_default_yaml_file_path(repo_path)
feature_store_configuration = feature_store_path.read_text()

materialization_config = yaml.dump(
{"paths": paths, "feature_view": feature_view.name}
Expand All @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_service_definition(self, job_id, namespace):
"""Creates a kubernetes service definition.

This service definition is created to allow bytewax workers
to communicate with each other.
"""
service_definition = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"dataflow-{job_id}",
"namespace": namespace,
},
"spec": {
"clusterIP": "None",
"clusterIPs": ["None"],
"internalTrafficPolicy": "Cluster",
"ipFamilies": ["IPv4"],
"ipFamilyPolicy": "SingleStack",
"ports": [
{
"name": "worker",
"port": 9999,
"protocol": "TCP",
"targetPort": 9999,
}
],
"selector": {"job-name": f"dataflow-{job_id}"},
"sessionAffinity": "None",
"type": "ClusterIP",
},
}

utils.create_from_dict(self.k8s_client, service_definition)

def _create_job_definition(self, job_id, namespace, pods, env):
"""Create a kubernetes job definition."""
job_env = [
Expand Down Expand Up @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
"value": "false",
},
{
"name": "BYTEWAX_HOSTFILE_PATH",
"value": "/etc/bytewax/hostfile.txt",
},
{
"name": "BYTEWAX_STATEFULSET_NAME",
"value": f"dataflow-{job_id}",
Expand All @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"subdomain": f"dataflow-{job_id}",
"initContainers": [
{
"command": [
"sh",
"-c",
f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone',
],
"env": [
{
"name": "BYTEWAX_REPLICAS",
Expand Down