@@ -202,6 +202,10 @@ class SqlRegistryConfig(RegistryConfig):
202202 """ str: Path to metadata store.
203203 If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """
204204
205+ read_path : Optional [StrictStr ] = None
206+ """ str: Read Path to metadata store if different from path.
207+ If registry_type is 'sql', then this is a Read Endpoint for database URL. If not set, path will be used for read and write. """
208+
205209 sqlalchemy_config_kwargs : Dict [str , Any ] = {"echo" : False }
206210 """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """
207211
@@ -223,13 +227,20 @@ def __init__(
223227 registry_config , SqlRegistryConfig
224228 ), "SqlRegistry needs a valid registry_config"
225229
226- self .engine : Engine = create_engine (
230+ self .write_engine : Engine = create_engine (
227231 registry_config .path , ** registry_config .sqlalchemy_config_kwargs
228232 )
233+ if registry_config .read_path :
234+ self .read_engine : Engine = create_engine (
235+ registry_config .read_path ,
236+ ** registry_config .sqlalchemy_config_kwargs ,
237+ )
238+ else :
239+ self .read_engine = self .write_engine
240+ metadata .create_all (self .write_engine )
229241 self .thread_pool_executor_worker_count = (
230242 registry_config .thread_pool_executor_worker_count
231243 )
232- metadata .create_all (self .engine )
233244 self .purge_feast_metadata = registry_config .purge_feast_metadata
234245 # Sync feast_metadata to projects table
235246 # when purge_feast_metadata is set to True, Delete data from
@@ -246,7 +257,7 @@ def __init__(
246257 def _sync_feast_metadata_to_projects_table (self ):
247258 feast_metadata_projects : set = []
248259 projects_set : set = []
249- with self .engine .begin () as conn :
260+ with self .write_engine .begin () as conn :
250261 stmt = select (feast_metadata ).where (
251262 feast_metadata .c .metadata_key == FeastMetadataKeys .PROJECT_UUID .value
252263 )
@@ -255,7 +266,7 @@ def _sync_feast_metadata_to_projects_table(self):
255266 feast_metadata_projects .append (row ._mapping ["project_id" ])
256267
257268 if len (feast_metadata_projects ) > 0 :
258- with self .engine .begin () as conn :
269+ with self .write_engine .begin () as conn :
259270 stmt = select (projects )
260271 rows = conn .execute (stmt ).all ()
261272 for row in rows :
@@ -267,7 +278,7 @@ def _sync_feast_metadata_to_projects_table(self):
267278 self .apply_project (Project (name = project_name ), commit = True )
268279
269280 if self .purge_feast_metadata :
270- with self .engine .begin () as conn :
281+ with self .write_engine .begin () as conn :
271282 for project_name in feast_metadata_projects :
272283 stmt = delete (feast_metadata ).where (
273284 feast_metadata .c .project_id == project_name
@@ -285,7 +296,7 @@ def teardown(self):
285296 validation_references ,
286297 permissions ,
287298 }:
288- with self .engine .begin () as conn :
299+ with self .write_engine .begin () as conn :
289300 stmt = delete (t )
290301 conn .execute (stmt )
291302
@@ -494,7 +505,7 @@ def apply_feature_service(
494505 )
495506
496507 def delete_data_source (self , name : str , project : str , commit : bool = True ):
497- with self .engine .begin () as conn :
508+ with self .write_engine .begin () as conn :
498509 stmt = delete (data_sources ).where (
499510 data_sources .c .data_source_name == name ,
500511 data_sources .c .project_id == project ,
@@ -552,7 +563,7 @@ def _list_on_demand_feature_views(
552563 )
553564
554565 def _list_project_metadata (self , project : str ) -> List [ProjectMetadata ]:
555- with self .engine .begin () as conn :
566+ with self .read_engine .begin () as conn :
556567 stmt = select (feast_metadata ).where (
557568 feast_metadata .c .project_id == project ,
558569 )
@@ -671,7 +682,7 @@ def apply_user_metadata(
671682 table = self ._infer_fv_table (feature_view )
672683
673684 name = feature_view .name
674- with self .engine .begin () as conn :
685+ with self .write_engine .begin () as conn :
675686 stmt = select (table ).where (
676687 getattr (table .c , "feature_view_name" ) == name ,
677688 table .c .project_id == project ,
@@ -726,7 +737,7 @@ def get_user_metadata(
726737 table = self ._infer_fv_table (feature_view )
727738
728739 name = feature_view .name
729- with self .engine .begin () as conn :
740+ with self .read_engine .begin () as conn :
730741 stmt = select (table ).where (getattr (table .c , "feature_view_name" ) == name )
731742 row = conn .execute (stmt ).first ()
732743 if row :
@@ -830,7 +841,7 @@ def _apply_object(
830841 name = name or (obj .name if hasattr (obj , "name" ) else None )
831842 assert name , f"name needs to be provided for { obj } "
832843
833- with self .engine .begin () as conn :
844+ with self .write_engine .begin () as conn :
834845 update_datetime = _utc_now ()
835846 update_time = int (update_datetime .timestamp ())
836847 stmt = select (table ).where (
@@ -906,7 +917,7 @@ def _apply_object(
906917
907918 def _maybe_init_project_metadata (self , project ):
908919 # Initialize project metadata if needed
909- with self .engine .begin () as conn :
920+ with self .write_engine .begin () as conn :
910921 update_datetime = _utc_now ()
911922 update_time = int (update_datetime .timestamp ())
912923 stmt = select (feast_metadata ).where (
@@ -933,7 +944,7 @@ def _delete_object(
933944 id_field_name : str ,
934945 not_found_exception : Optional [Callable ],
935946 ):
936- with self .engine .begin () as conn :
947+ with self .write_engine .begin () as conn :
937948 stmt = delete (table ).where (
938949 getattr (table .c , id_field_name ) == name , table .c .project_id == project
939950 )
@@ -959,7 +970,7 @@ def _get_object(
959970 proto_field_name : str ,
960971 not_found_exception : Optional [Callable ],
961972 ):
962- with self .engine .begin () as conn :
973+ with self .read_engine .begin () as conn :
963974 stmt = select (table ).where (
964975 getattr (table .c , id_field_name ) == name , table .c .project_id == project
965976 )
@@ -981,7 +992,7 @@ def _list_objects(
981992 proto_field_name : str ,
982993 tags : Optional [dict [str , str ]] = None ,
983994 ):
984- with self .engine .begin () as conn :
995+ with self .read_engine .begin () as conn :
985996 stmt = select (table ).where (table .c .project_id == project )
986997 rows = conn .execute (stmt ).all ()
987998 if rows :
@@ -996,7 +1007,7 @@ def _list_objects(
9961007 return []
9971008
9981009 def _set_last_updated_metadata (self , last_updated : datetime , project : str ):
999- with self .engine .begin () as conn :
1010+ with self .write_engine .begin () as conn :
10001011 stmt = select (feast_metadata ).where (
10011012 feast_metadata .c .metadata_key
10021013 == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
@@ -1030,7 +1041,7 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
10301041 conn .execute (insert_stmt )
10311042
10321043 def _get_last_updated_metadata (self , project : str ):
1033- with self .engine .begin () as conn :
1044+ with self .read_engine .begin () as conn :
10341045 stmt = select (feast_metadata ).where (
10351046 feast_metadata .c .metadata_key
10361047 == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
@@ -1075,7 +1086,7 @@ def apply_permission(
10751086 )
10761087
10771088 def delete_permission (self , name : str , project : str , commit : bool = True ):
1078- with self .engine .begin () as conn :
1089+ with self .write_engine .begin () as conn :
10791090 stmt = delete (permissions ).where (
10801091 permissions .c .permission_name == name ,
10811092 permissions .c .project_id == project ,
@@ -1088,7 +1099,7 @@ def _list_projects(
10881099 self ,
10891100 tags : Optional [dict [str , str ]],
10901101 ) -> List [Project ]:
1091- with self .engine .begin () as conn :
1102+ with self .read_engine .begin () as conn :
10921103 stmt = select (projects )
10931104 rows = conn .execute (stmt ).all ()
10941105 if rows :
@@ -1133,7 +1144,7 @@ def delete_project(
11331144 ):
11341145 project = self .get_project (name , allow_cache = False )
11351146 if project :
1136- with self .engine .begin () as conn :
1147+ with self .write_engine .begin () as conn :
11371148 for t in {
11381149 managed_infra ,
11391150 saved_datasets ,
0 commit comments