From d07896c1e0e33b5d91e413ebad53c163f812e9d6 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Tue, 27 Jan 2026 10:32:07 +0000 Subject: [PATCH 1/8] feat: adjust ray offline store to support abfs(s) ADLS Azure Storage Signed-off-by: Jonas Bauer --- .../offline_stores/contrib/ray_offline_store/ray.py | 10 +++++++--- test_registry | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) create mode 100644 test_registry diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index b7b3619c15a..f782cc98aa2 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -1160,13 +1160,17 @@ def persist( f"Ray offline store only supports SavedDatasetFileStorage, got {type(storage)}" ) destination_path = storage.file_options.uri - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith( + ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") + ): if not allow_overwrite and os.path.exists(destination_path): raise SavedDatasetLocationAlreadyExists(location=destination_path) try: ray_ds = self._get_ray_dataset() - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith( + ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") + ): os.makedirs(os.path.dirname(destination_path), exist_ok=True) ray_ds.write_parquet(destination_path) @@ -1959,7 +1963,7 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: path_obj = Path(resolved_path) if path_obj.suffix == ".parquet": path_obj = path_obj.with_suffix("") - if not absolute_path.startswith(("s3://", "gs://")): + if not absolute_path.startswith(("s3://", "gs://", "abfs://", "abfss://")): path_obj.mkdir(parents=True, exist_ok=True) ds.write_parquet(str(path_obj)) except Exception as e: diff --git a/test_registry b/test_registry new file mode 100644 index 00000000000..b70f8717e4f --- /dev/null +++ b/test_registry @@ -0,0 +1 @@ +1"$da30d864-b635-49a1-aa21-43386ad9101b* Ô©âËð‚îó \ No newline at end of file From 3de9dcda4646106e22f4d5f68a8378b657c2f882 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Tue, 27 Jan 2026 10:57:05 +0000 Subject: [PATCH 2/8] Delete test_registry file. Signed-off-by: Jonas Bauer --- test_registry | 1 - 1 file changed, 1 deletion(-) delete mode 100644 test_registry diff --git a/test_registry b/test_registry deleted file mode 100644 index b70f8717e4f..00000000000 --- a/test_registry +++ /dev/null @@ -1 +0,0 @@ -1"$da30d864-b635-49a1-aa21-43386ad9101b* Ô©âËð‚îó \ No newline at end of file From a5dc52226124eb73e3f0e555baf5b0ebf7921519 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Tue, 27 Jan 2026 14:21:35 +0000 Subject: [PATCH 3/8] Feedback from PR Review. Signed-off-by: Jonas Bauer --- docs/reference/offline-stores/ray.md | 6 +++++- .../contrib/ray_offline_store/ray.py | 16 +++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/reference/offline-stores/ray.md b/docs/reference/offline-stores/ray.md index 89040f05c94..ef5021bf4d2 100644 --- a/docs/reference/offline-stores/ray.md +++ b/docs/reference/offline-stores/ray.md @@ -27,7 +27,7 @@ The template provides a complete working example with sample datasets and demons The Ray offline store provides: - Ray-based data reading from file sources (Parquet, CSV, etc.) - Support for local, remote, and KubeRay (Kubernetes-managed) clusters -- Integration with various storage backends (local files, S3, GCS, HDFS) +- Integration with various storage backends (local files, S3, GCS, HDFS, Azure Blob) - Efficient data filtering and column selection - Timestamp-based data processing with timezone awareness - Enterprise-ready KubeRay cluster support via CodeFlare SDK @@ -463,6 +463,10 @@ job.persist(gcs_storage, allow_overwrite=True) # HDFS hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/driver_features.parquet") job.persist(hdfs_storage, allow_overwrite=True) + +# Azure Blob Storage / Azure Data Lake Storage Gen2 +az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet") +job.persist(az_storage, allow_overwrite=True) ``` ### Using Ray Cluster diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index f782cc98aa2..bea20acd2e8 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -62,6 +62,12 @@ from feast.utils import _get_column_names, make_df_tzaware, make_tzaware logger = logging.getLogger(__name__) +# Remote storage URI schemes supported by the Ray offline store +# S3: Amazon S3 +# GCS: Google Cloud Storage +# HDFS: Hadoop Distributed File System +# Azure: Azure Storage Gen2 +REMOTE_STORAGE_SCHEMES = ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") def _get_data_schema_info( @@ -1160,17 +1166,13 @@ def persist( f"Ray offline store only supports SavedDatasetFileStorage, got {type(storage)}" ) destination_path = storage.file_options.uri - if not destination_path.startswith( - ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") - ): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): if not allow_overwrite and os.path.exists(destination_path): raise SavedDatasetLocationAlreadyExists(location=destination_path) try: ray_ds = self._get_ray_dataset() - if not destination_path.startswith( - ("s3://", "gs://", "hdfs://", "abfs://", "abfss://") - ): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): os.makedirs(os.path.dirname(destination_path), exist_ok=True) ray_ds.write_parquet(destination_path) @@ -1963,7 +1965,7 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: path_obj = Path(resolved_path) if path_obj.suffix == ".parquet": path_obj = path_obj.with_suffix("") - if not absolute_path.startswith(("s3://", "gs://", "abfs://", "abfss://")): + if not absolute_path.startswith(REMOTE_STORAGE_SCHEMES): path_obj.mkdir(parents=True, exist_ok=True) ds.write_parquet(str(path_obj)) except Exception as e: From 9c140c71386789c5f775e764d617e27f550458ab Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Tue, 27 Jan 2026 14:37:14 +0000 Subject: [PATCH 4/8] Added additional comment about env var. Signed-off-by: Jonas Bauer --- docs/reference/offline-stores/ray.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/reference/offline-stores/ray.md b/docs/reference/offline-stores/ray.md index ef5021bf4d2..063ffcb209a 100644 --- a/docs/reference/offline-stores/ray.md +++ b/docs/reference/offline-stores/ray.md @@ -465,6 +465,7 @@ hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/drive job.persist(hdfs_storage, allow_overwrite=True) # Azure Blob Storage / Azure Data Lake Storage Gen2 +# By setting AZURE_STORAGE_ANON=False it uses DefaultAzureCredential az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet") job.persist(az_storage, allow_overwrite=True) ``` From f57b172be2340f34220d469de594dc6a8002229b Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Wed, 28 Jan 2026 08:00:07 +0000 Subject: [PATCH 5/8] Fixed potential issue from PR. Signed-off-by: Jonas Bauer --- sdk/python/feast/infra/compute_engines/ray/job.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/compute_engines/ray/job.py b/sdk/python/feast/infra/compute_engines/ray/job.py index 06eea4e5d88..e818220baff 100644 --- a/sdk/python/feast/infra/compute_engines/ray/job.py +++ b/sdk/python/feast/infra/compute_engines/ray/job.py @@ -18,6 +18,9 @@ from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.plan import ExecutionPlan from feast.infra.compute_engines.dag.value import DAGValue +from feast.infra.offline_stores.contrib.ray_offline_store.ray import ( + REMOTE_STORAGE_SCHEMES, +) from feast.infra.offline_stores.file_source import SavedDatasetFileStorage from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata from feast.infra.ray_initializer import get_ray_wrapper @@ -205,7 +208,7 @@ def persist( destination_path = storage.file_options.uri # Check if destination already exists - if not destination_path.startswith(("s3://", "gs://", "hdfs://")): + if not destination_path.startswith(REMOTE_STORAGE_SCHEMES): import os if not allow_overwrite and os.path.exists(destination_path): From ce9886634ca3438296921c5b76d926b600534280 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Thu, 29 Jan 2026 15:43:33 +0000 Subject: [PATCH 6/8] Fixed found issue by Devin from PR. Signed-off-by: Jonas Bauer --- .../offline_stores/contrib/ray_offline_store/ray.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index bea20acd2e8..1d3f16d1783 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -1962,12 +1962,15 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: filesystem, resolved_path = FileSource.create_filesystem_and_path( absolute_path, destination.s3_endpoint_override ) - path_obj = Path(resolved_path) - if path_obj.suffix == ".parquet": - path_obj = path_obj.with_suffix("") - if not absolute_path.startswith(REMOTE_STORAGE_SCHEMES): + if absolute_path.startswith(REMOTE_STORAGE_SCHEMES): + write_path = absolute_path.rstrip('.parquet') if absolute_path.endswith('.parquet') else absolute_path + else: + path_obj = Path(resolved_path) + if path_obj.suffix == '.parquet': + path_obj = path_obj.with_suffix('') path_obj.mkdir(parents=True, exist_ok=True) - ds.write_parquet(str(path_obj)) + write_path = str(path_obj) + ds.write_parquet(write_path) except Exception as e: raise RuntimeError(f"Failed to write logged features: {e}") From 7f1d005fd826d1cc6abd015a3a6f78af50772d45 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Thu, 29 Jan 2026 15:45:57 +0000 Subject: [PATCH 7/8] Apply formatting Signed-off-by: Jonas Bauer --- .../offline_stores/contrib/ray_offline_store/ray.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index 1d3f16d1783..82cad954c4b 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -1963,11 +1963,15 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: absolute_path, destination.s3_endpoint_override ) if absolute_path.startswith(REMOTE_STORAGE_SCHEMES): - write_path = absolute_path.rstrip('.parquet') if absolute_path.endswith('.parquet') else absolute_path + write_path = ( + absolute_path.rstrip(".parquet") + if absolute_path.endswith(".parquet") + else absolute_path + ) else: path_obj = Path(resolved_path) - if path_obj.suffix == '.parquet': - path_obj = path_obj.with_suffix('') + if path_obj.suffix == ".parquet": + path_obj = path_obj.with_suffix("") path_obj.mkdir(parents=True, exist_ok=True) write_path = str(path_obj) ds.write_parquet(write_path) From 74967bae79ea9d2059c6b0fc4b798afd57dbdac3 Mon Sep 17 00:00:00 2001 From: Jonas Bauer Date: Thu, 29 Jan 2026 15:54:12 +0000 Subject: [PATCH 8/8] Fixed found issue by Devin from PR. Signed-off-by: Jonas Bauer --- .../feast/infra/offline_stores/contrib/ray_offline_store/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index 82cad954c4b..bc7c60733b4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -1964,7 +1964,7 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame: ) if absolute_path.startswith(REMOTE_STORAGE_SCHEMES): write_path = ( - absolute_path.rstrip(".parquet") + absolute_path[:-8] if absolute_path.endswith(".parquet") else absolute_path )