8000 gh-131591: Add tests for _PdbClient by godlygeek · Pull Request #132976 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-131591: Add tests for _PdbClient #132976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 30, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert accidental formatting changes
  • Loading branch information
godlygeek committed Apr 28, 2025
commit 102404c8f203072d8289ce3fdfa63d96d4fdf206
137 changes: 66 additions & 71 deletions Lib/test/test_remote_pdb.py
< 8000 td class="blob-num blob-num-deletion empty-cell">
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


if not sys.is_remote_debug_enabled():
raise unittest.SkipTest("remote debugging is disabled")
raise unittest.SkipTest('remote debugging is disabled')


@contextmanager
Expand Down Expand Up @@ -782,8 +782,8 @@ def setUp(self):
self.pdb.quitting = False

# Create a frame for testing
self.test_globals = {"a": 1, "b": 2, "__pdb_convenience_variables": {"x": 100}}
self.test_locals = {"c": 3, "d": 4}
self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}}
self.test_locals = {'c': 3, 'd': 4}

# Create a simple test frame
frame_info = unittest.mock.Mock()
Expand Down Expand Up @@ -825,13 +825,18 @@ def test_read_command_EOF(self):
def test_completion(self):
"""Test handling completion requests."""
# Mock completenames to return specific values
with unittest.mock.patch.object(
self.pdb, "completenames", return_value=["continue", "clear"]
):
with unittest.mock.patch.object(self.pdb, 'completenames',
return_value=["continue", "clear"]):

# Add a completion request
self.sockfile.add_input(
{"complete": {"text": "c", "line": "c", "begidx": 0, "endidx": 1}}
)
self.sockfile.add_input({
"complete": {
"text": "c",
"line": "c",
"begidx": 0,
"endidx": 1
}
})

# Add a regular command to break the loop
self.sockfile.add_input({"reply": "help"})
Expand Down Expand Up @@ -865,7 +870,7 @@ def test_interact_mode(self):
self.assertIsInstance(self.pdb._interact_state, dict)

# Test running code in interact mode
with unittest.mock.patch.object(self.pdb, "_error_exc") as mock_error:
with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error:
self.pdb._run_in_python_repl("print('test')")
mock_error.assert_not_called()

Expand All @@ -876,7 +881,7 @@ def test_interact_mode(self):
def test_registering_commands(self):
"""Test registering breakpoint commands."""
# Mock get_bpbynumber
with unittest.mock.patch.object(self.pdb, "get_bpbynumber"):
with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'):
# Queue up some input to send
self.sockfile.add_input({"reply": "commands 1"})
self.sockfile.add_input({"reply": "silent"})
Expand All @@ -888,7 +893,7 @@ def test_registering_commands(self):
self.pdb.cmdloop()

outputs = self.sockfile.get_output()
self.assertIn("command_list", outputs[0])
self.assertIn('command_list', outputs[0])
self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"})
self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"})
self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"})
Expand All @@ -904,19 +909,17 @@ def test_registering_commands(self):

def test_detach(self):
"""Test the detach method."""
with unittest.mock.patch.object(self.sockfile, "close") as mock_close:
with unittest.mock.patch.object(self.sockfile, 'close') as mock_close:
self.pdb.detach()
mock_close.assert_called_once()
self.assertFalse(self.pdb.quitting)

def test_cmdloop(self):
"""Test the command loop with various commands."""
# Mock onecmd to track command execution
with unittest.mock.patch.object(
self.pdb, "onecmd", return_value=False
) as mock_onecmd:
with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd:
# Add commands to the queue
self.pdb.cmdqueue = ["help", "list"]
self.pdb.cmdqueue = ['help', 'list']

# Add a command from the socket for when cmdqueue is empty
self.sockfile.add_input({"reply": "next"})
Expand All @@ -926,24 +929,23 @@ def test_cmdloop(self):

# Configure onecmd to exit the loop on "quit"
def side_effect(line):
return line == "quit"

return line == 'quit'
mock_onecmd.side_effect = side_effect

# Run the command loop
self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace()
self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace()
self.pdb.cmdloop()

# Should have processed 4 commands: 2 from cmdqueue, 2 from socket
self.assertEqual(mock_onecmd.call_count, 4)
mock_onecmd.assert_any_call("help")
mock_onecmd.assert_any_call("list")
mock_onecmd.assert_any_call("next")
mock_onecmd.assert_any_call("quit")
mock_onecmd.assert_any_call('help')
mock_onecmd.assert_any_call('list')
mock_onecmd.assert_any_call('next')
mock_onecmd.assert_any_call('quit')

# Check if prompt was sent to client
outputs = self.sockfile.get_output()
prompts = [o for o in outputs if "prompt" in o]
prompts = [o for o in outputs if 'prompt' in o]
self.assertEqual(len(prompts), 2) # Should have sent 2 prompts


Expand All @@ -954,7 +956,7 @@ class PdbConnectTestCase(unittest.TestCase):
def setUp(self):
# Create a server socket that will wait for the debugger to connect
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.bind(("127.0.0.1", 0)) # Let OS assign port
self.server_sock.bind(('127.0.0.1', 0)) # Let OS assign port
self.server_sock.listen(1)
self.port = self.server_sock.getsockname()[1]

Expand Down Expand Up @@ -995,11 +997,10 @@ def dummy_function():
result = connect_to_debugger()
foo()
print(f"Function returned: {{result}}")
"""
)
""")

self.script_path = TESTFN + "_connect_test.py"
with open(self.script_path, "w") as f:
with open(self.script_path, 'w') as f:
f.write(script)

def tearDown(self):
Expand All @@ -1016,12 +1017,12 @@ def _connect_and_get_client_file(self):
[sys.executable, self.script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
text=True
)

# Accept the connection from the subprocess
client_sock, _ = self.server_sock.accept()
client_file = client_sock.makefile("rwb")
client_file = client_sock.makefile('rwb')
self.addCleanup(client_file.close)
self.addCleanup(client_sock.close)

Expand All @@ -1036,7 +1037,7 @@ def _read_until_prompt(self, client_file):
break
msg = json.loads(data.decode())
messages.append(msg)
if "prompt" in msg:
if 'prompt' in msg:
break
return messages

Expand All @@ -1049,12 +1050,12 @@ def _send_interrupt(self, pid):
"""Helper to send an interrupt signal to the debugger."""
# with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
interrupt_script = TESTFN + "_interrupt_script.py"
with open(interrupt_script, "w") as f:
with open(interrupt_script, 'w') as f:
f.write(
"import pdb, sys\n"
'import pdb, sys\n'
'print("Hello, world!")\n'
"if inst := pdb.Pdb._last_pdb_instance:\n"
" inst.set_trace(sys._getframe(1))\n"
'if inst := pdb.Pdb._last_pdb_instance:\n'
' inst.set_trace(sys._getframe(1))\n'
)
self.addCleanup(unlink, interrupt_script)
try:
Expand All @@ -1071,19 +1072,19 @@ def test_connect_and_basic_commands(self):
# We should receive initial data from the debugger
data = client_file.readline()
initial_data = json.loads(data.decode())
self.assertIn("message", initial_data)
self.assertIn("pdb._connect", initial_data["message"])
self.assertIn('message', initial_data)
self.assertIn('pdb._connect', initial_data['message'])

# First, look for command_list message
data = client_file.readline()
command_list = json.loads(data.decode())
self.assertIn("command_list", command_list)
self.assertIn('command_list', command_list)

# Then, look for the first prompt
data = client_file.readline()
prompt_data = json.loads(data.decode())
self.assertIn("prompt", prompt_data)
self.assertEqual(prompt_data["state"], "pdb")
self.assertIn('prompt', prompt_data)
self.assertEqual(prompt_data['state'], 'pdb')

# Send 'bt' (backtrace) command
self._send_command(client_file, "bt")
Expand All @@ -1092,11 +1093,8 @@ def test_connect_and_basic_commands(self):
messages = self._read_until_prompt(client_file)

# Extract text messages containing stack info
text_msg = [
msg["message"]
for msg in messages
if "message" in msg and "connect_to_debugger" in msg["message"]
]
text_msg = [msg['message'] for msg in messages
if 'message' in msg and 'connect_to_debugger' in msg['message']]
got_stack_info = bool(text_msg)

expected_stacks = [
Expand All @@ -1107,9 +1105,7 @@ def test_connect_and_basic_commands(self):
for stack, msg in zip(expected_stacks, text_msg, strict=True):
self.assertIn(stack, msg)

self.assertTrue(
got_stack_info, "Should have received stack trace information"
)
self.assertTrue(got_stack_info, "Should have received stack trace information")

# Send 'c' (continue) command to let the program finish
self._send_command(client_file, "c")
Expand All @@ -1132,32 +1128,28 @@ def test_breakpoints(self):
# Set a breakpoint at the return statement
self._send_command(client_file, "break bar")
messages = self._read_until_prompt(client_file)
bp_msg = next(msg["message"] for msg in messages if "message" in msg)
bp_msg = next(msg['message'] for msg in messages if 'message' in msg)
self.assertIn("Breakpoint", bp_msg)

# Continue execution until breakpoint
self._send_command(client_file, "c")
messages = self._read_until_prompt(client_file)

# Verify we hit the breakpoint
hit_msg = next(msg["message"] for msg in messages if "message" in msg)
hit_msg = next(msg['message'] for msg in messages if 'message' in msg)
self.assertIn("bar()", hit_msg)

# Check breakpoint list
self._send_command(client_file, "b")
messages = self._read_until_prompt(client_file)
list_msg = next(
msg["message"] for msg in reversed(messages) if "message" in msg
)
list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
self.assertIn("1 breakpoint", list_msg)
self.assertIn("breakpoint already hit 1 time", list_msg)

# Clear breakpoint
self._send_command(client_file, "clear 1")
messages = self._read_until_prompt(client_file)
clear_msg = next(
msg["message"] for msg in reversed(messages) if "message" in msg
)
clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
self.assertIn("Deleted breakpoint", clear_msg)

# Continue to end
Expand Down Expand Up @@ -1217,7 +1209,7 @@ def bar():
messages = self._read_until_prompt(client_file)

# Verify we got the keyboard interrupt message.
interrupt_msgs = [msg["message"] for msg in messages if "message" in msg]
interrupt_msgs = [msg['message'] for msg in messages if 'message' in msg]
expected_msg = [msg for msg in interrupt_msgs if "bar()" in msg]
self.assertGreater(len(expected_msg), 0)

Expand Down Expand Up @@ -1252,7 +1244,7 @@ def test_handle_eof(self):
def test_protocol_version(self):
"""Test that incompatible protocol versions are properly detected."""
# Create a script using an incompatible protocol version
script = textwrap.dedent(f"""
script = textwrap.dedent(f'''
import sys
import pdb

Expand All @@ -1277,7 +1269,7 @@ def run_test():

if __name__ == "__main__":
print("Test result:", run_test())
""")
''')
self._create_script(script=script)
process, client_file = self._connect_and_get_client_file()

Expand All @@ -1286,10 +1278,10 @@ def run_test():
data = client_file.readline()
message = json.loads(data.decode())

self.assertIn("message", message)
self.assertEqual(message["type"], "error")
self.assertIn("incompatible", message["message"])
self.assertIn("protocol version", message["message"])
self.assertIn('message', message)
self.assertEqual(message['type'], 'error')
self.assertIn('incompatible', message['message'])
self.assertIn('protocol version', message['message'])

# The process should complete normally
stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT)
Expand Down Expand Up @@ -1318,15 +1310,15 @@ def test_help_system(self):
data = client_file.readline()
message = json.loads(data.decode())

self.assertIn("help", message)
self.assertIn('help', message)

if cmd == "help":
# Should just contain the command itself
self.assertEqual(message["help"], "")
self.assertEqual(message['help'], "")
else:
# Should contain the specific command we asked for help with
command = cmd.split()[1]
self.assertEqual(message["help"], command)
self.assertEqual(message['help'], command)

# Skip to the next prompt
self._read_until_prompt(client_file)
Expand All @@ -1351,14 +1343,18 @@ def test_multi_line_commands(self):
multi_line_commands = [
# Define a function
"def test_func():\n return 42",

# For loop
"for i in range(3):\n print(i)",

# If statement
"if True:\n x = 42\nelse:\n x = 0",

# Try/except
"try:\n result = 10/2\n print(result)\nexcept ZeroDivisionError:\n print('Error')",

# Class definition
"class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value",
"class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value"
]

for cmd in multi_line_commands:
Expand All @@ -1370,7 +1366,7 @@ def test_multi_line_commands(self):
messages = self._read_until_prompt(client_file)

# Find the result message
result_msg = next(msg["message"] for msg in messages if "message" in msg)
result_msg = next(msg['message'] for msg in messages if 'message' in msg)
self.assertIn("42", result_msg)

# Test creating an instance of the defined class
Expand All @@ -1382,7 +1378,7 @@ def test_multi_line_commands(self):
messages = self._read_until_prompt(client_file)

# Find the result message
result_msg = next(msg["message"] for msg in messages if "message" in msg)
result_msg = next(msg['message'] for msg in messages if 'message' in msg)
self.assertIn("100", result_msg)

# Continue execution to finish
Expand All @@ -1392,6 +1388,5 @@ def test_multi_line_commands(self):
self.assertIn("Function returned: 42", stdout)
self.assertEqual(process.returncode, 0)


if __name__ == "__main__":
unittest.main()
Loading
0