|
20 | 20 | import math |
21 | 21 | from mock import patch, Mock |
22 | 22 | import os |
| 23 | +import weakref |
23 | 24 | import six |
24 | 25 | from six import BytesIO |
25 | 26 | from socket import error as socket_error |
26 | | -import sys |
27 | | -import time |
28 | 27 |
|
| 28 | +from cassandra.io.libevreactor import _cleanup as libev__cleanup |
29 | 29 | from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT, |
30 | 30 | ConnectionException, ProtocolError) |
31 | 31 |
|
32 | 32 | from cassandra.protocol import (write_stringmultimap, write_int, write_string, |
33 | 33 | SupportedMessage, ReadyMessage, ServerError) |
34 | 34 | from cassandra.marshal import uint8_pack, uint32_pack, int32_pack |
35 | | -from tests.unit.io.utils import TimerCallback |
36 | | -from tests.unit.io.utils import submit_and_wait_for_completion |
| 35 | + |
37 | 36 | from tests import is_monkey_patched |
38 | 37 |
|
39 | 38 |
|
40 | 39 | try: |
41 | | - from cassandra.io.libevreactor import LibevConnection |
| 40 | + from cassandra.io.libevreactor import LibevConnection, LibevLoop |
42 | 41 | except ImportError: |
43 | 42 | LibevConnection = None # noqa |
44 | 43 |
|
@@ -296,3 +295,37 @@ def test_partial_message_read(self, *args): |
296 | 295 |
|
297 | 296 | self.assertTrue(c.connected_event.is_set()) |
298 | 297 | self.assertFalse(c.is_defunct) |
| 298 | + |
| 299 | + def test_watchers_are_finished(self, *args): |
| 300 | + """ |
| 301 | + Test for asserting that watchers are closed in LibevConnection |
| 302 | +
|
| 303 | + This test simulates a process termination without calling cluster.shutdown(), which would trigger |
| 304 | + LibevConnection._libevloop._cleanup. It will check the watchers have been closed |
| 305 | + Finally it will restore the LibevConnection reactor so it doesn't affect |
| 306 | + the rest of the tests |
| 307 | +
|
| 308 | + @since 3.10 |
| 309 | + @jira_ticket PYTHON-747 |
| 310 | + @expected_result the watchers are closed |
| 311 | +
|
| 312 | + @test_category connection |
| 313 | + """ |
| 314 | + with patch.object(LibevConnection._libevloop, "_thread"), \ |
| 315 | + patch.object(LibevConnection._libevloop, "notify"): |
| 316 | + |
| 317 | + self.make_connection() |
| 318 | + |
| 319 | + # We have to make a copy because the connections shouldn't |
| 320 | + # be alive when we verify them |
| 321 | + live_connections = set(LibevConnection._libevloop._live_conns) |
| 322 | + |
| 323 | + # This simulates the process ending without cluster.shutdown() |
| 324 | + # being called, then with atexit _cleanup for libevreactor would |
| 325 | + # be called |
| 326 | + libev__cleanup(weakref.ref(LibevConnection._libevloop)) |
| 327 | + for conn in live_connections: |
| 328 | + for watcher in (conn._write_watcher, conn._read_watcher): |
| 329 | + self.assertTrue(watcher.stop.mock_calls) |
| 330 | + |
| 331 | + LibevConnection._libevloop._shutdown = False |
0 commit comments