Skip to content

Commit d61846f

Browse files
GWealecopybara-github
authored andcommitted
fix: Optimize row-level locking in append_event
Only acquire FOR UPDATE locks on app and user state rows when the event's state_delta contains changes for those specific scopes. This avoids unnecessary locking on state rows that are not being modified, improving concurrency. Close #4655 Co-authored-by: George Weale <gweale@google.com> PiperOrigin-RevId: 878108562
1 parent 4e3e2cb commit d61846f

2 files changed

Lines changed: 113 additions & 18 deletions

File tree

src/google/adk/sessions/database_session_service.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,16 @@ async def append_event(self, session: Session, event: Event) -> Event:
531531
schema = self._get_schema_classes()
532532
is_sqlite = self.db_engine.dialect.name == _SQLITE_DIALECT
533533
use_row_level_locking = self._supports_row_level_locking()
534+
535+
state_delta = (
536+
event.actions.state_delta
537+
if event.actions and event.actions.state_delta
538+
else {}
539+
)
540+
state_deltas = _session_util.extract_state_delta(state_delta)
541+
has_app_delta = bool(state_deltas["app"])
542+
has_user_delta = bool(state_deltas["user"])
543+
534544
async with self._with_session_lock(
535545
app_name=session.app_name,
536546
user_id=session.user_id,
@@ -554,7 +564,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
554564
sql_session=sql_session,
555565
state_model=schema.StorageAppState,
556566
predicates=(schema.StorageAppState.app_name == session.app_name,),
557-
use_row_level_locking=use_row_level_locking,
567+
use_row_level_locking=use_row_level_locking and has_app_delta,
558568
missing_message=(
559569
"App state missing for app_name="
560570
f"{session.app_name!r}. Session state tables should be "
@@ -568,7 +578,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
568578
schema.StorageUserState.app_name == session.app_name,
569579
schema.StorageUserState.user_id == session.user_id,
570580
),
571-
use_row_level_locking=use_row_level_locking,
581+
use_row_level_locking=use_row_level_locking and has_user_delta,
572582
missing_message=(
573583
"User state missing for app_name="
574584
f"{session.app_name!r}, user_id={session.user_id!r}. "
@@ -599,23 +609,19 @@ async def append_event(self, session: Session, event: Event) -> Event:
599609
storage_events = [e async for e in result]
600610
session.events = [e.to_event() for e in storage_events]
601611

602-
# Extract state delta
603-
if event.actions and event.actions.state_delta:
604-
state_deltas = _session_util.extract_state_delta(
605-
event.actions.state_delta
612+
# Merge pre-extracted state deltas into storage.
613+
if has_app_delta:
614+
storage_app_state.state = (
615+
storage_app_state.state | state_deltas["app"]
616+
)
617+
if has_user_delta:
618+
storage_user_state.state = (
619+
storage_user_state.state | state_deltas["user"]
620+
)
621+
if state_deltas["session"]:
622+
storage_session.state = (
623+
storage_session.state | state_deltas["session"]
606624
)
607-
app_state_delta = state_deltas["app"]
608-
user_state_delta = state_deltas["user"]
609-
session_state_delta = state_deltas["session"]
610-
# Merge state and update storage
611-
if app_state_delta:
612-
storage_app_state.state = storage_app_state.state | app_state_delta
613-
if user_state_delta:
614-
storage_user_state.state = (
615-
storage_user_state.state | user_state_delta
616-
)
617-
if session_state_delta:
618-
storage_session.state = storage_session.state | session_state_delta
619625

620626
if is_sqlite:
621627
update_time = datetime.fromtimestamp(

tests/unittests/sessions/test_session_service.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,3 +1153,92 @@ async def test_prepare_tables_idempotent_after_creation():
11531153
assert session.id == 's1'
11541154
finally:
11551155
await service.close()
1156+
1157+
1158+
@pytest.mark.asyncio
1159+
@pytest.mark.parametrize(
1160+
'state_delta, expect_app_lock, expect_user_lock',
1161+
[
1162+
pytest.param(
1163+
None,
1164+
False,
1165+
False,
1166+
id='no_state_delta',
1167+
),
1168+
pytest.param(
1169+
{'session_key': 'v'},
1170+
False,
1171+
False,
1172+
id='session_only_delta',
1173+
),
1174+
pytest.param(
1175+
{'app:key': 'v'},
1176+
True,
1177+
False,
1178+
id='app_delta_only',
1179+
),
1180+
pytest.param(
1181+
{'user:key': 'v'},
1182+
False,
1183+
True,
1184+
id='user_delta_only',
1185+
),
1186+
pytest.param(
1187+
{'app:a': '1', 'user:b': '2', 'sk': '3'},
1188+
True,
1189+
True,
1190+
id='all_scopes',
1191+
),
1192+
],
1193+
)
1194+
async def test_append_event_locks_only_scopes_with_deltas(
1195+
state_delta, expect_app_lock, expect_user_lock
1196+
):
1197+
"""FOR UPDATE should only be requested for state scopes that have deltas."""
1198+
service = DatabaseSessionService('sqlite+aiosqlite:///:memory:')
1199+
1200+
lock_requests = []
1201+
original_fn = database_session_service._select_required_state
1202+
1203+
async def tracking_fn(**kwargs):
1204+
lock_requests.append({
1205+
'model': kwargs['state_model'].__tablename__,
1206+
'use_row_level_locking': kwargs['use_row_level_locking'],
1207+
})
1208+
return await original_fn(**kwargs)
1209+
1210+
try:
1211+
session = await service.create_session(
1212+
app_name='app', user_id='user', session_id='s1'
1213+
)
1214+
1215+
database_session_service._select_required_state = tracking_fn
1216+
lock_requests.clear()
1217+
1218+
event_kwargs = {'invocation_id': 'inv', 'author': 'user'}
1219+
if state_delta is not None:
1220+
event_kwargs['actions'] = EventActions(state_delta=state_delta)
1221+
event = Event(**event_kwargs)
1222+
await service.append_event(session, event)
1223+
1224+
app_req = next(
1225+
(r for r in lock_requests if r['model'] == 'app_states'), None
1226+
)
1227+
user_req = next(
1228+
(r for r in lock_requests if r['model'] == 'user_states'), None
1229+
)
1230+
1231+
# SQLite doesn't support row-level locking so use_row_level_locking is
1232+
# always False. The important check is that locking is not requested
1233+
# when there is no delta (it must never be True without a delta).
1234+
if not expect_app_lock:
1235+
assert (
1236+
app_req is None or not app_req['use_row_level_locking']
1237+
), 'app_states should not be locked without an app: delta'
1238+
if not expect_user_lock:
1239+
assert (
1240+
user_req is None or not user_req['use_row_level_locking']
1241+
), 'user_states should not be locked without a user: delta'
1242+
finally:
1243+
database_session_service._select_required_state = original_fn
1244+
await service.close()

0 commit comments

Comments
 (0)