diff --git a/Lib/test/test_repl.py b/Lib/test/test_repl.py index 03bf8d8b548..211d1783842 100644 --- a/Lib/test/test_repl.py +++ b/Lib/test/test_repl.py @@ -1,14 +1,35 @@ """Test the interactive interpreter.""" -import sys import os -import unittest +import select import subprocess +import sys +import unittest +from contextlib import contextmanager +from functools import partial from textwrap import dedent -from test.support import cpython_only, SuppressCrashReport +from test import support +from test.support import ( + cpython_only, + has_subprocess_support, + os_helper, + SuppressCrashReport, + SHORT_TIMEOUT, +) from test.support.script_helper import kill_python +from test.support.import_helper import import_module + +try: + import pty +except ImportError: + pty = None -def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): + +if not has_subprocess_support: + raise unittest.SkipTest("test module requires subprocess") + + +def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, custom=False, isolated=True, **kw): """Run the Python REPL with the given arguments. kw is extra keyword args to pass to subprocess.Popen. Returns a Popen @@ -22,7 +43,14 @@ def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): # path may be used by Py_GetPath() to build the default module search # path. stdin_fname = os.path.join(os.path.dirname(sys.executable), "") - cmd_line = [stdin_fname, '-E', '-i'] + cmd_line = [stdin_fname] + # Isolated mode implies -EPs and ignores PYTHON* variables. + if isolated: + cmd_line.append('-I') + # Don't re-run the built-in REPL from interactive mode + # if we're testing a custom REPL (such as the asyncio REPL). + if not custom: + cmd_line.append('-i') cmd_line.extend(args) # Set TERM=vt100, for the rationale see the comments in spawn_python() of @@ -36,10 +64,47 @@ def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): stdout=stdout, stderr=stderr, **kw) + +spawn_asyncio_repl = partial(spawn_repl, "-m", "asyncio", custom=True) + + +@contextmanager +def temp_pythonstartup(*, source: str, histfile: str = ".pythonhist"): + """Create environment variables for a PYTHONSTARTUP script in a temporary directory.""" + with os_helper.temp_dir() as tmpdir: + filename = os.path.join(tmpdir, "pythonstartup.py") + with open(filename, "w") as f: + f.write(source) + yield { + "PYTHONSTARTUP": filename, + "PYTHON_HISTORY": os.path.join(tmpdir, histfile) + } + + +def run_on_interactive_mode(source): + """Spawn a new Python interpreter, pass the given + input source code from the stdin and return the + result back. If the interpreter exits non-zero, it + raises a ValueError.""" + + process = spawn_repl() + process.stdin.write(source) + output = kill_python(process) + + if process.returncode != 0: + raise ValueError("Process didn't exit properly.") + return output + + +@support.force_not_colorized_test_class class TestInteractiveInterpreter(unittest.TestCase): @cpython_only + # Python built with Py_TRACE_REFS fail with a fatal error in + # _PyRefchain_Trace() on memory allocation error. + @unittest.skipIf(support.Py_TRACE_REFS, 'cannot test Py_TRACE_REFS build') def test_no_memory(self): + import_module("_testcapi") # Issue #30696: Fix the interactive interpreter looping endlessly when # no memory. Check also that the fix does not break the interactive # loop when an exception is raised. @@ -92,6 +157,23 @@ def test_multiline_string_parsing(self): output = kill_python(p) self.assertEqual(p.returncode, 0) + @cpython_only + def test_lexer_buffer_realloc_with_null_start(self): + # gh-144759: NULL pointer arithmetic in the lexer when start and + # multi_line_start are NULL (uninitialized in tok_mode_stack[0]) + # and the lexer buffer is reallocated while parsing long input. + long_value = "a" * 2000 + user_input = dedent(f"""\ + x = f'{{{long_value!r}}}' + print(x) + """) + p = spawn_repl() + p.stdin.write(user_input) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + self.assertIn(long_value, output) + + @unittest.expectedFailureIf(sys.platform == "android", "TODO: RUSTPYTHON; AssertionError: 101 != 0") def test_close_stdin(self): user_input = dedent(''' import os @@ -107,6 +189,306 @@ def test_close_stdin(self): self.assertEqual(process.returncode, 0) self.assertIn('before close', output) + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 101 != 0 + def test_interactive_traceback_reporting(self): + user_input = "1 / 0 / 3 / 4" + p = spawn_repl() + p.stdin.write(user_input) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + + traceback_lines = output.splitlines()[-6:-1] + expected_lines = [ + "Traceback (most recent call last):", + " File \"\", line 1, in ", + " 1 / 0 / 3 / 4", + " ~~^~~", + "ZeroDivisionError: division by zero", + ] + self.assertEqual(traceback_lines, expected_lines) + + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 101 != 0 + def test_interactive_traceback_reporting_multiple_input(self): + user_input1 = dedent(""" + def foo(x): + 1 / x + + """) + p = spawn_repl() + p.stdin.write(user_input1) + user_input2 = "foo(0)" + p.stdin.write(user_input2) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + + traceback_lines = output.splitlines()[-8:-1] + expected_lines = [ + ' File "", line 1, in ', + ' foo(0)', + ' ~~~^^^', + ' File "", line 2, in foo', + ' 1 / x', + ' ~~^~~', + 'ZeroDivisionError: division by zero' + ] + self.assertEqual(traceback_lines, expected_lines) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_pythonstartup_error_reporting(self): + # errors based on https://github.com/python/cpython/issues/137576 + + def make_repl(env): + return subprocess.Popen( + [os.path.join(os.path.dirname(sys.executable), ''), "-i"], + executable=sys.executable, + text=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + ) + + # case 1: error in user input, but PYTHONSTARTUP is fine + with os_helper.temp_dir() as tmpdir: + script = os.path.join(tmpdir, "pythonstartup.py") + with open(script, "w") as f: + f.write("print('from pythonstartup')\n") + + env = os.environ.copy() + env['PYTHONSTARTUP'] = script + env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".pythonhist") + p = make_repl(env) + p.stdin.write("1/0") + output = kill_python(p) + expected = dedent(""" + Traceback (most recent call last): + File "", line 1, in + 1/0 + ~^~ + ZeroDivisionError: division by zero + """) + self.assertIn("from pythonstartup", output) + self.assertIn(expected, output) + + # case 2: error in PYTHONSTARTUP triggered by user input + with os_helper.temp_dir() as tmpdir: + script = os.path.join(tmpdir, "pythonstartup.py") + with open(script, "w") as f: + f.write("def foo():\n 1/0\n") + + env = os.environ.copy() + env['PYTHONSTARTUP'] = script + env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".pythonhist") + p = make_repl(env) + p.stdin.write('foo()') + output = kill_python(p) + expected = dedent(""" + Traceback (most recent call last): + File "", line 1, in + foo() + ~~~^^ + File "%s", line 2, in foo + 1/0 + ~^~ + ZeroDivisionError: division by zero + """) % script + self.assertIn(expected, output) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_runsource_show_syntax_error_location(self): + user_input = dedent("""def f(x, x): ... + """) + p = spawn_repl() + p.stdin.write(user_input) + output = kill_python(p) + expected_lines = [ + ' def f(x, x): ...', + ' ^', + "SyntaxError: duplicate argument 'x' in function definition" + ] + self.assertEqual(output.splitlines()[4:-1], expected_lines) + + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 101 != 0 + def test_interactive_source_is_in_linecache(self): + user_input = dedent(""" + def foo(x): + return x + 1 + + def bar(x): + return foo(x) + 2 + """) + p = spawn_repl() + p.stdin.write(user_input) + user_input2 = dedent(""" + import linecache + print(linecache._interactive_cache[linecache._make_key(foo.__code__)]) + """) + p.stdin.write(user_input2) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + expected = "(30, None, [\'def foo(x):\\n\', \' return x + 1\\n\', \'\\n\'], \'\')" + self.assertIn(expected, output, expected) + + def test_asyncio_repl_reaches_python_startup_script(self): + with os_helper.temp_dir() as tmpdir: + script = os.path.join(tmpdir, "pythonstartup.py") + with open(script, "w") as f: + f.write("print('pythonstartup done!')\n") + env = os.environ.copy() + env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".asyncio_history") + env["PYTHONSTARTUP"] = script + p = spawn_asyncio_repl(isolated=False, env=env) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + self.assertIn("pythonstartup done!", output) + + def test_asyncio_repl_respects_isolated_mode(self): + with os_helper.temp_dir() as tmpdir: + script = os.path.join(tmpdir, "pythonstartup.py") + with open(script, "w") as f: + f.write("print('should not print')\n") + env = os.environ.copy() + env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".asyncio_history") + env["PYTHONSTARTUP"] = script + p = spawn_asyncio_repl(isolated=True, env=env) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + self.assertNotIn("should not print", output) + + @unittest.skipUnless(pty, "requires pty") + def test_asyncio_repl_is_ok(self): + m, s = pty.openpty() + cmd = [sys.executable, "-I", "-m", "asyncio"] + env = os.environ.copy() + proc = subprocess.Popen( + cmd, + stdin=s, + stdout=s, + stderr=s, + text=True, + close_fds=True, + env=env, + ) + os.close(s) + os.write(m, b"await asyncio.sleep(0)\n") + os.write(m, b"exit()\n") + output = [] + while select.select([m], [], [], SHORT_TIMEOUT)[0]: + try: + data = os.read(m, 1024).decode("utf-8") + if not data: + break + except OSError: + break + output.append(data) + os.close(m) + try: + exit_code = proc.wait(timeout=SHORT_TIMEOUT) + except subprocess.TimeoutExpired: + proc.kill() + exit_code = proc.wait() + + self.assertEqual(exit_code, 0, "".join(output)) + + +@support.force_not_colorized_test_class +class TestInteractiveModeSyntaxErrors(unittest.TestCase): + + @unittest.expectedFailure # TODO: RUSTPYTHON; ValueError: Process didn't exit properly. + def test_interactive_syntax_error_correct_line(self): + output = run_on_interactive_mode(dedent("""\ + def f(): + print(0) + return yield 42 + """)) + + traceback_lines = output.splitlines()[-4:-1] + expected_lines = [ + ' return yield 42', + ' ^^^^^', + 'SyntaxError: invalid syntax' + ] + self.assertEqual(traceback_lines, expected_lines) + + +class TestAsyncioREPL(unittest.TestCase): + def test_multiple_statements_fail_early(self): + user_input = "1 / 0; print(f'afterwards: {1+1}')" + p = spawn_asyncio_repl() + p.stdin.write(user_input) + output = kill_python(p) + self.assertIn("ZeroDivisionError", output) + self.assertNotIn("afterwards: 2", output) + + def test_toplevel_contextvars_sync(self): + user_input = dedent("""\ + from contextvars import ContextVar + var = ContextVar("var", default="failed") + var.set("ok") + """) + p = spawn_asyncio_repl() + p.stdin.write(user_input) + user_input2 = dedent(""" + print(f"toplevel contextvar test: {var.get()}") + """) + p.stdin.write(user_input2) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + expected = "toplevel contextvar test: ok" + self.assertIn(expected, output, expected) + + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 101 != 0 + def test_toplevel_contextvars_async(self): + user_input = dedent("""\ + from contextvars import ContextVar + var = ContextVar('var', default='failed') + """) + p = spawn_asyncio_repl() + p.stdin.write(user_input) + user_input2 = "async def set_var(): var.set('ok')\n" + p.stdin.write(user_input2) + user_input3 = "await set_var()\n" + p.stdin.write(user_input3) + user_input4 = "print(f'toplevel contextvar test: {var.get()}')\n" + p.stdin.write(user_input4) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + expected = "toplevel contextvar test: ok" + self.assertIn(expected, output, expected) + + def test_quiet_mode(self): + p = spawn_repl("-q", "-m", "asyncio", custom=True) + output = kill_python(p) + self.assertEqual(p.returncode, 0) + self.assertEqual(output[:3], ">>>") + + @support.force_not_colorized + @support.subTests( + ("startup_code", "expected_error"), + [ + ("some invalid syntax\n", "SyntaxError: invalid syntax"), + ("1/0\n", "ZeroDivisionError: division by zero"), + ], + ) + def test_pythonstartup_failure(self, startup_code, expected_error): + startup_env = self.enterContext( + temp_pythonstartup(source=startup_code, histfile=".asyncio_history")) + + p = spawn_repl( + "-qm", "asyncio", + env=os.environ | startup_env, + isolated=False, + custom=True) + p.stdin.write("print('user code', 'executed')\n") + output = kill_python(p) + self.assertEqual(p.returncode, 0) + + tb_hint = f'File "{startup_env["PYTHONSTARTUP"]}", line 1' + self.assertIn(tb_hint, output) + self.assertIn(expected_error, output) + + self.assertIn("user code executed", output) + if __name__ == "__main__": unittest.main()