Skip to content

Commit 369ca98

Browse files
authored
feat: Add materialization support to ibis/duckdb (#4173)
* add materialization support to ibis/duckdb Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * remove unnecessary comments Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * pin ibis versions Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * refactor ibis into bunch of functions Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * fix requirements conflicts Signed-off-by: tokoko <togurg14@freeuni.edu.ge> --------- Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
1 parent e739745 commit 369ca98

File tree

5 files changed

+522
-335
lines changed

5 files changed

+522
-335
lines changed
Lines changed: 146 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,157 @@
1+
from datetime import datetime
2+
from pathlib import Path
3+
from typing import Any, Callable, List, Optional, Union
4+
15
import ibis
6+
import pandas as pd
7+
import pyarrow
8+
from ibis.expr.types import Table
29
from pydantic import StrictStr
310

4-
from feast.infra.offline_stores.ibis import IbisOfflineStore
5-
from feast.repo_config import FeastConfigBaseModel
11+
from feast.data_format import DeltaFormat, ParquetFormat
12+
from feast.data_source import DataSource
13+
from feast.feature_logging import LoggingConfig, LoggingSource
14+
from feast.feature_view import FeatureView
15+
from feast.infra.offline_stores.file_source import FileSource
16+
from feast.infra.offline_stores.ibis import (
17+
get_historical_features_ibis,
18+
offline_write_batch_ibis,
19+
pull_all_from_table_or_query_ibis,
20+
pull_latest_from_table_or_query_ibis,
21+
write_logged_features_ibis,
22+
)
23+
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
24+
from feast.infra.registry.base_registry import BaseRegistry
25+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
26+
27+
28+
def _read_data_source(data_source: DataSource) -> Table:
29+
assert isinstance(data_source, FileSource)
30+
31+
if isinstance(data_source.file_format, ParquetFormat):
32+
return ibis.read_parquet(data_source.path)
33+
elif isinstance(data_source.file_format, DeltaFormat):
34+
return ibis.read_delta(data_source.path)
35+
36+
37+
def _write_data_source(table: pyarrow.Table, data_source: DataSource):
38+
assert isinstance(data_source, FileSource)
39+
40+
file_options = data_source.file_options
41+
42+
if isinstance(data_source.file_format, ParquetFormat):
43+
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
44+
if table.schema != prev_table.schema:
45+
table = table.cast(prev_table.schema)
46+
new_table = pyarrow.concat_tables([table, prev_table])
47+
ibis.memtable(new_table).to_parquet(file_options.uri)
48+
elif isinstance(data_source.file_format, DeltaFormat):
49+
from deltalake import DeltaTable
50+
51+
prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow()
52+
if table.schema != prev_schema:
53+
table = table.cast(prev_schema)
54+
ibis.memtable(table).to_delta(file_options.uri, mode="append")
655

756

857
class DuckDBOfflineStoreConfig(FeastConfigBaseModel):
958
type: StrictStr = "duckdb"
1059
# """ Offline store type selector"""
1160

1261

13-
class DuckDBOfflineStore(IbisOfflineStore):
62+
class DuckDBOfflineStore(OfflineStore):
63+
@staticmethod
64+
def pull_latest_from_table_or_query(
65+
config: RepoConfig,
66+
data_source: DataSource,
67+
join_key_columns: List[str],
68+
feature_name_columns: List[str],
69+
timestamp_field: str,
70+
created_timestamp_column: Optional[str],
71+
start_date: datetime,
72+
end_date: datetime,
73+
) -> RetrievalJob:
74+
return pull_latest_from_table_or_query_ibis(
75+
config=config,
76+
data_source=data_source,
77+
join_key_columns=join_key_columns,
78+
feature_name_columns=feature_name_columns,
79+
timestamp_field=timestamp_field,
80+
created_timestamp_column=created_timestamp_column,
81+
start_date=start_date,
82+
end_date=end_date,
83+
data_source_reader=_read_data_source,
84+
)
85+
86+
@staticmethod
87+
def get_historical_features(
88+
config: RepoConfig,
89+
feature_views: List[FeatureView],
90+
feature_refs: List[str],
91+
entity_df: Union[pd.DataFrame, str],
92+
registry: BaseRegistry,
93+
project: str,
94+
full_feature_names: bool = False,
95+
) -> RetrievalJob:
96+
return get_historical_features_ibis(
97+
config=config,
98+
feature_views=feature_views,
99+
feature_refs=feature_refs,
100+
entity_df=entity_df,
101+
registry=registry,
102+
project=project,
103+
full_feature_names=full_feature_names,
104+
data_source_reader=_read_data_source,
105+
)
106+
107+
@staticmethod
108+
def pull_all_from_table_or_query(
109+
config: RepoConfig,
110+
data_source: DataSource,
111+
join_key_columns: List[str],
112+
feature_name_columns: List[str],
113+
timestamp_field: str,
114+
start_date: datetime,
115+
end_date: datetime,
116+
) -> RetrievalJob:
117+
return pull_all_from_table_or_query_ibis(
118+
config=config,
119+
data_source=data_source,
120+
join_key_columns=join_key_columns,
121+
feature_name_columns=feature_name_columns,
122+
timestamp_field=timestamp_field,
123+
start_date=start_date,
124+
end_date=end_date,
125+
data_source_reader=_read_data_source,
126+
)
127+
128+
@staticmethod
129+
def offline_write_batch(
130+
config: RepoConfig,
131+
feature_view: FeatureView,
132+
table: pyarrow.Table,
133+
progress: Optional[Callable[[int], Any]],
134+
):
135+
offline_write_batch_ibis(
136+
config=config,
137+
feature_view=feature_view,
138+
table=table,
139+
progress=progress,
140+
data_source_writer=_write_data_source,
141+
)
142+
14143
@staticmethod
15-
def setup_ibis_backend():
16-
# there's no need to call setup as duckdb is default ibis backend
17-
ibis.set_backend("duckdb")
144+
def write_logged_features(
145+
config: RepoConfig,
146+
data: Union[pyarrow.Table, Path],
147+
source: LoggingSource,
148+
logging_config: LoggingConfig,
149+
registry: BaseRegistry,
150+
):
151+
write_logged_features_ibis(
152+
config=config,
153+
data=data,
154+
source=source,
155+
logging_config=logging_config,
156+
registry=registry,
157+
)

0 commit comments

Comments
 (0)