Skip to content

Commit a9c9902

Browse files
committed
add changes + fix linting issues
Signed-off-by: lukas.valatka <lukas.valatka@cast.ai>
1 parent b601ef2 commit a9c9902

File tree

6 files changed

+89
-11
lines changed

6 files changed

+89
-11
lines changed

infra/website/src/pages/index.astro

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,6 @@ features = store.retrieve_online_documents(
114114
<div class="logo-item">
115115
<img src="/images/logos/seatgeek.svg" alt="SeatGeek" class="company-logo">
116116
</div>
117-
<div class="logo-item">
118-
<img src="/images/logos/castai.png" alt="Cast.ai" class="company-logo">
119-
</div>
120117
</div>
121118
</div>
122119
</section>

sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ class ClickhouseConfig(FeastConfigBaseModel):
1111
password: StrictStr
1212
use_temporary_tables_for_entity_df: bool = True
1313

14+
# Set this to higher than default, for larger scale offline store jobs
15+
send_receive_timeout: int | None = None
16+
1417
model_config = ConfigDict(frozen=True)

sdk/python/feast/infra/utils/clickhouse/connection_utils.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,22 @@
1111
def get_client(config: ClickhouseConfig) -> Client:
1212
# Clickhouse client is not thread-safe, so we need to create a separate instance for each thread.
1313
if not hasattr(thread_local, "clickhouse_client"):
14-
thread_local.clickhouse_client = clickhouse_connect.get_client(
15-
host=config.host,
16-
port=config.port,
17-
user=config.user,
18-
password=config.password,
19-
database=config.database,
20-
)
14+
if config.send_receive_timeout is not None:
15+
thread_local.clickhouse_client = clickhouse_connect.get_client(
16+
host=config.host,
17+
port=config.port,
18+
user=config.user,
19+
password=config.password,
20+
database=config.database,
21+
send_receive_timeout=config.send_receive_timeout,
22+
)
23+
else:
24+
thread_local.clickhouse_client = clickhouse_connect.get_client(
25+
host=config.host,
26+
port=config.port,
27+
user=config.user,
28+
password=config.password,
29+
database=config.database,
30+
)
2131

2232
return thread_local.clickhouse_client

sdk/python/feast/templates/postgres/feature_repo/feature_store.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ project: my_project
22
provider: local
33
registry:
44
registry_type: sql
5-
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
5+
path: postgresql://DB_USERNAME:DB_PASSWORD@DB_HOST:DB_PORT/DB_NAME
66
cache_ttl_seconds: 60
77
sqlalchemy_config_kwargs:
88
echo: false

sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import logging
12
import threading
23
from unittest.mock import MagicMock, patch
34

45
import pytest
6+
from testcontainers.clickhouse import ClickHouseContainer
7+
from testcontainers.core.waiting_utils import wait_for_logs
58

69
from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig
710
from feast.infra.utils.clickhouse.connection_utils import get_client, thread_local
811

12+
logger = logging.getLogger(__name__)
13+
914

1015
@pytest.fixture
1116
def clickhouse_config():
@@ -76,3 +81,52 @@ def thread_2_work():
7681
assert client_1a is not client_2, (
7782
"Different threads should get different client instances (not cached)"
7883
)
84+
85+
86+
@pytest.fixture(scope="module")
87+
def clickhouse_container():
88+
"""Start a ClickHouse container for integration testing."""
89+
container = ClickHouseContainer(
90+
username="default",
91+
password="password",
92+
dbname="default",
93+
)
94+
container.start()
95+
96+
log_string_to_wait_for = "Logging errors to"
97+
waited = wait_for_logs(
98+
container=container,
99+
predicate=log_string_to_wait_for,
100+
timeout=30,
101+
interval=10,
102+
)
103+
logger.info("Waited for %s seconds until ClickHouse container was up", waited)
104+
105+
yield container
106+
container.stop()
107+
108+
109+
def test_get_client_with_additional_params(clickhouse_container):
110+
"""
111+
Test that get_client works with a real ClickHouse container and properly passes
112+
additional settings like send_receive_timeout.
113+
"""
114+
# Create config with custom send_receive_timeout
115+
config = ClickhouseConfig(
116+
host=clickhouse_container.get_container_host_ip(),
117+
port=clickhouse_container.get_exposed_port(8123),
118+
user="default",
119+
password="password",
120+
database="default",
121+
send_receive_timeout=60,
122+
)
123+
124+
# Get client and verify it works
125+
client = get_client(config)
126+
127+
# Verify client is connected and functional by running a simple query
128+
result = client.query("SELECT 1 AS test_value")
129+
assert result.result_rows == [(1,)]
130+
131+
# Verify the send_receive_timeout was applied
132+
assert client.timeout._read == 60

sdk/python/tests/unit/local_feast_tests/test_init.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,17 @@ def test_repo_init_with_underscore_in_project_name() -> None:
6767
)
6868
result = runner.run(["apply"], cwd=repo_dir)
6969
assert result.returncode != 0
70+
71+
72+
def test_postgres_template_registry_path_is_parameterized() -> None:
73+
template_fs_yaml = (
74+
Path(__file__).resolve().parents[3]
75+
/ "feast"
76+
/ "templates"
77+
/ "postgres"
78+
/ "feature_repo"
79+
/ "feature_store.yaml"
80+
)
81+
contents = template_fs_yaml.read_text(encoding="utf-8")
82+
expected = "path: postgresql://DB_USERNAME:DB_PASSWORD@DB_HOST:DB_PORT/DB_NAME"
83+
assert expected in contents

0 commit comments

Comments
 (0)