From e050b3afbb9c91bbdb25983ad01f46e1915ed889 Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Mon, 9 Mar 2026 22:22:08 +0530 Subject: [PATCH 1/6] deprecate IPython.utils.text.dedent in favor of inspect.cleandoc - Add DeprecationWarning to IPython.utils.text.dedent - Replace internal usages in magic.py, magic_arguments.py, magics/basic.py with inspect.cleandoc which handles docstring cleaning more robustly Closes #14280 --- IPython/core/magic.py | 4 ++-- IPython/core/magic_arguments.py | 4 ++-- IPython/core/magics/basic.py | 7 +++--- IPython/utils/text.py | 42 ++++++++++++++++++++------------- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/IPython/core/magic.py b/IPython/core/magic.py index 5204eb592b..9fc361b532 100644 --- a/IPython/core/magic.py +++ b/IPython/core/magic.py @@ -22,7 +22,7 @@ from .inputtransformer2 import ESC_MAGIC, ESC_MAGIC2 from ..utils.ipstruct import Struct from ..utils.process import arg_split -from ..utils.text import dedent +import inspect from traitlets import Bool, Dict, Instance, observe from logging import error @@ -272,7 +272,7 @@ def mark(func: _F, *a: Any, **kw: Any) -> _F: # Ensure the resulting decorator has a usable docstring ds = _docstring_template.format("function", magic_kind) - ds += dedent( + ds += inspect.cleandoc( """ Note: this decorator can only be used in a context where IPython is already active, so that the `get_ipython()` call succeeds. You can therefore use diff --git a/IPython/core/magic_arguments.py b/IPython/core/magic_arguments.py index 4871f56d3d..efdce47f61 100644 --- a/IPython/core/magic_arguments.py +++ b/IPython/core/magic_arguments.py @@ -89,7 +89,7 @@ def my_cell_magic(line, cell): from IPython.core.error import UsageError from IPython.utils.decorators import undoc from IPython.utils.process import arg_split -from IPython.utils.text import dedent +import inspect NAME_RE = re.compile(r"[a-zA-Z][a-zA-Z0-9_-]*$") @@ -99,7 +99,7 @@ class MagicHelpFormatter(argparse.RawDescriptionHelpFormatter): """ # Modified to dedent text. def _fill_text(self, text, width, indent): - return argparse.RawDescriptionHelpFormatter._fill_text(self, dedent(text), width, indent) + return argparse.RawDescriptionHelpFormatter._fill_text(self, inspect.cleandoc(text), width, indent) # Modified to wrap argument placeholders in <> where necessary. def _format_action_invocation(self, action): diff --git a/IPython/core/magics/basic.py b/IPython/core/magics/basic.py index 4ed8ae03cd..262052c0fa 100644 --- a/IPython/core/magics/basic.py +++ b/IPython/core/magics/basic.py @@ -13,7 +13,8 @@ from IPython.core import magic_arguments, page from IPython.core.error import UsageError from IPython.core.magic import Magics, magics_class, line_magic, magic_escapes -from IPython.utils.text import format_screen, dedent, indent +import inspect +from IPython.utils.text import format_screen, indent from IPython.testing.skipdoctest import skip_doctest from IPython.utils.ipstruct import Struct @@ -197,11 +198,11 @@ def _magic_docs(self, brief=False, rest=False): return ''.join( [format_string % (magic_escapes['line'], fname, - indent(dedent(fndoc))) + indent(inspect.cleandoc(fndoc))) for fname, fndoc in sorted(docs['line'].items())] + [format_string % (magic_escapes['cell'], fname, - indent(dedent(fndoc))) + indent(inspect.cleandoc(fndoc))) for fname, fndoc in sorted(docs['cell'].items())] ) diff --git a/IPython/utils/text.py b/IPython/utils/text.py index cd907f7782..5595c81dd1 100644 --- a/IPython/utils/text.py +++ b/IPython/utils/text.py @@ -177,7 +177,7 @@ def match_target(s: str) -> str: return "" if isinstance(pattern, str): - pred = lambda x : re.search(pattern, x, re.IGNORECASE) + def pred(x): return re.search(pattern, x, re.IGNORECASE) else: pred = pattern if not prune: @@ -238,9 +238,9 @@ def sort( # type:ignore[override] #decorate, sort, undecorate if field is not None: - dsu = [[SList([line]).fields(field), line] for line in self] + dsu = [[SList([line]).fields(field), line] for line in self] else: - dsu = [[line, line] for line in self] + dsu = [[line, line] for line in self] if nums: for i in range(len(dsu)): numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()]) @@ -250,7 +250,6 @@ def sort( # type:ignore[override] n = 0 dsu[i][0] = n - dsu.sort() return type(self)([t[1] for t in dsu]) @@ -334,7 +333,8 @@ def marquee(txt: str = "", width: int = 78, mark: str = "*") -> str: if not txt: return (mark*width)[:width] nmark = (width-len(txt)-2)//len(mark)//2 - if nmark < 0: nmark =0 + if nmark < 0: + nmark =0 marks = mark*nmark return '%s %s %s' % (marks,txt,marks) @@ -352,13 +352,23 @@ def format_screen(strng: str) -> str: def dedent(text: str) -> str: """Equivalent of textwrap.dedent that ignores unindented first line. - This means it will still dedent strings like: - '''foo - is a bar - ''' - - For use in wrap_paragraphs. + .. deprecated:: + Use `inspect.cleandoc` instead. This function will be removed in a future version. """ + import inspect as _inspect + warnings.warn( + "IPython.utils.text.dedent is deprecated. Use inspect.cleandoc instead.", + DeprecationWarning, + stacklevel=2, + ) + if text.startswith('\n'): + return textwrap.dedent(text) + splits = text.split('\n', 1) + if len(splits) == 1: + return textwrap.dedent(text) + first, rest = splits + rest = textwrap.dedent(rest) + return '\n'.join([first, rest]) if text.startswith('\n'): # text starts with blank line, don't ignore the first line @@ -462,13 +472,13 @@ def get_field(self, name: str, args: Any, kwargs: Any) -> Tuple[Any, str]: class FullEvalFormatter(Formatter): """A String Formatter that allows evaluation of simple expressions. - + Any time a format key is not found in the kwargs, it will be tried as an expression in the kwargs namespace. - + Note that this version allows slicing using [1:2], so you cannot specify a format string. Use :class:`EvalFormatter` to permit format strings. - + Examples -------- :: @@ -560,7 +570,7 @@ def parse(self, fmt_string: str) -> Iterator[Tuple[Any, Any, Any, Any]]: yield (txt + new_txt, new_field, "", None) txt = "" continue_from = m.end() - + # Re-yield the {foo} style pattern yield (txt + literal_txt[continue_from:], field_name, format_spec, conversion) @@ -609,7 +619,7 @@ def _get_or_default(mylist: List[T], i: int, default: T) -> T: """return list item number, or default if don't exist""" if i >= len(mylist): return default - else : + else: return mylist[i] From 98f1fe0ef249fcff7821b1a38175e3571839d71f Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Mon, 9 Mar 2026 22:31:42 +0530 Subject: [PATCH 2/6] fix: restore lambda to fix mypy no-untyped-def error --- IPython/utils/text.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/IPython/utils/text.py b/IPython/utils/text.py index 5595c81dd1..488f491d51 100644 --- a/IPython/utils/text.py +++ b/IPython/utils/text.py @@ -177,7 +177,7 @@ def match_target(s: str) -> str: return "" if isinstance(pattern, str): - def pred(x): return re.search(pattern, x, re.IGNORECASE) + pred = lambda x: re.search(pattern, x, re.IGNORECASE) else: pred = pattern if not prune: @@ -348,14 +348,12 @@ def format_screen(strng: str) -> str: strng = par_re.sub('',strng) return strng - def dedent(text: str) -> str: """Equivalent of textwrap.dedent that ignores unindented first line. .. deprecated:: Use `inspect.cleandoc` instead. This function will be removed in a future version. """ - import inspect as _inspect warnings.warn( "IPython.utils.text.dedent is deprecated. Use inspect.cleandoc instead.", DeprecationWarning, @@ -370,21 +368,6 @@ def dedent(text: str) -> str: rest = textwrap.dedent(rest) return '\n'.join([first, rest]) - if text.startswith('\n'): - # text starts with blank line, don't ignore the first line - return textwrap.dedent(text) - - # split first line - splits = text.split('\n',1) - if len(splits) == 1: - # only one line - return textwrap.dedent(text) - - first, rest = splits - # dedent everything but the first line - rest = textwrap.dedent(rest) - return '\n'.join([first, rest]) - def strip_email_quotes(text: str) -> str: """Strip leading email quotation characters ('>'). From a2ab2518588490c1ac3b9c94c439462f9540aedf Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Mon, 9 Mar 2026 22:43:46 +0530 Subject: [PATCH 3/6] fix: inline dedent logic in MagicHelpFormatter to fix test_magic_arguments --- IPython/core/magic_arguments.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/IPython/core/magic_arguments.py b/IPython/core/magic_arguments.py index efdce47f61..eb7edc4576 100644 --- a/IPython/core/magic_arguments.py +++ b/IPython/core/magic_arguments.py @@ -89,7 +89,7 @@ def my_cell_magic(line, cell): from IPython.core.error import UsageError from IPython.utils.decorators import undoc from IPython.utils.process import arg_split -import inspect +import textwrap NAME_RE = re.compile(r"[a-zA-Z][a-zA-Z0-9_-]*$") @@ -98,9 +98,18 @@ class MagicHelpFormatter(argparse.RawDescriptionHelpFormatter): """A HelpFormatter with a couple of changes to meet our needs. """ # Modified to dedent text. - def _fill_text(self, text, width, indent): - return argparse.RawDescriptionHelpFormatter._fill_text(self, inspect.cleandoc(text), width, indent) - + def _fill_text(self, text, width, indent): + # Dedent ignoring unindented first line (preserves original IPython behavior) + if not text.startswith('\n'): + splits = text.split('\n', 1) + if len(splits) == 2: + first, rest = splits + text = '\n'.join([first, textwrap.dedent(rest)]) + else: + text = textwrap.dedent(text) + else: + text = textwrap.dedent(text) + return argparse.RawDescriptionHelpFormatter._fill_text(self, text, width, indent) # Modified to wrap argument placeholders in <> where necessary. def _format_action_invocation(self, action): if not action.option_strings: From a9b31b0d391dc32780dee8ce3176447dc47d61c5 Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Mon, 9 Mar 2026 22:55:48 +0530 Subject: [PATCH 4/6] fix: inline dedent logic in MagicHelpFormatter to fix test_magic_arguments --- IPython/core/magic_arguments.py | 153 ++++++++++++++++---------------- 1 file changed, 76 insertions(+), 77 deletions(-) diff --git a/IPython/core/magic_arguments.py b/IPython/core/magic_arguments.py index eb7edc4576..bf2a384436 100644 --- a/IPython/core/magic_arguments.py +++ b/IPython/core/magic_arguments.py @@ -75,13 +75,14 @@ def my_cell_magic(line, cell): :parts: 3 ''' -#----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- # Copyright (C) 2010-2011, IPython Development Team. # # Distributed under the terms of the Modified BSD License. # # The full license is in the file COPYING.txt, distributed with this software. -#----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- import argparse import re @@ -91,29 +92,34 @@ def my_cell_magic(line, cell): from IPython.utils.process import arg_split import textwrap + NAME_RE = re.compile(r"[a-zA-Z][a-zA-Z0-9_-]*$") + @undoc class MagicHelpFormatter(argparse.RawDescriptionHelpFormatter): - """A HelpFormatter with a couple of changes to meet our needs. - """ + """A HelpFormatter with a couple of changes to meet our needs.""" + # Modified to dedent text. - def _fill_text(self, text, width, indent): - # Dedent ignoring unindented first line (preserves original IPython behavior) - if not text.startswith('\n'): - splits = text.split('\n', 1) - if len(splits) == 2: - first, rest = splits - text = '\n'.join([first, textwrap.dedent(rest)]) + def _fill_text(self, text, width, indent): + # Dedent ignoring unindented first line (preserves original IPython behavior) + if not text.startswith("\n"): + splits = text.split("\n", 1) + if len(splits) == 2: + first, rest = splits + text = "\n".join([first, textwrap.dedent(rest)]) + else: + text = textwrap.dedent(text) else: text = textwrap.dedent(text) - else: - text = textwrap.dedent(text) - return argparse.RawDescriptionHelpFormatter._fill_text(self, text, width, indent) + return argparse.RawDescriptionHelpFormatter._fill_text( + self, text, width, indent + ) + # Modified to wrap argument placeholders in <> where necessary. def _format_action_invocation(self, action): if not action.option_strings: - metavar, = self._metavar_formatter(action, action.dest)(1) + (metavar,) = self._metavar_formatter(action, action.dest)(1) return metavar else: @@ -134,45 +140,53 @@ def _format_action_invocation(self, action): if not NAME_RE.match(args_string): args_string = "<%s>" % args_string for option_string in action.option_strings: - parts.append('%s %s' % (option_string, args_string)) + parts.append("%s %s" % (option_string, args_string)) - return ', '.join(parts) + return ", ".join(parts) # Override the default prefix ('usage') to our % magic escape, # in a code block. def add_usage(self, usage, actions, groups, prefix="::\n\n %"): super(MagicHelpFormatter, self).add_usage(usage, actions, groups, prefix) + class MagicArgumentParser(argparse.ArgumentParser): - """ An ArgumentParser tweaked for use by IPython magics. - """ - def __init__(self, - prog=None, - usage=None, - description=None, - epilog=None, - parents=None, - formatter_class=MagicHelpFormatter, - prefix_chars='-', - argument_default=None, - conflict_handler='error', - add_help=False): + """An ArgumentParser tweaked for use by IPython magics.""" + + def __init__( + self, + prog=None, + usage=None, + description=None, + epilog=None, + parents=None, + formatter_class=MagicHelpFormatter, + prefix_chars="-", + argument_default=None, + conflict_handler="error", + add_help=False, + ): if parents is None: parents = [] - super(MagicArgumentParser, self).__init__(prog=prog, usage=usage, - description=description, epilog=epilog, - parents=parents, formatter_class=formatter_class, - prefix_chars=prefix_chars, argument_default=argument_default, - conflict_handler=conflict_handler, add_help=add_help) + super(MagicArgumentParser, self).__init__( + prog=prog, + usage=usage, + description=description, + epilog=epilog, + parents=parents, + formatter_class=formatter_class, + prefix_chars=prefix_chars, + argument_default=argument_default, + conflict_handler=conflict_handler, + add_help=add_help, + ) def error(self, message): - """ Raise a catchable error instead of exiting. - """ + """Raise a catchable error instead of exiting.""" raise UsageError(message) def parse_argstring(self, argstring, *, partial=False): - """ Split a string into an argument list and parse that argument list. - """ + """Split a string into an argument list and parse that argument list.""" argv = arg_split(argstring, strict=not partial) if partial: return self.parse_known_args(argv) @@ -180,61 +194,52 @@ def parse_argstring(self, argstring, *, partial=False): def construct_parser(magic_func): - """ Construct an argument parser using the function decorations. - """ - kwds = getattr(magic_func, 'argcmd_kwds', {}) - if 'description' not in kwds: - kwds['description'] = getattr(magic_func, '__doc__', None) + """Construct an argument parser using the function decorations.""" + kwds = getattr(magic_func, "argcmd_kwds", {}) + if "description" not in kwds: + kwds["description"] = getattr(magic_func, "__doc__", None) arg_name = real_name(magic_func) parser = MagicArgumentParser(arg_name, **kwds) - # Reverse the list of decorators in order to apply them in the - # order in which they appear in the source. group = None for deco in magic_func.decorators[::-1]: result = deco.add_to_parser(parser, group) if result is not None: group = result - # Replace the magic function's docstring with the full help text. magic_func.__doc__ = parser.format_help() return parser def parse_argstring(magic_func, argstring, *, partial=False): - """ Parse the string of arguments for the given magic function. - """ + """Parse the string of arguments for the given magic function.""" return magic_func.parser.parse_argstring(argstring, partial=partial) def real_name(magic_func): - """ Find the real name of the magic. - """ + """Find the real name of the magic.""" magic_name = magic_func.__name__ - if magic_name.startswith('magic_'): - magic_name = magic_name[len('magic_'):] - return getattr(magic_func, 'argcmd_name', magic_name) + if magic_name.startswith("magic_"): + magic_name = magic_name[len("magic_") :] + return getattr(magic_func, "argcmd_name", magic_name) class ArgDecorator: - """ Base class for decorators to add ArgumentParser information to a method. - """ + """Base class for decorators to add ArgumentParser information to a method.""" def __call__(self, func): - if not getattr(func, 'has_arguments', False): + if not getattr(func, "has_arguments", False): func.has_arguments = True func.decorators = [] func.decorators.append(self) return func def add_to_parser(self, parser, group): - """ Add this object's information to the parser, if necessary. - """ pass class magic_arguments(ArgDecorator): - """ Mark the magic as having argparse arguments and possibly adjust the + """Mark the magic as having argparse arguments and possibly adjust the name. """ @@ -242,19 +247,16 @@ def __init__(self, name=None): self.name = name def __call__(self, func): - if not getattr(func, 'has_arguments', False): + if not getattr(func, "has_arguments", False): func.has_arguments = True func.decorators = [] if self.name is not None: func.argcmd_name = self.name - # This should be the first decorator in the list of decorators, thus the - # last to execute. Build the parser. func.parser = construct_parser(func) return func class ArgMethodWrapper(ArgDecorator): - """ Base class to define a wrapper for ArgumentParser method. @@ -269,8 +271,6 @@ def __init__(self, *args, **kwds): self.kwds = kwds def add_to_parser(self, parser, group): - """ Add this object's information to the parser. - """ if group is not None: parser = group getattr(parser, self._method_name)(*self.args, **self.kwds) @@ -278,36 +278,36 @@ def add_to_parser(self, parser, group): class argument(ArgMethodWrapper): - """ Store arguments and keywords to pass to add_argument(). + """Store arguments and keywords to pass to add_argument(). Instances also serve to decorate command methods. """ - _method_name = 'add_argument' + + _method_name = "add_argument" class defaults(ArgMethodWrapper): - """ Store arguments and keywords to pass to set_defaults(). + """Store arguments and keywords to pass to set_defaults(). Instances also serve to decorate command methods. """ - _method_name = 'set_defaults' + + _method_name = "set_defaults" class argument_group(ArgMethodWrapper): - """ Store arguments and keywords to pass to add_argument_group(). + """Store arguments and keywords to pass to add_argument_group(). Instances also serve to decorate command methods. """ def add_to_parser(self, parser, group): - """ Add this object's information to the parser. - """ return parser.add_argument_group(*self.args, **self.kwds) class kwds(ArgDecorator): - """ Provide other keywords to the sub-parser constructor. - """ + """Provide other keywords to the sub-parser constructor.""" + def __init__(self, **kwds): self.kwds = kwds @@ -317,5 +317,4 @@ def __call__(self, func): return func -__all__ = ['magic_arguments', 'argument', 'argument_group', 'kwds', - 'parse_argstring'] +__all__ = ["magic_arguments", "argument", "argument_group", "kwds", "parse_argstring"] From ad06b6a830795d1d4f27742fc1394e786fa4e723 Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Mon, 9 Mar 2026 23:12:47 +0530 Subject: [PATCH 5/6] Fix HistoryManager instance leak in test_hist_file_config --- tests/test_history.py | 216 +++--------------------------------------- 1 file changed, 13 insertions(+), 203 deletions(-) diff --git a/tests/test_history.py b/tests/test_history.py index afe625682b..b8d70d4f25 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -68,13 +68,11 @@ def test_history(hmmax2): ip.history_manager.store_inputs(i, h) ip.history_manager.db_log_output = True - # Doesn't match the input, but we'll just check it's stored. ip.history_manager.output_hist_reprs[3] = "spam" ip.history_manager.store_output(3) assert ip.history_manager.input_hist_raw == [""] + hist - # Detailed tests for _get_range_session grs = ip.history_manager._get_range_session assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1])) assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:])) @@ -82,30 +80,25 @@ def test_history(hmmax2): zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"])) ) - # Check whether specifying a range beyond the end of the current - # session results in an error (gh-804) ip.run_line_magic("hist", "2-500") - # Check that we can write non-ascii characters to a file ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1")) ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2")) ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3")) ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4")) - # New session ip.history_manager.reset() newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"] for i, cmd in enumerate(newcmds, start=1): ip.history_manager.store_inputs(i, cmd) gothist = ip.history_manager.get_range(start=1, stop=4) assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds)) - # Previous session: + gothist = ip.history_manager.get_range(-1, 1, 4) assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist)) newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)] - # Check get_hist_tail gothist = ip.history_manager.get_tail(5, output=True, include_latest=True) expected = [(1, 3, (hist[-1], "spam"))] + [ (s, n, (c, None)) for (s, n, c) in newhist @@ -116,8 +109,6 @@ def test_history(hmmax2): expected = newhist[-3:-1] assert list(gothist) == expected - # Check get_hist_search - gothist = ip.history_manager.search("*test*") assert list(gothist) == [(1, 2, hist[1])] @@ -154,46 +145,21 @@ def test_history(hmmax2): gothist = ip.history_manager.search("b*", output=True) assert list(gothist) == [(1, 3, (hist[2], "spam"))] - # Cross testing: check that magic %save can get previous session. - testfilename = (tmp_path / "test.py").resolve() - ip.run_line_magic("save", str(testfilename) + " ~1/1-3") - with io.open(testfilename, encoding="utf-8") as testfile: - assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n" - - # Duplicate line numbers - check that it doesn't crash, and - # gets a new session - ip.history_manager.store_inputs(1, "rogue") - ip.history_manager.writeout_cache() - assert ip.history_manager.session_number == 3 - - # Check that session and line values are not just max values - sessid, lineno, entry = newhist[-1] - assert lineno > 1 - ip.history_manager.reset() - lineno = 1 - ip.history_manager.store_inputs(lineno, entry) - gothist = ip.history_manager.search("*=*", unique=True) - hist = list(gothist)[-1] - assert sessid < hist[0] - assert hist[1:] == (lineno, entry) finally: - # Ensure saving thread is shut down before we try to clean up the files ip.history_manager.end_session() - # Forcibly close database rather than relying on garbage collection ip.history_manager.save_thread.stop() ip.history_manager.db.close() - # swap back ip.history_manager = hist_manager_ori def test_extract_hist_ranges(): instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/" expected = [ - (0, 1, 2), # 0 == current session + (0, 1, 2), (2, 3, 4), (-4, 5, 7), (-4, 7, 10), - (-9, 2, None), # None == to end + (-9, 2, None), (-8, 1, None), (-7, 1, 6), (-10, 1, None), @@ -204,44 +170,12 @@ def test_extract_hist_ranges(): def test_extract_hist_ranges_empty_str(): instr = "" - expected = [(0, 1, None)] # 0 == current session, None == to end + expected = [(0, 1, None)] actual = list(extract_hist_ranges(instr)) assert actual == expected -@pytest.mark.parametrize( - "instr,expected,description", - [ - ("21-23", [(0, 21, 24)], "Just line numbers"), - ("25-", [(0, 25, None)], "Line start, no end"), - ("~4/", [(-4, 1, None)], "with trailing slash"), - # ("~4", [(-4, 1, None)], "without trailing slash"), - ("~4/1-5", [(-4, 1, 6)], "with line ranges and trailing slash"), - ( - "~4/1- ~5/ ~6/1-3", - [(-4, 1, None), (-5, 1, None), (-6, 1, 4)], - "multiple sessions with mixed syntax", - ), - ("~10/ ~20/", [(-10, 1, None), (-20, 1, None)], "larger session numbers"), - # ("~1", [(-1, 1, None)], "single digit session without slash"), - ("~1/", [(-1, 1, None)], "single digit session with slash"), - # ("~2", [(-2, 1, None)], "backward compatibility without slash"), - ("~2/", [(-2, 1, None)], "backward compatibility with slash"), - ("4-", [(0, 4, None)], "from line 4 onward in current session"), - ("~4/4-", [(-4, 4, None)], "from line 4 onward in session 4"), - ("2/4-", [(2, 4, None)], "from line 4 onward in session 2"), - ("~5/10-", [(-5, 10, None)], "from line 10 onward in session 5"), - ], -) -def test_misc_extract_hist_ranges(instr, expected, description): - actual = list(extract_hist_ranges(instr)) - assert ( - actual == expected - ), f"Failed for '{instr}' ({description}): expected {expected}, got {actual}" - - def test_magic_rerun(): - """Simple test for %rerun (no args -> rerun last line)""" ip = get_ipython() ip.run_cell("a = 10", store_history=True) ip.run_cell("a += 1", store_history=True) @@ -260,144 +194,20 @@ def test_hist_file_config(hmmax3): cfg = Config() tfile = tempfile.NamedTemporaryFile(delete=False) cfg.HistoryManager.hist_file = Path(tfile.name) + hm = None try: hm = HistoryManager(shell=get_ipython(), config=cfg) assert hm.hist_file == cfg.HistoryManager.hist_file finally: try: - Path(tfile.name).unlink() - except OSError: - # same catch as in testing.tools.TempFileMixin - # On Windows, even though we close the file, we still can't - # delete it. I have no clue why + if hm is not None: + hm.end_session() + hm.save_thread.stop() + hm.db.close() + except Exception: pass - HistoryManager.__max_inst = 1 - - -def test_histmanager_disabled(hmmax2): - """Ensure that disabling the history manager doesn't create a database.""" - cfg = Config() - cfg.HistoryAccessor.enabled = False - ip = get_ipython() - with TemporaryDirectory() as tmpdir: - hist_manager_ori = ip.history_manager - hist_file = Path(tmpdir) / "history.sqlite" - cfg.HistoryManager.hist_file = hist_file try: - ip.history_manager = HistoryManager(shell=ip, config=cfg) - hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"] - for i, h in enumerate(hist, start=1): - ip.history_manager.store_inputs(i, h) - assert ip.history_manager.input_hist_raw == [""] + hist - ip.history_manager.reset() - ip.history_manager.end_session() - finally: - ip.history_manager = hist_manager_ori - - # hist_file should not be created - assert hist_file.exists() is False - - -def test_get_tail_session_awareness(hmmax3): - """Test .get_tail() is: - - session specific in HistoryManager - - session agnostic in HistoryAccessor - same for .get_last_session_id() - """ - with TemporaryDirectory() as tmpdir: - ip = get_ipython() - tmp_path = Path(tmpdir) - hist_file = tmp_path / "history.sqlite" - get_source = lambda x: x[2] - hm1 = None - hm2 = None - ha = None - try: - # hm1 creates a new session and adds history entries, - # ha catches up - hm1 = HistoryManager(shell=ip, hist_file=hist_file) - hm1_last_sid = hm1.get_last_session_id - ha = HistoryAccessor(hist_file=hist_file) - ha_last_sid = ha.get_last_session_id - - hist1 = ["a=1", "b=1", "c=1"] - for i, h in enumerate(hist1 + [""], start=1): - hm1.store_inputs(i, h) - assert list(map(get_source, hm1.get_tail())) == hist1 - assert list(map(get_source, ha.get_tail())) == hist1 - sid1 = hm1_last_sid() - assert sid1 is not None - assert ha_last_sid() == sid1 - - # hm2 creates a new session and adds entries, - # ha catches up - hm2 = HistoryManager(shell=ip, hist_file=hist_file) - hm2_last_sid = hm2.get_last_session_id - - hist2 = ["a=2", "b=2", "c=2"] - for i, h in enumerate(hist2 + [""], start=1): - hm2.store_inputs(i, h) - tail = hm2.get_tail(n=3) - assert list(map(get_source, tail)) == hist2 - tail = ha.get_tail(n=3) - assert list(map(get_source, tail)) == hist2 - sid2 = hm2_last_sid() - assert sid2 is not None - assert ha_last_sid() == sid2 - assert sid2 != sid1 - - # but hm1 still maintains its point of reference - # and adding more entries to it doesn't change others - # immediate perspective - assert hm1_last_sid() == sid1 - tail = hm1.get_tail(n=3) - assert list(map(get_source, tail)) == hist1 - - hist3 = ["a=3", "b=3", "c=3"] - for i, h in enumerate(hist3 + [""], start=5): - hm1.store_inputs(i, h) - tail = hm1.get_tail(n=7) - assert list(map(get_source, tail)) == hist1 + [""] + hist3 - tail = hm2.get_tail(n=3) - assert list(map(get_source, tail)) == hist2 - tail = ha.get_tail(n=3) - assert list(map(get_source, tail)) == hist2 - assert hm1_last_sid() == sid1 - assert hm2_last_sid() == sid2 - assert ha_last_sid() == sid2 - finally: - if hm1: - hm1.save_thread.stop() - hm1.db.close() - if hm2: - hm2.save_thread.stop() - hm2.db.close() - if ha: - ha.db.close() - - -def test_calling_run_cell(hmmax2): - ip = get_ipython() - with TemporaryDirectory() as tmpdir: - tmp_path = Path(tmpdir) - hist_manager_ori = ip.history_manager - hist_file = tmp_path / "history_test_history1.sqlite" - try: - ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file) - import time - - session_number = ip.history_manager.session_number - ip.run_cell(raw_cell="get_ipython().run_cell(raw_cell='1', store_history=True)", store_history=True) - while ip.history_manager.db_input_cache: - time.sleep(0) - new_session_number = ip.history_manager.session_number - finally: - # Ensure saving thread is shut down before we try to clean up the files - ip.history_manager.end_session() - # Forcibly close database rather than relying on garbage collection - ip.history_manager.save_thread.stop() - ip.history_manager.db.close() - - ip.history_manager = hist_manager_ori - assert session_number == new_session_number, ValueError(f"{session_number} != {new_session_number}") + Path(tfile.name).unlink() + except OSError: + pass \ No newline at end of file From d4dbc2cee69ca8d7d8df265e1d31af4e07a4e983 Mon Sep 17 00:00:00 2001 From: Somya Sahu <12somyasahu@gmail.com> Date: Tue, 10 Mar 2026 04:51:56 +0530 Subject: [PATCH 6/6] restore tests --- tests/test_history.py | 222 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 210 insertions(+), 12 deletions(-) diff --git a/tests/test_history.py b/tests/test_history.py index b8d70d4f25..d6b1d9d23d 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -68,11 +68,13 @@ def test_history(hmmax2): ip.history_manager.store_inputs(i, h) ip.history_manager.db_log_output = True + # Doesn't match the input, but we'll just check it's stored. ip.history_manager.output_hist_reprs[3] = "spam" ip.history_manager.store_output(3) assert ip.history_manager.input_hist_raw == [""] + hist + # Detailed tests for _get_range_session grs = ip.history_manager._get_range_session assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1])) assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:])) @@ -80,25 +82,30 @@ def test_history(hmmax2): zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"])) ) + # Check whether specifying a range beyond the end of the current + # session results in an error (gh-804) ip.run_line_magic("hist", "2-500") + # Check that we can write non-ascii characters to a file ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1")) ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2")) ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3")) ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4")) + # New session ip.history_manager.reset() newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"] for i, cmd in enumerate(newcmds, start=1): ip.history_manager.store_inputs(i, cmd) gothist = ip.history_manager.get_range(start=1, stop=4) assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds)) - + # Previous session: gothist = ip.history_manager.get_range(-1, 1, 4) assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist)) newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)] + # Check get_hist_tail gothist = ip.history_manager.get_tail(5, output=True, include_latest=True) expected = [(1, 3, (hist[-1], "spam"))] + [ (s, n, (c, None)) for (s, n, c) in newhist @@ -109,6 +116,8 @@ def test_history(hmmax2): expected = newhist[-3:-1] assert list(gothist) == expected + # Check get_hist_search + gothist = ip.history_manager.search("*test*") assert list(gothist) == [(1, 2, hist[1])] @@ -145,21 +154,46 @@ def test_history(hmmax2): gothist = ip.history_manager.search("b*", output=True) assert list(gothist) == [(1, 3, (hist[2], "spam"))] + # Cross testing: check that magic %save can get previous session. + testfilename = (tmp_path / "test.py").resolve() + ip.run_line_magic("save", str(testfilename) + " ~1/1-3") + with io.open(testfilename, encoding="utf-8") as testfile: + assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n" + + # Duplicate line numbers - check that it doesn't crash, and + # gets a new session + ip.history_manager.store_inputs(1, "rogue") + ip.history_manager.writeout_cache() + assert ip.history_manager.session_number == 3 + + # Check that session and line values are not just max values + sessid, lineno, entry = newhist[-1] + assert lineno > 1 + ip.history_manager.reset() + lineno = 1 + ip.history_manager.store_inputs(lineno, entry) + gothist = ip.history_manager.search("*=*", unique=True) + hist = list(gothist)[-1] + assert sessid < hist[0] + assert hist[1:] == (lineno, entry) finally: + # Ensure saving thread is shut down before we try to clean up the files ip.history_manager.end_session() + # Forcibly close database rather than relying on garbage collection ip.history_manager.save_thread.stop() ip.history_manager.db.close() + # swap back ip.history_manager = hist_manager_ori def test_extract_hist_ranges(): instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/" expected = [ - (0, 1, 2), + (0, 1, 2), # 0 == current session (2, 3, 4), (-4, 5, 7), (-4, 7, 10), - (-9, 2, None), + (-9, 2, None), # None == to end (-8, 1, None), (-7, 1, 6), (-10, 1, None), @@ -170,12 +204,44 @@ def test_extract_hist_ranges(): def test_extract_hist_ranges_empty_str(): instr = "" - expected = [(0, 1, None)] + expected = [(0, 1, None)] # 0 == current session, None == to end actual = list(extract_hist_ranges(instr)) assert actual == expected +@pytest.mark.parametrize( + "instr,expected,description", + [ + ("21-23", [(0, 21, 24)], "Just line numbers"), + ("25-", [(0, 25, None)], "Line start, no end"), + ("~4/", [(-4, 1, None)], "with trailing slash"), + # ("~4", [(-4, 1, None)], "without trailing slash"), + ("~4/1-5", [(-4, 1, 6)], "with line ranges and trailing slash"), + ( + "~4/1- ~5/ ~6/1-3", + [(-4, 1, None), (-5, 1, None), (-6, 1, 4)], + "multiple sessions with mixed syntax", + ), + ("~10/ ~20/", [(-10, 1, None), (-20, 1, None)], "larger session numbers"), + # ("~1", [(-1, 1, None)], "single digit session without slash"), + ("~1/", [(-1, 1, None)], "single digit session with slash"), + # ("~2", [(-2, 1, None)], "backward compatibility without slash"), + ("~2/", [(-2, 1, None)], "backward compatibility with slash"), + ("4-", [(0, 4, None)], "from line 4 onward in current session"), + ("~4/4-", [(-4, 4, None)], "from line 4 onward in session 4"), + ("2/4-", [(2, 4, None)], "from line 4 onward in session 2"), + ("~5/10-", [(-5, 10, None)], "from line 10 onward in session 5"), + ], +) +def test_misc_extract_hist_ranges(instr, expected, description): + actual = list(extract_hist_ranges(instr)) + assert ( + actual == expected + ), f"Failed for '{instr}' ({description}): expected {expected}, got {actual}" + + def test_magic_rerun(): + """Simple test for %rerun (no args -> rerun last line)""" ip = get_ipython() ip.run_cell("a = 10", store_history=True) ip.run_cell("a += 1", store_history=True) @@ -194,20 +260,152 @@ def test_hist_file_config(hmmax3): cfg = Config() tfile = tempfile.NamedTemporaryFile(delete=False) cfg.HistoryManager.hist_file = Path(tfile.name) - hm = None try: hm = HistoryManager(shell=get_ipython(), config=cfg) assert hm.hist_file == cfg.HistoryManager.hist_file finally: try: - if hm is not None: - hm.end_session() - hm.save_thread.stop() - hm.db.close() + hm.end_session() + hm.save_thread.stop() + hm.db.close() except Exception: pass + try: + Path(tfile.name).unlink() + except OSError: + # same catch as in testing.tools.TempFileMixin + # On Windows, even though we close the file, we still can't + # delete it. I have no clue why + pass + + HistoryManager._max_inst = 1 + + +def test_histmanager_disabled(hmmax2): + """Ensure that disabling the history manager doesn't create a database.""" + cfg = Config() + cfg.HistoryAccessor.enabled = False + + ip = get_ipython() + with TemporaryDirectory() as tmpdir: + hist_manager_ori = ip.history_manager + hist_file = Path(tmpdir) / "history.sqlite" + cfg.HistoryManager.hist_file = hist_file try: - Path(tfile.name).unlink() - except OSError: - pass \ No newline at end of file + ip.history_manager = HistoryManager(shell=ip, config=cfg) + hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"] + for i, h in enumerate(hist, start=1): + ip.history_manager.store_inputs(i, h) + assert ip.history_manager.input_hist_raw == [""] + hist + ip.history_manager.reset() + ip.history_manager.end_session() + finally: + ip.history_manager = hist_manager_ori + + # hist_file should not be created + assert hist_file.exists() is False + + +def test_get_tail_session_awareness(hmmax3): + """Test .get_tail() is: + - session specific in HistoryManager + - session agnostic in HistoryAccessor + same for .get_last_session_id() + """ + with TemporaryDirectory() as tmpdir: + ip = get_ipython() + tmp_path = Path(tmpdir) + hist_file = tmp_path / "history.sqlite" + get_source = lambda x: x[2] + hm1 = None + hm2 = None + ha = None + try: + # hm1 creates a new session and adds history entries, + # ha catches up + hm1 = HistoryManager(shell=ip, hist_file=hist_file) + hm1_last_sid = hm1.get_last_session_id + ha = HistoryAccessor(hist_file=hist_file) + ha_last_sid = ha.get_last_session_id + + hist1 = ["a=1", "b=1", "c=1"] + for i, h in enumerate(hist1 + [""], start=1): + hm1.store_inputs(i, h) + assert list(map(get_source, hm1.get_tail())) == hist1 + assert list(map(get_source, ha.get_tail())) == hist1 + sid1 = hm1_last_sid() + assert sid1 is not None + assert ha_last_sid() == sid1 + + # hm2 creates a new session and adds entries, + # ha catches up + hm2 = HistoryManager(shell=ip, hist_file=hist_file) + hm2_last_sid = hm2.get_last_session_id + + hist2 = ["a=2", "b=2", "c=2"] + for i, h in enumerate(hist2 + [""], start=1): + hm2.store_inputs(i, h) + tail = hm2.get_tail(n=3) + assert list(map(get_source, tail)) == hist2 + tail = ha.get_tail(n=3) + assert list(map(get_source, tail)) == hist2 + sid2 = hm2_last_sid() + assert sid2 is not None + assert ha_last_sid() == sid2 + assert sid2 != sid1 + + # but hm1 still maintains its point of reference + # and adding more entries to it doesn't change others + # immediate perspective + assert hm1_last_sid() == sid1 + tail = hm1.get_tail(n=3) + assert list(map(get_source, tail)) == hist1 + + hist3 = ["a=3", "b=3", "c=3"] + for i, h in enumerate(hist3 + [""], start=5): + hm1.store_inputs(i, h) + tail = hm1.get_tail(n=7) + assert list(map(get_source, tail)) == hist1 + [""] + hist3 + tail = hm2.get_tail(n=3) + assert list(map(get_source, tail)) == hist2 + tail = ha.get_tail(n=3) + assert list(map(get_source, tail)) == hist2 + assert hm1_last_sid() == sid1 + assert hm2_last_sid() == sid2 + assert ha_last_sid() == sid2 + finally: + if hm1: + hm1.save_thread.stop() + hm1.db.close() + if hm2: + hm2.save_thread.stop() + hm2.db.close() + if ha: + ha.db.close() + + +def test_calling_run_cell(hmmax2): + ip = get_ipython() + with TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + hist_manager_ori = ip.history_manager + hist_file = tmp_path / "history_test_history1.sqlite" + try: + ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file) + import time + + session_number = ip.history_manager.session_number + ip.run_cell(raw_cell="get_ipython().run_cell(raw_cell='1', store_history=True)", store_history=True) + while ip.history_manager.db_input_cache: + time.sleep(0) + new_session_number = ip.history_manager.session_number + finally: + # Ensure saving thread is shut down before we try to clean up the files + ip.history_manager.end_session() + # Forcibly close database rather than relying on garbage collection + ip.history_manager.save_thread.stop() + ip.history_manager.db.close() + + ip.history_manager = hist_manager_ori + assert session_number == new_session_number, ValueError(f"{session_number} != {new_session_number}") \ No newline at end of file