Skip to content

Commit 95b6c13

Browse files
committed
Merge pull request apache#285 from datastax/PYTHON-206
PYTHON-206 - Connect timeouts
2 parents 10fc41f + b94e31b commit 95b6c13

8 files changed

Lines changed: 31 additions & 74 deletions

File tree

cassandra/cluster.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,14 @@ def auth_provider(self, value):
426426
See :attr:`.schema_event_refresh_window` for discussion of rationale
427427
"""
428428

429+
connect_timeout = 5
430+
"""
431+
Timeout, in seconds, for creating new connections.
432+
433+
This timeout covers the entire connection negotiation, including TCP
434+
establishment, options passing, and authentication.
435+
"""
436+
429437
sessions = None
430438
control_connection = None
431439
scheduler = None
@@ -464,7 +472,8 @@ def __init__(self,
464472
control_connection_timeout=2.0,
465473
idle_heartbeat_interval=30,
466474
schema_event_refresh_window=2,
467-
topology_event_refresh_window=10):
475+
topology_event_refresh_window=10,
476+
connect_timeout=5):
468477
"""
469478
Any of the mutable Cluster attributes may be set as keyword arguments
470479
to the constructor.
@@ -517,6 +526,7 @@ def __init__(self,
517526
self.idle_heartbeat_interval = idle_heartbeat_interval
518527
self.schema_event_refresh_window = schema_event_refresh_window
519528
self.topology_event_refresh_window = topology_event_refresh_window
529+
self.connect_timeout = connect_timeout
520530

521531
self._listeners = set()
522532
self._listener_lock = Lock()
@@ -706,11 +716,11 @@ def connection_factory(self, address, *args, **kwargs):
706716
Intended for internal use only.
707717
"""
708718
kwargs = self._make_connection_kwargs(address, kwargs)
709-
return self.connection_class.factory(address, *args, **kwargs)
719+
return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs)
710720

711721
def _make_connection_factory(self, host, *args, **kwargs):
712722
kwargs = self._make_connection_kwargs(host.address, kwargs)
713-
return partial(self.connection_class.factory, host.address, *args, **kwargs)
723+
return partial(self.connection_class.factory, host.address, self.connect_timeout, *args, **kwargs)
714724

715725
def _make_connection_kwargs(self, address, kwargs_dict):
716726
if self._auth_provider_callable:

cassandra/connection.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,23 @@ def handle_fork(self):
239239
"""
240240
pass
241241

242+
@classmethod
243+
def factory(cls, host, timeout, *args, **kwargs):
244+
"""
245+
A factory function which returns connections which have
246+
succeeded in connecting and are ready for service (or
247+
raises an exception otherwise).
248+
"""
249+
conn = cls(host, *args, **kwargs)
250+
conn.connected_event.wait(timeout)
251+
if conn.last_error:
252+
raise conn.last_error
253+
elif not conn.connected_event.is_set():
254+
conn.close()
255+
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
256+
else:
257+
return conn
258+
242259
def close(self):
243260
raise NotImplementedError()
244261

cassandra/io/asyncorereactor.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,6 @@ def handle_fork(cls):
156156
cls._loop._cleanup()
157157
cls._loop = None
158158

159-
@classmethod
160-
def factory(cls, *args, **kwargs):
161-
timeout = kwargs.pop('timeout', 5.0)
162-
conn = cls(*args, **kwargs)
163-
conn.connected_event.wait(timeout)
164-
if conn.last_error:
165-
raise conn.last_error
166-
elif not conn.connected_event.is_set():
167-
conn.close()
168-
raise OperationTimedOut("Timed out creating connection")
169-
else:
170-
return conn
171-
172159
def __init__(self, *args, **kwargs):
173160
Connection.__init__(self, *args, **kwargs)
174161
asyncore.dispatcher.__init__(self)

cassandra/io/eventletreactor.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,6 @@ class EventletConnection(Connection):
5757
def initialize_reactor(cls):
5858
eventlet.monkey_patch()
5959

60-
@classmethod
61-
def factory(cls, *args, **kwargs):
62-
timeout = kwargs.pop('timeout', 5.0)
63-
conn = cls(*args, **kwargs)
64-
conn.connected_event.wait(timeout)
65-
if conn.last_error:
66-
raise conn.last_error
67-
elif not conn.connected_event.is_set():
68-
conn.close()
69-
raise OperationTimedOut("Timed out creating connection")
70-
else:
71-
return conn
72-
7360
def __init__(self, *args, **kwargs):
7461
Connection.__init__(self, *args, **kwargs)
7562

cassandra/io/geventreactor.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,6 @@ class GeventConnection(Connection):
5050
_write_watcher = None
5151
_socket = None
5252

53-
@classmethod
54-
def factory(cls, *args, **kwargs):
55-
timeout = kwargs.pop('timeout', 5.0)
56-
conn = cls(*args, **kwargs)
57-
conn.connected_event.wait(timeout)
58-
if conn.last_error:
59-
raise conn.last_error
60-
elif not conn.connected_event.is_set():
61-
conn.close()
62-
raise OperationTimedOut("Timed out creating connection")
63-
else:
64-
return conn
65-
6653
def __init__(self, *args, **kwargs):
6754
Connection.__init__(self, *args, **kwargs)
6855

cassandra/io/libevreactor.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -236,19 +236,6 @@ def handle_fork(cls):
236236
cls._libevloop._cleanup()
237237
cls._libevloop = None
238238

239-
@classmethod
240-
def factory(cls, *args, **kwargs):
241-
timeout = kwargs.pop('timeout', 5.0)
242-
conn = cls(*args, **kwargs)
243-
conn.connected_event.wait(timeout)
244-
if conn.last_error:
245-
raise conn.last_error
246-
elif not conn.connected_event.is_set():
247-
conn.close()
248-
raise OperationTimedOut("Timed out creating new connection")
249-
else:
250-
return conn
251-
252239
def __init__(self, *args, **kwargs):
253240
Connection.__init__(self, *args, **kwargs)
254241

cassandra/io/twistedreactor.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,6 @@ def initialize_reactor(cls):
148148
if not cls._loop:
149149
cls._loop = TwistedLoop()
150150

151-
@classmethod
152-
def factory(cls, *args, **kwargs):
153-
"""
154-
A factory function which returns connections which have
155-
succeeded in connecting and are ready for service (or
156-
raises an exception otherwise).
157-
"""
158-
timeout = kwargs.pop('timeout', 5.0)
159-
conn = cls(*args, **kwargs)
160-
conn.connected_event.wait(timeout)
161-
if conn.last_error:
162-
raise conn.last_error
163-
elif not conn.connected_event.is_set():
164-
conn.close()
165-
raise OperationTimedOut("Timed out creating connection")
166-
else:
167-
return conn
168-
169151
def __init__(self, *args, **kwargs):
170152
"""
171153
Initialization method.

tests/integration/standard/test_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def get_connection(self):
5858
e = None
5959
for i in range(5):
6060
try:
61-
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
61+
conn = self.klass.factory(host='127.0.0.1', timeout=5, protocol_version=PROTOCOL_VERSION)
6262
break
6363
except (OperationTimedOut, NoHostAvailable) as e:
6464
continue

0 commit comments

Comments
 (0)