From 37783126faf9c12a9565fd1ad73a2111f0189f6b Mon Sep 17 00:00:00 2001 From: Jim Witschey Date: Thu, 4 Jan 2018 14:41:28 -0500 Subject: [PATCH] PYTHON-853: delay timeout --- CHANGELOG.rst | 1 + cassandra/cluster.py | 39 +++++++++++++++++++++++++++------------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f91251b1f0..0105fca7ae 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,7 @@ Bug Fixes * Support retry_policy in PreparedStatement (PYTHON-861) * __del__ method in Session is throwing an exception (PYTHON-813) * LZ4 import issue with recent versions (PYTHON-897) +* ResponseFuture._connection can be None when returning request_id (PYTHON-853) Other ----- diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 87085d8aaa..126552b77a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3495,22 +3495,37 @@ def _cancel_timer(self): if self._timer: self._timer.cancel() - def _on_timeout(self): + def _on_timeout(self, _attempts=0): + """ + Called when the request associated with this ResponseFuture times out. - try: - self._connection._requests.pop(self._req_id) - # This prevents the race condition of the - # event loop thread just receiving the waited message - # If it arrives after this, it will be ignored - except KeyError: + This function may reschedule itself. The ``_attempts`` parameter tracks + the number of times this has happened. This parameter should only be + set in those cases, where ``_on_timeout`` reschedules itself. + """ + # PYTHON-853: for short timeouts, we sometimes race with our __init__ + if self._connection is None and _attempts < 3: + self._timer = self.session.cluster.connection_class.create_timer( + 0.01, + partial(self._on_timeout, _attempts=_attempts + 1) + ) return - pool = self.session._pools.get(self._current_host) - if pool and not pool.is_shutdown: - with self._connection.lock: - self._connection.request_ids.append(self._req_id) + if self._connection is not None: + try: + self._connection._requests.pop(self._req_id) + # This prevents the race condition of the + # event loop thread just receiving the waited message + # If it arrives after this, it will be ignored + except KeyError: + return + + pool = self.session._pools.get(self._current_host) + if pool and not pool.is_shutdown: + with self._connection.lock: + self._connection.request_ids.append(self._req_id) - pool.return_connection(self._connection) + pool.return_connection(self._connection) errors = self._errors if not errors: