|
3 | 3 | import telnetlib |
4 | 4 | import time |
5 | 5 | import queue |
| 6 | +import sys |
| 7 | +import io |
6 | 8 |
|
7 | 9 | from unittest import TestCase |
8 | 10 | from test import support |
@@ -304,6 +306,20 @@ def do_nego(self, sock, cmd, opt): |
304 | 306 | self.sb_seen += sb_data |
305 | 307 |
|
306 | 308 | tl = telnetlib |
| 309 | + |
| 310 | +class TelnetDebuglevel(tl.Telnet): |
| 311 | + ''' Telnet-alike that captures messages written to stdout when |
| 312 | + debuglevel > 0 |
| 313 | + ''' |
| 314 | + _messages = '' |
| 315 | + def msg(self, msg, *args): |
| 316 | + orig_stdout = sys.stdout |
| 317 | + sys.stdout = fake_stdout = io.StringIO() |
| 318 | + tl.Telnet.msg(self, msg, *args) |
| 319 | + self._messages += fake_stdout.getvalue() |
| 320 | + sys.stdout = orig_stdout |
| 321 | + return |
| 322 | + |
307 | 323 | class OptionTests(TestCase): |
308 | 324 | setUp = _read_setUp |
309 | 325 | tearDown = _read_tearDown |
@@ -363,6 +379,36 @@ def test_SB_commands(self): |
363 | 379 | self.assertEqual(b'', telnet.read_sb_data()) |
364 | 380 | nego.sb_getter = None # break the nego => telnet cycle |
365 | 381 |
|
| 382 | + def _test_debuglevel(self, data, expected_msg): |
| 383 | + """ helper for testing debuglevel messages """ |
| 384 | + self.setUp() |
| 385 | + self.dataq.put(data) |
| 386 | + telnet = TelnetDebuglevel(HOST, self.port) |
| 387 | + telnet.set_debuglevel(1) |
| 388 | + self.dataq.join() |
| 389 | + txt = telnet.read_all() |
| 390 | + self.assertTrue(expected_msg in telnet._messages, |
| 391 | + msg=(telnet._messages, expected_msg)) |
| 392 | + self.tearDown() |
| 393 | + |
| 394 | + def test_debuglevel(self): |
| 395 | + # test all the various places that self.msg(...) is called |
| 396 | + given_a_expect_b = [ |
| 397 | + # Telnet.fill_rawq |
| 398 | + (b'a', ": recv b''\n"), |
| 399 | + # Telnet.process_rawq |
| 400 | + (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), |
| 401 | + (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), |
| 402 | + (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), |
| 403 | + (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), |
| 404 | + (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), |
| 405 | + # Telnet.write |
| 406 | + # XXX, untested |
| 407 | + ] |
| 408 | + for a, b in given_a_expect_b: |
| 409 | + self._test_debuglevel([a, EOF_sigil], b) |
| 410 | + return |
| 411 | + |
366 | 412 | def test_main(verbose=None): |
367 | 413 | support.run_unittest(GeneralTests, ReadTests, OptionTests) |
368 | 414 |
|
|
0 commit comments