diff --git a/docs/reference/offline-stores/ray.md b/docs/reference/offline-stores/ray.md index 89040f05c94..063ffcb209a 100644 --- a/docs/reference/offline-stores/ray.md +++ b/docs/reference/offline-stores/ray.md @@ -27,7 +27,7 @@ The template provides a complete working example with sample datasets and demons The Ray offline store provides: - Ray-based data reading from file sources (Parquet, CSV, etc.) - Support for local, remote, and KubeRay (Kubernetes-managed) clusters -- Integration with various storage backends (local files, S3, GCS, HDFS) +- Integration with various storage backends (local files, S3, GCS, HDFS, Azure Blob) - Efficient data filtering and column selection - Timestamp-based data processing with timezone awareness - Enterprise-ready KubeRay cluster support via CodeFlare SDK @@ -463,6 +463,11 @@ job.persist(gcs_storage, allow_overwrite=True) # HDFS hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/driver_features.parquet") job.persist(hdfs_storage, allow_overwrite=True) + +# Azure Blob Storage / Azure Data Lake Storage Gen2 +# By setting AZURE_STORAGE_ANON=False it uses DefaultAzureCredential +az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet") +job.persist(az_storage, allow_overwrite=True) ``` ### Using Ray Cluster diff --git a/sdk/python/feast/infra/compute_engines/ray/job.py b/sdk/python/feast/infra/compute_engines/ray/job.py index 06eea4e5d88..e818220baff 100644 --- a/sdk/python/feast/infra/compute_engines/ray/job.py +++ b/sdk/python/feast/infra/compute_engines/ray/job.py @@ -18,6 +18,9 @@ from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.plan import ExecutionPlan from feast.infra.compute_engines.dag.value import DAGValue +from feast.infra.offline_stores.contrib.ray_offline_store.ray import ( + REMOTE_STORAGE_SCHEMES, +) from feast.infra.offline_stores.file_source import SavedDatasetFileStorage from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata from feast.infra.ray_initializer import get_ray_wrapper @@ -205,7 +208,7 @@ def persist( destination_path = storage.file_options.uri # Check if destination already exists - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): import os if not allow_overwrite and os.path.exists(destination_path): diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index b7b3619c15a..bc7c60733b4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -62,6 +62,12 @@ from feast.utils import _get_column_names, make_df_tzaware, make_tzaware logger = logging.getLogger(__name__) +# Remote storage URI schemes supported by the Ray offline store +# S3: Amazon S3 +# GCS: Google Cloud Storage +# HDFS: Hadoop Distributed File System +# Azure: Azure Storage Gen2 +REMOTE_STORAGE_SCHEMES = ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") def _get_data_schema_info( @@ -1160,13 +1166,13 @@ def persist( f"Ray offline store only supports SavedDatasetFileStorage, got {type(storage)}" ) destination_path = storage.file_options.uri - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): if not allow_overwrite and os.path.exists(destination_path): raise SavedDatasetLocationAlreadyExists(location=destination_path) try: ray_ds = self._get_ray_dataset() - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): os.makedirs(os.path.dirname(destination_path), exist_ok=True) ray_ds.write_parquet(destination_path) @@ -1956,12 +1962,19 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: filesystem, resolved_path = FileSource.create_filesystem_and_path( absolute_path, destination.s3_endpoint_override ) - path_obj = Path(resolved_path) - if path_obj.suffix == ".parquet": - path_obj = path_obj.with_suffix("") - if not absolute_path.startswith(("s3://", "gs://")): + if absolute_path.startswith(REMOTE_STORAGE_SCHEMES): + write_path = ( + absolute_path[:-8] + if absolute_path.endswith(".parquet") + else absolute_path + ) + else: + path_obj = Path(resolved_path) + if path_obj.suffix == ".parquet": + path_obj = path_obj.with_suffix("") path_obj.mkdir(parents=True, exist_ok=True) - ds.write_parquet(str(path_obj)) + write_path = str(path_obj) + ds.write_parquet(write_path) except Exception as e: raise RuntimeError(f"Failed to write logged features: {e}")