|
27 | 27 | from packaging.version import Version |
28 | 28 |
|
29 | 29 | import cassandra |
30 | | -from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT |
| 30 | +from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT, ControlConnection |
31 | 31 | from cassandra.concurrent import execute_concurrent |
32 | 32 | from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, |
33 | 33 | RetryPolicy, SimpleConvictionPolicy, HostDistance, |
@@ -502,79 +502,70 @@ def test_refresh_schema_type(self): |
502 | 502 | @local |
503 | 503 | @notwindows |
504 | 504 | def test_refresh_schema_no_wait(self): |
505 | | - contact_points = [CASSANDRA_IP] |
506 | | - with Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10, |
507 | | - contact_points=contact_points, |
508 | | - execution_profiles= |
509 | | - {EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy= |
510 | | - HostFilterPolicy( |
511 | | - RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP |
512 | | - ))}) as cluster: |
513 | | - session = cluster.connect() |
514 | | - |
515 | | - schema_ver = session.execute("SELECT schema_version FROM system.local WHERE key='local'")[0][0] |
516 | | - new_schema_ver = uuid4() |
517 | | - session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (new_schema_ver,)) |
518 | | - |
519 | | - try: |
520 | | - agreement_timeout = 1 |
521 | | - |
522 | | - # cluster agreement wait exceeded |
523 | | - c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=agreement_timeout) |
524 | | - c.connect() |
525 | | - self.assertTrue(c.metadata.keyspaces) |
526 | | - |
527 | | - # cluster agreement wait used for refresh |
528 | | - original_meta = c.metadata.keyspaces |
529 | | - start_time = time.time() |
530 | | - self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata) |
531 | | - end_time = time.time() |
532 | | - self.assertGreaterEqual(end_time - start_time, agreement_timeout) |
533 | | - self.assertIs(original_meta, c.metadata.keyspaces) |
534 | | - |
535 | | - # refresh wait overrides cluster value |
536 | | - original_meta = c.metadata.keyspaces |
537 | | - start_time = time.time() |
538 | | - c.refresh_schema_metadata(max_schema_agreement_wait=0) |
539 | | - end_time = time.time() |
540 | | - self.assertLess(end_time - start_time, agreement_timeout) |
541 | | - self.assertIsNot(original_meta, c.metadata.keyspaces) |
542 | | - self.assertEqual(original_meta, c.metadata.keyspaces) |
543 | | - |
544 | | - c.shutdown() |
545 | | - |
546 | | - refresh_threshold = 0.5 |
547 | | - # cluster agreement bypass |
548 | | - c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=0) |
549 | | - start_time = time.time() |
550 | | - s = c.connect() |
551 | | - end_time = time.time() |
552 | | - self.assertLess(end_time - start_time, refresh_threshold) |
553 | | - self.assertTrue(c.metadata.keyspaces) |
554 | | - |
555 | | - # cluster agreement wait used for refresh |
556 | | - original_meta = c.metadata.keyspaces |
557 | | - start_time = time.time() |
558 | | - c.refresh_schema_metadata() |
559 | | - end_time = time.time() |
560 | | - self.assertLess(end_time - start_time, refresh_threshold) |
561 | | - self.assertIsNot(original_meta, c.metadata.keyspaces) |
562 | | - self.assertEqual(original_meta, c.metadata.keyspaces) |
563 | | - |
564 | | - # refresh wait overrides cluster value |
565 | | - original_meta = c.metadata.keyspaces |
566 | | - start_time = time.time() |
567 | | - self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata, |
568 | | - max_schema_agreement_wait=agreement_timeout) |
569 | | - end_time = time.time() |
570 | | - self.assertGreaterEqual(end_time - start_time, agreement_timeout) |
571 | | - self.assertIs(original_meta, c.metadata.keyspaces) |
572 | | - c.shutdown() |
573 | | - finally: |
574 | | - # TODO once fixed this connect call |
575 | | - session = cluster.connect() |
576 | | - session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (schema_ver,)) |
577 | | - |
| 505 | + original_wait_for_responses = connection.Connection.wait_for_responses |
| 506 | + |
| 507 | + def patched_wait_for_responses(*args, **kwargs): |
| 508 | + # When selecting schema version, replace the real schema UUID with an unexpected UUID |
| 509 | + response = original_wait_for_responses(*args, **kwargs) |
| 510 | + if len(args) > 2 and hasattr(args[2], "query") and args[2].query == "SELECT schema_version FROM system.local WHERE key='local'": |
| 511 | + new_uuid = uuid4() |
| 512 | + response[1].parsed_rows[0] = (new_uuid,) |
| 513 | + return response |
| 514 | + |
| 515 | + with patch.object(connection.Connection, "wait_for_responses", patched_wait_for_responses): |
| 516 | + agreement_timeout = 1 |
| 517 | + |
| 518 | + # cluster agreement wait exceeded |
| 519 | + c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=agreement_timeout) |
| 520 | + c.connect() |
| 521 | + self.assertTrue(c.metadata.keyspaces) |
| 522 | + |
| 523 | + # cluster agreement wait used for refresh |
| 524 | + original_meta = c.metadata.keyspaces |
| 525 | + start_time = time.time() |
| 526 | + self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata) |
| 527 | + end_time = time.time() |
| 528 | + self.assertGreaterEqual(end_time - start_time, agreement_timeout) |
| 529 | + self.assertIs(original_meta, c.metadata.keyspaces) |
| 530 | + |
| 531 | + # refresh wait overrides cluster value |
| 532 | + original_meta = c.metadata.keyspaces |
| 533 | + start_time = time.time() |
| 534 | + c.refresh_schema_metadata(max_schema_agreement_wait=0) |
| 535 | + end_time = time.time() |
| 536 | + self.assertLess(end_time - start_time, agreement_timeout) |
| 537 | + self.assertIsNot(original_meta, c.metadata.keyspaces) |
| 538 | + self.assertEqual(original_meta, c.metadata.keyspaces) |
| 539 | + |
| 540 | + c.shutdown() |
| 541 | + |
| 542 | + refresh_threshold = 0.5 |
| 543 | + # cluster agreement bypass |
| 544 | + c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=0) |
| 545 | + start_time = time.time() |
| 546 | + s = c.connect() |
| 547 | + end_time = time.time() |
| 548 | + self.assertLess(end_time - start_time, refresh_threshold) |
| 549 | + self.assertTrue(c.metadata.keyspaces) |
| 550 | + |
| 551 | + # cluster agreement wait used for refresh |
| 552 | + original_meta = c.metadata.keyspaces |
| 553 | + start_time = time.time() |
| 554 | + c.refresh_schema_metadata() |
| 555 | + end_time = time.time() |
| 556 | + self.assertLess(end_time - start_time, refresh_threshold) |
| 557 | + self.assertIsNot(original_meta, c.metadata.keyspaces) |
| 558 | + self.assertEqual(original_meta, c.metadata.keyspaces) |
| 559 | + |
| 560 | + # refresh wait overrides cluster value |
| 561 | + original_meta = c.metadata.keyspaces |
| 562 | + start_time = time.time() |
| 563 | + self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata, |
| 564 | + max_schema_agreement_wait=agreement_timeout) |
| 565 | + end_time = time.time() |
| 566 | + self.assertGreaterEqual(end_time - start_time, agreement_timeout) |
| 567 | + self.assertIs(original_meta, c.metadata.keyspaces) |
| 568 | + c.shutdown() |
578 | 569 |
|
579 | 570 | def test_trace(self): |
580 | 571 | """ |
@@ -1480,52 +1471,6 @@ def test_prepare_on_ignored_hosts(self): |
1480 | 1471 | cluster.shutdown() |
1481 | 1472 |
|
1482 | 1473 |
|
1483 | | -@local |
1484 | | -class DuplicateRpcTest(unittest.TestCase): |
1485 | | - |
1486 | | - load_balancing_policy = HostFilterPolicy(RoundRobinPolicy(), |
1487 | | - lambda host: host.address == "127.0.0.1") |
1488 | | - |
1489 | | - def setUp(self): |
1490 | | - self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, |
1491 | | - execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=self.load_balancing_policy)}) |
1492 | | - self.session = self.cluster.connect() |
1493 | | - |
1494 | | - self.address_column = "native_transport_address" if DSE_VERSION and DSE_VERSION >= Version("6.0") else "rpc_address" |
1495 | | - self.session.execute("UPDATE system.peers SET {} = '127.0.0.1' WHERE peer='127.0.0.2'". |
1496 | | - format(self.address_column)) |
1497 | | - |
1498 | | - def tearDown(self): |
1499 | | - self.session.execute("UPDATE system.peers SET {} = '127.0.0.2' WHERE peer='127.0.0.2'". |
1500 | | - format(self.address_column)) |
1501 | | - self.cluster.shutdown() |
1502 | | - |
1503 | | - def test_duplicate(self): |
1504 | | - """ |
1505 | | - Test duplicate RPC addresses. |
1506 | | -
|
1507 | | - Modifies the system.peers table to make hosts have the same rpc address. Ensures such hosts are filtered out and a message is logged |
1508 | | -
|
1509 | | - @since 3.4 |
1510 | | - @jira_ticket PYTHON-366 |
1511 | | - @expected_result only one hosts' metadata will be populated |
1512 | | -
|
1513 | | - @test_category metadata |
1514 | | - """ |
1515 | | - mock_handler = MockLoggingHandler() |
1516 | | - logger = logging.getLogger(cassandra.cluster.__name__) |
1517 | | - logger.addHandler(mock_handler) |
1518 | | - test_cluster = Cluster(protocol_version=PROTOCOL_VERSION, |
1519 | | - execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=self.load_balancing_policy)}) |
1520 | | - |
1521 | | - test_cluster.connect() |
1522 | | - warnings = mock_handler.messages.get("warning") |
1523 | | - self.assertEqual(len(warnings), 1) |
1524 | | - self.assertTrue('multiple' in warnings[0]) |
1525 | | - logger.removeHandler(mock_handler) |
1526 | | - test_cluster.shutdown() |
1527 | | - |
1528 | | - |
1529 | 1474 | @protocolv5 |
1530 | 1475 | class BetaProtocolTest(unittest.TestCase): |
1531 | 1476 |
|
|
0 commit comments