@@ -202,6 +202,10 @@ class SqlRegistryConfig(RegistryConfig):
202202 """ str: Path to metadata store.
203203 If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """
204204
205+ read_path : Optional [StrictStr ] = None
206+ """ str: Read Path to metadata store if different from path.
207+ If registry_type is 'sql', then this is a Read Endpoint for database URL. If not set, path will be used for read and write. """
208+
205209 sqlalchemy_config_kwargs : Dict [str , Any ] = {"echo" : False }
206210 """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """
207211
@@ -223,13 +227,20 @@ def __init__(
223227 registry_config , SqlRegistryConfig
224228 ), "SqlRegistry needs a valid registry_config"
225229
226- self .engine : Engine = create_engine (
230+ self .write_engine : Engine = create_engine (
227231 registry_config .path , ** registry_config .sqlalchemy_config_kwargs
228232 )
233+ if registry_config .read_path :
234+ self .read_engine : Engine = create_engine (
235+ registry_config .read_path ,
236+ ** registry_config .sqlalchemy_config_kwargs ,
237+ )
238+ else :
239+ self .read_engine = self .write_engine
240+ metadata .create_all (self .write_engine )
229241 self .thread_pool_executor_worker_count = (
230242 registry_config .thread_pool_executor_worker_count
231243 )
232- metadata .create_all (self .engine )
233244 self .purge_feast_metadata = registry_config .purge_feast_metadata
234245 # Sync feast_metadata to projects table
235246 # when purge_feast_metadata is set to True, Delete data from
@@ -246,7 +257,7 @@ def __init__(
246257 def _sync_feast_metadata_to_projects_table (self ):
247258 feast_metadata_projects : set = []
248259 projects_set : set = []
249- with self .engine .begin () as conn :
260+ with self .write_engine .begin () as conn :
250261 stmt = select (feast_metadata ).where (
251262 feast_metadata .c .metadata_key == FeastMetadataKeys .PROJECT_UUID .value
252263 )
@@ -255,7 +266,7 @@ def _sync_feast_metadata_to_projects_table(self):
255266 feast_metadata_projects .append (row ._mapping ["project_id" ])
256267
257268 if len (feast_metadata_projects ) > 0 :
258- with self .engine .begin () as conn :
269+ with self .write_engine .begin () as conn :
259270 stmt = select (projects )
260271 rows = conn .execute (stmt ).all ()
261272 for row in rows :
@@ -267,7 +278,7 @@ def _sync_feast_metadata_to_projects_table(self):
267278 self .apply_project (Project (name = project_name ), commit = True )
268279
269280 if self .purge_feast_metadata :
270- with self .engine .begin () as conn :
281+ with self .write_engine .begin () as conn :
271282 for project_name in feast_metadata_projects :
272283 stmt = delete (feast_metadata ).where (
273284 feast_metadata .c .project_id == project_name
@@ -285,7 +296,7 @@ def teardown(self):
285296 validation_references ,
286297 permissions ,
287298 }:
288- with self .engine .begin () as conn :
299+ with self .write_engine .begin () as conn :
289300 stmt = delete (t )
290301 conn .execute (stmt )
291302
@@ -549,7 +560,7 @@ def apply_feature_service(
549560 )
550561
551562 def delete_data_source (self , name : str , project : str , commit : bool = True ):
552- with self .engine .begin () as conn :
563+ with self .write_engine .begin () as conn :
553564 stmt = delete (data_sources ).where (
554565 data_sources .c .data_source_name == name ,
555566 data_sources .c .project_id == project ,
@@ -607,7 +618,7 @@ def _list_on_demand_feature_views(
607618 )
608619
609620 def _list_project_metadata (self , project : str ) -> List [ProjectMetadata ]:
610- with self .engine .begin () as conn :
621+ with self .read_engine .begin () as conn :
611622 stmt = select (feast_metadata ).where (
612623 feast_metadata .c .project_id == project ,
613624 )
@@ -726,7 +737,7 @@ def apply_user_metadata(
726737 table = self ._infer_fv_table (feature_view )
727738
728739 name = feature_view .name
729- with self .engine .begin () as conn :
740+ with self .write_engine .begin () as conn :
730741 stmt = select (table ).where (
731742 getattr (table .c , "feature_view_name" ) == name ,
732743 table .c .project_id == project ,
@@ -781,7 +792,7 @@ def get_user_metadata(
781792 table = self ._infer_fv_table (feature_view )
782793
783794 name = feature_view .name
784- with self .engine .begin () as conn :
795+ with self .read_engine .begin () as conn :
785796 stmt = select (table ).where (getattr (table .c , "feature_view_name" ) == name )
786797 row = conn .execute (stmt ).first ()
787798 if row :
@@ -885,7 +896,7 @@ def _apply_object(
885896 name = name or (obj .name if hasattr (obj , "name" ) else None )
886897 assert name , f"name needs to be provided for { obj } "
887898
888- with self .engine .begin () as conn :
899+ with self .write_engine .begin () as conn :
889900 update_datetime = _utc_now ()
890901 update_time = int (update_datetime .timestamp ())
891902 stmt = select (table ).where (
@@ -961,7 +972,7 @@ def _apply_object(
961972
962973 def _maybe_init_project_metadata (self , project ):
963974 # Initialize project metadata if needed
964- with self .engine .begin () as conn :
975+ with self .write_engine .begin () as conn :
965976 update_datetime = _utc_now ()
966977 update_time = int (update_datetime .timestamp ())
967978 stmt = select (feast_metadata ).where (
@@ -988,7 +999,7 @@ def _delete_object(
988999 id_field_name : str ,
9891000 not_found_exception : Optional [Callable ],
9901001 ):
991- with self .engine .begin () as conn :
1002+ with self .write_engine .begin () as conn :
9921003 stmt = delete (table ).where (
9931004 getattr (table .c , id_field_name ) == name , table .c .project_id == project
9941005 )
@@ -1014,7 +1025,7 @@ def _get_object(
10141025 proto_field_name : str ,
10151026 not_found_exception : Optional [Callable ],
10161027 ):
1017- with self .engine .begin () as conn :
1028+ with self .read_engine .begin () as conn :
10181029 stmt = select (table ).where (
10191030 getattr (table .c , id_field_name ) == name , table .c .project_id == project
10201031 )
@@ -1036,7 +1047,7 @@ def _list_objects(
10361047 proto_field_name : str ,
10371048 tags : Optional [dict [str , str ]] = None ,
10381049 ):
1039- with self .engine .begin () as conn :
1050+ with self .read_engine .begin () as conn :
10401051 stmt = select (table ).where (table .c .project_id == project )
10411052 rows = conn .execute (stmt ).all ()
10421053 if rows :
@@ -1051,7 +1062,7 @@ def _list_objects(
10511062 return []
10521063
10531064 def _set_last_updated_metadata (self , last_updated : datetime , project : str ):
1054- with self .engine .begin () as conn :
1065+ with self .write_engine .begin () as conn :
10551066 stmt = select (feast_metadata ).where (
10561067 feast_metadata .c .metadata_key
10571068 == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
@@ -1085,7 +1096,7 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
10851096 conn .execute (insert_stmt )
10861097
10871098 def _get_last_updated_metadata (self , project : str ):
1088- with self .engine .begin () as conn :
1099+ with self .read_engine .begin () as conn :
10891100 stmt = select (feast_metadata ).where (
10901101 feast_metadata .c .metadata_key
10911102 == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
@@ -1130,7 +1141,7 @@ def apply_permission(
11301141 )
11311142
11321143 def delete_permission (self , name : str , project : str , commit : bool = True ):
1133- with self .engine .begin () as conn :
1144+ with self .write_engine .begin () as conn :
11341145 stmt = delete (permissions ).where (
11351146 permissions .c .permission_name == name ,
11361147 permissions .c .project_id == project ,
@@ -1143,7 +1154,7 @@ def _list_projects(
11431154 self ,
11441155 tags : Optional [dict [str , str ]],
11451156 ) -> List [Project ]:
1146- with self .engine .begin () as conn :
1157+ with self .read_engine .begin () as conn :
11471158 stmt = select (projects )
11481159 rows = conn .execute (stmt ).all ()
11491160 if rows :
@@ -1188,7 +1199,7 @@ def delete_project(
11881199 ):
11891200 project = self .get_project (name , allow_cache = False )
11901201 if project :
1191- with self .engine .begin () as conn :
1202+ with self .write_engine .begin () as conn :
11921203 for t in {
11931204 managed_infra ,
11941205 saved_datasets ,
0 commit comments