diff --git a/Lib/pty.py b/Lib/pty.py index 1d97994abe..4b25ac32c8 100644 --- a/Lib/pty.py +++ b/Lib/pty.py @@ -32,27 +32,18 @@ def openpty(): except (AttributeError, OSError): pass master_fd, slave_name = _open_terminal() - slave_fd = slave_open(slave_name) - return master_fd, slave_fd - -def master_open(): - """master_open() -> (master_fd, slave_name) - Open a pty master and return the fd, and the filename of the slave end. - Deprecated, use openpty() instead.""" - - import warnings - warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 + slave_fd = os.open(slave_name, os.O_RDWR) try: - master_fd, slave_fd = os.openpty() - except (AttributeError, OSError): + from fcntl import ioctl, I_PUSH + except ImportError: + return master_fd, slave_fd + try: + ioctl(slave_fd, I_PUSH, "ptem") + ioctl(slave_fd, I_PUSH, "ldterm") + except OSError: pass - else: - slave_name = os.ttyname(slave_fd) - os.close(slave_fd) - return master_fd, slave_name - - return _open_terminal() + return master_fd, slave_fd def _open_terminal(): """Open pty master and return (master_fd, tty_name).""" @@ -66,26 +57,6 @@ def _open_terminal(): return (fd, '/dev/tty' + x + y) raise OSError('out of pty devices') -def slave_open(tty_name): - """slave_open(tty_name) -> slave_fd - Open the pty slave and acquire the controlling terminal, returning - opened filedescriptor. - Deprecated, use openpty() instead.""" - - import warnings - warnings.warn("Use pty.openpty() instead.", DeprecationWarning, stacklevel=2) # Remove API in 3.14 - - result = os.open(tty_name, os.O_RDWR) - try: - from fcntl import ioctl, I_PUSH - except ImportError: - return result - try: - ioctl(result, I_PUSH, "ptem") - ioctl(result, I_PUSH, "ldterm") - except OSError: - pass - return result def fork(): """fork() -> (pid, master_fd) diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index 6d3c47195c..c12209db55 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -1,14 +1,13 @@ import unittest from test.support import ( - is_android, is_apple_mobile, is_emscripten, is_wasi, reap_children, verbose + is_android, is_apple_mobile, is_wasm32, reap_children, verbose ) from test.support.import_helper import import_module -from test.support.os_helper import TESTFN, unlink # Skip these tests if termios is not available import_module('termios') -if is_android or is_apple_mobile or is_emscripten or is_wasi: +if is_android or is_apple_mobile or is_wasm32: raise unittest.SkipTest("pty is not available on this platform") import errno @@ -109,8 +108,8 @@ def setUp(self): def handle_sighup(signum, frame): pass + @unittest.skip("TODO: RUSTPYTHON; \"Not runnable. tty.tcgetwinsize\" is required to setUp") @expectedFailureIfStdinIsTTY - @unittest.skip('TODO: RUSTPYTHON; "Not runnable. tty.tcgetwinsize" is required to setUp') def test_openpty(self): try: mode = tty.tcgetattr(pty.STDIN_FILENO) @@ -196,7 +195,7 @@ def test_openpty(self): s2 = _readline(master_fd) self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) - @unittest.skip('TODO: RUSTPYTHON; "Not runnable. tty.tcgetwinsize" is required to setUp') + @unittest.skip("TODO: RUSTPYTHON; \"Not runnable. tty.tcgetwinsize\" is required to setUp") def test_fork(self): debug("calling pty.fork()") pid, master_fd = pty.fork() @@ -279,7 +278,7 @@ def test_fork(self): ##else: ## raise TestFailed("Read from master_fd did not raise exception") - @unittest.skip('TODO: RUSTPYTHON; AttributeError: module "tty" has no attribute "tcgetwinsize"') + @unittest.skip("TODO: RUSTPYTHON; AttributeError: module \"tty\" has no attribute \"tcgetwinsize\"") def test_master_read(self): # XXX(nnorwitz): this test leaks fds when there is an error. debug("Calling pty.openpty()") @@ -299,28 +298,29 @@ def test_master_read(self): self.assertEqual(data, b"") - @unittest.skip('TODO: RUSTPYTHON; AttributeError: module "tty" has no attribute "tcgetwinsize"') + @unittest.skip("TODO: RUSTPYTHON; AttributeError: module \"tty\" has no attribute \"tcgetwinsize\"") def test_spawn_doesnt_hang(self): - self.addCleanup(unlink, TESTFN) - with open(TESTFN, 'wb') as f: - STDOUT_FILENO = 1 - dup_stdout = os.dup(STDOUT_FILENO) - os.dup2(f.fileno(), STDOUT_FILENO) - buf = b'' - def master_read(fd): - nonlocal buf - data = os.read(fd, 1024) - buf += data - return data + # gh-140482: Do the test in a pty.fork() child to avoid messing + # with the interactive test runner's terminal settings. + pid, fd = pty.fork() + if pid == pty.CHILD: + pty.spawn([sys.executable, '-c', 'print("hi there")']) + os._exit(0) + + try: + buf = bytearray() try: - pty.spawn([sys.executable, '-c', 'print("hi there")'], - master_read) - finally: - os.dup2(dup_stdout, STDOUT_FILENO) - os.close(dup_stdout) - self.assertEqual(buf, b'hi there\r\n') - with open(TESTFN, 'rb') as f: - self.assertEqual(f.read(), b'hi there\r\n') + while (data := os.read(fd, 1024)) != b'': + buf.extend(data) + except OSError as e: + if e.errno != errno.EIO: + raise + + (pid, status) = os.waitpid(pid, 0) + self.assertEqual(status, 0) + self.assertEqual(bytes(buf), b"hi there\r\n") + finally: + os.close(fd) class SmallPtyTests(unittest.TestCase): """These tests don't spawn children or hang."""