Skip to content

Commit 3249b97

Browse files
authored
fix: Fix SQL Registry cache miss (#3482)
Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent fd91cda commit 3249b97

File tree

8 files changed

+160
-48
lines changed

8 files changed

+160
-48
lines changed

sdk/python/feast/feature_store.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ def __init__(
166166

167167
registry_config = self.config.get_registry_config()
168168
if registry_config.registry_type == "sql":
169-
self._registry = SqlRegistry(registry_config, None)
169+
self._registry = SqlRegistry(registry_config, self.config.project, None)
170170
else:
171-
r = Registry(registry_config, repo_path=self.repo_path)
171+
r = Registry(self.config.project, registry_config, repo_path=self.repo_path)
172172
r._initialize_registry(self.config.project)
173173
self._registry = r
174174

@@ -210,7 +210,9 @@ def refresh_registry(self):
210210
downloaded synchronously, which may increase latencies if the triggering method is get_online_features().
211211
"""
212212
registry_config = self.config.get_registry_config()
213-
registry = Registry(registry_config, repo_path=self.repo_path)
213+
registry = Registry(
214+
self.config.project, registry_config, repo_path=self.repo_path
215+
)
214216
registry.refresh(self.config.project)
215217

216218
self._registry = registry

sdk/python/feast/infra/registry/proto_registry_utils.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from typing import List
1+
import uuid
2+
from typing import List, Optional
23

4+
from feast import usage
35
from feast.data_source import DataSource
46
from feast.entity import Entity
57
from feast.errors import (
@@ -15,12 +17,32 @@
1517
from feast.feature_view import FeatureView
1618
from feast.on_demand_feature_view import OnDemandFeatureView
1719
from feast.project_metadata import ProjectMetadata
20+
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
1821
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
1922
from feast.request_feature_view import RequestFeatureView
2023
from feast.saved_dataset import SavedDataset, ValidationReference
2124
from feast.stream_feature_view import StreamFeatureView
2225

2326

27+
def init_project_metadata(cached_registry_proto: RegistryProto, project: str):
28+
new_project_uuid = f"{uuid.uuid4()}"
29+
usage.set_current_project_uuid(new_project_uuid)
30+
cached_registry_proto.project_metadata.append(
31+
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
32+
)
33+
34+
35+
def get_project_metadata(
36+
registry_proto: Optional[RegistryProto], project: str
37+
) -> Optional[ProjectMetadataProto]:
38+
if not registry_proto:
39+
return None
40+
for pm in registry_proto.project_metadata:
41+
if pm.project == project:
42+
return pm
43+
return None
44+
45+
2446
def get_feature_service(
2547
registry_proto: RegistryProto, name: str, project: str
2648
) -> FeatureService:

sdk/python/feast/infra/registry/registry.py

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import logging
15-
import uuid
1615
from datetime import datetime, timedelta
1716
from enum import Enum
1817
from pathlib import Path
@@ -44,7 +43,6 @@
4443
from feast.infra.registry.registry_store import NoopRegistryStore
4544
from feast.on_demand_feature_view import OnDemandFeatureView
4645
from feast.project_metadata import ProjectMetadata
47-
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
4846
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
4947
from feast.repo_config import RegistryConfig
5048
from feast.repo_contents import RepoContents
@@ -143,25 +141,6 @@ def get_registry_store_class_from_scheme(registry_path: str):
143141
return get_registry_store_class_from_type(registry_store_type)
144142

145143

146-
def _get_project_metadata(
147-
registry_proto: Optional[RegistryProto], project: str
148-
) -> Optional[ProjectMetadataProto]:
149-
if not registry_proto:
150-
return None
151-
for pm in registry_proto.project_metadata:
152-
if pm.project == project:
153-
return pm
154-
return None
155-
156-
157-
def _init_project_metadata(cached_registry_proto: RegistryProto, project: str):
158-
new_project_uuid = f"{uuid.uuid4()}"
159-
usage.set_current_project_uuid(new_project_uuid)
160-
cached_registry_proto.project_metadata.append(
161-
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
162-
)
163-
164-
165144
class Registry(BaseRegistry):
166145
def apply_user_metadata(
167146
self,
@@ -184,19 +163,25 @@ def get_user_metadata(
184163
cached_registry_proto_ttl: timedelta
185164

186165
def __new__(
187-
cls, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
166+
cls,
167+
project: str,
168+
registry_config: Optional[RegistryConfig],
169+
repo_path: Optional[Path],
188170
):
189171
# We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers
190172
# needing to make any changes.
191173
if registry_config and registry_config.registry_type == "sql":
192174
from feast.infra.registry.sql import SqlRegistry
193175

194-
return SqlRegistry(registry_config, repo_path)
176+
return SqlRegistry(registry_config, project, repo_path)
195177
else:
196178
return super(Registry, cls).__new__(cls)
197179

198180
def __init__(
199-
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
181+
self,
182+
project: str,
183+
registry_config: Optional[RegistryConfig],
184+
repo_path: Optional[Path],
200185
):
201186
"""
202187
Create the Registry object.
@@ -225,7 +210,7 @@ def __init__(
225210
)
226211

227212
def clone(self) -> "Registry":
228-
new_registry = Registry(None, None)
213+
new_registry = Registry("project", None, None)
229214
new_registry.cached_registry_proto_ttl = timedelta(seconds=0)
230215
new_registry.cached_registry_proto = (
231216
self.cached_registry_proto.__deepcopy__()
@@ -243,7 +228,7 @@ def _initialize_registry(self, project: str):
243228
except FileNotFoundError:
244229
registry_proto = RegistryProto()
245230
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
246-
_init_project_metadata(registry_proto, project)
231+
proto_registry_utils.init_project_metadata(registry_proto, project)
247232
self._registry_store.update_registry_proto(registry_proto)
248233

249234
def update_infra(self, infra: Infra, project: str, commit: bool = True):
@@ -791,7 +776,12 @@ def _prepare_registry_for_changes(self, project: str):
791776
"""Prepares the Registry for changes by refreshing the cache if necessary."""
792777
try:
793778
self._get_registry_proto(project=project, allow_cache=True)
794-
if _get_project_metadata(self.cached_registry_proto, project) is None:
779+
if (
780+
proto_registry_utils.get_project_metadata(
781+
self.cached_registry_proto, project
782+
)
783+
is None
784+
):
795785
# Project metadata not initialized yet. Try pulling without cache
796786
self._get_registry_proto(project=project, allow_cache=False)
797787
except FileNotFoundError:
@@ -802,8 +792,15 @@ def _prepare_registry_for_changes(self, project: str):
802792

803793
# Initialize project metadata if needed
804794
assert self.cached_registry_proto
805-
if _get_project_metadata(self.cached_registry_proto, project) is None:
806-
_init_project_metadata(self.cached_registry_proto, project)
795+
if (
796+
proto_registry_utils.get_project_metadata(
797+
self.cached_registry_proto, project
798+
)
799+
is None
800+
):
801+
proto_registry_utils.init_project_metadata(
802+
self.cached_registry_proto, project
803+
)
807804
self.commit()
808805

809806
return self.cached_registry_proto
@@ -836,7 +833,7 @@ def _get_registry_proto(
836833
)
837834

838835
if project:
839-
old_project_metadata = _get_project_metadata(
836+
old_project_metadata = proto_registry_utils.get_project_metadata(
840837
registry_proto=self.cached_registry_proto, project=project
841838
)
842839

@@ -854,13 +851,13 @@ def _get_registry_proto(
854851
if not project:
855852
return registry_proto
856853

857-
project_metadata = _get_project_metadata(
854+
project_metadata = proto_registry_utils.get_project_metadata(
858855
registry_proto=registry_proto, project=project
859856
)
860857
if project_metadata:
861858
usage.set_current_project_uuid(project_metadata.project_uuid)
862859
else:
863-
_init_project_metadata(registry_proto, project)
860+
proto_registry_utils.init_project_metadata(registry_proto, project)
864861
self.commit()
865862

866863
return registry_proto

sdk/python/feast/infra/registry/sql.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,24 @@ class FeastMetadataKeys(Enum):
180180

181181
class SqlRegistry(BaseRegistry):
182182
def __init__(
183-
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
183+
self,
184+
registry_config: Optional[RegistryConfig],
185+
project: str,
186+
repo_path: Optional[Path],
184187
):
185188
assert registry_config is not None, "SqlRegistry needs a valid registry_config"
186189
self.engine: Engine = create_engine(registry_config.path, echo=False)
187190
metadata.create_all(self.engine)
188191
self.cached_registry_proto = self.proto()
192+
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
189193
self.cached_registry_proto_created = datetime.utcnow()
190194
self._refresh_lock = Lock()
191195
self.cached_registry_proto_ttl = timedelta(
192196
seconds=registry_config.cache_ttl_seconds
193197
if registry_config.cache_ttl_seconds is not None
194198
else 0
195199
)
200+
self.project = project
196201

197202
def teardown(self):
198203
for t in {
@@ -210,6 +215,16 @@ def teardown(self):
210215
conn.execute(stmt)
211216

212217
def refresh(self, project: Optional[str] = None):
218+
if project:
219+
project_metadata = proto_registry_utils.get_project_metadata(
220+
registry_proto=self.cached_registry_proto, project=project
221+
)
222+
if project_metadata:
223+
usage.set_current_project_uuid(project_metadata.project_uuid)
224+
else:
225+
proto_registry_utils.init_project_metadata(
226+
self.cached_registry_proto, project
227+
)
213228
self.cached_registry_proto = self.proto()
214229
self.cached_registry_proto_created = datetime.utcnow()
215230

@@ -816,7 +831,13 @@ def proto(self) -> RegistryProto:
816831
]:
817832
objs: List[Any] = lister(project) # type: ignore
818833
if objs:
819-
registry_proto_field.extend([obj.to_proto() for obj in objs])
834+
obj_protos = [obj.to_proto() for obj in objs]
835+
for obj_proto in obj_protos:
836+
if "spec" in obj_proto.DESCRIPTOR.fields_by_name:
837+
obj_proto.spec.project = project
838+
else:
839+
obj_proto.project = project
840+
registry_proto_field.extend(obj_protos)
820841

821842
# This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783,
822843
# the registry proto only has a single infra field, which we're currently setting as the "last" project.

sdk/python/feast/repo_operations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str:
349349
"""For debugging only: output contents of the metadata registry"""
350350
registry_config = repo_config.get_registry_config()
351351
project = repo_config.project
352-
registry = Registry(registry_config=registry_config, repo_path=repo_path)
352+
registry = Registry(project, registry_config=registry_config, repo_path=repo_path)
353353
registry_dict = registry.to_dict(project=project)
354354
return json.dumps(registry_dict, indent=2, sort_keys=True)
355355

sdk/python/tests/integration/registration/test_registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def gcs_registry() -> Registry:
4545
registry_config = RegistryConfig(
4646
path=f"gs://{bucket_name}/registry.db", cache_ttl_seconds=600
4747
)
48-
return Registry(registry_config, None)
48+
return Registry("project", registry_config, None)
4949

5050

5151
@pytest.fixture
@@ -57,7 +57,7 @@ def s3_registry() -> Registry:
5757
path=f"{aws_registry_path}/{int(time.time() * 1000)}/registry.db",
5858
cache_ttl_seconds=600,
5959
)
60-
return Registry(registry_config, None)
60+
return Registry("project", registry_config, None)
6161

6262

6363
@pytest.mark.integration

sdk/python/tests/unit/infra/test_local_registry.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
def local_registry() -> Registry:
4040
fd, registry_path = mkstemp()
4141
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
42-
return Registry(registry_config, None)
42+
return Registry("project", registry_config, None)
4343

4444

4545
@pytest.mark.parametrize(
@@ -443,7 +443,7 @@ def test_apply_data_source(test_registry: Registry):
443443
def test_commit():
444444
fd, registry_path = mkstemp()
445445
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
446-
test_registry = Registry(registry_config, None)
446+
test_registry = Registry("project", registry_config, None)
447447

448448
entity = Entity(
449449
name="driver_car_id",
@@ -484,7 +484,7 @@ def test_commit():
484484
validate_project_uuid(project_uuid, test_registry)
485485

486486
# Create new registry that points to the same store
487-
registry_with_same_store = Registry(registry_config, None)
487+
registry_with_same_store = Registry("project", registry_config, None)
488488

489489
# Retrieving the entity should fail since the store is empty
490490
entities = registry_with_same_store.list_entities(project)
@@ -495,7 +495,7 @@ def test_commit():
495495
test_registry.commit()
496496

497497
# Reconstruct the new registry in order to read the newly written store
498-
registry_with_same_store = Registry(registry_config, None)
498+
registry_with_same_store = Registry("project", registry_config, None)
499499

500500
# Retrieving the entity should now succeed
501501
entities = registry_with_same_store.list_entities(project)

0 commit comments

Comments
 (0)