Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3322,8 +3322,8 @@ def _on_speculative_execute(self):
if self._time_remaining <= 0:
self._on_timeout()
return
if not self.send_request(error_no_hosts=False):
self._start_timer()
self.send_request(error_no_hosts=False)
self._start_timer()


def _make_query_plan(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def execute_until_pass(session, query):
try:
return session.execute(query)
except (ConfigurationException, AlreadyExists):
log.warn("Recieved already exists from query {0} not exiting".format(query))
log.warn("Received already exists from query {0} not exiting".format(query))
# keyspace/table was already created/dropped
return
except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure):
Expand All @@ -414,7 +414,7 @@ def execute_with_long_wait_retry(session, query, timeout=30):
try:
return session.execute(query, timeout=timeout)
except (ConfigurationException, AlreadyExists):
log.warn("Recieved already exists from query {0} not exiting".format(query))
log.warn("Received already exists from query {0} not exiting".format(query))
# keyspace/table was already created/dropped
return
except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure):
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/cqlengine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def setup_package():
create_keyspace_simple(DEFAULT_KEYSPACE, 1)


def teardown_package():
connection.unregister_connection("default")


def is_prepend_reversed():
# do we have https://issues.apache.org/jira/browse/CASSANDRA-8733 ?
ver, _ = get_server_versions()
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/cqlengine/connections/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ConnectionTest(BaseCassEngTestCase):

@classmethod
def setUpClass(cls):
cls.original_cluster = connection.get_cluster()
connection.unregister_connection('default')
cls.keyspace1 = 'ctest1'
cls.keyspace2 = 'ctest2'
super(ConnectionTest, cls).setUpClass()
Expand All @@ -56,7 +56,6 @@ def tearDownClass(cls):
execute_with_long_wait_retry(cls.setup_session, "DROP KEYSPACE {0}".format(cls.keyspace1))
execute_with_long_wait_retry(cls.setup_session, "DROP KEYSPACE {0}".format(cls.keyspace2))
models.DEFAULT_KEYSPACE = DEFAULT_KEYSPACE
cls.original_cluster.shutdown()
cls.setup_cluster.shutdown()
setup_connection(DEFAULT_KEYSPACE)
models.DEFAULT_KEYSPACE
Expand Down
1 change: 0 additions & 1 deletion tests/integration/cqlengine/query/test_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
models.DEFAULT_KEYSPACE = cls.default_keyspace
setup_connection(models.DEFAULT_KEYSPACE)
super(TestNamedWithMV, cls).tearDownClass()

@greaterthanorequalcass30
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/cqlengine/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ def test_connection_param_validation(self):

@test_category object_mapper
"""
session = Cluster(['127.0.0.1']).connect()
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
with self.assertRaises(CQLEngineException):
conn.register_connection("bad_coonection1", session=session, consistency="not_null")
with self.assertRaises(CQLEngineException):
Expand All @@ -252,6 +253,7 @@ def test_connection_param_validation(self):
conn.register_connection("bad_coonection4", session=session, cluster_options="not_null")
with self.assertRaises(CQLEngineException):
conn.register_connection("bad_coonection5", hosts="not_null", session=session)
cluster.shutdown()


class BatchQueryConnectionTests(BaseCassEngTestCase):
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/long/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def test_for_schema_disagreements_same_keyspace(self):
execute_until_pass(session, "INSERT INTO test.cf (key, value) VALUES ({0}, {0})".format(j))

execute_until_pass(session, "DROP KEYSPACE test")
cluster.shutdown()

def test_for_schema_disagreement_attribute(self):
"""
Expand Down Expand Up @@ -149,6 +150,7 @@ def test_for_schema_disagreement_attribute(self):
self.check_and_wait_for_agreement(session, rs, True)
rs = session.execute("DROP KEYSPACE test_schema_disagreement")
self.check_and_wait_for_agreement(session, rs, True)
cluster.shutdown()

def check_and_wait_for_agreement(self, session, rs, exepected):
self.assertEqual(rs.response_future.is_schema_agreed, exepected)
Expand Down
24 changes: 21 additions & 3 deletions tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol, protocolv5
from tests.integration.util import assert_quiescent_pool_state
import sys


def setup_module():
Expand Down Expand Up @@ -74,6 +75,7 @@ def test_ignored_host_up(self):
self.assertTrue(host.is_up)
else:
self.assertIsNone(host.is_up)
cluster.shutdown()

def test_host_resolution(self):
"""
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_raise_error_on_control_connection_timeout(self):

with self.assertRaisesRegexp(NoHostAvailable, "OperationTimedOut\('errors=Timed out creating connection \(1 seconds\)"):
cluster.connect()
cluster.shutdown()

get_node(1).resume()

Expand Down Expand Up @@ -431,7 +434,7 @@ def test_refresh_schema_type(self):
self.assertEqual(original_test1rf_meta.export_as_string(), current_test1rf_meta.export_as_string())
self.assertIsNot(original_type_meta, current_type_meta)
self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query())
session.shutdown()
cluster.shutdown()

def test_refresh_schema_no_wait(self):

Expand Down Expand Up @@ -879,7 +882,17 @@ def test_add_profile_timeout(self):
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1',)))

node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2']))
self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2', node2, pool_wait_timeout=0.000000001)

max_retry_count = 10
for i in range(max_retry_count):
start = time.time()
try:
self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2',
node2, pool_wait_timeout=sys.float_info.min)
except Exception:
end = time.time()
self.assertAlmostEqual(start, end, 1)
break


class LocalHostAdressTranslator(AddressTranslator):
Expand Down Expand Up @@ -933,6 +946,7 @@ def test_address_translator_with_mixed_nodes(self):
c.connect()
for host in c.metadata.all_hosts():
self.assertEqual(adder_map.get(str(host)), host.broadcast_address)
c.shutdown()


class ContextManagementTest(unittest.TestCase):
Expand Down Expand Up @@ -1090,6 +1104,7 @@ def test_prepare_on_ignored_hosts(self):
# address
for c in cluster.connection_factory.mock_calls:
self.assertEqual(call(unignored_address), c)
cluster.shutdown()


class DuplicateRpcTest(unittest.TestCase):
Expand Down Expand Up @@ -1120,12 +1135,14 @@ def test_duplicate(self):
mock_handler = MockLoggingHandler()
logger = logging.getLogger(cassandra.cluster.__name__)
logger.addHandler(mock_handler)
test_cluster = self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy)
test_cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy)
test_cluster.connect()
warnings = mock_handler.messages.get("warning")
self.assertEqual(len(warnings), 1)
self.assertTrue('multiple' in warnings[0])
logger.removeHandler(mock_handler)
test_cluster.shutdown()



@protocolv5
Expand Down Expand Up @@ -1166,3 +1183,4 @@ def test_valid_protocol_version_beta_options_connect(self):
session = cluster.connect()
self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.MAX_SUPPORTED)
self.assertTrue(session.execute("select release_version from system.local")[0])
cluster.shutdown()
3 changes: 2 additions & 1 deletion tests/integration/standard/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ def test_connect_timeout(self):
for i in range(max_retry_count):
start = time.time()
try:
self.get_connection(timeout=sys.float_info.min)
conn = self.get_connection(timeout=sys.float_info.min)
conn.close()
except Exception as e:
end = time.time()
self.assertAlmostEqual(start, end, 1)
Expand Down
10 changes: 6 additions & 4 deletions tests/integration/standard/test_custom_protocol_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def test_custom_raw_uuid_row_results(self):
"""

# Ensure that we get normal uuid back first
session = Cluster(protocol_version=PROTOCOL_VERSION).connect(keyspace="custserdes")
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect(keyspace="custserdes")
session.row_factory = tuple_factory
result = session.execute("SELECT schema_version FROM system.local")
uuid_type = result[0][0]
Expand All @@ -82,7 +83,7 @@ def test_custom_raw_uuid_row_results(self):
result_set = session.execute("SELECT schema_version FROM system.local")
uuid_type = result_set[0][0]
self.assertEqual(type(uuid_type), uuid.UUID)
session.shutdown()
cluster.shutdown()

def test_custom_raw_row_results_all_types(self):
"""
Expand All @@ -99,7 +100,8 @@ def test_custom_raw_row_results_all_types(self):
@test_category data_types:serialization
"""
# Connect using a custom protocol handler that tracks the various types the result message is used with.
session = Cluster(protocol_version=PROTOCOL_VERSION).connect(keyspace="custserdes")
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect(keyspace="custserdes")
session.client_protocol_handler = CustomProtocolHandlerResultMessageTracked
session.row_factory = tuple_factory

Expand All @@ -113,7 +115,7 @@ def test_custom_raw_row_results_all_types(self):
self.assertEqual(actual, expected)
# Ensure we have covered the various primitive types
self.assertEqual(len(CustomResultMessageTracked.checked_rev_row_set), len(PRIMITIVE_DATATYPES)-1)
session.shutdown()
cluster.shutdown()


class CustomResultMessageRaw(ResultMessage):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/standard/test_cython_protocol_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
drop_keyspace_shutdown_cluster("testspace", cls.session, cls.session)
drop_keyspace_shutdown_cluster("testspace", cls.session, cls.cluster)

@cythontest
def test_cython_parser(self):
Expand Down Expand Up @@ -188,7 +188,7 @@ def get_data(protocol_handler):
session.row_factory = tuple_factory

results = session.execute("SELECT * FROM test_table")
session.shutdown()
cluster.shutdown()
return results


Expand Down
4 changes: 4 additions & 0 deletions tests/integration/standard/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ def test_export_schema(self):
cluster.connect()

self.assertIsInstance(cluster.metadata.export_schema_as_string(), six.string_types)
cluster.shutdown()

def test_export_keyspace_schema(self):
"""
Expand Down Expand Up @@ -2114,6 +2115,9 @@ def setup_class(cls):
cls.session.set_keyspace(cls.keyspace_name)
connection = cls.cluster.control_connection._connection
cls.parser_class = get_schema_parser(connection, str(CASS_SERVER_VERSION[0]), timeout=20).__class__
cls.cluster.control_connection.reconnect = Mock()



@classmethod
def teardown_class(cls):
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/standard/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ def test_duplicate_metrics_per_cluster(self):
self.assertTrue("appcluster" in scales._Stats.stats.keys())
self.assertTrue("devops" in scales._Stats.stats.keys())

cluster2.shutdown()
cluster3.shutdown()


class RequestAnalyzer(object):
"""
Expand Down