Skip to content

Commit c111de7

Browse files
author
Bhargav Dodla
committed
feat: Added support for reading from Reader Endpoints for AWS Aurora usecases
Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com>
1 parent 4a6b663 commit c111de7

File tree

2 files changed

+68
-20
lines changed

2 files changed

+68
-20
lines changed

sdk/python/feast/infra/registry/sql.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ class SqlRegistryConfig(RegistryConfig):
202202
""" str: Path to metadata store.
203203
If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """
204204

205+
read_path: Optional[StrictStr] = None
206+
""" str: Read Path to metadata store if different from path.
207+
If registry_type is 'sql', then this is a Read Endpoint for database URL. If not set, path will be used for read and write. """
208+
205209
sqlalchemy_config_kwargs: Dict[str, Any] = {"echo": False}
206210
""" Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """
207211

@@ -223,13 +227,20 @@ def __init__(
223227
registry_config, SqlRegistryConfig
224228
), "SqlRegistry needs a valid registry_config"
225229

226-
self.engine: Engine = create_engine(
230+
self.write_engine: Engine = create_engine(
227231
registry_config.path, **registry_config.sqlalchemy_config_kwargs
228232
)
233+
if registry_config.read_path:
234+
self.read_engine: Engine = create_engine(
235+
registry_config.read_path,
236+
**registry_config.sqlalchemy_config_kwargs,
237+
)
238+
else:
239+
self.read_engine = self.write_engine
240+
metadata.create_all(self.write_engine)
229241
self.thread_pool_executor_worker_count = (
230242
registry_config.thread_pool_executor_worker_count
231243
)
232-
metadata.create_all(self.engine)
233244
self.purge_feast_metadata = registry_config.purge_feast_metadata
234245
# Sync feast_metadata to projects table
235246
# when purge_feast_metadata is set to True, Delete data from
@@ -246,7 +257,7 @@ def __init__(
246257
def _sync_feast_metadata_to_projects_table(self):
247258
feast_metadata_projects: set = []
248259
projects_set: set = []
249-
with self.engine.begin() as conn:
260+
with self.write_engine.begin() as conn:
250261
stmt = select(feast_metadata).where(
251262
feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value
252263
)
@@ -255,7 +266,7 @@ def _sync_feast_metadata_to_projects_table(self):
255266
feast_metadata_projects.append(row._mapping["project_id"])
256267

257268
if len(feast_metadata_projects) > 0:
258-
with self.engine.begin() as conn:
269+
with self.write_engine.begin() as conn:
259270
stmt = select(projects)
260271
rows = conn.execute(stmt).all()
261272
for row in rows:
@@ -267,7 +278,7 @@ def _sync_feast_metadata_to_projects_table(self):
267278
self.apply_project(Project(name=project_name), commit=True)
268279

269280
if self.purge_feast_metadata:
270-
with self.engine.begin() as conn:
281+
with self.write_engine.begin() as conn:
271282
for project_name in feast_metadata_projects:
272283
stmt = delete(feast_metadata).where(
273284
feast_metadata.c.project_id == project_name
@@ -285,7 +296,7 @@ def teardown(self):
285296
validation_references,
286297
permissions,
287298
}:
288-
with self.engine.begin() as conn:
299+
with self.write_engine.begin() as conn:
289300
stmt = delete(t)
290301
conn.execute(stmt)
291302

@@ -494,7 +505,7 @@ def apply_feature_service(
494505
)
495506

496507
def delete_data_source(self, name: str, project: str, commit: bool = True):
497-
with self.engine.begin() as conn:
508+
with self.write_engine.begin() as conn:
498509
stmt = delete(data_sources).where(
499510
data_sources.c.data_source_name == name,
500511
data_sources.c.project_id == project,
@@ -552,7 +563,7 @@ def _list_on_demand_feature_views(
552563
)
553564

554565
def _list_project_metadata(self, project: str) -> List[ProjectMetadata]:
555-
with self.engine.begin() as conn:
566+
with self.read_engine.begin() as conn:
556567
stmt = select(feast_metadata).where(
557568
feast_metadata.c.project_id == project,
558569
)
@@ -671,7 +682,7 @@ def apply_user_metadata(
671682
table = self._infer_fv_table(feature_view)
672683

673684
name = feature_view.name
674-
with self.engine.begin() as conn:
685+
with self.write_engine.begin() as conn:
675686
stmt = select(table).where(
676687
getattr(table.c, "feature_view_name") == name,
677688
table.c.project_id == project,
@@ -726,7 +737,7 @@ def get_user_metadata(
726737
table = self._infer_fv_table(feature_view)
727738

728739
name = feature_view.name
729-
with self.engine.begin() as conn:
740+
with self.read_engine.begin() as conn:
730741
stmt = select(table).where(getattr(table.c, "feature_view_name") == name)
731742
row = conn.execute(stmt).first()
732743
if row:
@@ -830,7 +841,7 @@ def _apply_object(
830841
name = name or (obj.name if hasattr(obj, "name") else None)
831842
assert name, f"name needs to be provided for {obj}"
832843

833-
with self.engine.begin() as conn:
844+
with self.write_engine.begin() as conn:
834845
update_datetime = _utc_now()
835846
update_time = int(update_datetime.timestamp())
836847
stmt = select(table).where(
@@ -906,7 +917,7 @@ def _apply_object(
906917

907918
def _maybe_init_project_metadata(self, project):
908919
# Initialize project metadata if needed
909-
with self.engine.begin() as conn:
920+
with self.write_engine.begin() as conn:
910921
update_datetime = _utc_now()
911922
update_time = int(update_datetime.timestamp())
912923
stmt = select(feast_metadata).where(
@@ -933,7 +944,7 @@ def _delete_object(
933944
id_field_name: str,
934945
not_found_exception: Optional[Callable],
935946
):
936-
with self.engine.begin() as conn:
947+
with self.write_engine.begin() as conn:
937948
stmt = delete(table).where(
938949
getattr(table.c, id_field_name) == name, table.c.project_id == project
939950
)
@@ -959,7 +970,7 @@ def _get_object(
959970
proto_field_name: str,
960971
not_found_exception: Optional[Callable],
961972
):
962-
with self.engine.begin() as conn:
973+
with self.read_engine.begin() as conn:
963974
stmt = select(table).where(
964975
getattr(table.c, id_field_name) == name, table.c.project_id == project
965976
)
@@ -981,7 +992,7 @@ def _list_objects(
981992
proto_field_name: str,
982993
tags: Optional[dict[str, str]] = None,
983994
):
984-
with self.engine.begin() as conn:
995+
with self.read_engine.begin() as conn:
985996
stmt = select(table).where(table.c.project_id == project)
986997
rows = conn.execute(stmt).all()
987998
if rows:
@@ -996,7 +1007,7 @@ def _list_objects(
9961007
return []
9971008

9981009
def _set_last_updated_metadata(self, last_updated: datetime, project: str):
999-
with self.engine.begin() as conn:
1010+
with self.write_engine.begin() as conn:
10001011
stmt = select(feast_metadata).where(
10011012
feast_metadata.c.metadata_key
10021013
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
@@ -1030,7 +1041,7 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
10301041
conn.execute(insert_stmt)
10311042

10321043
def _get_last_updated_metadata(self, project: str):
1033-
with self.engine.begin() as conn:
1044+
with self.read_engine.begin() as conn:
10341045
stmt = select(feast_metadata).where(
10351046
feast_metadata.c.metadata_key
10361047
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
@@ -1075,7 +1086,7 @@ def apply_permission(
10751086
)
10761087

10771088
def delete_permission(self, name: str, project: str, commit: bool = True):
1078-
with self.engine.begin() as conn:
1089+
with self.write_engine.begin() as conn:
10791090
stmt = delete(permissions).where(
10801091
permissions.c.permission_name == name,
10811092
permissions.c.project_id == project,
@@ -1088,7 +1099,7 @@ def _list_projects(
10881099
self,
10891100
tags: Optional[dict[str, str]],
10901101
) -> List[Project]:
1091-
with self.engine.begin() as conn:
1102+
with self.read_engine.begin() as conn:
10921103
stmt = select(projects)
10931104
rows = conn.execute(stmt).all()
10941105
if rows:
@@ -1133,7 +1144,7 @@ def delete_project(
11331144
):
11341145
project = self.get_project(name, allow_cache=False)
11351146
if project:
1136-
with self.engine.begin() as conn:
1147+
with self.write_engine.begin() as conn:
11371148
for t in {
11381149
managed_infra,
11391150
saved_datasets,

sdk/python/tests/integration/registration/test_universal_registry.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ def minio_registry() -> Registry:
123123

124124
POSTGRES_USER = "test"
125125
POSTGRES_PASSWORD = "test"
126+
POSTGRES_READONLY_USER = "read_only_user"
127+
POSTGRES_READONLY_PASSWORD = "readonly_password"
126128
POSTGRES_DB = "test"
127129

128130
logger = logging.getLogger(__name__)
@@ -166,6 +168,38 @@ def pg_registry_async():
166168
container.stop()
167169

168170

171+
def add_pg_read_only_user(container_host, container_port):
172+
# Connect to PostgreSQL as an admin
173+
import psycopg
174+
175+
conn_string = f"dbname={POSTGRES_DB} user={POSTGRES_USER} password={POSTGRES_PASSWORD} host={container_host} port={container_port}"
176+
print(f"conn_string: {conn_string}")
177+
178+
with psycopg.connect(conn_string) as conn:
179+
# Create the read-only user (if not exists)
180+
conn.execute(
181+
f"CREATE USER {POSTGRES_READONLY_USER} WITH PASSWORD '{POSTGRES_READONLY_PASSWORD}';"
182+
)
183+
184+
# Revoke any privileges just in case (to ensure it's clean)
185+
conn.execute(
186+
f"REVOKE ALL PRIVILEGES ON DATABASE {POSTGRES_DB} FROM {POSTGRES_READONLY_USER};"
187+
)
188+
189+
# Grant connect and select privileges on all tables in the database
190+
conn.execute(
191+
f"GRANT CONNECT ON DATABASE {POSTGRES_DB} TO {POSTGRES_READONLY_USER};"
192+
)
193+
conn.execute(
194+
f"GRANT SELECT ON ALL TABLES IN SCHEMA public TO {POSTGRES_READONLY_USER};"
195+
)
196+
197+
# Optionally, ensure read-only access to any new tables added in the future
198+
conn.execute(
199+
f"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO {POSTGRES_READONLY_USER};"
200+
)
201+
202+
169203
def _given_registry_config_for_pg_sql(
170204
container,
171205
cache_ttl_seconds=2,
@@ -184,13 +218,16 @@ def _given_registry_config_for_pg_sql(
184218
container_port = container.get_exposed_port(5432)
185219
container_host = container.get_container_host_ip()
186220

221+
add_pg_read_only_user(container_host, container_port)
222+
187223
return SqlRegistryConfig(
188224
registry_type="sql",
189225
cache_ttl_seconds=cache_ttl_seconds,
190226
cache_mode=cache_mode,
191227
# The `path` must include `+psycopg` in order for `sqlalchemy.create_engine()`
192228
# to understand that we are using psycopg3.
193229
path=f"postgresql+psycopg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}",
230+
read_path=f"postgresql+psycopg://{POSTGRES_READONLY_USER}:{POSTGRES_READONLY_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}",
194231
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
195232
thread_pool_executor_worker_count=thread_pool_executor_worker_count,
196233
purge_feast_metadata=purge_feast_metadata,

0 commit comments

Comments
 (0)