Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/reference/offline-stores/ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The template provides a complete working example with sample datasets and demons
The Ray offline store provides:
- Ray-based data reading from file sources (Parquet, CSV, etc.)
- Support for local, remote, and KubeRay (Kubernetes-managed) clusters
- Integration with various storage backends (local files, S3, GCS, HDFS)
- Integration with various storage backends (local files, S3, GCS, HDFS, Azure Blob)
- Efficient data filtering and column selection
- Timestamp-based data processing with timezone awareness
- Enterprise-ready KubeRay cluster support via CodeFlare SDK
Expand Down Expand Up @@ -463,6 +463,11 @@ job.persist(gcs_storage, allow_overwrite=True)
# HDFS
hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/driver_features.parquet")
job.persist(hdfs_storage, allow_overwrite=True)

# Azure Blob Storage / Azure Data Lake Storage Gen2
# By setting AZURE_STORAGE_ANON=False it uses DefaultAzureCredential
az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet")
job.persist(az_storage, allow_overwrite=True)
```

### Using Ray Cluster
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/compute_engines/ray/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from feast.infra.compute_engines.dag.model import DAGFormat
from feast.infra.compute_engines.dag.plan import ExecutionPlan
from feast.infra.compute_engines.dag.value import DAGValue
from feast.infra.offline_stores.contrib.ray_offline_store.ray import (
REMOTE_STORAGE_SCHEMES,
)
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata
from feast.infra.ray_initializer import get_ray_wrapper
Expand Down Expand Up @@ -205,7 +208,7 @@ def persist(
destination_path = storage.file_options.uri

# Check if destination already exists
if not destination_path.startswith(("s3://", "gs://", "hdfs://")):
if not destination_path.startswith(REMOTE_STORAGE_SCHEMES):
import os

if not allow_overwrite and os.path.exists(destination_path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
from feast.utils import _get_column_names, make_df_tzaware, make_tzaware

logger = logging.getLogger(__name__)
# Remote storage URI schemes supported by the Ray offline store
# S3: Amazon S3
# GCS: Google Cloud Storage
# HDFS: Hadoop Distributed File System
# Azure: Azure Storage Gen2
REMOTE_STORAGE_SCHEMES = ("s3://", "gs://", "hdfs://", "abfs://", "abfss://")


def _get_data_schema_info(
Expand Down Expand Up @@ -1160,13 +1166,13 @@ def persist(
f"Ray offline store only supports SavedDatasetFileStorage, got {type(storage)}"
)
destination_path = storage.file_options.uri
if not destination_path.startswith(("s3://", "gs://", "hdfs://")):
if not destination_path.startswith(REMOTE_STORAGE_SCHEMES):
if not allow_overwrite and os.path.exists(destination_path):
raise SavedDatasetLocationAlreadyExists(location=destination_path)
try:
ray_ds = self._get_ray_dataset()

if not destination_path.startswith(("s3://", "gs://", "hdfs://")):
if not destination_path.startswith(REMOTE_STORAGE_SCHEMES):
os.makedirs(os.path.dirname(destination_path), exist_ok=True)

ray_ds.write_parquet(destination_path)
Expand Down Expand Up @@ -1956,12 +1962,19 @@ def normalize_timestamps(batch: pd.DataFrame) -> pd.DataFrame:
filesystem, resolved_path = FileSource.create_filesystem_and_path(
absolute_path, destination.s3_endpoint_override
)
path_obj = Path(resolved_path)
if path_obj.suffix == ".parquet":
path_obj = path_obj.with_suffix("")
if not absolute_path.startswith(("s3://", "gs://")):
if absolute_path.startswith(REMOTE_STORAGE_SCHEMES):
write_path = (
absolute_path[:-8]
if absolute_path.endswith(".parquet")
else absolute_path
)
else:
path_obj = Path(resolved_path)
if path_obj.suffix == ".parquet":
path_obj = path_obj.with_suffix("")
path_obj.mkdir(parents=True, exist_ok=True)
ds.write_parquet(str(path_obj))
write_path = str(path_obj)
ds.write_parquet(write_path)
except Exception as e:
raise RuntimeError(f"Failed to write logged features: {e}")

Expand Down
Loading