From 35c6ce5365345571832028d5bdb97efd1a4a0435 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 20 Apr 2015 15:58:44 -0500 Subject: [PATCH 1/2] Refactor redundant Connection factory methods to base --- cassandra/connection.py | 18 ++++++++++++++++++ cassandra/io/asyncorereactor.py | 13 ------------- cassandra/io/eventletreactor.py | 13 ------------- cassandra/io/geventreactor.py | 13 ------------- cassandra/io/libevreactor.py | 13 ------------- cassandra/io/twistedreactor.py | 18 ------------------ 6 files changed, 18 insertions(+), 70 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 2a020c0615..6cbd434650 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -239,6 +239,24 @@ def handle_fork(self): """ pass + @classmethod + def factory(cls, *args, **kwargs): + """ + A factory function which returns connections which have + succeeded in connecting and are ready for service (or + raises an exception otherwise). + """ + timeout = kwargs.pop('timeout', 5.0) + conn = cls(*args, **kwargs) + conn.connected_event.wait(timeout) + if conn.last_error: + raise conn.last_error + elif not conn.connected_event.is_set(): + conn.close() + raise OperationTimedOut("Timed out creating connection") + else: + return conn + def close(self): raise NotImplementedError() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 2ac7156d61..ef687c388c 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -156,19 +156,6 @@ def handle_fork(cls): cls._loop._cleanup() cls._loop = None - @classmethod - def factory(cls, *args, **kwargs): - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) - conn.connected_event.wait(timeout) - if conn.last_error: - raise conn.last_error - elif not conn.connected_event.is_set(): - conn.close() - raise OperationTimedOut("Timed out creating connection") - else: - return conn - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) asyncore.dispatcher.__init__(self) diff --git a/cassandra/io/eventletreactor.py b/cassandra/io/eventletreactor.py index ceac6a951e..670d0f1865 100644 --- a/cassandra/io/eventletreactor.py +++ b/cassandra/io/eventletreactor.py @@ -57,19 +57,6 @@ class EventletConnection(Connection): def initialize_reactor(cls): eventlet.monkey_patch() - @classmethod - def factory(cls, *args, **kwargs): - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) - conn.connected_event.wait(timeout) - if conn.last_error: - raise conn.last_error - elif not conn.connected_event.is_set(): - conn.close() - raise OperationTimedOut("Timed out creating connection") - else: - return conn - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index 4cd9c68109..6e9af0da4d 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -50,19 +50,6 @@ class GeventConnection(Connection): _write_watcher = None _socket = None - @classmethod - def factory(cls, *args, **kwargs): - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) - conn.connected_event.wait(timeout) - if conn.last_error: - raise conn.last_error - elif not conn.connected_event.is_set(): - conn.close() - raise OperationTimedOut("Timed out creating connection") - else: - return conn - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index db11eaf8be..93b4c97854 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -236,19 +236,6 @@ def handle_fork(cls): cls._libevloop._cleanup() cls._libevloop = None - @classmethod - def factory(cls, *args, **kwargs): - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) - conn.connected_event.wait(timeout) - if conn.last_error: - raise conn.last_error - elif not conn.connected_event.is_set(): - conn.close() - raise OperationTimedOut("Timed out creating new connection") - else: - return conn - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index 1a5a64e796..ff81e5613f 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -148,24 +148,6 @@ def initialize_reactor(cls): if not cls._loop: cls._loop = TwistedLoop() - @classmethod - def factory(cls, *args, **kwargs): - """ - A factory function which returns connections which have - succeeded in connecting and are ready for service (or - raises an exception otherwise). - """ - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) - conn.connected_event.wait(timeout) - if conn.last_error: - raise conn.last_error - elif not conn.connected_event.is_set(): - conn.close() - raise OperationTimedOut("Timed out creating connection") - else: - return conn - def __init__(self, *args, **kwargs): """ Initialization method. From b94e31be846c8366b3ab58dc8cb838fa5448db10 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 21 Apr 2015 14:55:37 -0500 Subject: [PATCH 2/2] Add connect_timeout to cluster and Connection.factory PYTHON-206 --- cassandra/cluster.py | 16 +++++++++++++--- cassandra/connection.py | 7 +++---- tests/integration/standard/test_connection.py | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 9aa5912b64..7183df8903 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -427,6 +427,14 @@ def auth_provider(self, value): See :attr:`.schema_event_refresh_window` for discussion of rationale """ + connect_timeout = 5 + """ + Timeout, in seconds, for creating new connections. + + This timeout covers the entire connection negotiation, including TCP + establishment, options passing, and authentication. + """ + sessions = None control_connection = None scheduler = None @@ -465,7 +473,8 @@ def __init__(self, control_connection_timeout=2.0, idle_heartbeat_interval=30, schema_event_refresh_window=2, - topology_event_refresh_window=10): + topology_event_refresh_window=10, + connect_timeout=5): """ Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. @@ -518,6 +527,7 @@ def __init__(self, self.idle_heartbeat_interval = idle_heartbeat_interval self.schema_event_refresh_window = schema_event_refresh_window self.topology_event_refresh_window = topology_event_refresh_window + self.connect_timeout = connect_timeout self._listeners = set() self._listener_lock = Lock() @@ -707,11 +717,11 @@ def connection_factory(self, address, *args, **kwargs): Intended for internal use only. """ kwargs = self._make_connection_kwargs(address, kwargs) - return self.connection_class.factory(address, *args, **kwargs) + return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs) def _make_connection_factory(self, host, *args, **kwargs): kwargs = self._make_connection_kwargs(host.address, kwargs) - return partial(self.connection_class.factory, host.address, *args, **kwargs) + return partial(self.connection_class.factory, host.address, self.connect_timeout, *args, **kwargs) def _make_connection_kwargs(self, address, kwargs_dict): if self._auth_provider_callable: diff --git a/cassandra/connection.py b/cassandra/connection.py index 6cbd434650..c239313c45 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -240,20 +240,19 @@ def handle_fork(self): pass @classmethod - def factory(cls, *args, **kwargs): + def factory(cls, host, timeout, *args, **kwargs): """ A factory function which returns connections which have succeeded in connecting and are ready for service (or raises an exception otherwise). """ - timeout = kwargs.pop('timeout', 5.0) - conn = cls(*args, **kwargs) + conn = cls(host, *args, **kwargs) conn.connected_event.wait(timeout) if conn.last_error: raise conn.last_error elif not conn.connected_event.is_set(): conn.close() - raise OperationTimedOut("Timed out creating connection") + raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout) else: return conn diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index e52280642b..3261adc346 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -58,7 +58,7 @@ def get_connection(self): e = None for i in range(5): try: - conn = self.klass.factory(protocol_version=PROTOCOL_VERSION) + conn = self.klass.factory(host='127.0.0.1', timeout=5, protocol_version=PROTOCOL_VERSION) break except (OperationTimedOut, NoHostAvailable) as e: continue