Skip to content

spark materialize engine not working on yarn #3331

@polaris707

Description

@polaris707

Expected Behavior

spark materialize engine working on yarn(not local[*])

Current Behavior

not working... since spark on yarn cannot share files without a common file system(like hdfs, cifs, ...)

Steps to reproduce

set spark.master config to yarn

Specifications

  • Version: 0.26.0
  • Platform: centos 7
  • Subsystem: python 3.9 / spark 3.3.0

Possible Solution

AS-IS

class _SparkSerializedArtifacts:
"""Class to assist with serializing unpicklable artifacts to the spark workers"""
feature_view_proto: str
repo_config_file: str
@classmethod
def serialize(cls, feature_view, repo_config):
# serialize to proto
feature_view_proto = feature_view.to_proto().SerializeToString()
# serialize repo_config to disk. Will be used to instantiate the online store
repo_config_file = tempfile.NamedTemporaryFile(delete=False).name
with open(repo_config_file, "wb") as f:
dill.dump(repo_config, f)
return _SparkSerializedArtifacts(
feature_view_proto=feature_view_proto, repo_config_file=repo_config_file
)
def unserialize(self):
# unserialize
proto = FeatureViewProto()
proto.ParseFromString(self.feature_view_proto)
feature_view = FeatureView.from_proto(proto)
# load
with open(self.repo_config_file, "rb") as f:
repo_config = dill.load(f)
provider = PassthroughProvider(repo_config)
online_store = provider.online_store
return feature_view, online_store, repo_config

TO-BE

class _SparkSerializedArtifacts:
    """Class to assist with serializing unpicklable artifacts to the spark workers"""

    feature_view_proto: str
    repo_config_byte: str

    @classmethod
    def serialize(cls, feature_view, repo_config):

        # serialize to proto
        feature_view_proto = feature_view.to_proto().SerializeToString()

        # serialize repo_config to disk. Will be used to instantiate the online store
        repo_config_byte = dill.dumps(repo_config)

        return _SparkSerializedArtifacts(
            feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte
        )

    def unserialize(self):
        # unserialize
        proto = FeatureViewProto()
        proto.ParseFromString(self.feature_view_proto)
        feature_view = FeatureView.from_proto(proto)

        # load
        repo_config = dill.load(self.repo_config_byte)

        provider = PassthroughProvider(repo_config) 
        online_store = provider.online_store 
        return feature_view, online_store, repo_config

use dill.dumps and dill.loads instead of dill.dump and dill.load

string types can be serialized and deserialized using pyspark seializer. So you don't need to pass the repo_config path.

this solution works for me.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions