Skip to content

Commit 2051fa2

Browse files
committed
Add new "exec_once_unless_exception" system; apply to dialect.initialize
Fixed an issue whereby if the dialect "initialize" process which occurs on first connect would encounter an unexpected exception, the initialize process would fail to complete and then no longer attempt on subsequent connection attempts, leaving the dialect in an un-initialized, or partially initialized state, within the scope of parameters that need to be established based on inspection of a live connection. The "invoke once" logic in the event system has been reworked to accommodate for this occurrence using new, private API features that establish an "exec once" hook that will continue to allow the initializer to fire off on subsequent connections, until it completes without raising an exception. This does not impact the behavior of the existing ``once=True`` flag within the event system. Fixes: sqlalchemy#4807 Change-Id: Iec32999b61b6af4b38b6719e0c2651454619078c
1 parent d1948bc commit 2051fa2

8 files changed

Lines changed: 272 additions & 15 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
.. change::
2+
:tags: bug, engine
3+
:tickets: 4807
4+
5+
Fixed an issue whereby if the dialect "initialize" process which occurs on
6+
first connect would encounter an unexpected exception, the initialize
7+
process would fail to complete and then no longer attempt on subsequent
8+
connection attempts, leaving the dialect in an un-initialized, or partially
9+
initialized state, within the scope of parameters that need to be
10+
established based on inspection of a live connection. The "invoke once"
11+
logic in the event system has been reworked to accommodate for this
12+
occurrence using new, private API features that establish an "exec once"
13+
hook that will continue to allow the initializer to fire off on subsequent
14+
connections, until it completes without raising an exception. This does not
15+
impact the behavior of the existing ``once=True`` flag within the event
16+
system.

lib/sqlalchemy/engine/create.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ def first_connect(dbapi_connection, connection_record):
529529
dialect.initialize(c)
530530
dialect.do_rollback(c.connection)
531531

532-
event.listen(pool, "first_connect", first_connect, once=True)
532+
event.listen(
533+
pool, "first_connect", first_connect, _once_unless_exception=True
534+
)
533535

534536
dialect_cls.engine_created(engine)
535537
if entrypoint is not dialect_cls:

lib/sqlalchemy/event/attr.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,9 @@ def for_modify(self, obj):
250250
def _needs_modify(self, *args, **kw):
251251
raise NotImplementedError("need to call for_modify()")
252252

253-
exec_once = insert = append = remove = clear = _needs_modify
253+
exec_once = (
254+
exec_once_unless_exception
255+
) = insert = append = remove = clear = _needs_modify
254256

255257
def __call__(self, *args, **kw):
256258
"""Execute this event."""
@@ -276,17 +278,40 @@ class _CompoundListener(_InstanceLevelDispatch):
276278
def _memoized_attr__exec_once_mutex(self):
277279
return threading.Lock()
278280

281+
def _exec_once_impl(self, retry_on_exception, *args, **kw):
282+
with self._exec_once_mutex:
283+
if not self._exec_once:
284+
try:
285+
self(*args, **kw)
286+
exception = False
287+
except:
288+
exception = True
289+
raise
290+
finally:
291+
if not exception or not retry_on_exception:
292+
self._exec_once = True
293+
279294
def exec_once(self, *args, **kw):
280295
"""Execute this event, but only if it has not been
281296
executed already for this collection."""
282297

283298
if not self._exec_once:
284-
with self._exec_once_mutex:
285-
if not self._exec_once:
286-
try:
287-
self(*args, **kw)
288-
finally:
289-
self._exec_once = True
299+
self._exec_once_impl(False, *args, **kw)
300+
301+
def exec_once_unless_exception(self, *args, **kw):
302+
"""Execute this event, but only if it has not been
303+
executed already for this collection, or was called
304+
by a previous exec_once_unless_exception call and
305+
raised an exception.
306+
307+
If exec_once was already called, then this method will never run
308+
the callable regardless of whether it raised or not.
309+
310+
.. versionadded:: 1.3.8
311+
312+
"""
313+
if not self._exec_once:
314+
self._exec_once_impl(True, *args, **kw)
290315

291316
def __call__(self, *args, **kw):
292317
"""Execute this event."""

lib/sqlalchemy/event/registry.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def with_dispatch_target(self, dispatch_target):
192192

193193
def listen(self, *args, **kw):
194194
once = kw.pop("once", False)
195+
once_unless_exception = kw.pop("_once_unless_exception", False)
195196
named = kw.pop("named", False)
196197

197198
target, identifier, fn = (
@@ -212,10 +213,12 @@ def listen(self, *args, **kw):
212213
if hasattr(stub_function, "_sa_warn"):
213214
stub_function._sa_warn()
214215

215-
if once:
216-
self.with_wrapper(util.only_once(self._listen_fn)).listen(
217-
*args, **kw
218-
)
216+
if once or once_unless_exception:
217+
self.with_wrapper(
218+
util.only_once(
219+
self._listen_fn, retry_on_exception=once_unless_exception
220+
)
221+
).listen(*args, **kw)
219222
else:
220223
self.dispatch_target.dispatch._listen(self, *args, **kw)
221224

lib/sqlalchemy/pool/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ def __connect(self, first_connect_check=False):
604604
if first_connect_check:
605605
pool.dispatch.first_connect.for_modify(
606606
pool.dispatch
607-
).exec_once(self.connection, self)
607+
).exec_once_unless_exception(self.connection, self)
608608
if pool.dispatch.connect:
609609
pool.dispatch.connect(self.connection, self)
610610

lib/sqlalchemy/util/langhelpers.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,7 +1487,7 @@ def warn_limited(msg, args):
14871487
warnings.warn(msg, exc.SAWarning, stacklevel=2)
14881488

14891489

1490-
def only_once(fn):
1490+
def only_once(fn, retry_on_exception):
14911491
"""Decorate the given function to be a no-op after it is called exactly
14921492
once."""
14931493

@@ -1499,7 +1499,12 @@ def go(*arg, **kw):
14991499
strong_fn = fn # noqa
15001500
if once:
15011501
once_fn = once.pop()
1502-
return once_fn(*arg, **kw)
1502+
try:
1503+
return once_fn(*arg, **kw)
1504+
except:
1505+
if retry_on_exception:
1506+
once.insert(0, once_fn)
1507+
raise
15031508

15041509
return go
15051510

test/base/test_events.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,78 @@ def listen_two(x, y):
171171
t2.dispatch.event_one,
172172
)
173173

174+
def test_exec_once(self):
175+
m1 = Mock()
176+
177+
event.listen(self.Target, "event_one", m1)
178+
179+
t1 = self.Target()
180+
t2 = self.Target()
181+
182+
t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(5, 6)
183+
184+
t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8)
185+
186+
t2.dispatch.event_one.for_modify(t2.dispatch).exec_once(9, 10)
187+
188+
eq_(m1.mock_calls, [call(5, 6), call(9, 10)])
189+
190+
def test_exec_once_exception(self):
191+
m1 = Mock()
192+
m1.side_effect = ValueError
193+
194+
event.listen(self.Target, "event_one", m1)
195+
196+
t1 = self.Target()
197+
198+
assert_raises(
199+
ValueError,
200+
t1.dispatch.event_one.for_modify(t1.dispatch).exec_once,
201+
5,
202+
6,
203+
)
204+
205+
t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8)
206+
207+
eq_(m1.mock_calls, [call(5, 6)])
208+
209+
def test_exec_once_unless_exception(self):
210+
m1 = Mock()
211+
m1.side_effect = ValueError
212+
213+
event.listen(self.Target, "event_one", m1)
214+
215+
t1 = self.Target()
216+
217+
assert_raises(
218+
ValueError,
219+
t1.dispatch.event_one.for_modify(
220+
t1.dispatch
221+
).exec_once_unless_exception,
222+
5,
223+
6,
224+
)
225+
226+
assert_raises(
227+
ValueError,
228+
t1.dispatch.event_one.for_modify(
229+
t1.dispatch
230+
).exec_once_unless_exception,
231+
7,
232+
8,
233+
)
234+
235+
m1.side_effect = None
236+
t1.dispatch.event_one.for_modify(
237+
t1.dispatch
238+
).exec_once_unless_exception(9, 10)
239+
240+
t1.dispatch.event_one.for_modify(
241+
t1.dispatch
242+
).exec_once_unless_exception(11, 12)
243+
244+
eq_(m1.mock_calls, [call(5, 6), call(7, 8), call(9, 10)])
245+
174246
def test_immutable_methods(self):
175247
t1 = self.Target()
176248
for meth in [
@@ -1146,6 +1218,70 @@ def test_once(self):
11461218
eq_(m3.mock_calls, [call("x")])
11471219
eq_(m4.mock_calls, [call("z")])
11481220

1221+
def test_once_unless_exception(self):
1222+
Target = self._fixture()
1223+
1224+
m1 = Mock()
1225+
m2 = Mock()
1226+
m3 = Mock()
1227+
m4 = Mock()
1228+
1229+
m1.side_effect = ValueError
1230+
m2.side_effect = ValueError
1231+
m3.side_effect = ValueError
1232+
1233+
event.listen(Target, "event_one", m1)
1234+
event.listen(Target, "event_one", m2, _once_unless_exception=True)
1235+
event.listen(Target, "event_one", m3, _once_unless_exception=True)
1236+
1237+
t1 = Target()
1238+
1239+
# only m1 is called, raises
1240+
assert_raises(ValueError, t1.dispatch.event_one, "x")
1241+
1242+
# now m1 and m2 can be called but not m3
1243+
m1.side_effect = None
1244+
1245+
assert_raises(ValueError, t1.dispatch.event_one, "y")
1246+
1247+
# now m3 can be called
1248+
m2.side_effect = None
1249+
1250+
event.listen(Target, "event_one", m4, _once_unless_exception=True)
1251+
assert_raises(ValueError, t1.dispatch.event_one, "z")
1252+
1253+
assert_raises(ValueError, t1.dispatch.event_one, "q")
1254+
1255+
eq_(m1.mock_calls, [call("x"), call("y"), call("z"), call("q")])
1256+
eq_(m2.mock_calls, [call("y"), call("z")])
1257+
eq_(m3.mock_calls, [call("z"), call("q")])
1258+
eq_(m4.mock_calls, []) # m4 never got called because m3 blocked it
1259+
1260+
# now m4 can be called
1261+
m3.side_effect = None
1262+
1263+
t1.dispatch.event_one("p")
1264+
eq_(
1265+
m1.mock_calls,
1266+
[call("x"), call("y"), call("z"), call("q"), call("p")],
1267+
)
1268+
1269+
# m2 already got called, so no "p"
1270+
eq_(m2.mock_calls, [call("y"), call("z")])
1271+
eq_(m3.mock_calls, [call("z"), call("q"), call("p")])
1272+
eq_(m4.mock_calls, [call("p")])
1273+
1274+
t1.dispatch.event_one("j")
1275+
eq_(
1276+
m1.mock_calls,
1277+
[call("x"), call("y"), call("z"), call("q"), call("p"), call("j")],
1278+
)
1279+
1280+
# nobody got "j" because they've all been successful
1281+
eq_(m2.mock_calls, [call("y"), call("z")])
1282+
eq_(m3.mock_calls, [call("z"), call("q"), call("p")])
1283+
eq_(m4.mock_calls, [call("p")])
1284+
11491285
def test_once_doesnt_dereference_listener(self):
11501286
# test for [ticket:4794]
11511287

test/engine/test_reconnect.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from sqlalchemy.testing import eq_
1919
from sqlalchemy.testing import expect_warnings
2020
from sqlalchemy.testing import fixtures
21+
from sqlalchemy.testing import is_false
22+
from sqlalchemy.testing import is_true
2123
from sqlalchemy.testing import mock
2224
from sqlalchemy.testing import ne_
2325
from sqlalchemy.testing.engines import testing_engine
@@ -550,10 +552,78 @@ class Dialect(DefaultDialect):
550552

551553
engine = create_engine(MyURL("foo://"), module=dbapi)
552554
engine.connect()
555+
556+
# note that the dispose() call replaces the old pool with a new one;
557+
# this is to test that even though a single pool is using
558+
# dispatch.exec_once(), by replacing the pool with a new one, the event
559+
# would normally fire again onless once=True is set on the original
560+
# listen as well.
561+
553562
engine.dispose()
554563
engine.connect()
555564
eq_(Dialect.initialize.call_count, 1)
556565

566+
def test_dialect_initialize_retry_if_exception(self):
567+
from sqlalchemy.engine.url import URL
568+
from sqlalchemy.engine.default import DefaultDialect
569+
570+
dbapi = self.dbapi
571+
572+
class MyURL(URL):
573+
def _get_entrypoint(self):
574+
return Dialect
575+
576+
def get_dialect(self):
577+
return Dialect
578+
579+
class Dialect(DefaultDialect):
580+
initialize = Mock()
581+
582+
# note that the first_connect hook is only invoked when the pool
583+
# makes a new DBAPI connection, and not when it checks out an existing
584+
# connection. So there is a dependency here that if the initializer
585+
# raises an exception, the pool-level connection attempt is also
586+
# failed, meaning no DBAPI connection is pooled. If the first_connect
587+
# exception raise did not prevent the connection from being pooled,
588+
# there could be the case where the pool could return that connection
589+
# on a subsequent attempt without initialization having proceeded.
590+
591+
Dialect.initialize.side_effect = TypeError
592+
engine = create_engine(MyURL("foo://"), module=dbapi)
593+
594+
assert_raises(TypeError, engine.connect)
595+
eq_(Dialect.initialize.call_count, 1)
596+
is_true(engine.pool._pool.empty())
597+
598+
assert_raises(TypeError, engine.connect)
599+
eq_(Dialect.initialize.call_count, 2)
600+
is_true(engine.pool._pool.empty())
601+
602+
engine.dispose()
603+
604+
assert_raises(TypeError, engine.connect)
605+
eq_(Dialect.initialize.call_count, 3)
606+
is_true(engine.pool._pool.empty())
607+
608+
Dialect.initialize.side_effect = None
609+
610+
conn = engine.connect()
611+
eq_(Dialect.initialize.call_count, 4)
612+
conn.close()
613+
is_false(engine.pool._pool.empty())
614+
615+
conn = engine.connect()
616+
eq_(Dialect.initialize.call_count, 4)
617+
conn.close()
618+
is_false(engine.pool._pool.empty())
619+
620+
engine.dispose()
621+
conn = engine.connect()
622+
623+
eq_(Dialect.initialize.call_count, 4)
624+
conn.close()
625+
is_false(engine.pool._pool.empty())
626+
557627
def test_invalidate_conn_w_contextmanager_interrupt(self):
558628
# test [ticket:3803]
559629
pool = self.db.pool

0 commit comments

Comments
 (0)