Skip to content

Commit 2b6f1d0

Browse files
authored
feat: Add delta format to FileSource, add support for it in ibis/duckdb (feast-dev#4123)
1 parent 2a6edea commit 2b6f1d0

11 files changed

Lines changed: 215 additions & 96 deletions

File tree

protos/feast/core/DataFormat.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ message FileFormat {
2727
// Defines options for the Parquet data format
2828
message ParquetFormat {}
2929

30+
// Defines options for delta data format
31+
message DeltaFormat {}
32+
3033
oneof format {
3134
ParquetFormat parquet_format = 1;
35+
DeltaFormat delta_format = 2;
3236
}
3337
}
3438

sdk/python/feast/data_format.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def from_proto(cls, proto):
4343
fmt = proto.WhichOneof("format")
4444
if fmt == "parquet_format":
4545
return ParquetFormat()
46+
elif fmt == "delta_format":
47+
return DeltaFormat()
4648
if fmt is None:
4749
return None
4850
raise NotImplementedError(f"FileFormat is unsupported: {fmt}")
@@ -66,6 +68,18 @@ def __str__(self):
6668
return "parquet"
6769

6870

71+
class DeltaFormat(FileFormat):
72+
"""
73+
Defines delta data format
74+
"""
75+
76+
def to_proto(self):
77+
return FileFormatProto(delta_format=FileFormatProto.DeltaFormat())
78+
79+
def __str__(self):
80+
return "delta"
81+
82+
6983
class StreamFormat(ABC):
7084
"""
7185
Defines an abtracts streaming data format used to encode feature data in streams

sdk/python/feast/infra/offline_stores/file_source.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typeguard import typechecked
99

1010
from feast import type_map
11-
from feast.data_format import FileFormat, ParquetFormat
11+
from feast.data_format import DeltaFormat, FileFormat, ParquetFormat
1212
from feast.data_source import DataSource
1313
from feast.feature_logging import LoggingDestination
1414
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
@@ -157,24 +157,31 @@ def get_table_column_names_and_types(
157157
filesystem, path = FileSource.create_filesystem_and_path(
158158
self.path, self.file_options.s3_endpoint_override
159159
)
160-
# Adding support for different file format path
161-
# based on S3 filesystem
162-
if filesystem is None:
163-
kwargs = (
164-
{"use_legacy_dataset": False}
165-
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
166-
else {}
167-
)
168160

169-
schema = ParquetDataset(path, **kwargs).schema
170-
if hasattr(schema, "names") and hasattr(schema, "types"):
171-
# Newer versions of pyarrow doesn't have this method,
172-
# but this field is good enough.
173-
pass
161+
# TODO why None check necessary
162+
if self.file_format is None or isinstance(self.file_format, ParquetFormat):
163+
if filesystem is None:
164+
kwargs = (
165+
{"use_legacy_dataset": False}
166+
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
167+
else {}
168+
)
169+
170+
schema = ParquetDataset(path, **kwargs).schema
171+
if hasattr(schema, "names") and hasattr(schema, "types"):
172+
# Newer versions of pyarrow doesn't have this method,
173+
# but this field is good enough.
174+
pass
175+
else:
176+
schema = schema.to_arrow_schema()
174177
else:
175-
schema = schema.to_arrow_schema()
178+
schema = ParquetDataset(path, filesystem=filesystem).schema
179+
elif isinstance(self.file_format, DeltaFormat):
180+
from deltalake import DeltaTable
181+
182+
schema = DeltaTable(self.path).schema().to_pyarrow()
176183
else:
177-
schema = ParquetDataset(path, filesystem=filesystem).schema
184+
raise Exception(f"Unknown FileFormat -> {self.file_format}")
178185

179186
return zip(schema.names, map(str, schema.types))
180187

sdk/python/feast/infra/offline_stores/ibis.py

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ibis.expr.types import Table
1414
from pytz import utc
1515

16+
from feast.data_format import DeltaFormat, ParquetFormat
1617
from feast.data_source import DataSource
1718
from feast.errors import SavedDatasetLocationAlreadyExists
1819
from feast.feature_logging import LoggingConfig, LoggingSource
@@ -105,6 +106,15 @@ def _generate_row_id(
105106

106107
return entity_table
107108

109+
@staticmethod
110+
def _read_data_source(data_source: DataSource) -> Table:
111+
assert isinstance(data_source, FileSource)
112+
113+
if isinstance(data_source.file_format, ParquetFormat):
114+
return ibis.read_parquet(data_source.path)
115+
elif isinstance(data_source.file_format, DeltaFormat):
116+
return ibis.read_delta(data_source.path)
117+
108118
@staticmethod
109119
def get_historical_features(
110120
config: RepoConfig,
@@ -137,7 +147,9 @@ def get_historical_features(
137147
def read_fv(
138148
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
139149
) -> Tuple:
140-
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)
150+
fv_table: Table = IbisOfflineStore._read_data_source(
151+
feature_view.batch_source
152+
)
141153

142154
for old_name, new_name in feature_view.batch_source.field_mapping.items():
143155
if old_name in fv_table.columns:
@@ -227,7 +239,7 @@ def pull_all_from_table_or_query(
227239
start_date = start_date.astimezone(tz=utc)
228240
end_date = end_date.astimezone(tz=utc)
229241

230-
table = ibis.read_parquet(data_source.path)
242+
table = IbisOfflineStore._read_data_source(data_source)
231243

232244
table = table.select(*fields)
233245

@@ -260,10 +272,9 @@ def write_logged_features(
260272
destination = logging_config.destination
261273
assert isinstance(destination, FileLoggingDestination)
262274

263-
if isinstance(data, Path):
264-
table = ibis.read_parquet(data)
265-
else:
266-
table = ibis.memtable(data)
275+
table = (
276+
ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data)
277+
)
267278

268279
if destination.partition_by:
269280
kwargs = {"partition_by": destination.partition_by}
@@ -294,12 +305,21 @@ def offline_write_batch(
294305
)
295306

296307
file_options = feature_view.batch_source.file_options
297-
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
298-
if table.schema != prev_table.schema:
299-
table = table.cast(prev_table.schema)
300-
new_table = pyarrow.concat_tables([table, prev_table])
301308

302-
ibis.memtable(new_table).to_parquet(file_options.uri)
309+
if isinstance(feature_view.batch_source.file_format, ParquetFormat):
310+
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
311+
if table.schema != prev_table.schema:
312+
table = table.cast(prev_table.schema)
313+
new_table = pyarrow.concat_tables([table, prev_table])
314+
315+
ibis.memtable(new_table).to_parquet(file_options.uri)
316+
elif isinstance(feature_view.batch_source.file_format, DeltaFormat):
317+
from deltalake import DeltaTable
318+
319+
prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow()
320+
if table.schema != prev_schema:
321+
table = table.cast(prev_schema)
322+
ibis.memtable(table).to_delta(file_options.uri, mode="append")
303323

304324

305325
class IbisRetrievalJob(RetrievalJob):
@@ -338,20 +358,28 @@ def persist(
338358
if not allow_overwrite and os.path.exists(storage.file_options.uri):
339359
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)
340360

341-
filesystem, path = FileSource.create_filesystem_and_path(
342-
storage.file_options.uri,
343-
storage.file_options.s3_endpoint_override,
344-
)
345-
346-
if path.endswith(".parquet"):
347-
pyarrow.parquet.write_table(
348-
self.to_arrow(), where=path, filesystem=filesystem
361+
if isinstance(storage.file_options.file_format, ParquetFormat):
362+
filesystem, path = FileSource.create_filesystem_and_path(
363+
storage.file_options.uri,
364+
storage.file_options.s3_endpoint_override,
349365
)
350-
else:
351-
# otherwise assume destination is directory
352-
pyarrow.parquet.write_to_dataset(
353-
self.to_arrow(), root_path=path, filesystem=filesystem
366+
367+
if path.endswith(".parquet"):
368+
pyarrow.parquet.write_table(
369+
self.to_arrow(), where=path, filesystem=filesystem
370+
)
371+
else:
372+
# otherwise assume destination is directory
373+
pyarrow.parquet.write_to_dataset(
374+
self.to_arrow(), root_path=path, filesystem=filesystem
375+
)
376+
elif isinstance(storage.file_options.file_format, DeltaFormat):
377+
mode = (
378+
"overwrite"
379+
if allow_overwrite and os.path.exists(storage.file_options.uri)
380+
else "error"
354381
)
382+
self.table.to_delta(storage.file_options.uri, mode=mode)
355383

356384
@property
357385
def metadata(self) -> Optional[RetrievalMetadata]:

sdk/python/requirements/py3.10-ci-requirements.txt

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ bidict==0.23.1
5959
# via ibis-framework
6060
bleach==6.1.0
6161
# via nbconvert
62-
boto3==1.34.85
62+
boto3==1.34.88
6363
# via
6464
# feast (setup.py)
6565
# moto
66-
botocore==1.34.85
66+
botocore==1.34.88
6767
# via
6868
# boto3
6969
# moto
@@ -134,11 +134,11 @@ cryptography==42.0.5
134134
# snowflake-connector-python
135135
# types-pyopenssl
136136
# types-redis
137-
dask[array,dataframe]==2024.4.1
137+
dask[array,dataframe]==2024.4.2
138138
# via
139139
# dask-expr
140140
# feast (setup.py)
141-
dask-expr==1.0.11
141+
dask-expr==1.0.12
142142
# via dask
143143
db-dtypes==1.2.0
144144
# via google-cloud-bigquery
@@ -148,6 +148,8 @@ decorator==5.1.1
148148
# via ipython
149149
defusedxml==0.7.1
150150
# via nbconvert
151+
deltalake==0.16.4
152+
# via feast (setup.py)
151153
dill==0.3.8
152154
# via feast (setup.py)
153155
distlib==0.3.8
@@ -158,15 +160,15 @@ docker==7.0.0
158160
# testcontainers
159161
docutils==0.19
160162
# via sphinx
161-
duckdb==0.10.1
163+
duckdb==0.10.2
162164
# via
163165
# duckdb-engine
164166
# ibis-framework
165167
duckdb-engine==0.11.5
166168
# via ibis-framework
167169
entrypoints==0.4
168170
# via altair
169-
exceptiongroup==1.2.0
171+
exceptiongroup==1.2.1
170172
# via
171173
# anyio
172174
# ipython
@@ -175,7 +177,7 @@ execnet==2.1.1
175177
# via pytest-xdist
176178
executing==2.0.1
177179
# via stack-data
178-
fastapi==0.110.1
180+
fastapi==0.110.2
179181
# via feast (setup.py)
180182
fastjsonschema==2.19.1
181183
# via nbformat
@@ -263,7 +265,7 @@ greenlet==3.0.3
263265
# via sqlalchemy
264266
grpc-google-iam-v1==0.13.0
265267
# via google-cloud-bigtable
266-
grpcio==1.62.1
268+
grpcio==1.62.2
267269
# via
268270
# feast (setup.py)
269271
# google-api-core
@@ -275,15 +277,15 @@ grpcio==1.62.1
275277
# grpcio-status
276278
# grpcio-testing
277279
# grpcio-tools
278-
grpcio-health-checking==1.62.1
280+
grpcio-health-checking==1.62.2
279281
# via feast (setup.py)
280-
grpcio-reflection==1.62.1
282+
grpcio-reflection==1.62.2
281283
# via feast (setup.py)
282-
grpcio-status==1.62.1
284+
grpcio-status==1.62.2
283285
# via google-api-core
284-
grpcio-testing==1.62.1
286+
grpcio-testing==1.62.2
285287
# via feast (setup.py)
286-
grpcio-tools==1.62.1
288+
grpcio-tools==1.62.2
287289
# via feast (setup.py)
288290
gunicorn==22.0.0 ; platform_system != "Windows"
289291
# via feast (setup.py)
@@ -482,13 +484,13 @@ nest-asyncio==1.6.0
482484
# via ipykernel
483485
nodeenv==1.8.0
484486
# via pre-commit
485-
notebook==7.1.2
487+
notebook==7.1.3
486488
# via great-expectations
487489
notebook-shim==0.2.4
488490
# via
489491
# jupyterlab
490492
# notebook
491-
numpy==1.24.4
493+
numpy==1.26.4
492494
# via
493495
# altair
494496
# dask
@@ -615,12 +617,15 @@ pyarrow==15.0.2
615617
# via
616618
# dask-expr
617619
# db-dtypes
620+
# deltalake
618621
# feast (setup.py)
619622
# google-cloud-bigquery
620623
# ibis-framework
621624
# snowflake-connector-python
622625
pyarrow-hotfix==0.6
623-
# via ibis-framework
626+
# via
627+
# deltalake
628+
# ibis-framework
624629
pyasn1==0.6.0
625630
# via
626631
# pyasn1-modules
@@ -692,7 +697,7 @@ pytest-ordering==0.6
692697
# via feast (setup.py)
693698
pytest-timeout==1.4.2
694699
# via feast (setup.py)
695-
pytest-xdist==3.5.0
700+
pytest-xdist==3.6.0
696701
# via feast (setup.py)
697702
python-dateutil==2.9.0.post0
698703
# via
@@ -728,7 +733,7 @@ pyyaml==6.0.1
728733
# pre-commit
729734
# responses
730735
# uvicorn
731-
pyzmq==26.0.0
736+
pyzmq==26.0.2
732737
# via
733738
# ipykernel
734739
# jupyter-client
@@ -785,7 +790,7 @@ rsa==4.9
785790
# via google-auth
786791
ruamel-yaml==0.17.17
787792
# via great-expectations
788-
ruff==0.3.7
793+
ruff==0.4.1
789794
# via feast (setup.py)
790795
s3transfer==0.10.1
791796
# via boto3
@@ -812,7 +817,7 @@ sniffio==1.3.1
812817
# httpx
813818
snowballstemmer==2.2.0
814819
# via sphinx
815-
snowflake-connector-python[pandas]==3.8.1
820+
snowflake-connector-python[pandas]==3.9.0
816821
# via feast (setup.py)
817822
sortedcontainers==2.4.0
818823
# via snowflake-connector-python
@@ -895,7 +900,7 @@ tqdm==4.66.2
895900
# via
896901
# feast (setup.py)
897902
# great-expectations
898-
traitlets==5.14.2
903+
traitlets==5.14.3
899904
# via
900905
# comm
901906
# ipykernel

0 commit comments

Comments
 (0)