From 8aa1f24ee5edf410141db36c122ef7d4d8adaf3d Mon Sep 17 00:00:00 2001 From: bjmb Date: Mon, 6 Feb 2017 12:09:44 -0500 Subject: [PATCH 1/2] Fixed the profile timeout --- tests/integration/__init__.py | 4 ++-- tests/integration/cqlengine/__init__.py | 4 ++++ .../cqlengine/connections/test_connection.py | 3 +-- .../integration/cqlengine/query/test_named.py | 1 - .../integration/cqlengine/test_connections.py | 4 +++- tests/integration/long/test_schema.py | 2 ++ tests/integration/standard/test_cluster.py | 24 ++++++++++++++++--- tests/integration/standard/test_connection.py | 3 ++- .../standard/test_custom_protocol_handler.py | 10 ++++---- .../standard/test_cython_protocol_handlers.py | 4 ++-- tests/integration/standard/test_metadata.py | 4 ++++ tests/integration/standard/test_metrics.py | 3 +++ 12 files changed, 50 insertions(+), 16 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 2b720f786c..4996af2356 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -396,7 +396,7 @@ def execute_until_pass(session, query): try: return session.execute(query) except (ConfigurationException, AlreadyExists): - log.warn("Recieved already exists from query {0} not exiting".format(query)) + log.warn("Received already exists from query {0} not exiting".format(query)) # keyspace/table was already created/dropped return except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): @@ -414,7 +414,7 @@ def execute_with_long_wait_retry(session, query, timeout=30): try: return session.execute(query, timeout=timeout) except (ConfigurationException, AlreadyExists): - log.warn("Recieved already exists from query {0} not exiting".format(query)) + log.warn("Received already exists from query {0} not exiting".format(query)) # keyspace/table was already created/dropped return except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): diff --git a/tests/integration/cqlengine/__init__.py b/tests/integration/cqlengine/__init__.py index 3f163ded64..ef61eed114 100644 --- a/tests/integration/cqlengine/__init__.py +++ b/tests/integration/cqlengine/__init__.py @@ -41,6 +41,10 @@ def setup_package(): create_keyspace_simple(DEFAULT_KEYSPACE, 1) +def teardown_package(): + connection.unregister_connection("default") + + def is_prepend_reversed(): # do we have https://issues.apache.org/jira/browse/CASSANDRA-8733 ? ver, _ = get_server_versions() diff --git a/tests/integration/cqlengine/connections/test_connection.py b/tests/integration/cqlengine/connections/test_connection.py index 659893bd0f..0352e726fb 100644 --- a/tests/integration/cqlengine/connections/test_connection.py +++ b/tests/integration/cqlengine/connections/test_connection.py @@ -40,7 +40,7 @@ class ConnectionTest(BaseCassEngTestCase): @classmethod def setUpClass(cls): - cls.original_cluster = connection.get_cluster() + connection.unregister_connection('default') cls.keyspace1 = 'ctest1' cls.keyspace2 = 'ctest2' super(ConnectionTest, cls).setUpClass() @@ -56,7 +56,6 @@ def tearDownClass(cls): execute_with_long_wait_retry(cls.setup_session, "DROP KEYSPACE {0}".format(cls.keyspace1)) execute_with_long_wait_retry(cls.setup_session, "DROP KEYSPACE {0}".format(cls.keyspace2)) models.DEFAULT_KEYSPACE = DEFAULT_KEYSPACE - cls.original_cluster.shutdown() cls.setup_cluster.shutdown() setup_connection(DEFAULT_KEYSPACE) models.DEFAULT_KEYSPACE diff --git a/tests/integration/cqlengine/query/test_named.py b/tests/integration/cqlengine/query/test_named.py index 55129cb985..a02e0a4bf4 100644 --- a/tests/integration/cqlengine/query/test_named.py +++ b/tests/integration/cqlengine/query/test_named.py @@ -291,7 +291,6 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): models.DEFAULT_KEYSPACE = cls.default_keyspace - setup_connection(models.DEFAULT_KEYSPACE) super(TestNamedWithMV, cls).tearDownClass() @greaterthanorequalcass30 diff --git a/tests/integration/cqlengine/test_connections.py b/tests/integration/cqlengine/test_connections.py index 1f30b0a972..ec3babcf14 100644 --- a/tests/integration/cqlengine/test_connections.py +++ b/tests/integration/cqlengine/test_connections.py @@ -241,7 +241,8 @@ def test_connection_param_validation(self): @test_category object_mapper """ - session = Cluster(['127.0.0.1']).connect() + cluster = Cluster(['127.0.0.1']) + session = cluster.connect() with self.assertRaises(CQLEngineException): conn.register_connection("bad_coonection1", session=session, consistency="not_null") with self.assertRaises(CQLEngineException): @@ -252,6 +253,7 @@ def test_connection_param_validation(self): conn.register_connection("bad_coonection4", session=session, cluster_options="not_null") with self.assertRaises(CQLEngineException): conn.register_connection("bad_coonection5", hosts="not_null", session=session) + cluster.shutdown() class BatchQueryConnectionTests(BaseCassEngTestCase): diff --git a/tests/integration/long/test_schema.py b/tests/integration/long/test_schema.py index 349a158af8..d2131ed8cf 100644 --- a/tests/integration/long/test_schema.py +++ b/tests/integration/long/test_schema.py @@ -113,6 +113,7 @@ def test_for_schema_disagreements_same_keyspace(self): execute_until_pass(session, "INSERT INTO test.cf (key, value) VALUES ({0}, {0})".format(j)) execute_until_pass(session, "DROP KEYSPACE test") + cluster.shutdown() def test_for_schema_disagreement_attribute(self): """ @@ -149,6 +150,7 @@ def test_for_schema_disagreement_attribute(self): self.check_and_wait_for_agreement(session, rs, True) rs = session.execute("DROP KEYSPACE test_schema_disagreement") self.check_and_wait_for_agreement(session, rs, True) + cluster.shutdown() def check_and_wait_for_agreement(self, session, rs, exepected): self.assertEqual(rs.response_future.is_schema_agreed, exepected) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 2817bebec7..47e76b78fd 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -35,6 +35,7 @@ from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\ MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol, protocolv5 from tests.integration.util import assert_quiescent_pool_state +import sys def setup_module(): @@ -74,6 +75,7 @@ def test_ignored_host_up(self): self.assertTrue(host.is_up) else: self.assertIsNone(host.is_up) + cluster.shutdown() def test_host_resolution(self): """ @@ -129,6 +131,7 @@ def test_raise_error_on_control_connection_timeout(self): with self.assertRaisesRegexp(NoHostAvailable, "OperationTimedOut\('errors=Timed out creating connection \(1 seconds\)"): cluster.connect() + cluster.shutdown() get_node(1).resume() @@ -431,7 +434,7 @@ def test_refresh_schema_type(self): self.assertEqual(original_test1rf_meta.export_as_string(), current_test1rf_meta.export_as_string()) self.assertIsNot(original_type_meta, current_type_meta) self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query()) - session.shutdown() + cluster.shutdown() def test_refresh_schema_no_wait(self): @@ -879,7 +882,17 @@ def test_add_profile_timeout(self): self.assertEqual(set(h.address for h in pools), set(('127.0.0.1',))) node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2'])) - self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2', node2, pool_wait_timeout=0.000000001) + + max_retry_count = 10 + for i in range(max_retry_count): + start = time.time() + try: + self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2', + node2, pool_wait_timeout=sys.float_info.min) + except Exception: + end = time.time() + self.assertAlmostEqual(start, end, 1) + break class LocalHostAdressTranslator(AddressTranslator): @@ -933,6 +946,7 @@ def test_address_translator_with_mixed_nodes(self): c.connect() for host in c.metadata.all_hosts(): self.assertEqual(adder_map.get(str(host)), host.broadcast_address) + c.shutdown() class ContextManagementTest(unittest.TestCase): @@ -1090,6 +1104,7 @@ def test_prepare_on_ignored_hosts(self): # address for c in cluster.connection_factory.mock_calls: self.assertEqual(call(unignored_address), c) + cluster.shutdown() class DuplicateRpcTest(unittest.TestCase): @@ -1120,12 +1135,14 @@ def test_duplicate(self): mock_handler = MockLoggingHandler() logger = logging.getLogger(cassandra.cluster.__name__) logger.addHandler(mock_handler) - test_cluster = self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy) + test_cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy) test_cluster.connect() warnings = mock_handler.messages.get("warning") self.assertEqual(len(warnings), 1) self.assertTrue('multiple' in warnings[0]) logger.removeHandler(mock_handler) + test_cluster.shutdown() + @protocolv5 @@ -1166,3 +1183,4 @@ def test_valid_protocol_version_beta_options_connect(self): session = cluster.connect() self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.MAX_SUPPORTED) self.assertTrue(session.execute("select release_version from system.local")[0]) + cluster.shutdown() diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 4beededeb7..538cf406f0 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -364,7 +364,8 @@ def test_connect_timeout(self): for i in range(max_retry_count): start = time.time() try: - self.get_connection(timeout=sys.float_info.min) + conn = self.get_connection(timeout=sys.float_info.min) + conn.close() except Exception as e: end = time.time() self.assertAlmostEqual(start, end, 1) diff --git a/tests/integration/standard/test_custom_protocol_handler.py b/tests/integration/standard/test_custom_protocol_handler.py index 89b1b2fd1e..79b9d84636 100644 --- a/tests/integration/standard/test_custom_protocol_handler.py +++ b/tests/integration/standard/test_custom_protocol_handler.py @@ -62,7 +62,8 @@ def test_custom_raw_uuid_row_results(self): """ # Ensure that we get normal uuid back first - session = Cluster(protocol_version=PROTOCOL_VERSION).connect(keyspace="custserdes") + cluster = Cluster(protocol_version=PROTOCOL_VERSION) + session = cluster.connect(keyspace="custserdes") session.row_factory = tuple_factory result = session.execute("SELECT schema_version FROM system.local") uuid_type = result[0][0] @@ -82,7 +83,7 @@ def test_custom_raw_uuid_row_results(self): result_set = session.execute("SELECT schema_version FROM system.local") uuid_type = result_set[0][0] self.assertEqual(type(uuid_type), uuid.UUID) - session.shutdown() + cluster.shutdown() def test_custom_raw_row_results_all_types(self): """ @@ -99,7 +100,8 @@ def test_custom_raw_row_results_all_types(self): @test_category data_types:serialization """ # Connect using a custom protocol handler that tracks the various types the result message is used with. - session = Cluster(protocol_version=PROTOCOL_VERSION).connect(keyspace="custserdes") + cluster = Cluster(protocol_version=PROTOCOL_VERSION) + session = cluster.connect(keyspace="custserdes") session.client_protocol_handler = CustomProtocolHandlerResultMessageTracked session.row_factory = tuple_factory @@ -113,7 +115,7 @@ def test_custom_raw_row_results_all_types(self): self.assertEqual(actual, expected) # Ensure we have covered the various primitive types self.assertEqual(len(CustomResultMessageTracked.checked_rev_row_set), len(PRIMITIVE_DATATYPES)-1) - session.shutdown() + cluster.shutdown() class CustomResultMessageRaw(ResultMessage): diff --git a/tests/integration/standard/test_cython_protocol_handlers.py b/tests/integration/standard/test_cython_protocol_handlers.py index 7dc3db300e..28c3b0f2d8 100644 --- a/tests/integration/standard/test_cython_protocol_handlers.py +++ b/tests/integration/standard/test_cython_protocol_handlers.py @@ -42,7 +42,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): - drop_keyspace_shutdown_cluster("testspace", cls.session, cls.session) + drop_keyspace_shutdown_cluster("testspace", cls.session, cls.cluster) @cythontest def test_cython_parser(self): @@ -188,7 +188,7 @@ def get_data(protocol_handler): session.row_factory = tuple_factory results = session.execute("SELECT * FROM test_table") - session.shutdown() + cluster.shutdown() return results diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index cebe8a3ad0..c54fc1ff22 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -967,6 +967,7 @@ def test_export_schema(self): cluster.connect() self.assertIsInstance(cluster.metadata.export_schema_as_string(), six.string_types) + cluster.shutdown() def test_export_keyspace_schema(self): """ @@ -2114,6 +2115,9 @@ def setup_class(cls): cls.session.set_keyspace(cls.keyspace_name) connection = cls.cluster.control_connection._connection cls.parser_class = get_schema_parser(connection, str(CASS_SERVER_VERSION[0]), timeout=20).__class__ + cls.cluster.control_connection.reconnect = Mock() + + @classmethod def teardown_class(cls): diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index 45e5256fb3..1f133a431d 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -271,6 +271,9 @@ def test_duplicate_metrics_per_cluster(self): self.assertTrue("appcluster" in scales._Stats.stats.keys()) self.assertTrue("devops" in scales._Stats.stats.keys()) + cluster2.shutdown() + cluster3.shutdown() + class RequestAnalyzer(object): """ From 00ca39714b4543a8a50850e70e016d1d8b70cff4 Mon Sep 17 00:00:00 2001 From: bjmb Date: Thu, 9 Feb 2017 17:48:27 -0500 Subject: [PATCH 2/2] Fixed timer logic to behave properly with ConstantSpeculativeExecutionPolicy --- cassandra/cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 5d037a1eac..6088c17831 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3322,8 +3322,8 @@ def _on_speculative_execute(self): if self._time_remaining <= 0: self._on_timeout() return - if not self.send_request(error_no_hosts=False): - self._start_timer() + self.send_request(error_no_hosts=False) + self._start_timer() def _make_query_plan(self):