From 2f5a1c1a22d4c4f503fefcc22c545c8094fe30e2 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Mon, 22 Dec 2025 12:44:16 +0200 Subject: [PATCH 1/4] add changes + fix linting issues Signed-off-by: lukas.valatka --- .../utils/clickhouse/clickhouse_config.py | 3 ++ .../utils/clickhouse/connection_utils.py | 24 ++++++--- .../infra/offline_stores/test_clickhouse.py | 54 +++++++++++++++++++ 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py index 1f163e0a81b..8a9a656817d 100644 --- a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py +++ b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py @@ -11,4 +11,7 @@ class ClickhouseConfig(FeastConfigBaseModel): password: StrictStr use_temporary_tables_for_entity_df: bool = True + # Set this to higher than default, for larger scale offline store jobs + send_receive_timeout: int | None = None + model_config = ConfigDict(frozen=True) diff --git a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py index 88f5334db14..ca9209f1c34 100644 --- a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py +++ b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py @@ -11,12 +11,22 @@ def get_client(config: ClickhouseConfig) -> Client: # Clickhouse client is not thread-safe, so we need to create a separate instance for each thread. if not hasattr(thread_local, "clickhouse_client"): - thread_local.clickhouse_client = clickhouse_connect.get_client( - host=config.host, - port=config.port, - user=config.user, - password=config.password, - database=config.database, - ) + if config.send_receive_timeout is not None: + thread_local.clickhouse_client = clickhouse_connect.get_client( + host=config.host, + port=config.port, + user=config.user, + password=config.password, + database=config.database, + send_receive_timeout=config.send_receive_timeout, + ) + else: + thread_local.clickhouse_client = clickhouse_connect.get_client( + host=config.host, + port=config.port, + user=config.user, + password=config.password, + database=config.database, + ) return thread_local.clickhouse_client diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py index 38c632a59a7..b1850d0597e 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -1,11 +1,16 @@ +import logging import threading from unittest.mock import MagicMock, patch import pytest +from testcontainers.clickhouse import ClickHouseContainer +from testcontainers.core.waiting_utils import wait_for_logs from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig from feast.infra.utils.clickhouse.connection_utils import get_client, thread_local +logger = logging.getLogger(__name__) + @pytest.fixture def clickhouse_config(): @@ -76,3 +81,52 @@ def thread_2_work(): assert client_1a is not client_2, ( "Different threads should get different client instances (not cached)" ) + + +@pytest.fixture(scope="module") +def clickhouse_container(): + """Start a ClickHouse container for integration testing.""" + container = ClickHouseContainer( + username="default", + password="password", + dbname="default", + ) + container.start() + + log_string_to_wait_for = "Logging errors to" + waited = wait_for_logs( + container=container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, + ) + logger.info("Waited for %s seconds until ClickHouse container was up", waited) + + yield container + container.stop() + + +def test_get_client_with_additional_params(clickhouse_container): + """ + Test that get_client works with a real ClickHouse container and properly passes + additional settings like send_receive_timeout. + """ + # Create config with custom send_receive_timeout + config = ClickhouseConfig( + host=clickhouse_container.get_container_host_ip(), + port=clickhouse_container.get_exposed_port(8123), + user="default", + password="password", + database="default", + send_receive_timeout=60, + ) + + # Get client and verify it works + client = get_client(config) + + # Verify client is connected and functional by running a simple query + result = client.query("SELECT 1 AS test_value") + assert result.result_rows == [(1,)] + + # Verify the send_receive_timeout was applied + assert client.timeout._read == 60 From 24f6f40fc8f65f10360603eef7e015c85196f4fa Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Mon, 22 Dec 2025 12:58:19 +0200 Subject: [PATCH 2/4] generalize based on pr suggestion Signed-off-by: lukas.valatka --- .../feast/infra/utils/clickhouse/clickhouse_config.py | 7 +++++-- .../feast/infra/utils/clickhouse/connection_utils.py | 6 ++++-- .../tests/unit/infra/offline_stores/test_clickhouse.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py index 8a9a656817d..75167f8a60e 100644 --- a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py +++ b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py @@ -1,3 +1,5 @@ +from typing import Any + from pydantic import ConfigDict, StrictStr from feast.repo_config import FeastConfigBaseModel @@ -11,7 +13,8 @@ class ClickhouseConfig(FeastConfigBaseModel): password: StrictStr use_temporary_tables_for_entity_df: bool = True - # Set this to higher than default, for larger scale offline store jobs - send_receive_timeout: int | None = None + # See https://github.com/ClickHouse/clickhouse-connect/blob/main/clickhouse_connect/driver/__init__.py#L51 + # Some typical ones e.g. send_receive_timeout (read_timeout), etc + additional_client_args: dict[str, Any] | None = None model_config = ConfigDict(frozen=True) diff --git a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py index ca9209f1c34..6d5f1b87052 100644 --- a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py +++ b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py @@ -11,14 +11,16 @@ def get_client(config: ClickhouseConfig) -> Client: # Clickhouse client is not thread-safe, so we need to create a separate instance for each thread. if not hasattr(thread_local, "clickhouse_client"): - if config.send_receive_timeout is not None: + additional_client_args = config.additional_client_args + + if additional_client_args: thread_local.clickhouse_client = clickhouse_connect.get_client( host=config.host, port=config.port, user=config.user, password=config.password, database=config.database, - send_receive_timeout=config.send_receive_timeout, + **additional_client_args, ) else: thread_local.clickhouse_client = clickhouse_connect.get_client( diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py index b1850d0597e..eadf7ae47bc 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -118,7 +118,7 @@ def test_get_client_with_additional_params(clickhouse_container): user="default", password="password", database="default", - send_receive_timeout=60, + additional_client_args={"send_receive_timeout": 60}, ) # Get client and verify it works From a05c6abbf2cf4beb2c2e43a7f0e997a0f8aa27f7 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Mon, 22 Dec 2025 13:10:22 +0200 Subject: [PATCH 3/4] add test verifying config parsing Signed-off-by: lukas.valatka --- .../infra/offline_stores/test_clickhouse.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py index eadf7ae47bc..8dc82286c29 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -83,6 +83,60 @@ def thread_2_work(): ) +def test_clickhouse_config_parses_additional_client_args(): + """ + Test that ClickhouseConfig correctly parses additional_client_args from a dict, + simulating how it would be parsed from YAML by Pydantic. + """ + # This simulates the dict that would come from yaml.safe_load() + raw_config = { + "host": "localhost", + "port": 8123, + "database": "default", + "user": "default", + "password": "password", + "additional_client_args": { + "send_receive_timeout": 60, + "compress": True, + "client_name": "feast_test", + }, + } + + # Pydantic should parse this dict into a ClickhouseConfig object + config = ClickhouseConfig(**raw_config) + + # Verify all fields are correctly parsed + assert config.host == "localhost" + assert config.port == 8123 + assert config.database == "default" + assert config.user == "default" + assert config.password == "password" + + # Verify additional_client_args is correctly parsed as a dict + assert config.additional_client_args is not None + assert isinstance(config.additional_client_args, dict) + assert config.additional_client_args["send_receive_timeout"] == 60 + assert config.additional_client_args["compress"] is True + assert config.additional_client_args["client_name"] == "feast_test" + + +def test_clickhouse_config_handles_none_additional_client_args(): + """ + Test that ClickhouseConfig correctly handles when additional_client_args is not provided. + """ + raw_config = { + "host": "localhost", + "port": 8123, + "database": "default", + "user": "default", + "password": "password", + } + + config = ClickhouseConfig(**raw_config) + + assert config.additional_client_args is None + + @pytest.fixture(scope="module") def clickhouse_container(): """Start a ClickHouse container for integration testing.""" From a3b9977448234ffc026963fbf7c1480a3fd28ab3 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Mon, 22 Dec 2025 13:54:47 +0200 Subject: [PATCH 4/4] rework test into integration, since CH instance is needed Signed-off-by: lukas.valatka --- .../tests/data_source.py | 28 ++++++++++ .../infra/offline_stores/test_clickhouse.py | 51 ------------------- 2 files changed, 28 insertions(+), 51 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py index 80fd1751dc5..4234c46eb3f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py @@ -15,6 +15,8 @@ from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( ClickhouseSource, ) +from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig +from feast.infra.utils.clickhouse.connection_utils import get_client from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -114,3 +116,29 @@ def create_saved_dataset_destination(self): def teardown(self): pass + + +def test_get_client_with_additional_params(clickhouse_container): + """ + Test that get_client works with a real ClickHouse container and properly passes + additional settings like send_receive_timeout. + """ + # Create config with custom send_receive_timeout + config = ClickhouseConfig( + host=clickhouse_container.get_container_host_ip(), + port=clickhouse_container.get_exposed_port(8123), + user=CLICKHOUSE_USER, + password=CLICKHOUSE_PASSWORD, + database=CLICKHOUSE_OFFLINE_DB, + additional_client_args={"send_receive_timeout": 60}, + ) + + # Get client and verify it works + client = get_client(config) + + # Verify client is connected and functional by running a simple query + result = client.query("SELECT 1 AS test_value") + assert result.result_rows == [(1,)] + + # Verify the send_receive_timeout was applied + assert client.timeout._read == 60 diff --git a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py index 8dc82286c29..f5440ed367d 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py @@ -3,8 +3,6 @@ from unittest.mock import MagicMock, patch import pytest -from testcontainers.clickhouse import ClickHouseContainer -from testcontainers.core.waiting_utils import wait_for_logs from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig from feast.infra.utils.clickhouse.connection_utils import get_client, thread_local @@ -135,52 +133,3 @@ def test_clickhouse_config_handles_none_additional_client_args(): config = ClickhouseConfig(**raw_config) assert config.additional_client_args is None - - -@pytest.fixture(scope="module") -def clickhouse_container(): - """Start a ClickHouse container for integration testing.""" - container = ClickHouseContainer( - username="default", - password="password", - dbname="default", - ) - container.start() - - log_string_to_wait_for = "Logging errors to" - waited = wait_for_logs( - container=container, - predicate=log_string_to_wait_for, - timeout=30, - interval=10, - ) - logger.info("Waited for %s seconds until ClickHouse container was up", waited) - - yield container - container.stop() - - -def test_get_client_with_additional_params(clickhouse_container): - """ - Test that get_client works with a real ClickHouse container and properly passes - additional settings like send_receive_timeout. - """ - # Create config with custom send_receive_timeout - config = ClickhouseConfig( - host=clickhouse_container.get_container_host_ip(), - port=clickhouse_container.get_exposed_port(8123), - user="default", - password="password", - database="default", - additional_client_args={"send_receive_timeout": 60}, - ) - - # Get client and verify it works - client = get_client(config) - - # Verify client is connected and functional by running a simple query - result = client.query("SELECT 1 AS test_value") - assert result.result_rows == [(1,)] - - # Verify the send_receive_timeout was applied - assert client.timeout._read == 60