Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Feedback from PR Review.
Signed-off-by: Jonas Bauer <jbauer@easy2parts.com>
  • Loading branch information
p3s-jbauer authored and ntkathole committed Jan 30, 2026
commit a5dc52226124eb73e3f0e555baf5b0ebf7921519
6 changes: 5 additions & 1 deletion docs/reference/offline-stores/ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The template provides a complete working example with sample datasets and demons
The Ray offline store provides:
- Ray-based data reading from file sources (Parquet, CSV, etc.)
- Support for local, remote, and KubeRay (Kubernetes-managed) clusters
- Integration with various storage backends (local files, S3, GCS, HDFS)
- Integration with various storage backends (local files, S3, GCS, HDFS, Azure Blob)
- Efficient data filtering and column selection
- Timestamp-based data processing with timezone awareness
- Enterprise-ready KubeRay cluster support via CodeFlare SDK
Expand Down Expand Up @@ -463,6 +463,10 @@ job.persist(gcs_storage, allow_overwrite=True)
# HDFS
hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/driver_features.parquet")
job.persist(hdfs_storage, allow_overwrite=True)

# Azure Blob Storage / Azure Data Lake Storage Gen2
az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet")
job.persist(az_storage, allow_overwrite=True)
```

### Using Ray Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
from feast.utils import _get_column_names, make_df_tzaware, make_tzaware

logger = logging.getLogger(__name__)
# Remote storage URI schemes supported by the Ray offline store
# S3: Amazon S3
# GCS: Google Cloud Storage
# HDFS: Hadoop Distributed File System
# Azure: Azure Storage Gen2
REMOTE_STORAGE_SCHEMES = ("s3://", "gs://", "hdfs://", "abfs://", "abfss://")


def _get_data_schema_info(
Expand Down Expand Up @@ -1160,17 +1166,13 @@ def persist(
f"Ray offline store only supports SavedDatasetFileStorage, got {type(storage)}"
)
destination_path = storage.file_options.uri
if not destination_path.startswith(
("s3://", "gs://", "hdfs://", "abfs://", "abfss://")
):
if not destination_path.startswith(REMOTE_STORAGE_SCHEMES):
if not allow_overwrite and os.path.exists(destination_path):
raise SavedDatasetLocationAlreadyExists(location=destination_path)
try:
ray_ds = self._get_ray_dataset()

if not destination_path.startswith(
("s3://", "gs://", "hdfs://", "abfs://", "abfss://")
):
if not destination_path.startswith(REMOTE_STORAGE_SCHEMES):
os.makedirs(os.path.dirname(destination_path), exist_ok=True)

ray_ds.write_parquet(destination_path)
Expand Down Expand Up @@ -1963,7 +1965,7 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame:
path_obj = Path(resolved_path)
if path_obj.suffix == ".parquet":
path_obj = path_obj.with_suffix("")
if not absolute_path.startswith(("s3://", "gs://", "abfs://", "abfss://")):
if not absolute_path.startswith(REMOTE_STORAGE_SCHEMES):
path_obj.mkdir(parents=True, exist_ok=True)
ds.write_parquet(str(path_obj))
except Exception as e:
Expand Down