From 1752a4fc72f72ac5e7c059f028940e656db5b50b Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Wed, 15 Oct 2025 14:48:36 +0300 Subject: [PATCH 1/3] add pull_all_from_table_or_query for clickhouse, to align with new materialization logic (calling it) Signed-off-by: lukas.valatka --- .../clickhouse_offline_store/clickhouse.py | 37 +++++++++++++++++++ .../registration/test_universal_types.py | 1 - 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py index bca6339fb15..5e8cf3d9053 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -191,6 +191,43 @@ def pull_latest_from_table_or_query( on_demand_feature_views=None, ) + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) + assert isinstance(data_source, ClickhouseSource) + + from_expression = data_source.get_table_query_string() + + timestamp_fields = [timestamp_field] + + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) + + field_string = ", ".join( + join_key_columns + feature_name_columns + timestamp_fields + ) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE {timestamp_field} BETWEEN parseDateTimeBestEffort('{start_date}') AND parseDateTimeBestEffort('{end_date}') + """ + + return ClickhouseRetrievalJob( + query=query, + config=config, + full_feature_names=False, + ) + class ClickhouseRetrievalJob(PostgreSQLRetrievalJob): def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 5ba99b9d7f1..b464cf2f766 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -343,7 +343,6 @@ def offline_types_test_fixtures(request, environment): if ( environment.data_source_creator.__class__.__name__ == "ClickhouseDataSourceCreator" - and config.feature_dtype in {"float", "datetime", "bool"} and config.feature_is_list and not config.has_empty_list ): From 7c7686940639a24b7a50da19a35a518a8ae64aea Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Tue, 4 Nov 2025 17:17:16 +0200 Subject: [PATCH 2/3] make sure get client is thread-local, since client is thread-unsafe Signed-off-by: lukas.valatka --- .../utils/clickhouse/connection_utils.py | 26 ++++--- .../infra/offline_stores/test_clickhouse.py | 78 +++++++++++++++++++ 2 files changed, 94 insertions(+), 10 deletions(-) create mode 100644 sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py diff --git a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py index e60922e478d..05d58732070 100644 --- a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py +++ b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py @@ -1,18 +1,24 @@ -from functools import cache +import threading import clickhouse_connect from clickhouse_connect.driver import Client from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig +thread_local = threading.local() -@cache + +# wildcall - shouldn't this be a generic decorator? none of {cache, lru_cache, functools.cache} support thread-local +# cache, whilst is useful for non thread-safe clients def get_client(config: ClickhouseConfig) -> Client: - client = clickhouse_connect.get_client( - host=config.host, - port=config.port, - user=config.user, - password=config.password, - database=config.database, - ) - return client + # Clickhouse client is not thread-safe, so we need to create a separate instance for each thread. + if not hasattr(thread_local, "clickhouse_client"): + thread_local.clickhouse_client = clickhouse_connect.get_client( + host=config.host, + port=config.port, + user=config.user, + password=config.password, + database=config.database, + ) + + return thread_local.clickhouse_client diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py new file mode 100644 index 00000000000..38c632a59a7 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -0,0 +1,78 @@ +import threading +from unittest.mock import MagicMock, patch + +import pytest + +from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig +from feast.infra.utils.clickhouse.connection_utils import get_client, thread_local + + +@pytest.fixture +def clickhouse_config(): + """Create a test ClickHouse configuration.""" + return ClickhouseConfig( + host="localhost", + port=9000, + user="default", + password="password", + database="test_db", + ) + + +@pytest.fixture(autouse=True) +def cleanup_thread_local(): + """Clean up thread_local storage after each test.""" + yield + if hasattr(thread_local, "clickhouse_client"): + delattr(thread_local, "clickhouse_client") + + +@patch("feast.infra.utils.clickhouse.connection_utils.clickhouse_connect.get_client") +def test_get_client_returns_different_objects_for_separate_threads( + mock_get_client, clickhouse_config +): + """ + Clickhouse client is thread-unsafe and crashes if shared between threads. + This test ensures that get_client returns different client instances for different threads, while + reusing the same instance within the same thread. + """ + + def create_mock_client(*args, **kwargs): + """Create a unique mock client for each call.""" + return MagicMock() + + mock_get_client.side_effect = create_mock_client + + results = {} + + def thread_1_work(): + """Thread 1 makes 2 calls to get_client.""" + client_1a = get_client(clickhouse_config) + client_1b = get_client(clickhouse_config) + results["thread_1"] = (client_1a, client_1b) + + def thread_2_work(): + """Thread 2 makes 1 call to get_client.""" + client_2 = get_client(clickhouse_config) + results["thread_2"] = client_2 + + thread_1 = threading.Thread(target=thread_1_work) + thread_2 = threading.Thread(target=thread_2_work) + + thread_1.start() + thread_2.start() + + thread_1.join() + thread_2.join() + + # Thread 1's two calls should return the same client (thread-local reuse) + client_1a, client_1b = results["thread_1"] + assert client_1a is client_1b, ( + "Same thread should get same client instance (cached)" + ) + + # Thread 2's client should be different from thread 1's client + client_2 = results["thread_2"] + assert client_1a is not client_2, ( + "Different threads should get different client instances (not cached)" + ) From 50ad3d12f0020bf36b4a0edd72007f3ddc7d15e7 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Tue, 4 Nov 2025 17:30:42 +0200 Subject: [PATCH 3/3] cleanup Signed-off-by: lukas.valatka --- sdk/python/feast/infra/utils/clickhouse/connection_utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py index 05d58732070..88f5334db14 100644 --- a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py +++ b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py @@ -8,8 +8,6 @@ thread_local = threading.local() -# wildcall - shouldn't this be a generic decorator? none of {cache, lru_cache, functools.cache} support thread-local -# cache, whilst is useful for non thread-safe clients def get_client(config: ClickhouseConfig) -> Client: # Clickhouse client is not thread-safe, so we need to create a separate instance for each thread. if not hasattr(thread_local, "clickhouse_client"):