Skip to content

Commit cb9e1d6

Browse files
Merge branch 'master' into INTPYTHON-297-MongoDB-Feast-Integration
2 parents 632e103 + 1858daf commit cb9e1d6

File tree

6 files changed

+239
-63
lines changed

6 files changed

+239
-63
lines changed

.github/workflows/registry-rest-api-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ jobs:
149149
- name: Setup and Run Registry Rest API tests
150150
run: |
151151
echo "Running Registry REST API tests..."
152-
uv run pytest -c sdk/python/pytest.ini sdk/python/tests/integration/rest_api/test_registry_rest_api.py --integration -s
152+
uv run pytest -c sdk/python/pytest.ini sdk/python/tests/integration/rest_api/test_registry_rest_api.py --integration -s --timeout=600
153153
154154
- name: Clean up docker images
155155
if: always()

.secrets.baseline

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@
13761376
"filename": "sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py",
13771377
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
13781378
"is_verified": false,
1379-
"line_number": 20
1379+
"line_number": 21
13801380
}
13811381
],
13821382
"sdk/python/tests/unit/infra/offline_stores/test_offline_store.py": [

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig
3232
from feast.infra.utils.clickhouse.connection_utils import get_client
3333
from feast.saved_dataset import SavedDatasetStorage
34+
from feast.utils import _utc_now, make_tzaware
3435

3536

3637
class ClickhouseOfflineStoreConfig(ClickhouseConfig):
@@ -43,15 +44,26 @@ def get_historical_features(
4344
config: RepoConfig,
4445
feature_views: List[FeatureView],
4546
feature_refs: List[str],
46-
entity_df: Union[pd.DataFrame, str],
47+
entity_df: Optional[Union[pd.DataFrame, str]],
4748
registry: BaseRegistry,
4849
project: str,
4950
full_feature_names: bool = False,
51+
**kwargs,
5052
) -> RetrievalJob:
5153
assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig)
5254
for fv in feature_views:
5355
assert isinstance(fv.batch_source, ClickhouseSource)
5456

57+
# Handle non-entity retrieval mode
58+
if entity_df is None:
59+
end_date = kwargs.get("end_date", None)
60+
if end_date is None:
61+
end_date = _utc_now()
62+
else:
63+
end_date = make_tzaware(end_date)
64+
65+
entity_df = pd.DataFrame({"event_timestamp": [end_date]})
66+
5567
entity_schema = _get_entity_schema(entity_df, config)
5668

5769
entity_df_event_timestamp_col = (

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ def teardown(self):
118118
pass
119119

120120

121+
def _make_offline_store_config(clickhouse_container):
122+
"""Build a ClickhouseOfflineStoreConfig pointing at the test container."""
123+
return ClickhouseOfflineStoreConfig(
124+
type="clickhouse",
125+
host=clickhouse_container.get_container_host_ip(),
126+
port=clickhouse_container.get_exposed_port(8123),
127+
database=CLICKHOUSE_OFFLINE_DB,
128+
user=CLICKHOUSE_USER,
129+
password=CLICKHOUSE_PASSWORD,
130+
)
131+
132+
121133
def test_get_client_with_additional_params(clickhouse_container):
122134
"""
123135
Test that get_client works with a real ClickHouse container and properly passes
@@ -142,3 +154,71 @@ def test_get_client_with_additional_params(clickhouse_container):
142154

143155
# Verify the send_receive_timeout was applied
144156
assert client.timeout._read == 60
157+
158+
159+
def test_non_entity_retrieval(clickhouse_container):
160+
"""Integration test: get_historical_features with entity_df=None returns real data."""
161+
from datetime import datetime, timedelta, timezone
162+
from unittest.mock import MagicMock
163+
164+
from feast.feature_view import FeatureView, Field
165+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
166+
ClickhouseOfflineStore,
167+
df_to_clickhouse_table,
168+
)
169+
from feast.repo_config import RepoConfig
170+
from feast.types import Float32
171+
172+
offline_config = _make_offline_store_config(clickhouse_container)
173+
repo_config = RepoConfig(
174+
project="test_project",
175+
registry="test_registry",
176+
provider="local",
177+
offline_store=offline_config,
178+
)
179+
180+
# Seed a feature table with real data
181+
now = datetime.now(tz=timezone.utc)
182+
feature_df = pd.DataFrame(
183+
{
184+
"event_timestamp": [now - timedelta(hours=2), now - timedelta(hours=1)],
185+
"feature_value": [1.0, 2.0],
186+
}
187+
)
188+
table_name = "test_non_entity_features"
189+
client = get_client(offline_config)
190+
client.command(f"DROP TABLE IF EXISTS {table_name}")
191+
df_to_clickhouse_table(offline_config, feature_df, table_name, "event_timestamp")
192+
193+
source = ClickhouseSource(
194+
name=table_name,
195+
table=table_name,
196+
timestamp_field="event_timestamp",
197+
)
198+
fv = FeatureView(
199+
name="test_fv",
200+
entities=[],
201+
ttl=timedelta(days=1),
202+
source=source,
203+
schema=[Field(name="feature_value", dtype=Float32)],
204+
)
205+
206+
registry = MagicMock()
207+
registry.list_on_demand_feature_views.return_value = []
208+
209+
job = ClickhouseOfflineStore.get_historical_features(
210+
config=repo_config,
211+
feature_views=[fv],
212+
feature_refs=["test_fv:feature_value"],
213+
entity_df=None,
214+
registry=registry,
215+
project="test_project",
216+
end_date=now,
217+
)
218+
219+
result_df = job.to_df()
220+
assert len(result_df) > 0
221+
assert "feature_value" in result_df.columns
222+
223+
# Cleanup
224+
client.command(f"DROP TABLE IF EXISTS {table_name}")

sdk/python/tests/integration/rest_api/conftest.py

Lines changed: 37 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
deploy_and_validate_pod,
1616
execPodCommand,
1717
get_pod_name_by_prefix,
18-
run_kubectl_apply_with_sed,
1918
run_kubectl_command,
2019
validate_feature_store_cr_status,
2120
)
@@ -90,38 +89,42 @@ def feast_rest_client():
9089
namespace = "test-ns-feast-rest"
9190
credit_scoring = "credit-scoring"
9291
driver_ranking = "driver-ranking"
93-
service_name = "feast-test-s3-registry-rest"
92+
# Registry REST service name created by the operator for credit-scoring (kind and OpenShift)
93+
registry_rest_service = "feast-credit-scoring-registry-rest"
9494
run_on_openshift = os.getenv("RUN_ON_OPENSHIFT_CI", "false").lower() == "true"
9595

9696
# Create test namespace
9797
create_namespace(api_instance, namespace)
9898

9999
try:
100-
if not run_on_openshift:
101-
# Deploy dependencies
102-
deploy_and_validate_pod(
103-
namespace, str(resource_dir / "redis.yaml"), "app=redis"
104-
)
105-
deploy_and_validate_pod(
106-
namespace, str(resource_dir / "postgres.yaml"), "app=postgres"
107-
)
108-
109-
# Create and validate FeatureStore CRs
110-
create_feast_project(
111-
str(resource_dir / "feast_config_credit_scoring.yaml"),
112-
namespace,
113-
credit_scoring,
114-
)
115-
validate_feature_store_cr_status(namespace, credit_scoring)
116-
117-
create_feast_project(
118-
str(resource_dir / "feast_config_driver_ranking.yaml"),
119-
namespace,
120-
driver_ranking,
121-
)
122-
validate_feature_store_cr_status(namespace, driver_ranking)
123-
124-
# Deploy ingress and get route URL
100+
# Deploy dependencies (same for kind and OpenShift)
101+
deploy_and_validate_pod(
102+
namespace, str(resource_dir / "redis.yaml"), "app=redis"
103+
)
104+
deploy_and_validate_pod(
105+
namespace, str(resource_dir / "postgres.yaml"), "app=postgres"
106+
)
107+
108+
# Create and validate FeatureStore CRs (SQL registry, same as kind)
109+
create_feast_project(
110+
str(resource_dir / "feast_config_credit_scoring.yaml"),
111+
namespace,
112+
credit_scoring,
113+
)
114+
validate_feature_store_cr_status(namespace, credit_scoring)
115+
116+
create_feast_project(
117+
str(resource_dir / "feast_config_driver_ranking.yaml"),
118+
namespace,
119+
driver_ranking,
120+
)
121+
validate_feature_store_cr_status(namespace, driver_ranking)
122+
123+
if run_on_openshift:
124+
# OpenShift: expose registry REST via route (no nginx ingress)
125+
route_url = create_route(namespace, credit_scoring, registry_rest_service)
126+
else:
127+
# Kind: deploy nginx ingress and get route URL
125128
run_kubectl_command(
126129
[
127130
"apply",
@@ -144,40 +147,14 @@ def feast_rest_client():
144147
)
145148
route_url = f"http://{ingress_host}"
146149

147-
# Apply feast projects
150+
# Apply feast projects
151+
applyFeastProject(namespace, credit_scoring)
152+
applyFeastProject(namespace, driver_ranking)
148153

149-
applyFeastProject(namespace, credit_scoring)
150-
151-
applyFeastProject(namespace, driver_ranking)
152-
153-
# Create Saved Datasets and Permissions
154-
pod_name = get_pod_name_by_prefix(namespace, credit_scoring)
155-
156-
# Apply datasets
157-
execPodCommand(
158-
namespace, pod_name, ["python", "create_ui_visible_datasets.py"]
159-
)
160-
161-
# Apply permissions
162-
execPodCommand(namespace, pod_name, ["python", "permissions_apply.py"])
163-
164-
else:
165-
# OpenShift cluster setup using S3-based registry
166-
aws_access_key = os.getenv("AWS_ACCESS_KEY")
167-
aws_secret_key = os.getenv("AWS_SECRET_KEY")
168-
aws_bucket = os.getenv("AWS_BUCKET_NAME")
169-
registry_path = os.getenv("AWS_REGISTRY_FILE_PATH")
170-
171-
run_kubectl_apply_with_sed(
172-
aws_access_key,
173-
aws_secret_key,
174-
aws_bucket,
175-
registry_path,
176-
str(resource_dir / "feast_config_rhoai.yaml"),
177-
namespace,
178-
)
179-
validate_feature_store_cr_status(namespace, "test-s3")
180-
route_url = create_route(namespace, credit_scoring, service_name)
154+
# Create Saved Datasets and Permissions
155+
pod_name = get_pod_name_by_prefix(namespace, credit_scoring)
156+
execPodCommand(namespace, pod_name, ["python", "create_ui_visible_datasets.py"])
157+
execPodCommand(namespace, pod_name, ["python", "permissions_apply.py"])
181158
if not route_url:
182159
raise RuntimeError("Route URL could not be fetched.")
183160

sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import threading
3+
from datetime import datetime, timedelta, timezone
34
from unittest.mock import MagicMock, patch
45

56
import pytest
@@ -133,3 +134,109 @@ def test_clickhouse_config_handles_none_additional_client_args():
133134
config = ClickhouseConfig(**raw_config)
134135

135136
assert config.additional_client_args is None
137+
138+
139+
class TestNonEntityRetrieval:
140+
"""Test the non-entity retrieval logic (entity_df=None) for ClickHouse."""
141+
142+
_MODULE = "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse"
143+
144+
def _call_get_historical_features(self, feature_views, **kwargs):
145+
"""Call get_historical_features with entity_df=None, mocking the pipeline."""
146+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
147+
ClickhouseOfflineStore,
148+
ClickhouseOfflineStoreConfig,
149+
)
150+
from feast.repo_config import RepoConfig
151+
152+
config = RepoConfig(
153+
project="test_project",
154+
registry="test_registry",
155+
provider="local",
156+
offline_store=ClickhouseOfflineStoreConfig(
157+
type="clickhouse",
158+
host="localhost",
159+
port=9000,
160+
database="test_db",
161+
user="default",
162+
password="password",
163+
),
164+
)
165+
166+
end = kwargs.get("end_date", datetime(2023, 1, 7, tzinfo=timezone.utc))
167+
168+
with (
169+
patch.multiple(
170+
self._MODULE,
171+
_upload_entity_df=MagicMock(),
172+
_get_entity_schema=MagicMock(
173+
return_value={"event_timestamp": "timestamp"}
174+
),
175+
_get_entity_df_event_timestamp_range=MagicMock(
176+
return_value=(end - timedelta(days=1), end)
177+
),
178+
),
179+
patch(
180+
f"{self._MODULE}.offline_utils.get_expected_join_keys",
181+
return_value=[],
182+
),
183+
patch(
184+
f"{self._MODULE}.offline_utils.assert_expected_columns_in_entity_df",
185+
),
186+
patch(
187+
f"{self._MODULE}.offline_utils.get_feature_view_query_context",
188+
return_value=[],
189+
),
190+
):
191+
refs = [f"{fv.name}:feature1" for fv in feature_views]
192+
return ClickhouseOfflineStore.get_historical_features(
193+
config=config,
194+
feature_views=feature_views,
195+
feature_refs=refs,
196+
entity_df=None,
197+
registry=MagicMock(),
198+
project="test_project",
199+
**kwargs,
200+
)
201+
202+
@staticmethod
203+
def _make_feature_view(name, ttl=None):
204+
from feast.entity import Entity
205+
from feast.feature_view import FeatureView, Field
206+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import (
207+
ClickhouseSource,
208+
)
209+
from feast.types import Float32
210+
211+
return FeatureView(
212+
name=name,
213+
entities=[Entity(name="driver_id", join_keys=["driver_id"])],
214+
ttl=ttl,
215+
source=ClickhouseSource(
216+
name=f"{name}_source",
217+
table=f"{name}_table",
218+
timestamp_field="event_timestamp",
219+
),
220+
schema=[
221+
Field(name="feature1", dtype=Float32),
222+
],
223+
)
224+
225+
def test_non_entity_mode_with_end_date(self):
226+
"""entity_df=None with explicit end_date produces a valid RetrievalJob."""
227+
from feast.infra.offline_stores.offline_store import RetrievalJob
228+
229+
fv = self._make_feature_view("test_fv")
230+
job = self._call_get_historical_features(
231+
[fv],
232+
end_date=datetime(2023, 1, 7, tzinfo=timezone.utc),
233+
)
234+
assert isinstance(job, RetrievalJob)
235+
236+
def test_non_entity_mode_defaults_end_date(self):
237+
"""entity_df=None without end_date defaults to now."""
238+
from feast.infra.offline_stores.offline_store import RetrievalJob
239+
240+
fv = self._make_feature_view("test_fv")
241+
job = self._call_get_historical_features([fv])
242+
assert isinstance(job, RetrievalJob)

0 commit comments

Comments
 (0)