|
1 | 1 | import contextlib |
| 2 | +import logging |
2 | 3 | import re |
3 | 4 | from dataclasses import asdict |
4 | 5 | from datetime import datetime |
|
32 | 33 | from feast.infra.utils.clickhouse.connection_utils import get_client |
33 | 34 | from feast.saved_dataset import SavedDatasetStorage |
34 | 35 |
|
| 36 | +logger = logging.getLogger(__name__) |
| 37 | + |
35 | 38 |
|
36 | 39 | class ClickhouseOfflineStoreConfig(ClickhouseConfig): |
37 | 40 | type: Literal["clickhouse"] = "clickhouse" |
@@ -205,6 +208,29 @@ def pull_all_from_table_or_query( |
205 | 208 | assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) |
206 | 209 | assert isinstance(data_source, ClickhouseSource) |
207 | 210 |
|
| 211 | + # https://github.com/feast-dev/feast/issues/5707 |
| 212 | + # Not an ideal solution, but least invasion into existing codebase |
| 213 | + if config.offline_store.deduplicate_pushdown: |
| 214 | + logger.warning( |
| 215 | + """ |
| 216 | + deduplicate_pushdown optimization is set to True in ClickhouseOfflineStoreConfig. |
| 217 | + Calling pull_latest_from_table_or_query instead of pull_all_from_table_or_query. |
| 218 | + This results in multiple times more efficient materialization jobs for large datasets. |
| 219 | + However, it comes with a caveat - can't compute historical features (just latest values available) with Feast compute engines |
| 220 | + Use with caution and ensure your use case aligns with this behavior. |
| 221 | + """ |
| 222 | + ) |
| 223 | + return ClickhouseOfflineStore.pull_latest_from_table_or_query( |
| 224 | + config=config, |
| 225 | + data_source=data_source, |
| 226 | + join_key_columns=join_key_columns, |
| 227 | + feature_name_columns=feature_name_columns, |
| 228 | + timestamp_field=timestamp_field, |
| 229 | + created_timestamp_column=created_timestamp_column, |
| 230 | + start_date=start_date, |
| 231 | + end_date=end_date, |
| 232 | + ) |
| 233 | + |
208 | 234 | from_expression = data_source.get_table_query_string() |
209 | 235 |
|
210 | 236 | timestamp_fields = [timestamp_field] |
|
0 commit comments