diff --git a/doc/changelog.rst b/doc/changelog.rst index ed6f23f86d..b3fd7e2f7a 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +Changes in Version 4.18.0 (2026/XX/XX) +-------------------------------------- +PyMongo 4.18 brings a number of changes including: + +- Improved connection pool throughput under concurrent load by replacing the + single pool lock with fine-grained locks to reduce lock contention. + Changes in Version 4.17.0 (2026/04/20) -------------------------------------- diff --git a/doc/contributors.rst b/doc/contributors.rst index 0bd815ce3f..d36731159e 100644 --- a/doc/contributors.rst +++ b/doc/contributors.rst @@ -108,3 +108,4 @@ The following is a list of people who have contributed to - Steven Silvester (blink1073) - Noah Stapp (NoahStapp) - Cal Jacobson (cj81499) +- Sophia Yang (sophiayangDB) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..2c543cbe20 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -709,6 +709,11 @@ def __init__( :param options: a PoolOptions instance :param is_sdam: whether to call hello for each new AsyncConnection """ + # Main lock only used to protect updating attributes. + # Avoid any additional work while holding the lock. + # If looping over an attribute, copy the container and do not take the lock. + self.lock = _async_create_lock() + if options.pause_enabled: self.state = PoolState.PAUSED else: @@ -720,10 +725,9 @@ def __init__( # and returned to pool from the left side. Stale sockets removed # from the right side. self.conns: collections.deque[AsyncConnection] = collections.deque() - self.active_contexts: set[_CancellationContext] = set() - self.lock = _async_create_lock() - self._max_connecting_cond = _async_create_condition(self.lock) - self.active_sockets = 0 + # This lock should only be contended by threads adding/removing connections. + self._conns_lock = _async_create_lock() + # Monotonically increasing connection ID required for CMAP Events. self.next_connection_id = 1 # Track whether the sockets in this pool are writeable or not. @@ -748,16 +752,19 @@ def __init__( # The first portion of the wait queue. # Enforces: maxPoolSize # Also used for: clearing the wait queue - self.size_cond = _async_create_condition(self.lock) self.requests = 0 + # This lock should only be contended by threads adding/removing self.requests. + self.size_cond = _async_create_condition(_async_create_lock()) self.max_pool_size = self.opts.max_pool_size if not self.max_pool_size: self.max_pool_size = float("inf") + # The second portion of the wait queue. # Enforces: maxConnecting # Also used for: clearing the wait queue - self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 + # This lock should only be contended by threads adding/removing self._pending. + self._max_connecting_cond = _async_create_condition(_async_create_lock()) self._max_connecting = self.opts.max_connecting self._client_id = client_id # Log before publishing event to prevent potential listener preemption in tests @@ -777,29 +784,41 @@ def __init__( ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 + # This lock should be contended on every operation. + self._operation_count_lock = _async_create_lock() + + self.active_contexts: set[_CancellationContext] = set() + self.active_sockets = 0 # Retain references to pinned connections to prevent the CPython GC # from thinking that a cursor's pinned connection can be GC'd when the # cursor is GC'd (see PYTHON-2751). self.__pinned_sockets: set[AsyncConnection] = set() self.ncursors = 0 self.ntxns = 0 + # This lock protects self.active_contexts, self.active_sockets, + # self.__pinned_sockets, self.ncursors, self.next_connection_id, and self.ntxns. + self._active_contexts_lock = _async_create_lock() async def ready(self) -> None: # Take the lock to avoid the race condition described in PYTHON-2699. - async with self.lock: - if self.state != PoolState.READY: + state_changed = False + if self.state != PoolState.READY: + async with self.lock: self.state = PoolState.READY - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_ready(self.address) + state_changed = True + if not state_changed: + return + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_READY, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + ) + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_pool_ready(self.address) @property def closed(self) -> bool: @@ -813,39 +832,49 @@ async def _reset( interrupt_connections: bool = False, ) -> None: old_state = self.state - async with self.size_cond: - if self.closed: - return + if self.closed: + return + is_fork = False + async with self.lock: if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) newpid = os.getpid() if self.pid != newpid: self.pid = newpid + is_fork = True + if is_fork: + async with self._active_contexts_lock: self.active_sockets = 0 + async with self._operation_count_lock: self.operation_count = 0 + async with self._conns_lock: if service_id is None: sockets, self.conns = self.conns, collections.deque() - else: - discard: collections.deque = collections.deque() # type: ignore[type-arg] - keep: collections.deque = collections.deque() # type: ignore[type-arg] - for conn in self.conns: - if conn.service_id == service_id: - discard.append(conn) - else: - keep.append(conn) - sockets = discard + if service_id is not None: + discard: collections.deque = collections.deque() # type: ignore[type-arg] + keep: collections.deque = collections.deque() # type: ignore[type-arg] + for conn in self.conns: + if conn.service_id == service_id: + discard.append(conn) + else: + keep.append(conn) + sockets = discard + async with self._conns_lock: self.conns = keep - if close: + if close: + async with self.lock: self.state = PoolState.CLOSED - # Clear the wait queue + # Clear the wait queue + async with self._max_connecting_cond: self._max_connecting_cond.notify_all() + async with self.size_cond: self.size_cond.notify_all() - if interrupt_connections: - for context in self.active_contexts: - context.cancel() + if interrupt_connections: + for context in self.active_contexts.copy(): + context.cancel() listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the @@ -903,9 +932,8 @@ async def update_is_writable(self, is_writable: Optional[bool]) -> None: Pool. """ self.is_writable = is_writable - async with self.lock: - for _socket in self.conns: - _socket.update_is_writable(self.is_writable) # type: ignore[arg-type] + for _socket in self.conns: + _socket.update_is_writable(self.is_writable) # type: ignore[arg-type] async def reset( self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False @@ -952,12 +980,12 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: await conn.close_conn(ConnectionClosedReason.IDLE) while True: + # There are enough sockets in the pool. + if len(self.conns) + self.active_sockets >= self.opts.min_pool_size: + return + if self.requests >= self.opts.min_pool_size: + return async with self.size_cond: - # There are enough sockets in the pool. - if len(self.conns) + self.active_sockets >= self.opts.min_pool_size: - return - if self.requests >= self.opts.min_pool_size: - return self.requests += 1 incremented = False try: @@ -970,13 +998,14 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: incremented = True conn = await self.connect() close_conn = False - async with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + async with self._conns_lock: self.conns.appendleft(conn) + async with self._active_contexts_lock: self.active_contexts.discard(conn.cancel_context) if close_conn: await conn.close_conn(ConnectionClosedReason.STALE) @@ -1015,11 +1044,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A Note that the pool does not keep a reference to the socket -- you must call checkin() when you're done with it. """ - async with self.lock: + # Use a temporary context so that interrupt_connections can cancel creating the socket. + tmp_context = _CancellationContext() + async with self._active_contexts_lock: conn_id = self.next_connection_id self.next_connection_id += 1 - # Use a temporary context so that interrupt_connections can cancel creating the socket. - tmp_context = _CancellationContext() self.active_contexts.add(tmp_context) listeners = self.opts._event_listeners @@ -1040,7 +1069,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A networking_interface = await _configured_protocol_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: - async with self.lock: + async with self._active_contexts_lock: self.active_contexts.discard(tmp_context) if self.enabled_for_cmap: assert listeners is not None @@ -1065,7 +1094,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - async with self.lock: + async with self._active_contexts_lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: @@ -1082,7 +1111,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A await conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as e: - async with self.lock: + async with self._active_contexts_lock: self.active_contexts.discard(conn.cancel_context) if not completed_hello: self._handle_connection_error(e) @@ -1144,7 +1173,7 @@ async def checkout( durationMS=duration, ) try: - async with self.lock: + async with self._active_contexts_lock: self.active_contexts.add(conn.cancel_context) yield conn # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. @@ -1163,11 +1192,11 @@ async def checkout( await self.checkin(conn) raise if conn.pinned_txn: - async with self.lock: + async with self._active_contexts_lock: self.__pinned_sockets.add(conn) self.ntxns += 1 elif conn.pinned_cursor: - async with self.lock: + async with self._active_contexts_lock: self.__pinned_sockets.add(conn) self.ncursors += 1 elif conn.active: @@ -1231,7 +1260,7 @@ async def _get_conn( "Attempted to check out a connection from closed connection pool" ) - async with self.lock: + async with self._operation_count_lock: self.operation_count += 1 # Get a free socket or create one. @@ -1260,9 +1289,9 @@ async def _get_conn( incremented = False emitted_event = False try: - async with self.lock: + async with self._active_contexts_lock: self.active_sockets += 1 - incremented = True + incremented = True while conn is None: # CMAP: we MUST wait for either maxConnecting OR for a socket # to be checked back into the pool. @@ -1280,7 +1309,8 @@ async def _get_conn( self._raise_if_not_ready(checkout_started_time, emit_event=False) try: - conn = self.conns.popleft() + async with self._conns_lock: + conn = self.conns.popleft() except IndexError: self._pending += 1 if conn: # We got a socket from the pool @@ -1301,9 +1331,10 @@ async def _get_conn( await conn.close_conn(ConnectionClosedReason.ERROR) async with self.size_cond: self.requests -= 1 - if incremented: - self.active_sockets -= 1 self.size_cond.notify() + if incremented: + async with self._active_contexts_lock: + self.active_sockets -= 1 if not emitted_event: duration = time.monotonic() - checkout_started_time @@ -1338,9 +1369,9 @@ async def checkin(self, conn: AsyncConnection) -> None: conn.active = False conn.pinned_txn = False conn.pinned_cursor = False - self.__pinned_sockets.discard(conn) listeners = self.opts._event_listeners - async with self.lock: + async with self._active_contexts_lock: + self.__pinned_sockets.discard(conn) self.active_contexts.discard(conn.cancel_context) if self.enabled_for_cmap: assert listeners is not None @@ -1379,28 +1410,29 @@ async def checkin(self, conn: AsyncConnection) -> None: ) else: close_conn = False - async with self.lock: - # Hold the lock to ensure this section does not race with - # Pool.reset(). - if self.stale_generation(conn.generation, conn.service_id): - close_conn = True - else: - conn.update_last_checkin_time() - conn.update_is_writable(bool(self.is_writable)) + if self.stale_generation(conn.generation, conn.service_id): + close_conn = True + else: + conn.update_last_checkin_time() + conn.update_is_writable(bool(self.is_writable)) + async with self._conns_lock: self.conns.appendleft(conn) + async with self._max_connecting_cond: # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() if close_conn: await conn.close_conn(ConnectionClosedReason.STALE) - async with self.size_cond: + async with self._active_contexts_lock: if txn: self.ntxns -= 1 elif cursor: self.ncursors -= 1 - self.requests -= 1 self.active_sockets -= 1 + async with self._operation_count_lock: self.operation_count -= 1 + async with self.size_cond: + self.requests -= 1 self.size_cond.notify() async def _perished(self, conn: AsyncConnection) -> bool: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..cb469e4270 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -707,6 +707,11 @@ def __init__( :param options: a PoolOptions instance :param is_sdam: whether to call hello for each new Connection """ + # Main lock only used to protect updating attributes. + # Avoid any additional work while holding the lock. + # If looping over an attribute, copy the container and do not take the lock. + self.lock = _create_lock() + if options.pause_enabled: self.state = PoolState.PAUSED else: @@ -718,10 +723,9 @@ def __init__( # and returned to pool from the left side. Stale sockets removed # from the right side. self.conns: collections.deque[Connection] = collections.deque() - self.active_contexts: set[_CancellationContext] = set() - self.lock = _create_lock() - self._max_connecting_cond = _create_condition(self.lock) - self.active_sockets = 0 + # This lock should only be contended by threads adding/removing connections. + self._conns_lock = _create_lock() + # Monotonically increasing connection ID required for CMAP Events. self.next_connection_id = 1 # Track whether the sockets in this pool are writeable or not. @@ -746,16 +750,19 @@ def __init__( # The first portion of the wait queue. # Enforces: maxPoolSize # Also used for: clearing the wait queue - self.size_cond = _create_condition(self.lock) self.requests = 0 + # This lock should only be contended by threads adding/removing self.requests. + self.size_cond = _create_condition(_create_lock()) self.max_pool_size = self.opts.max_pool_size if not self.max_pool_size: self.max_pool_size = float("inf") + # The second portion of the wait queue. # Enforces: maxConnecting # Also used for: clearing the wait queue - self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 + # This lock should only be contended by threads adding/removing self._pending. + self._max_connecting_cond = _create_condition(_create_lock()) self._max_connecting = self.opts.max_connecting self._client_id = client_id # Log before publishing event to prevent potential listener preemption in tests @@ -775,29 +782,41 @@ def __init__( ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 + # This lock should be contended on every operation. + self._operation_count_lock = _create_lock() + + self.active_contexts: set[_CancellationContext] = set() + self.active_sockets = 0 # Retain references to pinned connections to prevent the CPython GC # from thinking that a cursor's pinned connection can be GC'd when the # cursor is GC'd (see PYTHON-2751). self.__pinned_sockets: set[Connection] = set() self.ncursors = 0 self.ntxns = 0 + # This lock protects self.active_contexts, self.active_sockets, + # self.__pinned_sockets, self.ncursors, self.next_connection_id, and self.ntxns. + self._active_contexts_lock = _create_lock() def ready(self) -> None: # Take the lock to avoid the race condition described in PYTHON-2699. - with self.lock: - if self.state != PoolState.READY: + state_changed = False + if self.state != PoolState.READY: + with self.lock: self.state = PoolState.READY - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_ready(self.address) + state_changed = True + if not state_changed: + return + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_READY, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + ) + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_pool_ready(self.address) @property def closed(self) -> bool: @@ -811,39 +830,49 @@ def _reset( interrupt_connections: bool = False, ) -> None: old_state = self.state - with self.size_cond: - if self.closed: - return + if self.closed: + return + is_fork = False + with self.lock: if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) newpid = os.getpid() if self.pid != newpid: self.pid = newpid + is_fork = True + if is_fork: + with self._active_contexts_lock: self.active_sockets = 0 + with self._operation_count_lock: self.operation_count = 0 + with self._conns_lock: if service_id is None: sockets, self.conns = self.conns, collections.deque() - else: - discard: collections.deque = collections.deque() # type: ignore[type-arg] - keep: collections.deque = collections.deque() # type: ignore[type-arg] - for conn in self.conns: - if conn.service_id == service_id: - discard.append(conn) - else: - keep.append(conn) - sockets = discard + if service_id is not None: + discard: collections.deque = collections.deque() # type: ignore[type-arg] + keep: collections.deque = collections.deque() # type: ignore[type-arg] + for conn in self.conns: + if conn.service_id == service_id: + discard.append(conn) + else: + keep.append(conn) + sockets = discard + with self._conns_lock: self.conns = keep - if close: + if close: + with self.lock: self.state = PoolState.CLOSED - # Clear the wait queue + # Clear the wait queue + with self._max_connecting_cond: self._max_connecting_cond.notify_all() + with self.size_cond: self.size_cond.notify_all() - if interrupt_connections: - for context in self.active_contexts: - context.cancel() + if interrupt_connections: + for context in self.active_contexts.copy(): + context.cancel() listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the @@ -901,9 +930,8 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None: Pool. """ self.is_writable = is_writable - with self.lock: - for _socket in self.conns: - _socket.update_is_writable(self.is_writable) # type: ignore[arg-type] + for _socket in self.conns: + _socket.update_is_writable(self.is_writable) # type: ignore[arg-type] def reset( self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False @@ -948,12 +976,12 @@ def remove_stale_sockets(self, reference_generation: int) -> None: conn.close_conn(ConnectionClosedReason.IDLE) while True: + # There are enough sockets in the pool. + if len(self.conns) + self.active_sockets >= self.opts.min_pool_size: + return + if self.requests >= self.opts.min_pool_size: + return with self.size_cond: - # There are enough sockets in the pool. - if len(self.conns) + self.active_sockets >= self.opts.min_pool_size: - return - if self.requests >= self.opts.min_pool_size: - return self.requests += 1 incremented = False try: @@ -966,13 +994,14 @@ def remove_stale_sockets(self, reference_generation: int) -> None: incremented = True conn = self.connect() close_conn = False - with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + with self._conns_lock: self.conns.appendleft(conn) + with self._active_contexts_lock: self.active_contexts.discard(conn.cancel_context) if close_conn: conn.close_conn(ConnectionClosedReason.STALE) @@ -1011,11 +1040,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect Note that the pool does not keep a reference to the socket -- you must call checkin() when you're done with it. """ - with self.lock: + # Use a temporary context so that interrupt_connections can cancel creating the socket. + tmp_context = _CancellationContext() + with self._active_contexts_lock: conn_id = self.next_connection_id self.next_connection_id += 1 - # Use a temporary context so that interrupt_connections can cancel creating the socket. - tmp_context = _CancellationContext() self.active_contexts.add(tmp_context) listeners = self.opts._event_listeners @@ -1036,7 +1065,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect networking_interface = _configured_socket_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: - with self.lock: + with self._active_contexts_lock: self.active_contexts.discard(tmp_context) if self.enabled_for_cmap: assert listeners is not None @@ -1061,7 +1090,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - with self.lock: + with self._active_contexts_lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: @@ -1078,7 +1107,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as e: - with self.lock: + with self._active_contexts_lock: self.active_contexts.discard(conn.cancel_context) if not completed_hello: self._handle_connection_error(e) @@ -1140,7 +1169,7 @@ def checkout( durationMS=duration, ) try: - with self.lock: + with self._active_contexts_lock: self.active_contexts.add(conn.cancel_context) yield conn # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. @@ -1159,11 +1188,11 @@ def checkout( self.checkin(conn) raise if conn.pinned_txn: - with self.lock: + with self._active_contexts_lock: self.__pinned_sockets.add(conn) self.ntxns += 1 elif conn.pinned_cursor: - with self.lock: + with self._active_contexts_lock: self.__pinned_sockets.add(conn) self.ncursors += 1 elif conn.active: @@ -1227,7 +1256,7 @@ def _get_conn( "Attempted to check out a connection from closed connection pool" ) - with self.lock: + with self._operation_count_lock: self.operation_count += 1 # Get a free socket or create one. @@ -1256,9 +1285,9 @@ def _get_conn( incremented = False emitted_event = False try: - with self.lock: + with self._active_contexts_lock: self.active_sockets += 1 - incremented = True + incremented = True while conn is None: # CMAP: we MUST wait for either maxConnecting OR for a socket # to be checked back into the pool. @@ -1276,7 +1305,8 @@ def _get_conn( self._raise_if_not_ready(checkout_started_time, emit_event=False) try: - conn = self.conns.popleft() + with self._conns_lock: + conn = self.conns.popleft() except IndexError: self._pending += 1 if conn: # We got a socket from the pool @@ -1297,9 +1327,10 @@ def _get_conn( conn.close_conn(ConnectionClosedReason.ERROR) with self.size_cond: self.requests -= 1 - if incremented: - self.active_sockets -= 1 self.size_cond.notify() + if incremented: + with self._active_contexts_lock: + self.active_sockets -= 1 if not emitted_event: duration = time.monotonic() - checkout_started_time @@ -1334,9 +1365,9 @@ def checkin(self, conn: Connection) -> None: conn.active = False conn.pinned_txn = False conn.pinned_cursor = False - self.__pinned_sockets.discard(conn) listeners = self.opts._event_listeners - with self.lock: + with self._active_contexts_lock: + self.__pinned_sockets.discard(conn) self.active_contexts.discard(conn.cancel_context) if self.enabled_for_cmap: assert listeners is not None @@ -1375,28 +1406,29 @@ def checkin(self, conn: Connection) -> None: ) else: close_conn = False - with self.lock: - # Hold the lock to ensure this section does not race with - # Pool.reset(). - if self.stale_generation(conn.generation, conn.service_id): - close_conn = True - else: - conn.update_last_checkin_time() - conn.update_is_writable(bool(self.is_writable)) + if self.stale_generation(conn.generation, conn.service_id): + close_conn = True + else: + conn.update_last_checkin_time() + conn.update_is_writable(bool(self.is_writable)) + with self._conns_lock: self.conns.appendleft(conn) + with self._max_connecting_cond: # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() if close_conn: conn.close_conn(ConnectionClosedReason.STALE) - with self.size_cond: + with self._active_contexts_lock: if txn: self.ntxns -= 1 elif cursor: self.ncursors -= 1 - self.requests -= 1 self.active_sockets -= 1 + with self._operation_count_lock: self.operation_count -= 1 + with self.size_cond: + self.requests -= 1 self.size_cond.notify() def _perished(self, conn: Connection) -> bool: