|
13 | 13 | from ibis.expr.types import Table |
14 | 14 | from pytz import utc |
15 | 15 |
|
| 16 | +from feast.data_format import DeltaFormat, ParquetFormat |
16 | 17 | from feast.data_source import DataSource |
17 | 18 | from feast.errors import SavedDatasetLocationAlreadyExists |
18 | 19 | from feast.feature_logging import LoggingConfig, LoggingSource |
@@ -105,6 +106,15 @@ def _generate_row_id( |
105 | 106 |
|
106 | 107 | return entity_table |
107 | 108 |
|
| 109 | + @staticmethod |
| 110 | + def _read_data_source(data_source: DataSource) -> Table: |
| 111 | + assert isinstance(data_source, FileSource) |
| 112 | + |
| 113 | + if isinstance(data_source.file_format, ParquetFormat): |
| 114 | + return ibis.read_parquet(data_source.path) |
| 115 | + elif isinstance(data_source.file_format, DeltaFormat): |
| 116 | + return ibis.read_delta(data_source.path) |
| 117 | + |
108 | 118 | @staticmethod |
109 | 119 | def get_historical_features( |
110 | 120 | config: RepoConfig, |
@@ -137,7 +147,9 @@ def get_historical_features( |
137 | 147 | def read_fv( |
138 | 148 | feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool |
139 | 149 | ) -> Tuple: |
140 | | - fv_table: Table = ibis.read_parquet(feature_view.batch_source.name) |
| 150 | + fv_table: Table = IbisOfflineStore._read_data_source( |
| 151 | + feature_view.batch_source |
| 152 | + ) |
141 | 153 |
|
142 | 154 | for old_name, new_name in feature_view.batch_source.field_mapping.items(): |
143 | 155 | if old_name in fv_table.columns: |
@@ -227,7 +239,7 @@ def pull_all_from_table_or_query( |
227 | 239 | start_date = start_date.astimezone(tz=utc) |
228 | 240 | end_date = end_date.astimezone(tz=utc) |
229 | 241 |
|
230 | | - table = ibis.read_parquet(data_source.path) |
| 242 | + table = IbisOfflineStore._read_data_source(data_source) |
231 | 243 |
|
232 | 244 | table = table.select(*fields) |
233 | 245 |
|
@@ -260,10 +272,9 @@ def write_logged_features( |
260 | 272 | destination = logging_config.destination |
261 | 273 | assert isinstance(destination, FileLoggingDestination) |
262 | 274 |
|
263 | | - if isinstance(data, Path): |
264 | | - table = ibis.read_parquet(data) |
265 | | - else: |
266 | | - table = ibis.memtable(data) |
| 275 | + table = ( |
| 276 | + ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data) |
| 277 | + ) |
267 | 278 |
|
268 | 279 | if destination.partition_by: |
269 | 280 | kwargs = {"partition_by": destination.partition_by} |
@@ -294,12 +305,21 @@ def offline_write_batch( |
294 | 305 | ) |
295 | 306 |
|
296 | 307 | file_options = feature_view.batch_source.file_options |
297 | | - prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() |
298 | | - if table.schema != prev_table.schema: |
299 | | - table = table.cast(prev_table.schema) |
300 | | - new_table = pyarrow.concat_tables([table, prev_table]) |
301 | 308 |
|
302 | | - ibis.memtable(new_table).to_parquet(file_options.uri) |
| 309 | + if isinstance(feature_view.batch_source.file_format, ParquetFormat): |
| 310 | + prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() |
| 311 | + if table.schema != prev_table.schema: |
| 312 | + table = table.cast(prev_table.schema) |
| 313 | + new_table = pyarrow.concat_tables([table, prev_table]) |
| 314 | + |
| 315 | + ibis.memtable(new_table).to_parquet(file_options.uri) |
| 316 | + elif isinstance(feature_view.batch_source.file_format, DeltaFormat): |
| 317 | + from deltalake import DeltaTable |
| 318 | + |
| 319 | + prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow() |
| 320 | + if table.schema != prev_schema: |
| 321 | + table = table.cast(prev_schema) |
| 322 | + ibis.memtable(table).to_delta(file_options.uri, mode="append") |
303 | 323 |
|
304 | 324 |
|
305 | 325 | class IbisRetrievalJob(RetrievalJob): |
@@ -338,20 +358,28 @@ def persist( |
338 | 358 | if not allow_overwrite and os.path.exists(storage.file_options.uri): |
339 | 359 | raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri) |
340 | 360 |
|
341 | | - filesystem, path = FileSource.create_filesystem_and_path( |
342 | | - storage.file_options.uri, |
343 | | - storage.file_options.s3_endpoint_override, |
344 | | - ) |
345 | | - |
346 | | - if path.endswith(".parquet"): |
347 | | - pyarrow.parquet.write_table( |
348 | | - self.to_arrow(), where=path, filesystem=filesystem |
| 361 | + if isinstance(storage.file_options.file_format, ParquetFormat): |
| 362 | + filesystem, path = FileSource.create_filesystem_and_path( |
| 363 | + storage.file_options.uri, |
| 364 | + storage.file_options.s3_endpoint_override, |
349 | 365 | ) |
350 | | - else: |
351 | | - # otherwise assume destination is directory |
352 | | - pyarrow.parquet.write_to_dataset( |
353 | | - self.to_arrow(), root_path=path, filesystem=filesystem |
| 366 | + |
| 367 | + if path.endswith(".parquet"): |
| 368 | + pyarrow.parquet.write_table( |
| 369 | + self.to_arrow(), where=path, filesystem=filesystem |
| 370 | + ) |
| 371 | + else: |
| 372 | + # otherwise assume destination is directory |
| 373 | + pyarrow.parquet.write_to_dataset( |
| 374 | + self.to_arrow(), root_path=path, filesystem=filesystem |
| 375 | + ) |
| 376 | + elif isinstance(storage.file_options.file_format, DeltaFormat): |
| 377 | + mode = ( |
| 378 | + "overwrite" |
| 379 | + if allow_overwrite and os.path.exists(storage.file_options.uri) |
| 380 | + else "error" |
354 | 381 | ) |
| 382 | + self.table.to_delta(storage.file_options.uri, mode=mode) |
355 | 383 |
|
356 | 384 | @property |
357 | 385 | def metadata(self) -> Optional[RetrievalMetadata]: |
|
0 commit comments