|
3 | 3 | import itertools
|
4 | 4 | import json
|
5 | 5 | import os
|
| 6 | +import re |
6 | 7 | import signal
|
7 | 8 | import socket
|
8 | 9 | import subprocess
|
|
12 | 13 | import threading
|
13 | 14 | import unittest
|
14 | 15 | import unittest.mock
|
15 |
| -from contextlib import closing, contextmanager, redirect_stdout, ExitStack |
| 16 | +from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack |
16 | 17 | from pathlib import Path
|
17 |
| -from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT |
| 18 | +from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT |
18 | 19 | from test.support.os_helper import temp_dir, TESTFN, unlink
|
19 | 20 | from typing import Dict, List, Optional, Tuple, Union, Any
|
20 | 21 |
|
@@ -1431,5 +1432,152 @@ def test_multi_line_commands(self):
|
1431 | 1432 | self.assertIn("Function returned: 42", stdout)
|
1432 | 1433 | self.assertEqual(process.returncode, 0)
|
1433 | 1434 |
|
| 1435 | + |
| 1436 | +def _supports_remote_attaching(): |
| 1437 | + from contextlib import suppress |
| 1438 | + PROCESS_VM_READV_SUPPORTED = False |
| 1439 | + |
| 1440 | + try: |
| 1441 | + from _remote_debugging import PROCESS_VM_READV_SUPPORTED |
| 1442 | + except ImportError: |
| 1443 | + pass |
| 1444 | + |
| 1445 | + return PROCESS_VM_READV_SUPPORTED |
| 1446 | + |
| 1447 | + |
| 1448 | +@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") |
| 1449 | +@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", |
| 1450 | + "Test only runs on Linux, Windows and MacOS") |
| 1451 | +@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), |
| 1452 | + "Testing on Linux requires process_vm_readv support") |
| 1453 | +@cpython_only |
| 1454 | +@requires_subprocess() |
| 1455 | +class PdbAttachTestCase(unittest.TestCase): |
| 1456 | + def setUp(self): |
| 1457 | + # Create a server socket that will wait for the debugger to connect |
| 1458 | + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 1459 | + self.sock.bind(('127.0.0.1', 0)) # Let OS assign port |
| 1460 | + self.sock.listen(1) |
| 1461 | + self.port = self.sock.getsockname()[1] |
| 1462 | + self._create_script() |
| 1463 | + |
| 1464 | + def _create_script(self, script=None): |
| 1465 | + # Create a file for subprocess script |
| 1466 | + script = textwrap.dedent( |
| 1467 | + f""" |
| 1468 | + import socket |
| 1469 | + import time |
| 1470 | +
|
| 1471 | + def foo(): |
| 1472 | + return bar() |
| 1473 | +
|
| 1474 | + def bar(): |
| 1475 | + return baz() |
| 1476 | +
|
| 1477 | + def baz(): |
| 1478 | + x = 1 |
| 1479 | + # Trigger attach |
| 1480 | + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 1481 | + sock.connect(('127.0.0.1', {self.port})) |
| 1482 | + sock.close() |
| 1483 | + count = 0 |
| 1484 | + while x == 1 and count < 100: |
| 1485 | + count += 1 |
| 1486 | + time.sleep(0.1) |
| 1487 | + return x |
| 1488 | +
|
| 1489 | + result = foo() |
| 1490 | + print(f"Function returned: {{result}}") |
| 1491 | + """ |
| 1492 | + ) |
| 1493 | + |
| 1494 | + self.script_path = TESTFN + "_connect_test.py" |
| 1495 | + with open(self.script_path, 'w') as f: |
| 1496 | + f.write(script) |
| 1497 | + |
| 1498 | + def tearDown(self): |
| 1499 | + self.sock.close() |
| 1500 | + try: |
| 1501 | + unlink(self.script_path) |
| 1502 | + except OSError: |
| 1503 | + pass |
| 1504 | + |
| 1505 | + def do_integration_test(self, client_stdin): |
| 1506 | + process = subprocess.Popen( |
| 1507 | + [sys.executable, self.script_path], |
| 1508 | + stdout=subprocess.PIPE, |
| 1509 | + stderr=subprocess.PIPE, |
| 1510 | + text=True |
| 1511 | + ) |
| 1512 | + self.addCleanup(process.stdout.close) |
| 1513 | + self.addCleanup(process.stderr.close) |
| 1514 | + |
| 1515 | + # Wait for the process to reach our attachment point |
| 1516 | + self.sock.settimeout(10) |
| 1517 | + conn, _ = self.sock.accept() |
| 1518 | + conn.close() |
| 1519 | + |
| 1520 | + client_stdin = io.StringIO(client_stdin) |
| 1521 | + client_stdout = io.StringIO() |
| 1522 | + client_stderr = io.StringIO() |
| 1523 | + |
| 1524 | + self.addCleanup(client_stdin.close) |
| 1525 | + self.addCleanup(client_stdout.close) |
| 1526 | + self.addCleanup(client_stderr.close) |
| 1527 | + self.addCleanup(process.wait) |
| 1528 | + |
| 1529 | + with ( |
| 1530 | + unittest.mock.patch("sys.stdin", client_stdin), |
| 1531 | + redirect_stdout(client_stdout), |
| 1532 | + redirect_stderr(client_stderr), |
| 1533 | + unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), |
| 1534 | + ): |
| 1535 | + try: |
| 1536 | + pdb.main() |
| 1537 | + except PermissionError: |
| 1538 | + self.skipTest("Insufficient permissions for remote execution") |
| 1539 | + |
| 1540 | + process.wait() |
| 1541 | + server_stdout = process.stdout.read() |
| 1542 | + server_stderr = process.stderr.read() |
| 1543 | + |
| 1544 | + if process.returncode != 0: |
| 1545 | + print("server failed") |
| 1546 | + print(f"server stdout:\n{server_stdout}") |
| 1547 | + print(f"server stderr:\n{server_stderr}") |
| 1548 | + |
| 1549 | + self.assertEqual(process.returncode, 0) |
| 1550 | + return { |
| 1551 | + "client": { |
| 1552 | + "stdout": client_stdout.getvalue(), |
| 1553 | + "stderr": client_stderr.getvalue(), |
| 1554 | + }, |
| 1555 | + "server": { |
| 1556 | + "stdout": server_stdout, |
| 1557 | + "stderr": server_stderr, |
| 1558 | + }, |
| 1559 | + } |
| 1560 | + |
| 1561 | + def test_attach_to_process_without_colors(self): |
| 1562 | + with force_color(False): |
| 1563 | + output = self.do_integration_test("ll\nx=42\n") |
| 1564 | + self.assertEqual(output["client"]["stderr"], "") |
| 1565 | + self.assertEqual(output["server"]["stderr"], "") |
| 1566 | + |
| 1567 | + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") |
| 1568 | + self.assertIn("while x == 1", output["client"]["stdout"]) |
| 1569 | + self.assertNotIn("\x1b", output["client"]["stdout"]) |
| 1570 | + |
| 1571 | + def test_attach_to_process_with_colors(self): |
| 1572 | + with force_color(True): |
| 1573 | + output = self.do_integration_test("ll\nx=42\n") |
| 1574 | + self.assertEqual(output["client"]["stderr"], "") |
| 1575 | + self.assertEqual(output["server"]["stderr"], "") |
| 1576 | + |
| 1577 | + self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") |
| 1578 | + self.assertIn("\x1b", output["client"]["stdout"]) |
| 1579 | + self.assertNotIn("while x == 1", output["client"]["stdout"]) |
| 1580 | + self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) |
| 1581 | + |
1434 | 1582 | if __name__ == "__main__":
|
1435 | 1583 | unittest.main()
|
0 commit comments