Skip to content

Commit 69af21f

Browse files
fix: Prevent overwriting existing file during persist (feast-dev#3088)
* Prevent overwriting existing file for file offline store Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add `from_data_source` method Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Remove unnecessary changes Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Format Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Make overwriting optional Signed-off-by: Felix Wang <wangfelix98@gmail.com> Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent f3b522b commit 69af21f

16 files changed

Lines changed: 165 additions & 22 deletions

File tree

sdk/python/feast/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,11 @@ def __init__(
204204
)
205205

206206

207+
class SavedDatasetLocationAlreadyExists(Exception):
208+
def __init__(self, location: str):
209+
super().__init__(f"Saved dataset location {location} already exists.")
210+
211+
207212
class FeastOfflineStoreInvalidName(Exception):
208213
def __init__(self, offline_store_class_name: str):
209214
super().__init__(

sdk/python/feast/feature_store.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,7 @@ def create_saved_dataset(
11551155
storage: SavedDatasetStorage,
11561156
tags: Optional[Dict[str, str]] = None,
11571157
feature_service: Optional[FeatureService] = None,
1158+
allow_overwrite: bool = False,
11581159
) -> SavedDataset:
11591160
"""
11601161
Execute provided retrieval job and persist its outcome in given storage.
@@ -1163,6 +1164,14 @@ def create_saved_dataset(
11631164
Name for the saved dataset should be unique within project, since it's possible to overwrite previously stored dataset
11641165
with the same name.
11651166
1167+
Args:
1168+
from_: The retrieval job whose result should be persisted.
1169+
name: The name of the saved dataset.
1170+
storage: The saved dataset storage object indicating where the result should be persisted.
1171+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
1172+
feature_service (optional): The feature service that should be associated with this saved dataset.
1173+
allow_overwrite (optional): If True, the persisted result can overwrite an existing table or file.
1174+
11661175
Returns:
11671176
SavedDataset object with attached RetrievalJob
11681177
@@ -1195,7 +1204,7 @@ def create_saved_dataset(
11951204
dataset.min_event_timestamp = from_.metadata.min_event_timestamp
11961205
dataset.max_event_timestamp = from_.metadata.max_event_timestamp
11971206

1198-
from_.persist(storage)
1207+
from_.persist(storage=storage, allow_overwrite=allow_overwrite)
11991208

12001209
dataset = dataset.with_retrieval_job(
12011210
self._get_provider().retrieve_saved_dataset(

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ def _execute_query(
493493
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
494494
return bq_job
495495

496-
def persist(self, storage: SavedDatasetStorage):
496+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
497497
assert isinstance(storage, SavedDatasetBigQueryStorage)
498498

499499
self.to_bigquery(

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def _to_arrow_internal(self) -> pa.Table:
402402
def metadata(self) -> Optional[RetrievalMetadata]:
403403
return self._metadata
404404

405-
def persist(self, storage: SavedDatasetStorage):
405+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
406406
assert isinstance(storage, SavedDatasetAthenaStorage)
407407
self.to_athena(table_name=storage.athena_options.table)
408408

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def _to_arrow_internal(self) -> pa.Table:
297297
def metadata(self) -> Optional[RetrievalMetadata]:
298298
return self._metadata
299299

300-
def persist(self, storage: SavedDatasetStorage):
300+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
301301
assert isinstance(storage, SavedDatasetPostgreSQLStorage)
302302

303303
df_to_postgres_table(

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ def _to_arrow_internal(self) -> pyarrow.Table:
337337
self.to_spark_df().write.parquet(temp_dir, mode="overwrite")
338338
return pq.read_table(temp_dir)
339339

340-
def persist(self, storage: SavedDatasetStorage):
340+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
341341
"""
342342
Run the retrieval and persist the results in the same offline store used for read.
343343
Please note the persisting is done only within the scope of the spark session.

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def to_trino(
126126
self._client.execute_query(query_text=query)
127127
return destination_table
128128

129-
def persist(self, storage: SavedDatasetStorage):
129+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
130130
"""
131131
Run the retrieval and persist the results in the same offline store used for read.
132132
"""

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import uuid
23
from datetime import datetime
34
from pathlib import Path
@@ -11,13 +12,16 @@
1112
import pytz
1213
from pydantic.typing import Literal
1314

14-
from feast import FileSource, OnDemandFeatureView
1515
from feast.data_source import DataSource
16-
from feast.errors import FeastJoinKeysDuringMaterialization
16+
from feast.errors import (
17+
FeastJoinKeysDuringMaterialization,
18+
SavedDatasetLocationAlreadyExists,
19+
)
1720
from feast.feature_logging import LoggingConfig, LoggingSource
1821
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
1922
from feast.infra.offline_stores.file_source import (
2023
FileLoggingDestination,
24+
FileSource,
2125
SavedDatasetFileStorage,
2226
)
2327
from feast.infra.offline_stores.offline_store import (
@@ -30,6 +34,7 @@
3034
get_pyarrow_schema_from_batch_source,
3135
)
3236
from feast.infra.registry.base_registry import BaseRegistry
37+
from feast.on_demand_feature_view import OnDemandFeatureView
3338
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3439
from feast.saved_dataset import SavedDatasetStorage
3540
from feast.usage import log_exceptions_and_usage
@@ -83,8 +88,13 @@ def _to_arrow_internal(self):
8388
df = self.evaluation_function().compute()
8489
return pyarrow.Table.from_pandas(df)
8590

86-
def persist(self, storage: SavedDatasetStorage):
91+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
8792
assert isinstance(storage, SavedDatasetFileStorage)
93+
94+
# Check if the specified location already exists.
95+
if not allow_overwrite and os.path.exists(storage.file_options.uri):
96+
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)
97+
8898
filesystem, path = FileSource.create_filesystem_and_path(
8999
storage.file_options.uri,
90100
storage.file_options.s3_endpoint_override,

sdk/python/feast/infra/offline_stores/file_source.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,20 @@ def __eq__(self, other):
9696
)
9797

9898
@property
99-
def path(self):
100-
"""
101-
Returns the path of this file data source.
102-
"""
99+
def path(self) -> str:
100+
"""Returns the path of this file data source."""
103101
return self.file_options.uri
104102

103+
@property
104+
def file_format(self) -> Optional[FileFormat]:
105+
"""Returns the file format of this file data source."""
106+
return self.file_options.file_format
107+
108+
@property
109+
def s3_endpoint_override(self) -> Optional[str]:
110+
"""Returns the s3 endpoint override of this file data source."""
111+
return self.file_options.s3_endpoint_override
112+
105113
@staticmethod
106114
def from_proto(data_source: DataSourceProto):
107115
return FileSource(
@@ -177,24 +185,33 @@ def get_table_query_string(self) -> str:
177185
class FileOptions:
178186
"""
179187
Configuration options for a file data source.
188+
189+
Attributes:
190+
uri: File source url, e.g. s3:// or local file.
191+
s3_endpoint_override: Custom s3 endpoint (used only with s3 uri).
192+
file_format: File source format, e.g. parquet.
180193
"""
181194

195+
uri: str
196+
file_format: Optional[FileFormat]
197+
s3_endpoint_override: str
198+
182199
def __init__(
183200
self,
201+
uri: str,
184202
file_format: Optional[FileFormat],
185203
s3_endpoint_override: Optional[str],
186-
uri: Optional[str],
187204
):
188205
"""
189206
Initializes a FileOptions object.
190207
191208
Args:
209+
uri: File source url, e.g. s3:// or local file.
192210
file_format (optional): File source format, e.g. parquet.
193211
s3_endpoint_override (optional): Custom s3 endpoint (used only with s3 uri).
194-
uri (optional): File source url, e.g. s3:// or local file.
195212
"""
213+
self.uri = uri
196214
self.file_format = file_format
197-
self.uri = uri or ""
198215
self.s3_endpoint_override = s3_endpoint_override or ""
199216

200217
@classmethod
@@ -269,6 +286,17 @@ def to_data_source(self) -> DataSource:
269286
s3_endpoint_override=self.file_options.s3_endpoint_override,
270287
)
271288

289+
@staticmethod
290+
def from_data_source(data_source: DataSource) -> "SavedDatasetStorage":
291+
assert isinstance(data_source, FileSource)
292+
return SavedDatasetFileStorage(
293+
path=data_source.path,
294+
file_format=data_source.file_format
295+
if data_source.file_format
296+
else ParquetFormat(),
297+
s3_endpoint_override=data_source.s3_endpoint_override,
298+
)
299+
272300

273301
class FileLoggingDestination(LoggingDestination):
274302
_proto_kind = "file_destination"

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,16 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
179179
pass
180180

181181
@abstractmethod
182-
def persist(self, storage: SavedDatasetStorage):
183-
"""Synchronously executes the underlying query and persists the result in the same offline store."""
182+
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
183+
"""
184+
Synchronously executes the underlying query and persists the result in the same offline store
185+
at the specified destination.
186+
187+
Args:
188+
storage: The saved dataset storage object specifying where the result should be persisted.
189+
allow_overwrite: If True, a pre-existing location (e.g. table or file) can be overwritten.
190+
Currently not all individual offline store implementations make use of this parameter.
191+
"""
184192
pass
185193

186194
@property

0 commit comments

Comments
 (0)