diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py index dd327d6990234c..38cf6b5a08e892 100644 --- a/Lib/_pyrepl/utils.py +++ b/Lib/_pyrepl/utils.py @@ -23,9 +23,9 @@ BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')} -def THEME(): +def THEME(**kwargs): # Not cached: the user can modify the theme inside the interactive session. - return _colorize.get_theme().syntax + return _colorize.get_theme(**kwargs).syntax class Span(NamedTuple): @@ -254,7 +254,10 @@ def is_soft_keyword_used(*tokens: TI | None) -> bool: def disp_str( - buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0 + buffer: str, + colors: list[ColorSpan] | None = None, + start_index: int = 0, + force_color: bool = False, ) -> tuple[CharBuffer, CharWidths]: r"""Decompose the input buffer into a printable variant with applied colors. @@ -295,7 +298,7 @@ def disp_str( # move past irrelevant spans colors.pop(0) - theme = THEME() + theme = THEME(force_color=force_color) pre_color = "" post_color = "" if colors and colors[0].span.start < start_index: diff --git a/Lib/pdb.py b/Lib/pdb.py index 4efda171b7a813..f89d104fcddb9a 100644 --- a/Lib/pdb.py +++ b/Lib/pdb.py @@ -1069,7 +1069,7 @@ def handle_command_def(self, line): def _colorize_code(self, code): if self.colorize: colors = list(_pyrepl.utils.gen_colors(code)) - chars, _ = _pyrepl.utils.disp_str(code, colors=colors) + chars, _ = _pyrepl.utils.disp_str(code, colors=colors, force_color=True) code = "".join(chars) return code diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index 7c77bedb766c53..aef8a6b0129092 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -3,6 +3,7 @@ import itertools import json import os +import re import signal import socket import subprocess @@ -12,9 +13,9 @@ import threading import unittest import unittest.mock -from contextlib import closing, contextmanager, redirect_stdout, ExitStack +from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack from pathlib import Path -from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT +from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT from test.support.os_helper import temp_dir, TESTFN, unlink from typing import Dict, List, Optional, Tuple, Union, Any @@ -1431,5 +1432,152 @@ def test_multi_line_commands(self): self.assertIn("Function returned: 42", stdout) self.assertEqual(process.returncode, 0) + +def _supports_remote_attaching(): + from contextlib import suppress + PROCESS_VM_READV_SUPPORTED = False + + try: + from _remote_debugging import PROCESS_VM_READV_SUPPORTED + except ImportError: + pass + + return PROCESS_VM_READV_SUPPORTED + + +@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") +@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", + "Test only runs on Linux, Windows and MacOS") +@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), + "Testing on Linux requires process_vm_readv support") +@cpython_only +@requires_subprocess() +class PdbAttachTestCase(unittest.TestCase): + def setUp(self): + # Create a server socket that will wait for the debugger to connect + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('127.0.0.1', 0)) # Let OS assign port + self.sock.listen(1) + self.port = self.sock.getsockname()[1] + self._create_script() + + def _create_script(self, script=None): + # Create a file for subprocess script + script = textwrap.dedent( + f""" + import socket + import time + + def foo(): + return bar() + + def bar(): + return baz() + + def baz(): + x = 1 + # Trigger attach + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', {self.port})) + sock.close() + count = 0 + while x == 1 and count < 100: + count += 1 + time.sleep(0.1) + return x + + result = foo() + print(f"Function returned: {{result}}") + """ + ) + + self.script_path = TESTFN + "_connect_test.py" + with open(self.script_path, 'w') as f: + f.write(script) + + def tearDown(self): + self.sock.close() + try: + unlink(self.script_path) + except OSError: + pass + + def do_integration_test(self, client_stdin): + process = subprocess.Popen( + [sys.executable, self.script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.addCleanup(process.stdout.close) + self.addCleanup(process.stderr.close) + + # Wait for the process to reach our attachment point + self.sock.settimeout(10) + conn, _ = self.sock.accept() + conn.close() + + client_stdin = io.StringIO(client_stdin) + client_stdout = io.StringIO() + client_stderr = io.StringIO() + + self.addCleanup(client_stdin.close) + self.addCleanup(client_stdout.close) + self.addCleanup(client_stderr.close) + self.addCleanup(process.wait) + + with ( + unittest.mock.patch("sys.stdin", client_stdin), + redirect_stdout(client_stdout), + redirect_stderr(client_stderr), + unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), + ): + try: + pdb.main() + except PermissionError: + self.skipTest("Insufficient permissions for remote execution") + + process.wait() + server_stdout = process.stdout.read() + server_stderr = process.stderr.read() + + if process.returncode != 0: + print("server failed") + print(f"server stdout:\n{server_stdout}") + print(f"server stderr:\n{server_stderr}") + + self.assertEqual(process.returncode, 0) + return { + "client": { + "stdout": client_stdout.getvalue(), + "stderr": client_stderr.getvalue(), + }, + "server": { + "stdout": server_stdout, + "stderr": server_stderr, + }, + } + + def test_attach_to_process_without_colors(self): + with force_color(False): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("while x == 1", output["client"]["stdout"]) + self.assertNotIn("\x1b", output["client"]["stdout"]) + + def test_attach_to_process_with_colors(self): + with force_color(True): + output = self.do_integration_test("ll\nx=42\n") + self.assertEqual(output["client"]["stderr"], "") + self.assertEqual(output["server"]["stderr"], "") + + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") + self.assertIn("\x1b", output["client"]["stdout"]) + self.assertNotIn("while x == 1", output["client"]["stdout"]) + self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) + if __name__ == "__main__": unittest.main()