Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,14 @@ def auth_provider(self, value):
See :attr:`.schema_event_refresh_window` for discussion of rationale
"""

connect_timeout = 5
"""
Timeout, in seconds, for creating new connections.

This timeout covers the entire connection negotiation, including TCP
establishment, options passing, and authentication.
"""

sessions = None
control_connection = None
scheduler = None
Expand Down Expand Up @@ -465,7 +473,8 @@ def __init__(self,
control_connection_timeout=2.0,
idle_heartbeat_interval=30,
schema_event_refresh_window=2,
topology_event_refresh_window=10):
topology_event_refresh_window=10,
connect_timeout=5):
"""
Any of the mutable Cluster attributes may be set as keyword arguments
to the constructor.
Expand Down Expand Up @@ -518,6 +527,7 @@ def __init__(self,
self.idle_heartbeat_interval = idle_heartbeat_interval
self.schema_event_refresh_window = schema_event_refresh_window
self.topology_event_refresh_window = topology_event_refresh_window
self.connect_timeout = connect_timeout

self._listeners = set()
self._listener_lock = Lock()
Expand Down Expand Up @@ -707,11 +717,11 @@ def connection_factory(self, address, *args, **kwargs):
Intended for internal use only.
"""
kwargs = self._make_connection_kwargs(address, kwargs)
return self.connection_class.factory(address, *args, **kwargs)
return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs)

def _make_connection_factory(self, host, *args, **kwargs):
kwargs = self._make_connection_kwargs(host.address, kwargs)
return partial(self.connection_class.factory, host.address, *args, **kwargs)
return partial(self.connection_class.factory, host.address, self.connect_timeout, *args, **kwargs)

def _make_connection_kwargs(self, address, kwargs_dict):
if self._auth_provider_callable:
Expand Down
17 changes: 17 additions & 0 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,23 @@ def handle_fork(self):
"""
pass

@classmethod
def factory(cls, host, timeout, *args, **kwargs):
"""
A factory function which returns connections which have
succeeded in connecting and are ready for service (or
raises an exception otherwise).
"""
conn = cls(host, *args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
else:
return conn

def close(self):
raise NotImplementedError()

Expand Down
13 changes: 0 additions & 13 deletions cassandra/io/asyncorereactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,6 @@ def handle_fork(cls):
cls._loop._cleanup()
cls._loop = None

@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
conn = cls(*args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection")
else:
return conn

def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
asyncore.dispatcher.__init__(self)
Expand Down
13 changes: 0 additions & 13 deletions cassandra/io/eventletreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,6 @@ class EventletConnection(Connection):
def initialize_reactor(cls):
eventlet.monkey_patch()

@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
conn = cls(*args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection")
else:
return conn

def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)

Expand Down
13 changes: 0 additions & 13 deletions cassandra/io/geventreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,6 @@ class GeventConnection(Connection):
_write_watcher = None
_socket = None

@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
conn = cls(*args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection")
else:
return conn

def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)

Expand Down
13 changes: 0 additions & 13 deletions cassandra/io/libevreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,6 @@ def handle_fork(cls):
cls._libevloop._cleanup()
cls._libevloop = None

@classmethod
def factory(cls, *args, **kwargs):
timeout = kwargs.pop('timeout', 5.0)
conn = cls(*args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating new connection")
else:
return conn

def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)

Expand Down
18 changes: 0 additions & 18 deletions cassandra/io/twistedreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,6 @@ def initialize_reactor(cls):
if not cls._loop:
cls._loop = TwistedLoop()

@classmethod
def factory(cls, *args, **kwargs):
"""
A factory function which returns connections which have
succeeded in connecting and are ready for service (or
raises an exception otherwise).
"""
timeout = kwargs.pop('timeout', 5.0)
conn = cls(*args, **kwargs)
conn.connected_event.wait(timeout)
if conn.last_error:
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection")
else:
return conn

def __init__(self, *args, **kwargs):
"""
Initialization method.
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/standard/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_connection(self):
e = None
for i in range(5):
try:
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
conn = self.klass.factory(host='127.0.0.1', timeout=5, protocol_version=PROTOCOL_VERSION)
break
except (OperationTimedOut, NoHostAvailable) as e:
continue
Expand Down