diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 762d53eeb2df1a..181612b4e171ab 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -99,7 +99,9 @@ PEP 768: Safe external debugger interface for CPython :pep:`768` introduces a zero-overhead debugging interface that allows debuggers and profilers to safely attach to running Python processes. This is a significant enhancement to Python's -debugging capabilities allowing debuggers to forego unsafe alternatives. +debugging capabilities allowing debuggers to forego unsafe alternatives. See +:ref:`below ` for how this feature is leveraged to +implement the new :mod:`pdb` module's remote attaching capabilities. The new interface provides safe execution points for attaching debugger code without modifying the interpreter's normal execution path or adding runtime overhead. This enables tools to @@ -149,6 +151,32 @@ See :pep:`768` for more details. (Contributed by Pablo Galindo Salgado, Matt Wozniski, and Ivona Stojanovic in :gh:`131591`.) + +.. _whatsnew314-remote-pdb: + +Remote attaching to a running Python process with PDB +----------------------------------------------------- + +The :mod:`pdb` module now supports remote attaching to a running Python process +using a new ``-p PID`` command-line option: + +.. code-block:: sh + + python -m pdb -p 1234 + +This will connect to the Python process with the given PID and allow you to +debug it interactively. Notice that due to how the Python interpreter works +attaching to a remote process that is blocked in a system call or waiting for +I/O will only work once the next bytecode instruction is executed or when the +process receives a signal. + +This feature leverages :pep:`768` and the :func:`sys.remote_exec` function +to attach to the remote process and send the PDB commands to it. + + +(Contributed by Matt Wozniski and Pablo Galindo in :gh:`131591`.) + + .. _whatsnew314-pep758: PEP 758 – Allow except and except* expressions without parentheses diff --git a/Lib/pdb.py b/Lib/pdb.py index 160a7043a30c55..328599d17e69d2 100644 --- a/Lib/pdb.py +++ b/Lib/pdb.py @@ -74,13 +74,19 @@ import dis import code import glob +import json import token import types import codeop import pprint import signal +import socket +import typing import asyncio import inspect +import weakref +import builtins +import tempfile import textwrap import tokenize import itertools @@ -88,6 +94,7 @@ import linecache import _colorize +from contextlib import closing from contextlib import contextmanager from rlcompleter import Completer from types import CodeType @@ -918,7 +925,7 @@ def handle_command_def(self, line): if cmd == 'end': return True # end of cmd list elif cmd == 'EOF': - print('') + self.message('') return True # end of cmd list cmdlist = self.commands[self.commands_bnum] if cmd == 'silent': @@ -1458,6 +1465,13 @@ def do_ignore(self, arg): complete_ignore = _complete_bpnumber + def _prompt_for_confirmation(self, prompt, default): + try: + reply = input(prompt) + except EOFError: + reply = default + return reply.strip().lower() + def do_clear(self, arg): """cl(ear) [filename:lineno | bpnumber ...] @@ -1467,11 +1481,10 @@ def do_clear(self, arg): clear all breaks at that line in that file. """ if not arg: - try: - reply = input('Clear all breaks? ') - except EOFError: - reply = 'no' - reply = reply.strip().lower() + reply = self._prompt_for_confirmation( + 'Clear all breaks? ', + default='no', + ) if reply in ('y', 'yes'): bplist = [bp for bp in bdb.Breakpoint.bpbynumber if bp] self.clear_all_breaks() @@ -1775,6 +1788,9 @@ def do_jump(self, arg): self.error('Jump failed: %s' % e) do_j = do_jump + def _create_recursive_debugger(self): + return Pdb(self.completekey, self.stdin, self.stdout) + def do_debug(self, arg): """debug code @@ -1788,7 +1804,7 @@ def do_debug(self, arg): self.stop_trace() globals = self.curframe.f_globals locals = self.curframe.f_locals - p = Pdb(self.completekey, self.stdin, self.stdout) + p = self._create_recursive_debugger() p.prompt = "(%s) " % self.prompt.strip() self.message("ENTERING RECURSIVE DEBUGGER") try: @@ -2491,6 +2507,581 @@ def set_trace(*, header=None, commands=None): pdb.message(header) pdb.set_trace(sys._getframe().f_back, commands=commands) +# Remote PDB + +class _PdbServer(Pdb): + def __init__(self, sockfile, owns_sockfile=True, **kwargs): + self._owns_sockfile = owns_sockfile + self._interact_state = None + self._sockfile = sockfile + self._command_name_cache = [] + self._write_failed = False + super().__init__(**kwargs) + + @staticmethod + def protocol_version(): + # By default, assume a client and server are compatible if they run + # the same Python major.minor version. We'll try to keep backwards + # compatibility between patch versions of a minor version if possible. + # If we do need to change the protocol in a patch version, we'll change + # `revision` to the patch version where the protocol changed. + # We can ignore compatibility for pre-release versions; sys.remote_exec + # can't attach to a pre-release version except from that same version. + v = sys.version_info + revision = 0 + return int(f"{v.major:02X}{v.minor:02X}{revision:02X}F0", 16) + + def _ensure_valid_message(self, msg): + # Ensure the message conforms to our protocol. + # If anything needs to be changed here for a patch release of Python, + # the 'revision' in protocol_version() should be updated. + match msg: + case {"message": str(), "type": str()}: + # Have the client show a message. The client chooses how to + # format the message based on its type. The currently defined + # types are "info" and "error". If a message has a type the + # client doesn't recognize, it must be treated as "info". + pass + case {"help": str()}: + # Have the client show the help for a given argument. + pass + case {"prompt": str(), "state": str()}: + # Have the client display the given prompt and wait for a reply + # from the user. If the client recognizes the state it may + # enable mode-specific features like multi-line editing. + # If it doesn't recognize the state it must prompt for a single + # line only and send it directly to the server. A server won't + # progress until it gets a "reply" or "signal" message, but can + # process "complete" requests while waiting for the reply. + pass + case { + "completions": list(completions) + } if all(isinstance(c, str) for c in completions): + # Return valid completions for a client's "complete" request. + pass + case { + "command_list": list(command_list) + } if all(isinstance(c, str) for c in command_list): + # Report the list of legal PDB commands to the client. + # Due to aliases this list is not static, but the client + # needs to know it for multi-line editing. + pass + case _: + raise AssertionError( + f"PDB message doesn't follow the schema! {msg}" + ) + + def _send(self, **kwargs): + self._ensure_valid_message(kwargs) + json_payload = json.dumps(kwargs) + try: + self._sockfile.write(json_payload.encode() + b"\n") + self._sockfile.flush() + except OSError: + # This means that the client has abruptly disconnected, but we'll + # handle that the next time we try to read from the client instead + # of trying to handle it from everywhere _send() may be called. + # Track this with a flag rather than assuming readline() will ever + # return an empty string because the socket may be half-closed. + self._write_failed = True + + @typing.override + def message(self, msg, end="\n"): + self._send(message=str(msg) + end, type="info") + + @typing.override + def error(self, msg): + self._send(message=str(msg), type="error") + + def _get_input(self, prompt, state) -> str: + # Before displaying a (Pdb) prompt, send the list of PDB commands + # unless we've already sent an up-to-date list. + if state == "pdb" and not self._command_name_cache: + self._command_name_cache = self.completenames("", "", 0, 0) + self._send(command_list=self._command_name_cache) + self._send(prompt=prompt, state=state) + return self._read_reply() + + def _read_reply(self): + # Loop until we get a 'reply' or 'signal' from the client, + # processing out-of-band 'complete' requests as they arrive. + while True: + if self._write_failed: + raise EOFError + + msg = self._sockfile.readline() + if not msg: + raise EOFError + + try: + payload = json.loads(msg) + except json.JSONDecodeError: + self.error(f"Disconnecting: client sent invalid JSON {msg}") + raise EOFError + + match payload: + case {"reply": str(reply)}: + return reply + case {"signal": str(signal)}: + if signal == "INT": + raise KeyboardInterrupt + elif signal == "EOF": + raise EOFError + else: + self.error( + f"Received unrecognized signal: {signal}" + ) + # Our best hope of recovering is to pretend we + # got an EOF to exit whatever mode we're in. + raise EOFError + case { + "complete": { + "text": str(text), + "line": str(line), + "begidx": int(begidx), + "endidx": int(endidx), + } + }: + items = self._complete_any(text, line, begidx, endidx) + self._send(completions=items) + continue + # Valid JSON, but doesn't meet the schema. + self.error(f"Ignoring invalid message from client: {msg}") + + def _complete_any(self, text, line, begidx, endidx): + if begidx == 0: + return self.completenames(text, line, begidx, endidx) + + cmd = self.parseline(line)[0] + if cmd: + compfunc = getattr(self, "complete_" + cmd, self.completedefault) + else: + compfunc = self.completedefault + return compfunc(text, line, begidx, endidx) + + def cmdloop(self, intro=None): + self.preloop() + if intro is not None: + self.intro = intro + if self.intro: + self.message(str(self.intro)) + stop = None + while not stop: + if self._interact_state is not None: + try: + reply = self._get_input(prompt=">>> ", state="interact") + except KeyboardInterrupt: + # Match how KeyboardInterrupt is handled in a REPL + self.message("\nKeyboardInterrupt") + except EOFError: + self.message("\n*exit from pdb interact command*") + self._interact_state = None + else: + self._run_in_python_repl(reply) + continue + + if not self.cmdqueue: + try: + state = "commands" if self.commands_defining else "pdb" + reply = self._get_input(prompt=self.prompt, state=state) + except EOFError: + reply = "EOF" + + self.cmdqueue.append(reply) + + line = self.cmdqueue.pop(0) + line = self.precmd(line) + stop = self.onecmd(line) + stop = self.postcmd(stop, line) + self.postloop() + + def postloop(self): + super().postloop() + if self.quitting: + self.detach() + + def detach(self): + # Detach the debugger and close the socket without raising BdbQuit + self.quitting = False + if self._owns_sockfile: + # Don't try to reuse this instance, it's not valid anymore. + Pdb._last_pdb_instance = None + try: + self._sockfile.close() + except OSError: + # close() can fail if the connection was broken unexpectedly. + pass + + def do_debug(self, arg): + # Clear our cached list of valid commands; the recursive debugger might + # send its own differing list, and so ours needs to be re-sent. + self._command_name_cache = [] + return super().do_debug(arg) + + def do_alias(self, arg): + # Clear our cached list of valid commands; one might be added. + self._command_name_cache = [] + return super().do_alias(arg) + + def do_unalias(self, arg): + # Clear our cached list of valid commands; one might be removed. + self._command_name_cache = [] + return super().do_unalias(arg) + + def do_help(self, arg): + # Tell the client to render the help, since it might need a pager. + self._send(help=arg) + + do_h = do_help + + def _interact_displayhook(self, obj): + # Like the default `sys.displayhook` except sending a socket message. + if obj is not None: + self.message(repr(obj)) + builtins._ = obj + + def _run_in_python_repl(self, lines): + # Run one 'interact' mode code block against an existing namespace. + assert self._interact_state + save_displayhook = sys.displayhook + try: + sys.displayhook = self._interact_displayhook + code_obj = self._interact_state["compiler"](lines + "\n") + if code_obj is None: + raise SyntaxError("Incomplete command") + exec(code_obj, self._interact_state["ns"]) + except: + self._error_exc() + finally: + sys.displayhook = save_displayhook + + def do_interact(self, arg): + # Prepare to run 'interact' mode code blocks, and trigger the client + # to start treating all input as Python commands, not PDB ones. + self.message("*pdb interact start*") + self._interact_state = dict( + compiler=codeop.CommandCompiler(), + ns={**self.curframe.f_globals, **self.curframe.f_locals}, + ) + + @typing.override + def _create_recursive_debugger(self): + return _PdbServer(self._sockfile, owns_sockfile=False) + + @typing.override + def _prompt_for_confirmation(self, prompt, default): + try: + return self._get_input(prompt=prompt, state="confirm") + except (EOFError, KeyboardInterrupt): + return default + + def do_run(self, arg): + self.error("remote PDB cannot restart the program") + + do_restart = do_run + + def _error_exc(self): + if self._interact_state and isinstance(sys.exception(), SystemExit): + # If we get a SystemExit in 'interact' mode, exit the REPL. + self._interact_state = None + ret = super()._error_exc() + self.message("*exit from pdb interact command*") + return ret + else: + return super()._error_exc() + + def default(self, line): + # Unlike Pdb, don't prompt for more lines of a multi-line command. + # The remote needs to send us the whole block in one go. + try: + candidate = line.removeprefix("!") + "\n" + if codeop.compile_command(candidate, "", "single") is None: + raise SyntaxError("Incomplete command") + return super().default(candidate) + except: + self._error_exc() + + +class _PdbClient: + def __init__(self, pid, sockfile, interrupt_script): + self.pid = pid + self.sockfile = sockfile + self.interrupt_script = interrupt_script + self.pdb_instance = Pdb() + self.pdb_commands = set() + self.completion_matches = [] + self.state = "dumb" + self.write_failed = False + + def _ensure_valid_message(self, msg): + # Ensure the message conforms to our protocol. + # If anything needs to be changed here for a patch release of Python, + # the 'revision' in protocol_version() should be updated. + match msg: + case {"reply": str()}: + # Send input typed by a user at a prompt to the remote PDB. + pass + case {"signal": "EOF"}: + # Tell the remote PDB that the user pressed ^D at a prompt. + pass + case {"signal": "INT"}: + # Tell the remote PDB that the user pressed ^C at a prompt. + pass + case { + "complete": { + "text": str(), + "line": str(), + "begidx": int(), + "endidx": int(), + } + }: + # Ask the remote PDB what completions are valid for the given + # parameters, using readline's completion protocol. + pass + case _: + raise AssertionError( + f"PDB message doesn't follow the schema! {msg}" + ) + + def _send(self, **kwargs): + self._ensure_valid_message(kwargs) + json_payload = json.dumps(kwargs) + try: + self.sockfile.write(json_payload.encode() + b"\n") + self.sockfile.flush() + except OSError: + # This means that the client has abruptly disconnected, but we'll + # handle that the next time we try to read from the client instead + # of trying to handle it from everywhere _send() may be called. + # Track this with a flag rather than assuming readline() will ever + # return an empty string because the socket may be half-closed. + self.write_failed = True + + def read_command(self, prompt): + reply = input(prompt) + + if self.state == "dumb": + # No logic applied whatsoever, just pass the raw reply back. + return reply + + prefix = "" + if self.state == "pdb": + # PDB command entry mode + cmd = self.pdb_instance.parseline(reply)[0] + if cmd in self.pdb_commands or reply.strip() == "": + # Recognized PDB command, or blank line repeating last command + return reply + + # Otherwise, explicit or implicit exec command + if reply.startswith("!"): + prefix = "!" + reply = reply.removeprefix(prefix).lstrip() + + if codeop.compile_command(reply + "\n", "", "single") is not None: + # Valid single-line statement + return prefix + reply + + # Otherwise, valid first line of a multi-line statement + continue_prompt = "...".ljust(len(prompt)) + while codeop.compile_command(reply, "", "single") is None: + reply += "\n" + input(continue_prompt) + + return prefix + reply + + @contextmanager + def readline_completion(self, completer): + try: + import readline + except ImportError: + yield + return + + old_completer = readline.get_completer() + try: + readline.set_completer(completer) + if readline.backend == "editline": + # libedit uses "^I" instead of "tab" + command_string = "bind ^I rl_complete" + else: + command_string = "tab: complete" + readline.parse_and_bind(command_string) + yield + finally: + readline.set_completer(old_completer) + + def cmdloop(self): + with self.readline_completion(self.complete): + while not self.write_failed: + try: + if not (payload_bytes := self.sockfile.readline()): + break + except KeyboardInterrupt: + self.send_interrupt() + continue + + try: + payload = json.loads(payload_bytes) + except json.JSONDecodeError: + print( + f"*** Invalid JSON from remote: {payload_bytes}", + flush=True, + ) + continue + + self.process_payload(payload) + + def send_interrupt(self): + print( + "\n*** Program will stop at the next bytecode instruction." + " (Use 'cont' to resume)." + ) + sys.remote_exec(self.pid, self.interrupt_script) + + def process_payload(self, payload): + match payload: + case { + "command_list": command_list + } if all(isinstance(c, str) for c in command_list): + self.pdb_commands = set(command_list) + case {"message": str(msg), "type": str(msg_type)}: + if msg_type == "error": + print("***", msg, flush=True) + else: + print(msg, end="", flush=True) + case {"help": str(arg)}: + self.pdb_instance.do_help(arg) + case {"prompt": str(prompt), "state": str(state)}: + if state not in ("pdb", "interact"): + state = "dumb" + self.state = state + self.prompt_for_reply(prompt) + case _: + raise RuntimeError(f"Unrecognized payload {payload}") + + def prompt_for_reply(self, prompt): + while True: + try: + payload = {"reply": self.read_command(prompt)} + except EOFError: + payload = {"signal": "EOF"} + except KeyboardInterrupt: + payload = {"signal": "INT"} + except Exception as exc: + msg = traceback.format_exception_only(exc)[-1].strip() + print("***", msg, flush=True) + continue + + self._send(**payload) + return + + def complete(self, text, state): + import readline + + if state == 0: + self.completion_matches = [] + if self.state not in ("pdb", "interact"): + return None + + origline = readline.get_line_buffer() + line = origline.lstrip() + stripped = len(origline) - len(line) + begidx = readline.get_begidx() - stripped + endidx = readline.get_endidx() - stripped + + msg = { + "complete": { + "text": text, + "line": line, + "begidx": begidx, + "endidx": endidx, + } + } + + self._send(**msg) + if self.write_failed: + return None + + payload = self.sockfile.readline() + if not payload: + return None + + payload = json.loads(payload) + if "completions" not in payload: + raise RuntimeError( + f"Failed to get valid completions. Got: {payload}" + ) + + self.completion_matches = payload["completions"] + try: + return self.completion_matches[state] + except IndexError: + return None + + +def _connect(host, port, frame, commands, version): + with closing(socket.create_connection((host, port))) as conn: + sockfile = conn.makefile("rwb") + + remote_pdb = _PdbServer(sockfile) + weakref.finalize(remote_pdb, sockfile.close) + + if Pdb._last_pdb_instance is not None: + remote_pdb.error("Another PDB instance is already attached.") + elif version != remote_pdb.protocol_version(): + target_ver = f"0x{remote_pdb.protocol_version():08X}" + attach_ver = f"0x{version:08X}" + remote_pdb.error( + f"The target process is running a Python version that is" + f" incompatible with this PDB module." + f"\nTarget process pdb protocol version: {target_ver}" + f"\nLocal pdb module's protocol version: {attach_ver}" + ) + else: + remote_pdb.rcLines.extend(commands.splitlines()) + remote_pdb.set_trace(frame=frame) + + +def attach(pid, commands=()): + """Attach to a running process with the given PID.""" + with closing(socket.create_server(("localhost", 0))) as server: + port = server.getsockname()[1] + + with tempfile.NamedTemporaryFile("w", delete_on_close=False) as connect_script: + connect_script.write( + textwrap.dedent( + f""" + import pdb, sys + pdb._connect( + host="localhost", + port={port}, + frame=sys._getframe(1), + commands={json.dumps("\n".join(commands))}, + version={_PdbServer.protocol_version()}, + ) + """ + ) + ) + connect_script.close() + sys.remote_exec(pid, connect_script.name) + + # TODO Add a timeout? Or don't bother since the user can ^C? + client_sock, _ = server.accept() + + with closing(client_sock): + sockfile = client_sock.makefile("rwb") + + with closing(sockfile): + with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script: + interrupt_script.write( + 'import pdb, sys\n' + 'if inst := pdb.Pdb._last_pdb_instance:\n' + ' inst.set_trace(sys._getframe(1))\n' + ) + interrupt_script.close() + + _PdbClient(pid, sockfile, interrupt_script.name).cmdloop() + + # Post-Mortem interface def post_mortem(t=None): @@ -2560,7 +3151,7 @@ def help(): def main(): import argparse - parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | pyfile) [args ...]", + parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | -p pid | pyfile) [args ...]", description=_usage, formatter_class=argparse.RawDescriptionHelpFormatter, allow_abbrev=False) @@ -2571,6 +3162,7 @@ def main(): parser.add_argument('-c', '--command', action='append', default=[], metavar='command', dest='commands', help='pdb commands to execute as if given in a .pdbrc file') parser.add_argument('-m', metavar='module', dest='module') + parser.add_argument('-p', '--pid', type=int, help="attach to the specified PID", default=None) if len(sys.argv) == 1: # If no arguments were given (python -m pdb), print the whole help message. @@ -2580,7 +3172,15 @@ def main(): opts, args = parser.parse_known_args() - if opts.module: + if opts.pid: + # If attaching to a remote pid, unrecognized arguments are not allowed. + # This will raise an error if there are extra unrecognized arguments. + opts = parser.parse_args() + if opts.module: + parser.error("argument -m: not allowed with argument --pid") + attach(opts.pid, opts.commands) + return + elif opts.module: # If a module is being debugged, we consider the arguments after "-m module" to # be potential arguments to the module itself. We need to parse the arguments # before "-m" to check if there is any invalid argument. diff --git a/Lib/test/test_pyclbr.py b/Lib/test/test_pyclbr.py index a9ac13395a8fac..df05cd07d7e249 100644 --- a/Lib/test/test_pyclbr.py +++ b/Lib/test/test_pyclbr.py @@ -253,7 +253,8 @@ def test_others(self): cm( 'pdb', # pyclbr does not handle elegantly `typing` or properties - ignore=('Union', '_ModuleTarget', '_ScriptTarget', '_ZipTarget', 'curframe_locals'), + ignore=('Union', '_ModuleTarget', '_ScriptTarget', '_ZipTarget', 'curframe_locals', + '_InteractState'), ) cm('pydoc', ignore=('input', 'output',)) # properties diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py new file mode 100644 index 00000000000000..cc0ada12814afd --- /dev/null +++ b/Lib/test/test_remote_pdb.py @@ -0,0 +1,687 @@ +import io +import json +import os +import signal +import socket +import subprocess +import sys +import tempfile +import textwrap +import threading +import unittest +import unittest.mock +from contextlib import contextmanager +from pathlib import Path +from test.support import is_wasi, os_helper +from test.support.os_helper import temp_dir, TESTFN, unlink +from typing import Dict, List, Optional, Tuple, Union, Any + +import pdb +from pdb import _PdbServer, _PdbClient + + +class MockSocketFile: + """Mock socket file for testing _PdbServer without actual socket connections.""" + + def __init__(self): + self.input_queue = [] + self.output_buffer = [] + + def write(self, data: bytes) -> None: + """Simulate write to socket.""" + self.output_buffer.append(data) + + def flush(self) -> None: + """No-op flush implementation.""" + pass + + def readline(self) -> bytes: + """Read a line from the prepared input queue.""" + if not self.input_queue: + return b"" + return self.input_queue.pop(0) + + def close(self) -> None: + """Close the mock socket file.""" + pass + + def add_input(self, data: dict) -> None: + """Add input that will be returned by readline.""" + self.input_queue.append(json.dumps(data).encode() + b"\n") + + def get_output(self) -> List[dict]: + """Get the output that was written by the object being tested.""" + results = [] + for data in self.output_buffer: + if isinstance(data, bytes) and data.endswith(b"\n"): + try: + results.append(json.loads(data.decode().strip())) + except json.JSONDecodeError: + pass # Ignore non-JSON output + self.output_buffer = [] + return results + + +class RemotePdbTestCase(unittest.TestCase): + """Tests for the _PdbServer class.""" + + def setUp(self): + self.sockfile = MockSocketFile() + self.pdb = _PdbServer(self.sockfile) + + # Mock some Bdb attributes that are lazily created when tracing starts + self.pdb.botframe = None + self.pdb.quitting = False + + # Create a frame for testing + self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}} + self.test_locals = {'c': 3, 'd': 4} + + # Create a simple test frame + frame_info = unittest.mock.Mock() + frame_info.f_globals = self.test_globals + frame_info.f_locals = self.test_locals + frame_info.f_lineno = 42 + frame_info.f_code = unittest.mock.Mock() + frame_info.f_code.co_filename = "test_file.py" + frame_info.f_code.co_name = "test_function" + + self.pdb.curframe = frame_info + + def test_message_and_error(self): + """Test message and error methods send correct JSON.""" + self.pdb.message("Test message") + self.pdb.error("Test error") + + outputs = self.sockfile.get_output() + self.assertEqual(len(outputs), 2) + self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"}) + self.assertEqual(outputs[1], {"message": "Test error", "type": "error"}) + + def test_read_command(self): + """Test reading commands from the socket.""" + # Add test input + self.sockfile.add_input({"reply": "help"}) + + # Read the command + cmd = self.pdb._read_reply() + self.assertEqual(cmd, "help") + + def test_read_command_EOF(self): + """Test reading EOF command.""" + # Simulate socket closure + self.pdb._write_failed = True + with self.assertRaises(EOFError): + self.pdb._read_reply() + + def test_completion(self): + """Test handling completion requests.""" + # Mock completenames to return specific values + with unittest.mock.patch.object(self.pdb, 'completenames', + return_value=["continue", "clear"]): + + # Add a completion request + self.sockfile.add_input({ + "complete": { + "text": "c", + "line": "c", + "begidx": 0, + "endidx": 1 + } + }) + + # Add a regular command to break the loop + self.sockfile.add_input({"reply": "help"}) + + # Read command - this should process the completion request first + cmd = self.pdb._read_reply() + + # Verify completion response was sent + outputs = self.sockfile.get_output() + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0], {"completions": ["continue", "clear"]}) + + # The actual command should be returned + self.assertEqual(cmd, "help") + + def test_do_help(self): + """Test that do_help sends the help message.""" + self.pdb.do_help("break") + + outputs = self.sockfile.get_output() + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0], {"help": "break"}) + + def test_interact_mode(self): + """Test interaction mode setup and execution.""" + # First set up interact mode + self.pdb.do_interact("") + + # Verify _interact_state is properly initialized + self.assertIsNotNone(self.pdb._interact_state) + self.assertIsInstance(self.pdb._interact_state, dict) + + # Test running code in interact mode + with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error: + self.pdb._run_in_python_repl("print('test')") + mock_error.assert_not_called() + + # Test with syntax error + self.pdb._run_in_python_repl("if:") + mock_error.assert_called_once() + + def test_registering_commands(self): + """Test registering breakpoint commands.""" + # Mock get_bpbynumber + with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'): + # Queue up some input to send + self.sockfile.add_input({"reply": "commands 1"}) + self.sockfile.add_input({"reply": "silent"}) + self.sockfile.add_input({"reply": "print('hi')"}) + self.sockfile.add_input({"reply": "end"}) + self.sockfile.add_input({"signal": "EOF"}) + + # Run the PDB command loop + self.pdb.cmdloop() + + outputs = self.sockfile.get_output() + self.assertIn('command_list', outputs[0]) + self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"}) + self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"}) + self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"}) + self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"}) + self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"}) + self.assertEqual(outputs[6], {"message": "\n", "type": "info"}) + self.assertEqual(len(outputs), 7) + + self.assertEqual( + self.pdb.commands[1], + ["_pdbcmd_silence_frame_status", "print('hi')"], + ) + + def test_detach(self): + """Test the detach method.""" + with unittest.mock.patch.object(self.sockfile, 'close') as mock_close: + self.pdb.detach() + mock_close.assert_called_once() + self.assertFalse(self.pdb.quitting) + + def test_cmdloop(self): + """Test the command loop with various commands.""" + # Mock onecmd to track command execution + with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd: + # Add commands to the queue + self.pdb.cmdqueue = ['help', 'list'] + + # Add a command from the socket for when cmdqueue is empty + self.sockfile.add_input({"reply": "next"}) + + # Add a second command to break the loop + self.sockfile.add_input({"reply": "quit"}) + + # Configure onecmd to exit the loop on "quit" + def side_effect(line): + return line == 'quit' + mock_onecmd.side_effect = side_effect + + # Run the command loop + self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace() + self.pdb.cmdloop() + + # Should have processed 4 commands: 2 from cmdqueue, 2 from socket + self.assertEqual(mock_onecmd.call_count, 4) + mock_onecmd.assert_any_call('help') + mock_onecmd.assert_any_call('list') + mock_onecmd.assert_any_call('next') + mock_onecmd.assert_any_call('quit') + + # Check if prompt was sent to client + outputs = self.sockfile.get_output() + prompts = [o for o in outputs if 'prompt' in o] + self.assertEqual(len(prompts), 2) # Should have sent 2 prompts + + +@unittest.skipIf(is_wasi, "WASI does not support TCP sockets") +class PdbConnectTestCase(unittest.TestCase): + """Tests for the _connect mechanism using direct socket communication.""" + + def setUp(self): + # Create a server socket that will wait for the debugger to connect + self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_sock.bind(('127.0.0.1', 0)) # Let OS assign port + self.server_sock.listen(1) + self.port = self.server_sock.getsockname()[1] + + def _create_script(self, script=None): + # Create a file for subprocess script + if script is None: + script = textwrap.dedent( + f""" + import pdb + import sys + import time + + def foo(): + x = 42 + return bar() + + def bar(): + return 42 + + def connect_to_debugger(): + # Create a frame to debug + def dummy_function(): + x = 42 + # Call connect to establish connection + # with the test server + frame = sys._getframe() # Get the current frame + pdb._connect( + host='127.0.0.1', + port={self.port}, + frame=frame, + commands="", + version=pdb._PdbServer.protocol_version(), + ) + return x # This line won't be reached in debugging + + return dummy_function() + + result = connect_to_debugger() + foo() + print(f"Function returned: {{result}}") + """) + + self.script_path = TESTFN + "_connect_test.py" + with open(self.script_path, 'w') as f: + f.write(script) + + def tearDown(self): + self.server_sock.close() + try: + unlink(self.script_path) + except OSError: + pass + + def _connect_and_get_client_file(self): + """Helper to start subprocess and get connected client file.""" + # Start the subprocess that will connect to our socket + process = subprocess.Popen( + [sys.executable, self.script_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Accept the connection from the subprocess + client_sock, _ = self.server_sock.accept() + client_file = client_sock.makefile('rwb') + self.addCleanup(client_file.close) + self.addCleanup(client_sock.close) + + return process, client_file + + def _read_until_prompt(self, client_file): + """Helper to read messages until a prompt is received.""" + messages = [] + while True: + data = client_file.readline() + if not data: + break + msg = json.loads(data.decode()) + messages.append(msg) + if 'prompt' in msg: + break + return messages + + def _send_command(self, client_file, command): + """Helper to send a command to the debugger.""" + client_file.write(json.dumps({"reply": command}).encode() + b"\n") + client_file.flush() + + def _send_interrupt(self, pid): + """Helper to send an interrupt signal to the debugger.""" + # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script: + interrupt_script = TESTFN + "_interrupt_script.py" + with open(interrupt_script, 'w') as f: + f.write( + 'import pdb, sys\n' + 'print("Hello, world!")\n' + 'if inst := pdb.Pdb._last_pdb_instance:\n' + ' inst.set_trace(sys._getframe(1))\n' + ) + self.addCleanup(unlink, interrupt_script) + try: + sys.remote_exec(pid, interrupt_script) + except PermissionError: + self.skipTest("Insufficient permissions to execute code in remote process") + + def test_connect_and_basic_commands(self): + """Test connecting to a remote debugger and sending basic commands.""" + self._create_script() + process, client_file = self._connect_and_get_client_file() + + with process: + # We should receive initial data from the debugger + data = client_file.readline() + initial_data = json.loads(data.decode()) + self.assertIn('message', initial_data) + self.assertIn('pdb._connect', initial_data['message']) + + # First, look for command_list message + data = client_file.readline() + command_list = json.loads(data.decode()) + self.assertIn('command_list', command_list) + + # Then, look for the first prompt + data = client_file.readline() + prompt_data = json.loads(data.decode()) + self.assertIn('prompt', prompt_data) + self.assertEqual(prompt_data['state'], 'pdb') + + # Send 'bt' (backtrace) command + self._send_command(client_file, "bt") + + # Check for response - we should get some stack frames + messages = self._read_until_prompt(client_file) + + # Extract text messages containing stack info + text_msg = [msg['message'] for msg in messages + if 'message' in msg and 'connect_to_debugger' in msg['message']] + got_stack_info = bool(text_msg) + + expected_stacks = [ + "", + "connect_to_debugger", + ] + + for stack, msg in zip(expected_stacks, text_msg, strict=True): + self.assertIn(stack, msg) + + self.assertTrue(got_stack_info, "Should have received stack trace information") + + # Send 'c' (continue) command to let the program finish + self._send_command(client_file, "c") + + # Wait for process to finish + stdout, _ = process.communicate(timeout=5) + + # Check if we got the expected output + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + + def test_breakpoints(self): + """Test setting and hitting breakpoints.""" + self._create_script() + process, client_file = self._connect_and_get_client_file() + with process: + # Skip initial messages until we get to the prompt + self._read_until_prompt(client_file) + + # Set a breakpoint at the return statement + self._send_command(client_file, "break bar") + messages = self._read_until_prompt(client_file) + bp_msg = next(msg['message'] for msg in messages if 'message' in msg) + self.assertIn("Breakpoint", bp_msg) + + # Continue execution until breakpoint + self._send_command(client_file, "c") + messages = self._read_until_prompt(client_file) + + # Verify we hit the breakpoint + hit_msg = next(msg['message'] for msg in messages if 'message' in msg) + self.assertIn("bar()", hit_msg) + + # Check breakpoint list + self._send_command(client_file, "b") + messages = self._read_until_prompt(client_file) + list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) + self.assertIn("1 breakpoint", list_msg) + self.assertIn("breakpoint already hit 1 time", list_msg) + + # Clear breakpoint + self._send_command(client_file, "clear 1") + messages = self._read_until_prompt(client_file) + clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) + self.assertIn("Deleted breakpoint", clear_msg) + + # Continue to end + self._send_command(client_file, "c") + stdout, _ = process.communicate(timeout=5) + + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + + def test_keyboard_interrupt(self): + """Test that sending keyboard interrupt breaks into pdb.""" + synchronizer_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + synchronizer_sock.bind(('127.0.0.1', 0)) # Let OS assign port + synchronizer_sock.settimeout(5) + synchronizer_sock.listen(1) + self.addCleanup(synchronizer_sock.close) + sync_port = synchronizer_sock.getsockname()[1] + + script = textwrap.dedent(f""" + import time + import sys + import socket + import pdb + def bar(): + frame = sys._getframe() # Get the current frame + pdb._connect( + host='127.0.0.1', + port={self.port}, + frame=frame, + commands="", + version=pdb._PdbServer.protocol_version(), + ) + print("Connected to debugger") + iterations = 10 + socket.create_connection(('127.0.0.1', {sync_port})).close() + while iterations > 0: + print("Iteration", iterations) + time.sleep(1) + iterations -= 1 + return 42 + + if __name__ == "__main__": + print("Function returned:", bar()) + """) + self._create_script(script=script) + process, client_file = self._connect_and_get_client_file() + + with process: + + # Skip initial messages until we get to the prompt + self._read_until_prompt(client_file) + + # Continue execution + self._send_command(client_file, "c") + + # Wait until execution has continued + synchronizer_sock.accept()[0].close() + + # Inject a script to interrupt the running process + self._send_interrupt(process.pid) + messages = self._read_until_prompt(client_file) + + # Verify we got the keyboard interrupt message + interrupt_msg = next(msg['message'] for msg in messages if 'message' in msg) + self.assertIn("bar()", interrupt_msg) + + # Continue to end + self._send_command(client_file, "iterations = 0") + self._send_command(client_file, "c") + stdout, _ = process.communicate(timeout=5) + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + + def test_handle_eof(self): + """Test that EOF signal properly exits the debugger.""" + self._create_script() + process, client_file = self._connect_and_get_client_file() + + with process: + # Skip initial messages until we get to the prompt + self._read_until_prompt(client_file) + + # Send EOF signal to exit the debugger + client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n") + client_file.flush() + + # The process should complete normally after receiving EOF + stdout, stderr = process.communicate(timeout=5) + + # Verify process completed correctly + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + self.assertEqual(stderr, "") + + def test_protocol_version(self): + """Test that incompatible protocol versions are properly detected.""" + # Create a script using an incompatible protocol version + script = textwrap.dedent(f''' + import sys + import pdb + + def run_test(): + frame = sys._getframe() + + # Use a fake version number that's definitely incompatible + fake_version = 0x01010101 # A fake version that doesn't match any real Python version + + # Connect with the wrong version + pdb._connect( + host='127.0.0.1', + port={self.port}, + frame=frame, + commands="", + version=fake_version, + ) + + # This should print if the debugger detaches correctly + print("Debugger properly detected version mismatch") + return True + + if __name__ == "__main__": + print("Test result:", run_test()) + ''') + self._create_script(script=script) + process, client_file = self._connect_and_get_client_file() + + with process: + # First message should be an error about protocol version mismatch + data = client_file.readline() + message = json.loads(data.decode()) + + self.assertIn('message', message) + self.assertEqual(message['type'], 'error') + self.assertIn('incompatible', message['message']) + self.assertIn('protocol version', message['message']) + + # The process should complete normally + stdout, stderr = process.communicate(timeout=5) + + # Verify the process completed successfully + self.assertIn("Test result: True", stdout) + self.assertIn("Debugger properly detected version mismatch", stdout) + self.assertEqual(process.returncode, 0) + + def test_help_system(self): + """Test that the help system properly sends help text to the client.""" + self._create_script() + process, client_file = self._connect_and_get_client_file() + + with process: + # Skip initial messages until we get to the prompt + self._read_until_prompt(client_file) + + # Request help for different commands + help_commands = ["help", "help break", "help continue", "help pdb"] + + for cmd in help_commands: + self._send_command(client_file, cmd) + + # Look for help message + data = client_file.readline() + message = json.loads(data.decode()) + + self.assertIn('help', message) + + if cmd == "help": + # Should just contain the command itself + self.assertEqual(message['help'], "") + else: + # Should contain the specific command we asked for help with + command = cmd.split()[1] + self.assertEqual(message['help'], command) + + # Skip to the next prompt + self._read_until_prompt(client_file) + + # Continue execution to finish the program + self._send_command(client_file, "c") + + stdout, stderr = process.communicate(timeout=5) + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + + def test_multi_line_commands(self): + """Test that multi-line commands work properly over remote connection.""" + self._create_script() + process, client_file = self._connect_and_get_client_file() + + with process: + # Skip initial messages until we get to the prompt + self._read_until_prompt(client_file) + + # Send a multi-line command + multi_line_commands = [ + # Define a function + "def test_func():\n return 42", + + # For loop + "for i in range(3):\n print(i)", + + # If statement + "if True:\n x = 42\nelse:\n x = 0", + + # Try/except + "try:\n result = 10/2\n print(result)\nexcept ZeroDivisionError:\n print('Error')", + + # Class definition + "class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value" + ] + + for cmd in multi_line_commands: + self._send_command(client_file, cmd) + self._read_until_prompt(client_file) + + # Test executing the defined function + self._send_command(client_file, "test_func()") + messages = self._read_until_prompt(client_file) + + # Find the result message + result_msg = next(msg['message'] for msg in messages if 'message' in msg) + self.assertIn("42", result_msg) + + # Test creating an instance of the defined class + self._send_command(client_file, "obj = TestClass()") + self._read_until_prompt(client_file) + + # Test calling a method on the instance + self._send_command(client_file, "obj.get_value()") + messages = self._read_until_prompt(client_file) + + # Find the result message + result_msg = next(msg['message'] for msg in messages if 'message' in msg) + self.assertIn("100", result_msg) + + # Continue execution to finish + self._send_command(client_file, "c") + + stdout, stderr = process.communicate(timeout=5) + self.assertIn("Function returned: 42", stdout) + self.assertEqual(process.returncode, 0) + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst b/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst new file mode 100644 index 00000000000000..01ca64868531e6 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst @@ -0,0 +1,3 @@ +The CLI for the PDB debugger now accepts a ``-p PID`` argument to allow +attaching to a running process. The process must be running the same version +of Python as the one running PDB.