diff --git a/CHANGES b/CHANGES index 5e8bf5cdcd..707359f3ac 100644 --- a/CHANGES +++ b/CHANGES @@ -27,7 +27,7 @@ $ pipx install --suffix=@next 'tmuxp' --pip-args '\--pre' --force // Usage: tmuxp@next load yoursession ``` -## tmuxp 1.67.0 (Yet to be released) +## tmuxp 1.68.0 (Yet to be released) @@ -35,6 +35,24 @@ $ pipx install --suffix=@next 'tmuxp' --pip-args '\--pre' --force _Notes on the upcoming release will go here._ +## tmuxp 1.67.0 (2026-03-08) + +### What's new + +#### Animated progress spinner for `tmuxp load` (#1020) + +The `load` command now shows an animated spinner with real-time build progress +as windows and panes are created. Five built-in presets control the display +format (`default`, `minimal`, `window`, `pane`, `verbose`), and custom format +strings are supported via `--progress-format` or `TMUXP_PROGRESS_FORMAT`. + +- `--progress-lines N` / `TMUXP_PROGRESS_LINES`: Control how many lines of + `before_script` output appear in the spinner panel (default: 3). Use `0` to + hide the panel, `-1` for unlimited (capped to terminal height). +- `--no-progress` / `TMUXP_PROGRESS=0`: Disable the spinner entirely. +- During `before_script` execution, the progress bar shows a marching animation + and ⏸ icon. + ## tmuxp 1.66.0 (2026-03-08) ### Bug fixes diff --git a/docs/api/cli/index.md b/docs/api/cli/index.md index 9289503905..1381fbc90f 100644 --- a/docs/api/cli/index.md +++ b/docs/api/cli/index.md @@ -16,6 +16,7 @@ freeze import_config load ls +progress search shell utils diff --git a/docs/api/cli/progress.md b/docs/api/cli/progress.md new file mode 100644 index 0000000000..3b092349cf --- /dev/null +++ b/docs/api/cli/progress.md @@ -0,0 +1,8 @@ +# tmuxp progress - `tmuxp.cli._progress` + +```{eval-rst} +.. automodule:: tmuxp.cli._progress + :members: + :show-inheritance: + :undoc-members: +``` diff --git a/docs/cli/load.md b/docs/cli/load.md index a362888e15..8be9178f29 100644 --- a/docs/cli/load.md +++ b/docs/cli/load.md @@ -152,3 +152,104 @@ $ tmuxp load [filename] --log-file [log_filename] ```console $ tmuxp --log-level [LEVEL] load [filename] --log-file [log_filename] ``` + +## Progress display + +When loading a workspace, tmuxp shows an animated spinner with build progress. The spinner updates as windows and panes are created, giving real-time feedback during session builds. + +### Presets + +Five built-in presets control the spinner format: + +| Preset | Format | +|--------|--------| +| `default` | `Loading workspace: {session} {bar} {progress} {window}` | +| `minimal` | `Loading workspace: {session} [{window_progress}]` | +| `window` | `Loading workspace: {session} {window_bar} {window_progress_rel}` | +| `pane` | `Loading workspace: {session} {pane_bar} {session_pane_progress}` | +| `verbose` | `Loading workspace: {session} [window {window_index} of {window_total} · pane {pane_index} of {pane_total}] {window}` | + +Select a preset with `--progress-format`: + +```console +$ tmuxp load --progress-format minimal myproject +``` + +Or via environment variable: + +```console +$ TMUXP_PROGRESS_FORMAT=verbose tmuxp load myproject +``` + +### Custom format tokens + +Use a custom format string with any of the available tokens: + +| Token | Description | +|-------|-------------| +| `{session}` | Session name | +| `{window}` | Current window name | +| `{window_index}` | Current window number (1-based) | +| `{window_total}` | Total number of windows | +| `{window_progress}` | Window fraction (e.g. `1/3`) | +| `{window_progress_rel}` | Completed windows fraction (e.g. `1/3`) | +| `{windows_done}` | Number of completed windows | +| `{windows_remaining}` | Number of remaining windows | +| `{pane_index}` | Current pane number in the window | +| `{pane_total}` | Total panes in the current window | +| `{pane_progress}` | Pane fraction (e.g. `2/4`) | +| `{progress}` | Combined progress (e.g. `1/3 win · 2/4 pane`) | +| `{session_pane_progress}` | Panes completed across the session (e.g. `5/10`) | +| `{overall_percent}` | Pane-based completion percentage (0–100) | +| `{bar}` | Composite progress bar | +| `{pane_bar}` | Pane-based progress bar | +| `{window_bar}` | Window-based progress bar | +| `{status_icon}` | Status icon (⏸ during before_script) | + +Example: + +```console +$ tmuxp load --progress-format "{session} {bar} {overall_percent}%" myproject +``` + +### Panel lines + +The spinner shows script output in a panel below the spinner line. Control the panel height with `--progress-lines`: + +Hide the panel entirely (script output goes to stdout): + +```console +$ tmuxp load --progress-lines 0 myproject +``` + +Show unlimited lines (capped to terminal height): + +```console +$ tmuxp load --progress-lines -1 myproject +``` + +Set a custom height (default is 3): + +```console +$ tmuxp load --progress-lines 5 myproject +``` + +### Disabling progress + +Disable the animated spinner entirely: + +```console +$ tmuxp load --no-progress myproject +``` + +Or via environment variable: + +```console +$ TMUXP_PROGRESS=0 tmuxp load myproject +``` + +When progress is disabled, logging flows normally to the terminal and no spinner is rendered. + +### Before-script behavior + +During `before_script` execution, the progress bar shows a marching animation and a ⏸ status icon, indicating that tmuxp is waiting for the script to finish before continuing with pane creation. diff --git a/docs/configuration/environmental-variables.md b/docs/configuration/environmental-variables.md index d485da4574..73c0d1e373 100644 --- a/docs/configuration/environmental-variables.md +++ b/docs/configuration/environmental-variables.md @@ -24,3 +24,54 @@ building sessions. For this case you can override it here. ```console $ env LIBTMUX_TMUX_FORMAT_SEPARATOR='__SEP__' tmuxp load [session] ``` + +(TMUXP_PROGRESS)= + +## `TMUXP_PROGRESS` + +Master on/off switch for the animated progress spinner during `tmuxp load`. +Defaults to `1` (enabled). Set to `0` to disable: + +```console +$ TMUXP_PROGRESS=0 tmuxp load myproject +``` + +Equivalent to the `--no-progress` CLI flag. + +(TMUXP_PROGRESS_FORMAT)= + +## `TMUXP_PROGRESS_FORMAT` + +Set the spinner line format. Accepts a preset name (`default`, `minimal`, `window`, `pane`, `verbose`) or a custom format string with tokens like `{session}`, `{bar}`, `{progress}`: + +```console +$ TMUXP_PROGRESS_FORMAT=minimal tmuxp load myproject +``` + +Custom format example: + +```console +$ TMUXP_PROGRESS_FORMAT="{session} {bar} {overall_percent}%" tmuxp load myproject +``` + +Equivalent to the `--progress-format` CLI flag. + +(TMUXP_PROGRESS_LINES)= + +## `TMUXP_PROGRESS_LINES` + +Number of script-output lines shown in the spinner panel. Defaults to `3`. + +Set to `0` to hide the panel entirely (script output goes to stdout): + +```console +$ TMUXP_PROGRESS_LINES=0 tmuxp load myproject +``` + +Set to `-1` for unlimited lines (capped to terminal height): + +```console +$ TMUXP_PROGRESS_LINES=-1 tmuxp load myproject +``` + +Equivalent to the `--progress-lines` CLI flag. diff --git a/pyproject.toml b/pyproject.toml index ceb1aae0c9..e394750acc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "tmuxp" -version = "1.66.0" +version = "1.67.0" description = "Session manager for tmux, which allows users to save and load tmux sessions through simple configuration files." requires-python = ">=3.10,<4.0" authors = [ diff --git a/src/tmuxp/__about__.py b/src/tmuxp/__about__.py index f1cfd86085..48ad3629a1 100644 --- a/src/tmuxp/__about__.py +++ b/src/tmuxp/__about__.py @@ -8,7 +8,7 @@ __title__ = "tmuxp" __package_name__ = "tmuxp" -__version__ = "1.66.0" +__version__ = "1.67.0" __description__ = "tmux session manager" __email__ = "tony@git-pull.com" __author__ = "Tony Narlock" diff --git a/src/tmuxp/_internal/colors.py b/src/tmuxp/_internal/colors.py index 73db6062fa..a9b350016f 100644 --- a/src/tmuxp/_internal/colors.py +++ b/src/tmuxp/_internal/colors.py @@ -610,6 +610,7 @@ def get_color_mode(color_arg: str | None = None) -> ColorMode: # ANSI styling utilities (originally from click, via utils.py) _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") +ANSI_SEQ_RE = _ansi_re def strip_ansi(value: str) -> str: diff --git a/src/tmuxp/cli/_colors.py b/src/tmuxp/cli/_colors.py index 2513d118a8..31dab82076 100644 --- a/src/tmuxp/cli/_colors.py +++ b/src/tmuxp/cli/_colors.py @@ -12,6 +12,7 @@ import logging from tmuxp._internal.colors import ( + ANSI_SEQ_RE, ColorMode, Colors, UnknownStyleColor, @@ -25,6 +26,7 @@ logger = logging.getLogger(__name__) __all__ = [ + "ANSI_SEQ_RE", "ColorMode", "Colors", "UnknownStyleColor", diff --git a/src/tmuxp/cli/_progress.py b/src/tmuxp/cli/_progress.py new file mode 100644 index 0000000000..01d31d7272 --- /dev/null +++ b/src/tmuxp/cli/_progress.py @@ -0,0 +1,1131 @@ +"""Progress indicators for tmuxp CLI. + +This module provides a threaded spinner for long-running operations, +using only standard library and ANSI escape sequences. +""" + +from __future__ import annotations + +import atexit +import collections +import dataclasses +import itertools +import logging +import shutil +import sys +import threading +import time +import typing as t + +from ._colors import ANSI_SEQ_RE, ColorMode, Colors, strip_ansi + +logger = logging.getLogger(__name__) + + +if t.TYPE_CHECKING: + import types + + +# ANSI Escape Sequences +HIDE_CURSOR = "\033[?25l" +SHOW_CURSOR = "\033[?25h" +ERASE_LINE = "\033[2K" +CURSOR_TO_COL0 = "\r" +CURSOR_UP_1 = "\x1b[1A" +SYNC_START = "\x1b[?2026h" # synchronized output: buffer until SYNC_END +SYNC_END = "\x1b[?2026l" # flush — prevents multi-line flicker + +# Spinner frames (braille pattern) +SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + +BAR_WIDTH = 10 # inner fill character count +DEFAULT_OUTPUT_LINES = 3 # default spinner panel height (lines of script output) + + +def _visible_len(s: str) -> int: + r"""Return visible length of *s*, ignoring ANSI escapes. + + Examples + -------- + >>> _visible_len("hello") + 5 + >>> _visible_len("\033[32mgreen\033[0m") + 5 + >>> _visible_len("") + 0 + """ + return len(strip_ansi(s)) + + +def _truncate_visible(text: str, max_visible: int, suffix: str = "...") -> str: + r"""Truncate *text* to *max_visible* visible characters, preserving ANSI sequences. + + If the visible length of *text* is already within *max_visible*, it is + returned unchanged. Otherwise the text is cut so that exactly + *max_visible* visible characters remain, a ``\x1b[0m`` reset is appended + (to prevent color bleed), followed by *suffix*. + + Parameters + ---------- + text : str + Input string, possibly containing ANSI escape sequences. + max_visible : int + Maximum number of visible (non-ANSI) characters to keep. + suffix : str + Appended after the reset when truncation occurs. Default ``"..."``. + + Returns + ------- + str + Truncated string with ANSI sequences intact. + + Examples + -------- + Plain text truncation: + + >>> _truncate_visible("hello world", 5) + 'hello\x1b[0m...' + + ANSI sequences are preserved whole: + + >>> _truncate_visible("\033[32mgreen\033[0m", 3) + '\x1b[32mgre\x1b[0m...' + + No truncation needed: + + >>> _truncate_visible("short", 10) + 'short' + + Empty string: + + >>> _truncate_visible("", 5) + '' + """ + if max_visible <= 0: + return "" + if _visible_len(text) <= max_visible: + return text + + result: list[str] = [] + visible = 0 + i = 0 + while i < len(text) and visible < max_visible: + m = ANSI_SEQ_RE.match(text, i) + if m: + result.append(m.group()) + i = m.end() + else: + result.append(text[i]) + visible += 1 + i += 1 + return "".join(result) + "\x1b[0m" + suffix + + +SUCCESS_TEMPLATE = "Loaded workspace: {session} ({workspace_path}) {summary}" + +PROGRESS_PRESETS: dict[str, str] = { + "default": "Loading workspace: {session} {bar} {progress} {window}", + "minimal": "Loading workspace: {session} [{window_progress}]", + "window": "Loading workspace: {session} {window_bar} {window_progress_rel}", + "pane": "Loading workspace: {session} {pane_bar} {session_pane_progress}", + "verbose": ( + "Loading workspace: {session} [window {window_index} of {window_total}" + " · pane {pane_index} of {pane_total}] {window}" + ), +} + + +def render_bar(done: int, total: int, width: int = BAR_WIDTH) -> str: + """Render a plain-text ASCII progress bar without color. + + Parameters + ---------- + done : int + Completed units. + total : int + Total units. When ``<= 0``, returns ``""``. + width : int + Inner fill character count; default :data:`BAR_WIDTH`. + + Returns + ------- + str + A bar like ``"█████░░░░░"``. + Returns ``""`` when *total* <= 0 or *width* <= 0. + + Examples + -------- + >>> render_bar(0, 10) + '░░░░░░░░░░' + >>> render_bar(5, 10) + '█████░░░░░' + >>> render_bar(10, 10) + '██████████' + >>> render_bar(0, 0) + '' + >>> render_bar(3, 10, width=5) + '█░░░░' + """ + if total <= 0 or width <= 0: + return "" + filled = min(width, int(done / total * width)) + return "█" * filled + "░" * (width - filled) + + +class _SafeFormatMap(dict): # type: ignore[type-arg] + """dict subclass that returns ``{key}`` for missing keys in format_map.""" + + def __missing__(self, key: str) -> str: + return "{" + key + "}" + + +def resolve_progress_format(fmt: str) -> str: + """Return the format string for *fmt*, resolving preset names. + + If *fmt* is a key in :data:`PROGRESS_PRESETS` the corresponding + format string is returned; otherwise *fmt* is returned as-is. + + Examples + -------- + >>> resolve_progress_format("minimal") == PROGRESS_PRESETS["minimal"] + True + >>> resolve_progress_format("{session} w{window_progress}") + '{session} w{window_progress}' + >>> resolve_progress_format("unknown-preset") + 'unknown-preset' + """ + return PROGRESS_PRESETS.get(fmt, fmt) + + +@dataclasses.dataclass +class _WindowStatus: + """State for a single window in the build tree.""" + + name: str + done: bool = False + pane_num: int | None = None + pane_total: int | None = None + pane_done: int = 0 # panes completed in this window (set on window_done) + + +class BuildTree: + """Tracks session/window/pane build state; renders a structural progress tree. + + **Template Token Lifecycle** + + Each token is first available at the event listed in its column. + ``—`` means the value does not change at that phase. + + .. list-table:: + :header-rows: 1 + + * - Token + - Pre-``session_created`` + - After ``session_created`` + - After ``window_started`` + - After ``pane_creating`` + - After ``window_done`` + * - ``{session}`` + - ``""`` + - session name + - — + - — + - — + * - ``{window}`` + - ``""`` + - ``""`` + - window name + - — + - last window name + * - ``{window_index}`` + - ``0`` + - ``0`` + - N (1-based started count) + - — + - — + * - ``{window_total}`` + - ``0`` + - total + - — + - — + - — + * - ``{window_progress}`` + - ``""`` + - ``""`` + - ``"N/M"`` when > 0 + - — + - — + * - ``{windows_done}`` + - ``0`` + - ``0`` + - ``0`` + - ``0`` + - increments + * - ``{windows_remaining}`` + - ``0`` + - total + - total + - total + - decrements + * - ``{window_progress_rel}`` + - ``""`` + - ``"0/M"`` + - ``"0/M"`` + - — + - ``"N/M"`` + * - ``{pane_index}`` + - ``0`` + - ``0`` + - ``0`` + - pane_num + - ``0`` + * - ``{pane_total}`` + - ``0`` + - ``0`` + - window's pane total + - — + - window's pane total + * - ``{pane_progress}`` + - ``""`` + - ``""`` + - ``""`` + - ``"N/M"`` + - ``""`` + * - ``{pane_done}`` + - ``0`` + - ``0`` + - ``0`` + - pane_num + - pane_total + * - ``{pane_remaining}`` + - ``0`` + - ``0`` + - pane_total + - decrements + - ``0`` + * - ``{pane_progress_rel}`` + - ``""`` + - ``""`` + - ``"0/M"`` + - ``"N/M"`` + - ``"M/M"`` + * - ``{progress}`` + - ``""`` + - ``""`` + - ``"N/M win"`` + - ``"N/M win · P/Q pane"`` + - — + * - ``{session_pane_total}`` + - ``0`` + - total + - — + - — + - — + * - ``{session_panes_done}`` + - ``0`` + - ``0`` + - ``0`` + - ``0`` + - accumulated + * - ``{session_panes_remaining}`` + - ``0`` + - total + - total + - total + - decrements + * - ``{session_pane_progress}`` + - ``""`` + - ``"0/T"`` + - — + - — + - ``"N/T"`` + * - ``{overall_percent}`` + - ``0`` + - ``0`` + - ``0`` + - ``0`` + - updates + * - ``{summary}`` + - ``""`` + - ``""`` + - ``""`` + - ``""`` + - ``"[N win, M panes]"`` + * - ``{bar}`` (spinner) + - ``[░░…]`` + - ``[░░…]`` + - starts filling + - fractional + - jumps + * - ``{pane_bar}`` (spinner) + - ``""`` + - ``[░░…]`` + - — + - — + - updates + * - ``{window_bar}`` (spinner) + - ``""`` + - ``[░░…]`` + - — + - — + - updates + * - ``{status_icon}`` (spinner) + - ``""`` + - ``""`` + - ``""`` + - ``""`` + - ``""`` + + During ``before_script``: ``{bar}``, ``{pane_bar}``, ``{window_bar}`` show a + marching animation; ``{status_icon}`` = ``⏸``. + + Examples + -------- + Empty tree renders nothing: + + >>> from tmuxp.cli._colors import ColorMode, Colors + >>> colors = Colors(ColorMode.NEVER) + >>> tree = BuildTree() + >>> tree.render(colors, 80) + [] + + After session_created event the header appears: + + >>> tree.on_event({"event": "session_created", "name": "my-session"}) + >>> tree.render(colors, 80) + ['Session'] + + After window_started and pane_creating: + + >>> tree.on_event({"event": "window_started", "name": "editor", "pane_total": 2}) + >>> tree.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 2}) + >>> lines = tree.render(colors, 80) + >>> lines[1] + '- editor, pane (1 of 2)' + + After window_done the window gets a checkmark: + + >>> tree.on_event({"event": "window_done"}) + >>> lines = tree.render(colors, 80) + >>> lines[1] + '- ✓ editor' + + **Inline status format:** + + >>> tree2 = BuildTree() + >>> tree2.format_inline("Building projects...") + 'Building projects...' + >>> tree2.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + >>> tree2.format_inline("Building projects...") + 'Building projects... cihai' + >>> tree2.on_event({"event": "window_started", "name": "gp-libs", "pane_total": 2}) + >>> tree2.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 2}) + >>> tree2.format_inline("Building projects...") + 'Building projects... cihai [1 of 3 windows, 1 of 2 panes] gp-libs' + """ + + def __init__(self, workspace_path: str = "") -> None: + self.workspace_path: str = workspace_path + self.session_name: str | None = None + self.windows: list[_WindowStatus] = [] + self.window_total: int | None = None + self.session_pane_total: int | None = None + self.session_panes_done: int = 0 + self.windows_done: int = 0 + self._before_script_event: threading.Event = threading.Event() + + def on_event(self, event: dict[str, t.Any]) -> None: + """Update tree state from a build event dict. + + Examples + -------- + >>> tree = BuildTree() + >>> tree.on_event({ + ... "event": "session_created", "name": "dev", "window_total": 2, + ... }) + >>> tree.session_name + 'dev' + >>> tree.window_total + 2 + >>> tree.on_event({ + ... "event": "window_started", "name": "editor", "pane_total": 3, + ... }) + >>> len(tree.windows) + 1 + >>> tree.windows[0].name + 'editor' + """ + kind = event["event"] + if kind == "session_created": + self.session_name = event["name"] + self.window_total = event.get("window_total") + self.session_pane_total = event.get("session_pane_total") + elif kind == "before_script_started": + self._before_script_event.set() + elif kind == "before_script_done": + self._before_script_event.clear() + elif kind == "window_started": + self.windows.append( + _WindowStatus(name=event["name"], pane_total=event["pane_total"]) + ) + elif kind == "pane_creating": + if self.windows: + w = self.windows[-1] + w.pane_num = event["pane_num"] + w.pane_total = event["pane_total"] + elif kind == "window_done": + if self.windows: + w = self.windows[-1] + w.done = True + w.pane_num = None + w.pane_done = w.pane_total or 0 + self.session_panes_done += w.pane_done + self.windows_done += 1 + elif kind == "workspace_built": + for w in self.windows: + w.done = True + + def render(self, colors: Colors, width: int) -> list[str]: + """Render the current tree state to a list of display strings. + + Parameters + ---------- + colors : Colors + Colors instance for ANSI styling. + width : int + Terminal width; window lines are truncated to ``width - 1``. + + Returns + ------- + list[str] + Lines to display; empty list if no session has been created yet. + """ + if self.session_name is None: + return [] + lines: list[str] = [colors.heading("Session")] + for w in self.windows: + if w.done: + line = f"- {colors.success('✓')} {colors.highlight(w.name)}" + elif w.pane_num is not None and w.pane_total is not None: + line = ( + f"- {colors.highlight(w.name)}" + f"{colors.muted(f', pane ({w.pane_num} of {w.pane_total})')}" + ) + else: + line = f"- {colors.highlight(w.name)}" + lines.append(_truncate_visible(line, width - 1, suffix="")) + return lines + + def _context(self) -> dict[str, t.Any]: + """Return the current build-state token dict for template rendering. + + Examples + -------- + Zero-state before any events: + + >>> tree = BuildTree(workspace_path="~/.tmuxp/myapp.yaml") + >>> ev = { + ... "event": "session_created", + ... "name": "myapp", + ... "window_total": 5, + ... "session_pane_total": 10, + ... } + >>> tree.on_event(ev) + >>> ctx = tree._context() + >>> ctx["workspace_path"] + '~/.tmuxp/myapp.yaml' + >>> ctx["session"] + 'myapp' + >>> ctx["window_total"] + 5 + >>> ctx["window_index"] + 0 + >>> ctx["progress"] + '' + >>> ctx["windows_done"] + 0 + >>> ctx["windows_remaining"] + 5 + >>> ctx["window_progress_rel"] + '0/5' + >>> ctx["session_pane_total"] + 10 + >>> ctx["session_panes_remaining"] + 10 + >>> ctx["session_pane_progress"] + '0/10' + >>> ctx["summary"] + '' + + After windows complete, summary shows counts: + + >>> tree.on_event({"event": "window_started", "name": "w1", "pane_total": 3}) + >>> tree.on_event({"event": "window_done"}) + >>> tree.on_event({"event": "window_started", "name": "w2", "pane_total": 5}) + >>> tree.on_event({"event": "window_done"}) + >>> tree._context()["summary"] + '[2 win, 8 panes]' + """ + w = self.windows[-1] if self.windows else None + window_idx = len(self.windows) + win_tot = self.window_total or 0 + pane_idx = (w.pane_num or 0) if w else 0 + pane_tot = (w.pane_total or 0) if w else 0 + + win_progress = f"{window_idx}/{win_tot}" if win_tot and window_idx > 0 else "" + pane_progress = f"{pane_idx}/{pane_tot}" if pane_tot and pane_idx > 0 else "" + progress_parts = [ + f"{win_progress} win" if win_progress else "", + f"{pane_progress} pane" if pane_progress else "", + ] + progress = " · ".join(p for p in progress_parts if p) + + win_done = self.windows_done + win_progress_rel = f"{win_done}/{win_tot}" if win_tot else "" + + pane_done_cur = ( + (w.pane_num or 0) if w and not w.done else (w.pane_done if w else 0) + ) + pane_remaining = max(0, pane_tot - pane_done_cur) + pane_progress_rel = f"{pane_done_cur}/{pane_tot}" if pane_tot else "" + + spt = self.session_pane_total or 0 + session_pane_progress = f"{self.session_panes_done}/{spt}" if spt else "" + overall_percent = int(self.session_panes_done / spt * 100) if spt else 0 + + summary_parts: list[str] = [] + if self.windows_done: + summary_parts.append(f"{self.windows_done} win") + if self.session_panes_done: + summary_parts.append(f"{self.session_panes_done} panes") + summary = f"[{', '.join(summary_parts)}]" if summary_parts else "" + + return { + "workspace_path": self.workspace_path, + "session": self.session_name or "", + "window": w.name if w else "", + "window_index": window_idx, + "window_total": win_tot, + "window_progress": win_progress, + "pane_index": pane_idx, + "pane_total": pane_tot, + "pane_progress": pane_progress, + "progress": progress, + "windows_done": win_done, + "windows_remaining": max(0, win_tot - win_done), + "window_progress_rel": win_progress_rel, + "pane_done": pane_done_cur, + "pane_remaining": pane_remaining, + "pane_progress_rel": pane_progress_rel, + "session_pane_total": spt, + "session_panes_done": self.session_panes_done, + "session_panes_remaining": max(0, spt - self.session_panes_done), + "session_pane_progress": session_pane_progress, + "overall_percent": overall_percent, + "summary": summary, + } + + def format_template( + self, + fmt: str, + extra: dict[str, t.Any] | None = None, + ) -> str: + """Render *fmt* with the current build state. + + Returns ``""`` before ``session_created`` fires so callers can + fall back to a pre-build message. Unknown ``{tokens}`` are left + as-is (not dropped silently). + + The optional *extra* dict is merged on top of :meth:`_context` so + callers (e.g. :class:`Spinner`) can inject ANSI-colored tokens like + ``{bar}`` without adding color concerns to :class:`BuildTree`. + + Examples + -------- + >>> tree = BuildTree() + >>> tree.format_template("{session} [{progress}] {window}") + '' + >>> ev = {"event": "session_created", "name": "cihai", "window_total": 3} + >>> tree.on_event(ev) + >>> tree.format_template("{session} [{progress}] {window}") + 'cihai [] ' + >>> ev = {"event": "window_started", "name": "editor", "pane_total": 4} + >>> tree.on_event(ev) + >>> tree.format_template("{session} [{progress}] {window}") + 'cihai [1/3 win] editor' + >>> tree.on_event({"event": "pane_creating", "pane_num": 2, "pane_total": 4}) + >>> tree.format_template("{session} [{progress}] {window}") + 'cihai [1/3 win · 2/4 pane] editor' + >>> tree.format_template("minimal: {session} [{window_progress}]") + 'minimal: cihai [1/3]' + >>> tree.format_template("{session} {unknown_token}") + 'cihai {unknown_token}' + >>> tree.format_template("{session}", extra={"custom": "value"}) + 'cihai' + """ + if self.session_name is None: + return "" + ctx: dict[str, t.Any] = self._context() + if extra: + ctx = {**ctx, **extra} + return fmt.format_map(_SafeFormatMap(ctx)) + + def format_inline(self, base: str) -> str: + """Return base message with current build state appended inline. + + Parameters + ---------- + base : str + The original spinner message to start from. + + Returns + ------- + str + ``base`` alone if no session has been created yet; otherwise + ``"base session_name [W of N windows, P of M panes] window_name"``, + omitting the bracket section when there is no current window, and + omitting individual parts when their totals are not known. + """ + if self.session_name is None: + return base + parts = [base, self.session_name] + if self.windows: + w = self.windows[-1] + window_idx = len(self.windows) + bracket_parts: list[str] = [] + if self.window_total is not None: + bracket_parts.append(f"{window_idx} of {self.window_total} windows") + if w.pane_num is not None and w.pane_total is not None: + bracket_parts.append(f"{w.pane_num} of {w.pane_total} panes") + if bracket_parts: + parts.append(f"[{', '.join(bracket_parts)}]") + parts.append(w.name) + return " ".join(parts) + + +class Spinner: + """A threaded spinner for CLI progress. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> with Spinner("Build...", color_mode=ColorMode.NEVER, stream=stream) as spinner: + ... spinner.add_output_line("Session created: test") + ... spinner.update_message("Creating window: editor") + """ + + def __init__( + self, + message: str = "Loading...", + color_mode: ColorMode = ColorMode.AUTO, + stream: t.TextIO = sys.stderr, + interval: float = 0.1, + output_lines: int = DEFAULT_OUTPUT_LINES, + progress_format: str | None = None, + workspace_path: str = "", + ) -> None: + """Initialize spinner. + + Parameters + ---------- + message : str + Text displayed next to the spinner animation. + color_mode : ColorMode + ANSI color mode for styled output. + stream : t.TextIO + Output stream (default ``sys.stderr``). + interval : float + Seconds between animation frames. + output_lines : int + Max lines in the scrolling output panel. ``0`` hides the panel, + ``-1`` means unlimited. + progress_format : str | None + Format string for progress output. Tokens are documented in + :class:`BuildTree`. ``None`` uses the built-in default. + workspace_path : str + Absolute path to the workspace config file, shown in success + output. + """ + self.message = message + self._base_message = message + self.colors = Colors(color_mode) + self.stream = stream + self.interval = interval + + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._enabled = self._should_enable() + self._panel_hidden = output_lines == 0 + if output_lines < 0: + self._output_lines: collections.deque[str] = ( + collections.deque() + ) # unlimited + elif output_lines == 0: + self._output_lines = collections.deque(maxlen=1) # drop, never render + else: + self._output_lines = collections.deque(maxlen=output_lines) + self._prev_height: int = 0 + self._build_tree: BuildTree = BuildTree(workspace_path=workspace_path) + self._progress_format: str | None = ( + resolve_progress_format(progress_format) + if progress_format is not None + else None + ) + + def _should_enable(self) -> bool: + """Check if spinner should be enabled (TTY check).""" + return self.stream.isatty() + + def _restore_cursor(self) -> None: + """Unconditionally restore cursor — called by atexit on abnormal exit.""" + self.stream.write(SHOW_CURSOR) + self.stream.flush() + + def _spin(self) -> None: + """Spin in background thread.""" + frames = itertools.cycle(SPINNER_FRAMES) + march_pos = 0 # marching bar position counter (local to _spin) + + self.stream.write(HIDE_CURSOR) + self.stream.flush() + + try: + while not self._stop_event.is_set(): + frame = next(frames) + term_width = shutil.get_terminal_size(fallback=(80, 24)).columns + if self._panel_hidden: + panel: list[str] = [] + else: + term_height = shutil.get_terminal_size( + fallback=(80, 24), + ).lines + raw_panel = list(self._output_lines) + max_panel = term_height - 2 + if len(raw_panel) > max_panel: + raw_panel = raw_panel[-max_panel:] + panel = [ + _truncate_visible(line, term_width - 1, suffix="") + for line in raw_panel + ] + new_height = len(panel) + 1 # panel lines + spinner line + + parts: list[str] = [] + + # Erase previous render (cursor is at end of previous spinner line) + if self._prev_height > 0: + parts.append(f"{CURSOR_TO_COL0}{ERASE_LINE}") + parts.extend( + f"{CURSOR_UP_1}{ERASE_LINE}" + for _ in range(self._prev_height - 1) + ) + + # Write panel lines (tree lines already constrained by render()) + parts.extend(f"{output_line}\n" for output_line in panel) + + # Determine final spinner message + if ( + self._progress_format is not None + and self._build_tree._before_script_event.is_set() + ): + # Marching bar: sweep a 2-cell highlight across the bar + p = march_pos % max(1, BAR_WIDTH - 1) + march_bar = ( + self.colors.muted("░" * p) + + self.colors.warning("░░") + + self.colors.muted("░" * max(0, BAR_WIDTH - p - 2)) + ) + tree = self._build_tree + extra: dict[str, t.Any] = { + "session": self.colors.highlight( + tree.session_name or "", + ), + "bar": march_bar, + "pane_bar": march_bar, + "window_bar": march_bar, + "status_icon": self.colors.warning("⏸"), + } + rendered = self._build_tree.format_template( + self._progress_format, extra=extra + ) + msg = rendered if rendered else self._base_message + march_pos += 1 + else: + msg = self.message + march_pos = 0 # reset when not in before_script + + # Write spinner line (no trailing newline — cursor stays here) + spinner_text = f"{self.colors.info(frame)} {msg}" + if _visible_len(spinner_text) > term_width - 1: + spinner_text = _truncate_visible(spinner_text, term_width - 4) + parts.append(f"{CURSOR_TO_COL0}{spinner_text}") + + # Wrap entire frame in synchronized output to prevent flicker. + # Terminals that don't support it safely ignore the sequences. + self.stream.write(SYNC_START + "".join(parts) + SYNC_END) + self.stream.flush() + self._prev_height = new_height + time.sleep(self.interval) + finally: + # Erase the whole block and show cursor + if self._prev_height > 0: + self.stream.write(f"{CURSOR_TO_COL0}{ERASE_LINE}") + for _ in range(self._prev_height - 1): + self.stream.write(f"{CURSOR_UP_1}{ERASE_LINE}") + self.stream.write(SHOW_CURSOR) + self.stream.flush() + self._prev_height = 0 + + def add_output_line(self, line: str) -> None: + r"""Append a line to the live output panel (thread-safe via GIL). + + When the spinner is disabled (non-TTY), writes directly to the stream + so output is not silently swallowed. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("test", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.add_output_line("hello world") + >>> stream.getvalue() + 'hello world\n' + """ + stripped = line.rstrip("\n\r") + if stripped: + if self._enabled: + self._output_lines.append(stripped) + else: + self.stream.write(stripped + "\n") + self.stream.flush() + + def update_message(self, message: str) -> None: + """Update the message displayed next to the spinner. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("initial", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.message + 'initial' + >>> spinner.update_message("updated") + >>> spinner.message + 'updated' + """ + self.message = message + + def _build_extra(self) -> dict[str, t.Any]: + """Return spinner-owned template tokens (colored bar, status_icon). + + These are separated from :meth:`BuildTree._context` to keep ANSI/color + concerns out of :class:`BuildTree`, which is also used in tests without + colors. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("x", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner._build_tree.on_event( + ... { + ... "event": "session_created", + ... "name": "s", + ... "window_total": 4, + ... "session_pane_total": 8, + ... } + ... ) + >>> extra = spinner._build_extra() + >>> extra["bar"] + '░░░░░░░░░░' + >>> extra["status_icon"] + '' + """ + tree = self._build_tree + win_tot = tree.window_total or 0 + spt = tree.session_pane_total or 0 + + # Composite fraction: (windows_done + pane_frac) / window_total + if win_tot > 0: + cw = tree.windows[-1] if tree.windows else None + pane_frac = 0.0 + if cw and not cw.done and cw.pane_total: + pane_frac = (cw.pane_num or 0) / cw.pane_total + composite_done = tree.windows_done + pane_frac + composite_bar = render_bar(int(composite_done * 100), win_tot * 100) + else: + composite_bar = render_bar(0, 0) + + pane_bar = render_bar(tree.session_panes_done, spt) + window_bar = render_bar(tree.windows_done, win_tot) + + def _color_bar(plain: str) -> str: + if not plain: + return plain + filled = plain.count("█") + empty = plain.count("░") + return self.colors.success("█" * filled) + self.colors.muted("░" * empty) + + return { + "session": self.colors.highlight(tree.session_name or ""), + "bar": _color_bar(composite_bar), + "pane_bar": _color_bar(pane_bar), + "window_bar": _color_bar(window_bar), + "status_icon": "", + } + + def on_build_event(self, event: dict[str, t.Any]) -> None: + """Forward build event to BuildTree and update spinner message inline. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("Loading", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.on_build_event({ + ... "event": "session_created", "name": "myapp", + ... "window_total": 2, "session_pane_total": 3, + ... }) + >>> spinner._build_tree.session_name + 'myapp' + """ + self._build_tree.on_event(event) + if self._progress_format is not None: + extra = self._build_extra() + rendered = self._build_tree.format_template( + self._progress_format, extra=extra + ) + # Only switch to template output once a window has started so that + # the session_created → window_started gap doesn't show empty brackets. + self.message = ( + rendered + if (rendered and self._build_tree.windows) + else self._base_message + ) + else: + self.message = self._build_tree.format_inline(self._base_message) + + def start(self) -> None: + """Start the spinner thread. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("test", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.start() + >>> spinner.stop() + """ + if not self._enabled: + return + + atexit.register(self._restore_cursor) + self._stop_event.clear() + self._thread = threading.Thread(target=self._spin, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Stop the spinner thread. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("test", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.start() + >>> spinner.stop() + >>> spinner._thread is None + True + """ + if self._thread and self._thread.is_alive(): + self._stop_event.set() + self._thread.join() + self._thread = None + atexit.unregister(self._restore_cursor) + + def format_success(self) -> str: + """Render the success template with current build state. + + Uses :data:`SUCCESS_TEMPLATE` with colored ``{session}`` + (``highlight()``), ``{workspace_path}`` (``info()``), and + ``{summary}`` (``muted()``) from :meth:`BuildTree._context`. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("x", color_mode=ColorMode.NEVER, stream=stream, + ... workspace_path="~/.tmuxp/myapp.yaml") + >>> spinner._build_tree.on_event({ + ... "event": "session_created", "name": "myapp", + ... "window_total": 2, "session_pane_total": 4, + ... }) + >>> spinner._build_tree.on_event( + ... {"event": "window_started", "name": "w1", "pane_total": 2}) + >>> spinner._build_tree.on_event({"event": "window_done"}) + >>> spinner._build_tree.on_event( + ... {"event": "window_started", "name": "w2", "pane_total": 2}) + >>> spinner._build_tree.on_event({"event": "window_done"}) + >>> spinner.format_success() + 'Loaded workspace: myapp (~/.tmuxp/myapp.yaml) [2 win, 4 panes]' + """ + tree = self._build_tree + ctx = tree._context() + extra: dict[str, t.Any] = { + "session": self.colors.highlight(tree.session_name or ""), + "workspace_path": self.colors.info(ctx.get("workspace_path", "")), + "summary": self.colors.muted(ctx.get("summary", "")) + if ctx.get("summary") + else "", + } + return SUCCESS_TEMPLATE.format_map(_SafeFormatMap({**ctx, **extra})) + + def success(self, text: str | None = None) -> None: + """Stop the spinner and print a success line. + + Parameters + ---------- + text : str | None + The success message to display after the checkmark. + When ``None``, uses :meth:`format_success` if a progress format + is configured, otherwise falls back to ``_base_message``. + + Examples + -------- + >>> import io + >>> stream = io.StringIO() + >>> spinner = Spinner("x", color_mode=ColorMode.NEVER, stream=stream) + >>> spinner.success("done") + >>> "✓ done" in stream.getvalue() + True + + With no args and no progress format, falls back to base message: + + >>> stream2 = io.StringIO() + >>> spinner2 = Spinner("Loading...", color_mode=ColorMode.NEVER, stream=stream2) + >>> spinner2.success() + >>> "✓ Loading..." in stream2.getvalue() + True + """ + self.stop() + if text is None and self._progress_format is not None: + text = self.format_success() + elif text is None: + text = self._base_message + checkmark = self.colors.success("\u2713") + msg = f"{checkmark} {text}" + self.stream.write(f"{msg}\n") + self.stream.flush() + + def __enter__(self) -> Spinner: + self.start() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: types.TracebackType | None, + ) -> t.Literal[False]: + self.stop() + return False diff --git a/src/tmuxp/cli/load.py b/src/tmuxp/cli/load.py index 4673c4cde6..375cdb1b22 100644 --- a/src/tmuxp/cli/load.py +++ b/src/tmuxp/cli/load.py @@ -3,6 +3,7 @@ from __future__ import annotations import argparse +import contextlib import importlib import logging import os @@ -21,10 +22,41 @@ from tmuxp.workspace.finders import find_workspace_file, get_workspace_dir from ._colors import ColorMode, Colors, build_description, get_color_mode +from ._progress import ( + DEFAULT_OUTPUT_LINES, + SUCCESS_TEMPLATE, + Spinner, + _SafeFormatMap, + resolve_progress_format, +) from .utils import prompt_choices, prompt_yes_no, tmuxp_echo logger = logging.getLogger(__name__) + +@contextlib.contextmanager +def _silence_stream_handlers(logger_name: str = "tmuxp") -> t.Iterator[None]: + """Temporarily raise StreamHandler level to WARNING while spinner is active. + + INFO/DEBUG log records are diagnostics for aggregators, not user-facing output; + the spinner is the user-facing progress channel. Restores original levels on exit. + """ + _log = logging.getLogger(logger_name) + saved: list[tuple[logging.StreamHandler[t.Any], int]] = [ + (h, h.level) + for h in _log.handlers + if isinstance(h, logging.StreamHandler) + and not isinstance(h, logging.FileHandler) + ] + for h, _ in saved: + h.setLevel(logging.WARNING) + try: + yield + finally: + for h, level in saved: + h.setLevel(level) + + LOAD_DESCRIPTION = build_description( """ Load tmuxp workspace file(s) and create or attach to a tmux session. @@ -77,6 +109,9 @@ class CLILoadNamespace(argparse.Namespace): color: CLIColorModeLiteral log_file: str | None log_level: str + progress_format: str | None + panel_lines: int | None + no_progress: bool def load_plugins( @@ -196,7 +231,11 @@ def _reattach(builder: WorkspaceBuilder, colors: Colors | None = None) -> None: builder.session.attach() -def _load_attached(builder: WorkspaceBuilder, detached: bool) -> None: +def _load_attached( + builder: WorkspaceBuilder, + detached: bool, + pre_attach_hook: t.Callable[[], None] | None = None, +) -> None: """ Load workspace in new session. @@ -204,10 +243,16 @@ def _load_attached(builder: WorkspaceBuilder, detached: bool) -> None: ---------- builder: :class:`workspace.builder.WorkspaceBuilder` detached : bool + pre_attach_hook : callable, optional + called after build, before attach/switch_client; use to stop the spinner + so its cleanup sequences don't appear inside the tmux pane. """ builder.build() assert builder.session is not None + if pre_attach_hook is not None: + pre_attach_hook() + if "TMUX" in os.environ: # tmuxp ran from inside tmux # unset TMUX, save it, e.g. '/tmp/tmux-1000/default,30668,0' tmux_env = os.environ.pop("TMUX") @@ -219,7 +264,11 @@ def _load_attached(builder: WorkspaceBuilder, detached: bool) -> None: builder.session.attach() -def _load_detached(builder: WorkspaceBuilder, colors: Colors | None = None) -> None: +def _load_detached( + builder: WorkspaceBuilder, + colors: Colors | None = None, + pre_output_hook: t.Callable[[], None] | None = None, +) -> None: """ Load workspace in new session but don't attach. @@ -228,11 +277,16 @@ def _load_detached(builder: WorkspaceBuilder, colors: Colors | None = None) -> N builder: :class:`workspace.builder.WorkspaceBuilder` colors : Colors | None Optional Colors instance for styled output. + pre_output_hook : Callable | None + Called after build but before printing, e.g. to stop a spinner. """ builder.build() assert builder.session is not None + if pre_output_hook is not None: + pre_output_hook() + msg = "Session created in detached state." tmuxp_echo(colors.info(msg) if colors else msg) logger.info("session created in detached state") @@ -265,6 +319,123 @@ def _setup_plugins(builder: WorkspaceBuilder) -> Session: return builder.session +def _dispatch_build( + builder: WorkspaceBuilder, + detached: bool, + append: bool, + answer_yes: bool, + cli_colors: Colors, + pre_attach_hook: t.Callable[[], None] | None = None, + on_error_hook: t.Callable[[], None] | None = None, + pre_prompt_hook: t.Callable[[], None] | None = None, +) -> Session | None: + """Dispatch the build to the correct load path and handle errors. + + Handles the detached/attached/append switching logic and the + ``TmuxpException`` error-recovery prompt. Extracted so the + spinner-enabled and spinner-disabled paths share one implementation. + + Parameters + ---------- + builder : WorkspaceBuilder + Configured workspace builder. + detached : bool + Load session in detached state. + append : bool + Append windows to the current session. + answer_yes : bool + Skip interactive prompts. + cli_colors : Colors + Colors instance for styled output. + pre_attach_hook : callable, optional + Called before attach/switch_client (e.g. stop spinner). + on_error_hook : callable, optional + Called before showing the error-recovery prompt (e.g. stop spinner). + pre_prompt_hook : callable, optional + Called before any interactive prompt (e.g. stop spinner so ANSI + escape sequences don't garble the terminal during user input). + + Returns + ------- + Session | None + The built session, or ``None`` if the user killed it on error. + + Examples + -------- + >>> from tmuxp.cli.load import _dispatch_build + >>> callable(_dispatch_build) + True + """ + try: + if detached: + _load_detached(builder, cli_colors, pre_output_hook=pre_attach_hook) + return _setup_plugins(builder) + + if append: + if "TMUX" in os.environ: # tmuxp ran from inside tmux + _load_append_windows_to_current_session(builder) + else: + _load_attached(builder, detached, pre_attach_hook=pre_attach_hook) + + return _setup_plugins(builder) + + # append and answer_yes have no meaning if specified together + if answer_yes: + _load_attached(builder, detached, pre_attach_hook=pre_attach_hook) + return _setup_plugins(builder) + + if "TMUX" in os.environ: # tmuxp ran from inside tmux + if pre_prompt_hook is not None: + pre_prompt_hook() + msg = ( + "Already inside TMUX, switch to session? yes/no\n" + "Or (a)ppend windows in the current active session?\n[y/n/a]" + ) + options = ["y", "n", "a"] + choice = prompt_choices(msg, choices=options, color_mode=cli_colors.mode) + + if choice == "y": + _load_attached(builder, detached, pre_attach_hook=pre_attach_hook) + elif choice == "a": + _load_append_windows_to_current_session(builder) + else: + _load_detached(builder, cli_colors) + else: + _load_attached(builder, detached, pre_attach_hook=pre_attach_hook) + + except exc.TmuxpException as e: + if on_error_hook is not None: + on_error_hook() + logger.debug("workspace build failed", exc_info=True) + tmuxp_echo(cli_colors.error("[Error]") + f" {e}") + + choice = prompt_choices( + cli_colors.error("Error loading workspace.") + + " (k)ill, (a)ttach, (d)etach?", + choices=["k", "a", "d"], + default="k", + color_mode=cli_colors.mode, + ) + + if choice == "k": + if builder.session is not None: + builder.session.kill() + tmuxp_echo(cli_colors.muted("Session killed.")) + logger.info("session killed by user after build error") + elif choice == "a": + _reattach(builder, cli_colors) + else: + sys.exit() + return None + finally: + builder.on_progress = None + builder.on_before_script = None + builder.on_script_output = None + builder.on_build_event = None + + return _setup_plugins(builder) + + def load_workspace( workspace_file: StrPath, socket_name: str | None = None, @@ -276,6 +447,9 @@ def load_workspace( answer_yes: bool = False, append: bool = False, cli_colors: Colors | None = None, + progress_format: str | None = None, + panel_lines: int | None = None, + no_progress: bool = False, ) -> Session | None: """Entrypoint for ``tmuxp load``, load a tmuxp "workspace" session via config file. @@ -301,6 +475,15 @@ def load_workspace( Default False. cli_colors : Colors, optional Colors instance for CLI output formatting. If None, uses AUTO mode. + progress_format : str, optional + Spinner format preset name or custom format string with tokens. + panel_lines : int, optional + Number of script-output lines shown in the spinner panel. + Defaults to the :class:`~tmuxp.cli._progress.Spinner` default (3). + Override via ``TMUXP_PROGRESS_LINES`` environment variable. + no_progress : bool + Disable the progress spinner entirely. Default False. + Also disabled when ``TMUXP_PROGRESS=0``. Notes ----- @@ -361,11 +544,13 @@ def load_workspace( "loading workspace", extra={"tmux_config_path": str(workspace_file)}, ) - tmuxp_echo( - cli_colors.info("[Loading]") - + " " - + cli_colors.highlight(str(PrivatePath(workspace_file))), - ) + _progress_disabled = no_progress or os.getenv("TMUXP_PROGRESS", "1") == "0" + if _progress_disabled: + tmuxp_echo( + cli_colors.info("[Loading]") + + " " + + cli_colors.highlight(str(PrivatePath(workspace_file))), + ) # ConfigReader allows us to open a yaml or json file as a dict raw_workspace = config_reader.ConfigReader._from_file(workspace_file) or {} @@ -425,64 +610,96 @@ def load_workspace( _reattach(builder, cli_colors) return None - try: - if detached: - _load_detached(builder, cli_colors) - return _setup_plugins(builder) - - if append: - if "TMUX" in os.environ: # tmuxp ran from inside tmux - _load_append_windows_to_current_session(builder) - else: - _load_attached(builder, detached) - - return _setup_plugins(builder) - - # append and answer_yes have no meaning if specified together - if answer_yes: - _load_attached(builder, detached) - return _setup_plugins(builder) - - if "TMUX" in os.environ: # tmuxp ran from inside tmux - msg = ( - "Already inside TMUX, switch to session? yes/no\n" - "Or (a)ppend windows in the current active session?\n[y/n/a]" + if _progress_disabled: + _private_path = str(PrivatePath(workspace_file)) + result = _dispatch_build( + builder, + detached, + append, + answer_yes, + cli_colors, + ) + if result is not None: + summary = "" + try: + win_count = len(result.windows) + pane_count = sum(len(w.panes) for w in result.windows) + summary_parts: list[str] = [] + if win_count: + summary_parts.append(f"{win_count} win") + if pane_count: + summary_parts.append(f"{pane_count} panes") + summary = f"[{', '.join(summary_parts)}]" if summary_parts else "" + except Exception: + logger.debug("session gone before summary", exc_info=True) + ctx = { + "session": cli_colors.highlight(session_name), + "workspace_path": cli_colors.info(_private_path), + "summary": cli_colors.muted(summary) if summary else "", + } + checkmark = cli_colors.success("\u2713") + tmuxp_echo( + f"{checkmark} {SUCCESS_TEMPLATE.format_map(_SafeFormatMap(ctx))}" ) - options = ["y", "n", "a"] - choice = prompt_choices(msg, choices=options, color_mode=cli_colors.mode) + return result - if choice == "y": - _load_attached(builder, detached) - elif choice == "a": - _load_append_windows_to_current_session(builder) - else: - _load_detached(builder, cli_colors) - else: - _load_attached(builder, detached) - - except exc.TmuxpException as e: - logger.debug("workspace build failed", exc_info=True) - tmuxp_echo(cli_colors.error("[Error]") + f" {e}") - - choice = prompt_choices( - cli_colors.error("Error loading workspace.") - + " (k)ill, (a)ttach, (d)etach?", - choices=["k", "a", "d"], - default="k", - color_mode=cli_colors.mode, + # Spinner wraps only the actual build phase + _progress_fmt = resolve_progress_format( + progress_format + if progress_format is not None + else os.getenv("TMUXP_PROGRESS_FORMAT", "default") + ) + _panel_lines_env = os.getenv("TMUXP_PROGRESS_LINES") + if _panel_lines_env: + try: + _panel_lines_env_int: int | None = int(_panel_lines_env) + except ValueError: + _panel_lines_env_int = None + else: + _panel_lines_env_int = None + _panel_lines = panel_lines if panel_lines is not None else _panel_lines_env_int + _private_path = str(PrivatePath(workspace_file)) + _spinner = Spinner( + message=( + f"Loading workspace: {cli_colors.highlight(session_name)} ({_private_path})" + ), + color_mode=cli_colors.mode, + progress_format=_progress_fmt, + output_lines=_panel_lines if _panel_lines is not None else DEFAULT_OUTPUT_LINES, + workspace_path=_private_path, + ) + _success_emitted = False + + def _emit_success() -> None: + nonlocal _success_emitted + if _success_emitted: + return + _success_emitted = True + _spinner.success() + + with ( + _silence_stream_handlers(), + _spinner as spinner, + ): + builder.on_build_event = spinner.on_build_event + _resolved_panel = ( + _panel_lines if _panel_lines is not None else DEFAULT_OUTPUT_LINES ) - - if choice == "k": - if builder.session is not None: - builder.session.kill() - tmuxp_echo(cli_colors.muted("Session killed.")) - logger.info("session killed by user after build error") - elif choice == "a": - _reattach(builder, cli_colors) - else: - sys.exit() - - return _setup_plugins(builder) + if _resolved_panel != 0: + builder.on_script_output = spinner.add_output_line + result = _dispatch_build( + builder, + detached, + append, + answer_yes, + cli_colors, + pre_attach_hook=_emit_success, + on_error_hook=spinner.stop, + pre_prompt_hook=spinner.stop, + ) + if result is not None: + _emit_success() + return result def create_load_subparser(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: @@ -567,6 +784,42 @@ def create_load_subparser(parser: argparse.ArgumentParser) -> argparse.ArgumentP help="file to log errors/output to", ) + parser.add_argument( + "--progress-format", + metavar="FORMAT", + dest="progress_format", + default=None, + help=( + "Spinner line format: preset name " + "(default, minimal, window, pane, verbose) " + "or a format string with tokens " + "{session}, {window}, {progress}, {window_progress}, {pane_progress}, etc. " + "Env: TMUXP_PROGRESS_FORMAT" + ), + ) + + parser.add_argument( + "--progress-lines", + metavar="N", + dest="panel_lines", + type=int, + default=None, + help=( + "Number of script-output lines shown in the spinner panel (default: 3). " + "0 hides the panel entirely (script output goes to stdout). " + "-1 shows unlimited lines (capped to terminal height). " + "Env: TMUXP_PROGRESS_LINES" + ), + ) + + parser.add_argument( + "--no-progress", + dest="no_progress", + action="store_true", + default=False, + help=("Disable the animated progress spinner. Env: TMUXP_PROGRESS=0"), + ) + try: import shtab @@ -648,4 +901,7 @@ def command_load( answer_yes=args.answer_yes or False, append=args.append or False, cli_colors=cli_colors, + progress_format=args.progress_format, + panel_lines=args.panel_lines, + no_progress=args.no_progress, ) diff --git a/src/tmuxp/util.py b/src/tmuxp/util.py index d63a6c4182..152b1f6c06 100644 --- a/src/tmuxp/util.py +++ b/src/tmuxp/util.py @@ -28,8 +28,12 @@ def run_before_script( script_file: str | pathlib.Path, cwd: pathlib.Path | None = None, + on_line: t.Callable[[str], None] | None = None, ) -> int: - """Execute shell script, ``tee``-ing output to both terminal (if TTY) and buffer.""" + """Execute shell script, streaming output to callback or terminal (if TTY). + + Output is buffered and optionally forwarded via the ``on_line`` callback. + """ script_cmd = shlex.split(str(script_file)) try: @@ -68,13 +72,17 @@ def run_before_script( if line_out and line_out.strip(): out_buffer.append(line_out) - if is_out_tty: + if on_line is not None: + on_line(line_out) + elif is_out_tty: sys.stdout.write(line_out) sys.stdout.flush() if line_err and line_err.strip(): err_buffer.append(line_err) - if is_err_tty: + if on_line is not None: + on_line(line_err) + elif is_err_tty: sys.stderr.write(line_err) sys.stderr.flush() diff --git a/src/tmuxp/workspace/builder.py b/src/tmuxp/workspace/builder.py index 57d494c01f..728b477963 100644 --- a/src/tmuxp/workspace/builder.py +++ b/src/tmuxp/workspace/builder.py @@ -169,6 +169,101 @@ class WorkspaceBuilder: >>> sorted([window.name for window in session.windows]) ['editor', 'logging', 'test'] + **Progress callback:** + + >>> calls: list[str] = [] + >>> progress_cfg = { + ... "session_name": "progress-demo", + ... "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + ... } + >>> builder = WorkspaceBuilder( + ... session_config=progress_cfg, + ... server=server, + ... on_progress=calls.append, + ... ) + >>> builder.build() + >>> "Workspace built" in calls + True + + **Before-script hook:** + + >>> hook_calls: list[bool] = [] + >>> no_script_cfg = { + ... "session_name": "hook-demo", + ... "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + ... } + >>> builder = WorkspaceBuilder( + ... session_config=no_script_cfg, + ... server=server, + ... on_before_script=lambda: hook_calls.append(True), + ... ) + >>> builder.build() + >>> hook_calls # no before_script in config, callback not fired + [] + + **Script output hook:** + + >>> script_lines: list[str] = [] + >>> no_script_cfg2 = { + ... "session_name": "script-output-demo", + ... "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + ... } + >>> builder = WorkspaceBuilder( + ... session_config=no_script_cfg2, + ... server=server, + ... on_script_output=script_lines.append, + ... ) + >>> builder.build() + >>> script_lines # no before_script in config, callback not fired + [] + + **Build events hook:** + + >>> events: list[dict] = [] + >>> event_cfg = { + ... "session_name": "events-demo", + ... "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + ... } + >>> builder = WorkspaceBuilder( + ... session_config=event_cfg, + ... server=server, + ... on_build_event=events.append, + ... ) + >>> builder.build() + >>> [e["event"] for e in events] + ['session_created', 'window_started', 'pane_creating', + 'window_done', 'workspace_built'] + >>> next(e for e in events if e["event"] == "session_created")["session_pane_total"] + 1 + + **Build events with before_script:** + + ``before_script_started`` fires before the script runs; + ``before_script_done`` fires in ``finally`` (success or failure). + + >>> script_events: list[dict] = [] + >>> script_event_cfg = { + ... "session_name": "script-events-demo", + ... "before_script": "echo hello", + ... "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + ... } + >>> builder = WorkspaceBuilder( + ... session_config=script_event_cfg, + ... server=server, + ... on_build_event=script_events.append, + ... ) + >>> builder.build() + >>> event_names = [e["event"] for e in script_events] + >>> "before_script_started" in event_names + True + >>> "before_script_done" in event_names + True + >>> bs_start = event_names.index("before_script_started") + >>> bs_done = event_names.index("before_script_done") + >>> win_start = event_names.index("window_started") + >>> bs_start < bs_done < win_start + True + The normal phase of loading is: 1. Load JSON / YAML file via :class:`pathlib.Path`:: @@ -211,12 +306,20 @@ class WorkspaceBuilder: server: Server _session: Session | None session_name: str + on_progress: t.Callable[[str], None] | None + on_before_script: t.Callable[[], None] | None + on_script_output: t.Callable[[str], None] | None + on_build_event: t.Callable[[dict[str, t.Any]], None] | None def __init__( self, session_config: dict[str, t.Any], server: Server, plugins: list[t.Any] | None = None, + on_progress: t.Callable[[str], None] | None = None, + on_before_script: t.Callable[[], None] | None = None, + on_script_output: t.Callable[[str], None] | None = None, + on_build_event: t.Callable[[dict[str, t.Any]], None] | None = None, ) -> None: """Initialize workspace loading. @@ -231,6 +334,23 @@ def __init__( server : :class:`libtmux.Server` tmux server to build session in + on_progress : callable, optional + callback for progress updates during building + + on_before_script : callable, optional + called just before ``before_script`` runs; use to clear the terminal + (e.g. stop a spinner) so script output is not interleaved + + on_script_output : callable, optional + called with each output line from ``before_script`` subprocess; when + set, raw TTY tee is suppressed so the caller can route lines to a + live panel instead + + on_build_event : callable, optional + called with a dict event at each structural build milestone (session + created, window started/done, pane creating, workspace built); used + by the CLI to render a live session tree + Notes ----- TODO: Initialize :class:`libtmux.Session` from here, in @@ -248,6 +368,10 @@ def __init__( self.session_config = session_config self.plugins = plugins + self.on_progress = on_progress + self.on_before_script = on_before_script + self.on_script_output = on_script_output + self.on_build_event = on_build_event if self.server is not None and self.session_exists( session_name=self.session_config["session_name"], @@ -332,7 +456,21 @@ def build(self, session: Session | None = None, append: bool = False) -> None: assert session is not None assert session.name is not None + if self.on_progress: + self.on_progress(f"Session created: {session.name}") + self._session = session + if self.on_build_event: + self.on_build_event( + { + "event": "session_created", + "name": session.name, + "window_total": len(self.session_config["windows"]), + "session_pane_total": sum( + len(w.get("panes", [])) for w in self.session_config["windows"] + ), + } + ) _log = TmuxpLoggerAdapter( logger, {"tmux_session": self.session_config["session_name"]}, @@ -354,6 +492,10 @@ def build(self, session: Session | None = None, append: bool = False) -> None: focus = None if "before_script" in self.session_config: + if self.on_before_script: + self.on_before_script() + if self.on_build_event: + self.on_build_event({"event": "before_script_started"}) try: cwd = None @@ -364,7 +506,11 @@ def build(self, session: Session | None = None, append: bool = False) -> None: _log.debug( "running before script", ) - run_before_script(self.session_config["before_script"], cwd=cwd) + run_before_script( + self.session_config["before_script"], + cwd=cwd, + on_line=self.on_script_output, + ) except Exception: _log.error( "before script failed", @@ -376,6 +522,9 @@ def build(self, session: Session | None = None, append: bool = False) -> None: ) self.session.kill() raise + finally: + if self.on_build_event: + self.on_build_event({"event": "before_script_done"}) if "options" in self.session_config: for option, value in self.session_config["options"].items(): @@ -414,10 +563,17 @@ def build(self, session: Session | None = None, append: bool = False) -> None: if focus_pane: focus_pane.select() + if self.on_build_event: + self.on_build_event({"event": "window_done"}) + if focus: focus.select() + if self.on_progress: + self.on_progress("Workspace built") _log.info("workspace built") + if self.on_build_event: + self.on_build_event({"event": "workspace_built"}) def iter_create_windows( self, @@ -450,6 +606,17 @@ def iter_create_windows( ): window_name = window_config.get("window_name", None) + if self.on_progress: + self.on_progress(f"Creating window: {window_name or window_iterator}") + if self.on_build_event: + self.on_build_event( + { + "event": "window_started", + "name": window_name or str(window_iterator), + "pane_total": len(window_config["panes"]), + } + ) + is_first_window_pass = self.first_window_pass( window_iterator, session, @@ -545,6 +712,17 @@ def iter_create_panes( window_config["panes"], start=pane_base_index, ): + if self.on_progress: + self.on_progress(f"Creating pane: {pane_index}") + if self.on_build_event: + self.on_build_event( + { + "event": "pane_creating", + "pane_num": pane_index - int(pane_base_index) + 1, + "pane_total": len(window_config["panes"]), + } + ) + if pane_index == int(pane_base_index): pane = window.active_pane else: diff --git a/tests/cli/test_load.py b/tests/cli/test_load.py index 60c2c446ed..ec045dcf3c 100644 --- a/tests/cli/test_load.py +++ b/tests/cli/test_load.py @@ -807,6 +807,86 @@ def test_load_no_ansi_in_nontty_stderr( assert "\x1b[" not in captured.err, "ANSI codes leaked into non-TTY stderr" +class ProgressDisableFixture(t.NamedTuple): + """Test fixture for progress disable logic.""" + + test_id: str + env_value: str | None + no_progress_flag: bool + expected_disabled: bool + + +PROGRESS_DISABLE_FIXTURES: list[ProgressDisableFixture] = [ + ProgressDisableFixture("default_enabled", None, False, False), + ProgressDisableFixture("env_disabled", "0", False, True), + ProgressDisableFixture("flag_disabled", None, True, True), + ProgressDisableFixture("env_enabled_explicit", "1", False, False), + ProgressDisableFixture("flag_overrides_env", "1", True, True), +] + + +@pytest.mark.parametrize( + list(ProgressDisableFixture._fields), + PROGRESS_DISABLE_FIXTURES, + ids=[f.test_id for f in PROGRESS_DISABLE_FIXTURES], +) +def test_progress_disable_logic( + test_id: str, + env_value: str | None, + no_progress_flag: bool, + expected_disabled: bool, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Progress disable expression matches expected behavior.""" + if env_value is not None: + monkeypatch.setenv("TMUXP_PROGRESS", env_value) + else: + monkeypatch.delenv("TMUXP_PROGRESS", raising=False) + + import os + + result = no_progress_flag or os.getenv("TMUXP_PROGRESS", "1") == "0" + assert result is expected_disabled + + +def test_load_workspace_no_progress( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """load_workspace with no_progress=True creates session without spinner.""" + monkeypatch.delenv("TMUX", raising=False) + session_file = FIXTURE_PATH / "workspace/builder" / "two_pane.yaml" + + session = load_workspace( + session_file, + socket_name=server.socket_name, + detached=True, + no_progress=True, + ) + + assert isinstance(session, Session) + assert session.name == "sample workspace" + + +def test_load_workspace_env_progress_disabled( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """load_workspace with TMUXP_PROGRESS=0 creates session without spinner.""" + monkeypatch.delenv("TMUX", raising=False) + monkeypatch.setenv("TMUXP_PROGRESS", "0") + session_file = FIXTURE_PATH / "workspace/builder" / "two_pane.yaml" + + session = load_workspace( + session_file, + socket_name=server.socket_name, + detached=True, + ) + + assert isinstance(session, Session) + assert session.name == "sample workspace" + + def test_load_masks_home_in_spinner_message(monkeypatch: pytest.MonkeyPatch) -> None: """Spinner message should mask home directory via PrivatePath.""" monkeypatch.setattr(pathlib.Path, "home", lambda: pathlib.Path("/home/testuser")) diff --git a/tests/cli/test_progress.py b/tests/cli/test_progress.py new file mode 100644 index 0000000000..8ace187b65 --- /dev/null +++ b/tests/cli/test_progress.py @@ -0,0 +1,1425 @@ +"""Tests for tmuxp CLI progress indicator.""" + +from __future__ import annotations + +import atexit +import io +import pathlib +import time +import typing as t + +import libtmux +import pytest + +from tmuxp.cli._colors import ColorMode +from tmuxp.cli._progress import ( + BAR_WIDTH, + ERASE_LINE, + HIDE_CURSOR, + PROGRESS_PRESETS, + SHOW_CURSOR, + SUCCESS_TEMPLATE, + BuildTree, + Spinner, + _truncate_visible, + _visible_len, + render_bar, + resolve_progress_format, +) + + +class SpinnerEnablementFixture(t.NamedTuple): + """Test fixture for spinner TTY/color enablement matrix.""" + + test_id: str + isatty: bool + color_mode: ColorMode + expected_enabled: bool + + +SPINNER_ENABLEMENT_FIXTURES: list[SpinnerEnablementFixture] = [ + SpinnerEnablementFixture("tty_color_always", True, ColorMode.ALWAYS, True), + SpinnerEnablementFixture("tty_color_auto", True, ColorMode.AUTO, True), + SpinnerEnablementFixture("tty_color_never", True, ColorMode.NEVER, True), + SpinnerEnablementFixture("non_tty_color_always", False, ColorMode.ALWAYS, False), + SpinnerEnablementFixture("non_tty_color_never", False, ColorMode.NEVER, False), +] + + +@pytest.mark.parametrize( + list(SpinnerEnablementFixture._fields), + SPINNER_ENABLEMENT_FIXTURES, + ids=[f.test_id for f in SPINNER_ENABLEMENT_FIXTURES], +) +def test_spinner_enablement( + test_id: str, + isatty: bool, + color_mode: ColorMode, + expected_enabled: bool, +) -> None: + """Spinner._enabled depends only on TTY, not on color mode.""" + stream = io.StringIO() + stream.isatty = lambda: isatty # type: ignore[method-assign] + + spinner = Spinner(message="Test", color_mode=color_mode, stream=stream) + assert spinner._enabled is expected_enabled + + +def test_spinner_disabled_output() -> None: + """Disabled spinner produces no output.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + with Spinner(message="Test", stream=stream) as spinner: + spinner.update_message("Updated") + + assert stream.getvalue() == "" + + +def test_spinner_enabled_output() -> None: + """Enabled spinner writes ANSI control sequences.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + with Spinner( + message="Test", color_mode=ColorMode.ALWAYS, stream=stream, interval=0.01 + ): + pass # enter and exit — enough for at least one frame + cleanup + + output = stream.getvalue() + assert HIDE_CURSOR in output + assert SHOW_CURSOR in output + assert ERASE_LINE in output + assert "Test" in output + + +def test_spinner_atexit_registered(monkeypatch: pytest.MonkeyPatch) -> None: + """atexit.register called on start, unregistered on stop.""" + registered: list[t.Any] = [] + unregistered: list[t.Any] = [] + monkeypatch.setattr(atexit, "register", lambda fn, *a: registered.append(fn)) + monkeypatch.setattr(atexit, "unregister", lambda fn: unregistered.append(fn)) + + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + with Spinner(message="Test", color_mode=ColorMode.ALWAYS, stream=stream) as spinner: + assert len(registered) == 1 + assert spinner._restore_cursor in registered + + assert len(unregistered) == 1 + assert spinner._restore_cursor in unregistered + + +def test_spinner_cleans_up_on_exception() -> None: + """SHOW_CURSOR written even when body raises.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + msg = "deliberate" + with ( + pytest.raises(ValueError), + Spinner(message="Test", color_mode=ColorMode.ALWAYS, stream=stream), + ): + raise ValueError(msg) + + assert SHOW_CURSOR in stream.getvalue() + + +def test_spinner_update_message_thread_safe() -> None: + """update_message() can be called from the main thread without error.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner(message="Start", color_mode=ColorMode.NEVER, stream=stream) + spinner.update_message("Updated") + assert spinner.message == "Updated" + + +def test_spinner_add_output_line_accumulates() -> None: + """add_output_line() appends stripped lines to the panel deque on TTY.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + spinner = Spinner(message="Test", color_mode=ColorMode.NEVER, stream=stream) + spinner.add_output_line("Session created: test\n") + spinner.add_output_line("Creating window: editor") + spinner.add_output_line("") # blank lines are ignored + + assert list(spinner._output_lines) == [ + "Session created: test", + "Creating window: editor", + ] + + +def test_spinner_panel_respects_maxlen() -> None: + """Panel deque enforces output_lines maxlen, dropping oldest lines.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + spinner = Spinner( + message="Test", color_mode=ColorMode.NEVER, stream=stream, output_lines=3 + ) + for i in range(5): + spinner.add_output_line(f"line {i}") + + panel = list(spinner._output_lines) + assert len(panel) == 3 + assert panel == ["line 2", "line 3", "line 4"] + + +def test_spinner_panel_rendered_in_output() -> None: + """Enabled spinner writes panel lines and spinner line to stream.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + with Spinner( + message="Building...", color_mode=ColorMode.ALWAYS, stream=stream, interval=0.01 + ) as spinner: + spinner.add_output_line("Session created: my-session") + # Wait long enough for the spinner thread to render at least one frame + # that includes the panel line (interval=0.01s, so 0.05s is sufficient). + time.sleep(0.05) + + output = stream.getvalue() + assert HIDE_CURSOR in output + assert SHOW_CURSOR in output + assert "Session created: my-session" in output + assert "Building..." in output + + +# BuildTree tests + + +def test_build_tree_empty_renders_nothing() -> None: + """BuildTree.render() returns [] before any session_created event.""" + colors = ColorMode.NEVER + tree = BuildTree() + from tmuxp.cli._colors import Colors + + assert tree.render(Colors(colors), 80) == [] + + +def test_build_tree_session_created_shows_header() -> None: + """After session_created, render() returns the 'Session' heading line.""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert lines == ["Session"] + + +def test_build_tree_window_started_no_pane_yet() -> None: + """window_started adds a window line with just the name (no pane info).""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 2}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert len(lines) == 2 + assert lines[1] == "- editor" + + +def test_build_tree_pane_creating_shows_progress() -> None: + """pane_creating updates the last window to show pane N of M.""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 3}) + tree.on_event({"event": "pane_creating", "pane_num": 2, "pane_total": 3}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert lines[1] == "- editor, pane (2 of 3)" + + +def test_build_tree_window_done_shows_checkmark() -> None: + """window_done marks the window as done; render shows checkmark.""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 1}) + tree.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 1}) + tree.on_event({"event": "window_done"}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert lines[1] == "- ✓ editor" + + +def test_build_tree_workspace_built_marks_all_done() -> None: + """workspace_built marks all windows as done.""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 1}) + tree.on_event({"event": "window_started", "name": "logs", "pane_total": 1}) + tree.on_event({"event": "workspace_built"}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert lines[1] == "- ✓ editor" + assert lines[2] == "- ✓ logs" + + +def test_build_tree_multiple_windows_accumulate() -> None: + """Multiple window_started events accumulate into separate tree lines.""" + from tmuxp.cli._colors import Colors + + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "my-session"}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 2}) + tree.on_event({"event": "window_done"}) + tree.on_event({"event": "window_started", "name": "logging", "pane_total": 1}) + tree.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 1}) + lines = tree.render(Colors(ColorMode.NEVER), 80) + assert lines[1] == "- ✓ editor" + assert lines[2] == "- logging, pane (1 of 1)" + + +def test_spinner_on_build_event_delegates_to_tree() -> None: + """Spinner.on_build_event() updates the internal BuildTree state.""" + import io + + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner(message="Building...", color_mode=ColorMode.NEVER, stream=stream) + spinner.on_build_event({"event": "session_created", "name": "test-session"}) + spinner.on_build_event( + {"event": "window_started", "name": "editor", "pane_total": 1} + ) + + assert spinner._build_tree.session_name == "test-session" + assert len(spinner._build_tree.windows) == 1 + assert spinner._build_tree.windows[0].name == "editor" + + +# BuildTree.format_inline tests + + +def test_build_tree_format_inline_empty() -> None: + """format_inline returns base unchanged when no session has been created.""" + tree = BuildTree() + assert tree.format_inline("Building projects...") == "Building projects..." + + +def test_build_tree_format_inline_session_only() -> None: + """format_inline returns 'base session' after session_created with no windows.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + assert tree.format_inline("Building projects...") == "Building projects... cihai" + + +def test_build_tree_format_inline_with_window_total() -> None: + """format_inline shows window index/total bracket after window_started.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + tree.on_event({"event": "window_started", "name": "gp-libs", "pane_total": 2}) + result = tree.format_inline("Building projects...") + assert result == "Building projects... cihai [1 of 3 windows] gp-libs" + + +def test_build_tree_format_inline_with_panes() -> None: + """format_inline includes pane progress once pane_creating fires.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + tree.on_event({"event": "window_started", "name": "gp-libs", "pane_total": 2}) + tree.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 2}) + result = tree.format_inline("Building projects...") + assert result == "Building projects... cihai [1 of 3 windows, 1 of 2 panes] gp-libs" + + +def test_build_tree_format_inline_no_window_total() -> None: + """format_inline omits window count bracket when window_total is absent.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai"}) + tree.on_event({"event": "window_started", "name": "main", "pane_total": 1}) + tree.on_event({"event": "pane_creating", "pane_num": 1, "pane_total": 1}) + result = tree.format_inline("Building...") + assert result == "Building... cihai [1 of 1 panes] main" + + +def test_spinner_on_build_event_updates_message() -> None: + """on_build_event updates spinner.message via format_inline after each event.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format=None, + ) + assert spinner.message == "Building..." + + spinner.on_build_event( + {"event": "session_created", "name": "cihai", "window_total": 2} + ) + assert spinner.message == "Building... cihai" + + spinner.on_build_event( + {"event": "window_started", "name": "editor", "pane_total": 3} + ) + assert spinner.message == "Building... cihai [1 of 2 windows] editor" + + spinner.on_build_event({"event": "pane_creating", "pane_num": 2, "pane_total": 3}) + assert spinner.message == "Building... cihai [1 of 2 windows, 2 of 3 panes] editor" + + +# resolve_progress_format tests + + +def test_resolve_progress_format_preset_name() -> None: + """A known preset name resolves to its format string.""" + assert resolve_progress_format("default") == PROGRESS_PRESETS["default"] + assert resolve_progress_format("minimal") == PROGRESS_PRESETS["minimal"] + assert resolve_progress_format("verbose") == PROGRESS_PRESETS["verbose"] + + +def test_resolve_progress_format_raw_string() -> None: + """A raw template string is returned unchanged.""" + raw = "{session} w{window_progress}" + assert resolve_progress_format(raw) == raw + + +def test_resolve_progress_format_unknown_name() -> None: + """An unknown name not in presets is returned as-is (raw template pass-through).""" + assert resolve_progress_format("not-a-preset") == "not-a-preset" + + +# BuildTree.format_template tests + + +def test_build_tree_format_template_before_session() -> None: + """format_template returns '' before session_created fires.""" + tree = BuildTree() + assert tree.format_template("{session} [{progress}] {window}") == "" + + +def test_build_tree_format_template_session_only() -> None: + """After session_created alone, progress and window are empty.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + assert tree.format_template("{session} [{progress}] {window}") == "cihai [] " + + +def test_build_tree_format_template_with_window() -> None: + """After window_started, window progress appears but pane progress does not.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 4}) + assert ( + tree.format_template("{session} [{progress}] {window}") + == "cihai [1/3 win] editor" + ) + + +def test_build_tree_format_template_with_pane() -> None: + """After pane_creating, both window and pane progress appear.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 4}) + tree.on_event({"event": "pane_creating", "pane_num": 2, "pane_total": 4}) + assert ( + tree.format_template("{session} [{progress}] {window}") + == "cihai [1/3 win · 2/4 pane] editor" + ) + + +def test_build_tree_format_template_minimal() -> None: + """The minimal preset-style template shows only window fraction.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 4}) + assert tree.format_template("{session} [{window_progress}]") == "cihai [1/3]" + + +def test_build_tree_format_template_verbose() -> None: + """Verbose template shows window/pane indices and totals explicitly.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 12}) + tree.on_event({"event": "window_started", "name": "editor", "pane_total": 4}) + tree.on_event({"event": "pane_creating", "pane_num": 2, "pane_total": 4}) + result = tree.format_template(PROGRESS_PRESETS["verbose"]) + assert result == "Loading workspace: cihai [window 1 of 12 · pane 2 of 4] editor" + + +def test_build_tree_format_template_bad_token() -> None: + """Unknown tokens are left as {name}, known tokens still resolve.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + result = tree.format_template("{session} {unknown}") + # _SafeFormatMap: {session} resolves, {unknown} stays as-is + assert result == "cihai {unknown}" + + +# Spinner.progress_format integration tests + + +def test_spinner_progress_format_updates_message() -> None: + """Spinner with explicit progress_format uses format_template for updates.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + # Use an explicit format string rather than "default" preset to avoid + # coupling this test to the preset definition (which now includes {bar}). + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="{session} [{progress}] {window}", + ) + assert spinner.message == "Building..." + + spinner.on_build_event( + {"event": "session_created", "name": "cihai", "window_total": 3} + ) + # No windows yet — falls back to base message to avoid showing empty brackets. + assert spinner.message == "Building..." + + spinner.on_build_event( + {"event": "window_started", "name": "editor", "pane_total": 4} + ) + assert spinner.message == "cihai [1/3 win] editor" + + spinner.on_build_event({"event": "pane_creating", "pane_num": 2, "pane_total": 4}) + assert spinner.message == "cihai [1/3 win · 2/4 pane] editor" + + +def test_spinner_progress_format_none_uses_inline() -> None: + """Spinner with progress_format=None preserves the format_inline path.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format=None, + ) + + spinner.on_build_event( + {"event": "session_created", "name": "cihai", "window_total": 2} + ) + assert spinner.message == "Building... cihai" + + spinner.on_build_event( + {"event": "window_started", "name": "editor", "pane_total": 3} + ) + assert spinner.message == "Building... cihai [1 of 2 windows] editor" + + +# render_bar tests + + +def test_render_bar_empty() -> None: + """render_bar with done=0 produces an all-empty bar.""" + assert render_bar(0, 10) == "░░░░░░░░░░" + + +def test_render_bar_half() -> None: + """render_bar with done=5, total=10 fills exactly half.""" + assert render_bar(5, 10) == "█████░░░░░" + + +def test_render_bar_full() -> None: + """render_bar with done=total fills the entire bar.""" + assert render_bar(10, 10) == "██████████" + + +def test_render_bar_zero_total() -> None: + """render_bar with total=0 returns empty string.""" + assert render_bar(0, 0) == "" + + +def test_render_bar_custom_width() -> None: + """render_bar with custom width produces bar of that inner width.""" + assert render_bar(3, 10, width=5) == "█░░░░" + + +def test_render_bar_width_constant() -> None: + """BAR_WIDTH is the default inner width used by render_bar.""" + bar = render_bar(0, 10) + assert len(bar) == BAR_WIDTH + + +# BuildTree new token tests + + +def test_build_tree_context_session_pane_total() -> None: + """session_pane_total token reflects count from session_created event.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 8, + } + ) + ctx = tree._context() + assert ctx["session_pane_total"] == 8 + assert ctx["session_pane_progress"] == "0/8" + assert ctx["overall_percent"] == 0 + + +def test_build_tree_context_window_progress_rel() -> None: + """window_progress_rel is 0/N from session_created, increments on window_done.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 3, + "session_pane_total": 6, + } + ) + assert tree._context()["window_progress_rel"] == "0/3" + + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 2}) + assert tree._context()["window_progress_rel"] == "0/3" + + tree.on_event({"event": "window_done"}) + assert tree._context()["window_progress_rel"] == "1/3" + + +def test_build_tree_context_pane_progress_rel() -> None: + """pane_progress_rel shows 0/M after window_started, N/M after pane_creating.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 1, + "session_pane_total": 4, + } + ) + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 4}) + assert tree._context()["pane_progress_rel"] == "0/4" + + tree.on_event({"event": "pane_creating", "pane_num": 2, "pane_total": 4}) + assert tree._context()["pane_progress_rel"] == "2/4" + assert tree._context()["pane_done"] == 2 + assert tree._context()["pane_remaining"] == 2 + + +def test_build_tree_context_overall_percent() -> None: + """overall_percent is pane-based 0-100; updates on window_done.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 8, + } + ) + assert tree._context()["overall_percent"] == 0 + + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 4}) + tree.on_event({"event": "window_done"}) + assert tree._context()["session_panes_done"] == 4 + assert tree._context()["overall_percent"] == 50 + + +def test_build_tree_before_script_event_toggle() -> None: + """before_script_started sets the Event; before_script_done clears it.""" + tree = BuildTree() + assert not tree._before_script_event.is_set() + + tree.on_event({"event": "before_script_started"}) + assert tree._before_script_event.is_set() + + tree.on_event({"event": "before_script_done"}) + assert not tree._before_script_event.is_set() + + +def test_build_tree_zero_pane_window() -> None: + """Windows with pane_total=0 do not cause division-by-zero or exceptions.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 1, + "session_pane_total": 0, + } + ) + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 0}) + tree.on_event({"event": "window_done"}) + + assert tree.session_panes_done == 0 + assert tree.windows_done == 1 + ctx = tree._context() + assert ctx["session_pane_progress"] == "" + assert ctx["overall_percent"] == 0 + + +def test_format_template_extra_backward_compat() -> None: + """format_template(fmt) without extra still works as before.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + result = tree.format_template("{session} [{progress}] {window}") + assert result == "cihai [] " + + +def test_format_template_extra_injected() -> None: + """format_template resolves extra tokens from the extra dict.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + result = tree.format_template("{session} {bar}", extra={"bar": "[TEST_BAR]"}) + assert result == "cihai [TEST_BAR]" + + +def test_format_template_unknown_token_preserved() -> None: + """Unknown tokens in the format string render as {name}, not blank or raw fmt.""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "cihai", "window_total": 3}) + result = tree.format_template("{session} {unknown_token}") + assert result == "cihai {unknown_token}" + + +# Spinner bar token tests + + +def test_spinner_bar_token_no_color() -> None: + """With ColorMode.NEVER, {bar} token in message contains bar characters.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="{session} {bar} {progress} {window}", + ) + spinner.on_build_event( + { + "event": "session_created", + "name": "cihai", + "window_total": 3, + "session_pane_total": 6, + } + ) + spinner.on_build_event( + {"event": "window_started", "name": "editor", "pane_total": 2} + ) + spinner.on_build_event({"event": "pane_creating", "pane_num": 1, "pane_total": 2}) + + assert "░" in spinner.message or "█" in spinner.message + + +def test_spinner_pane_bar_preset() -> None: + """The 'pane' preset wires {pane_bar} and {session_pane_progress}.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="pane", + ) + spinner.on_build_event( + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 4, + } + ) + spinner.on_build_event({"event": "window_started", "name": "w1", "pane_total": 2}) + spinner.on_build_event({"event": "window_done"}) + + assert "2/4" in spinner.message + assert "░" in spinner.message or "█" in spinner.message + + +def test_spinner_before_script_event_via_events() -> None: + """before_script_started / before_script_done toggle the BuildTree Event flag.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Building...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="default", + ) + spinner.on_build_event({"event": "before_script_started"}) + assert spinner._build_tree._before_script_event.is_set() + + spinner.on_build_event({"event": "before_script_done"}) + assert not spinner._build_tree._before_script_event.is_set() + + +def test_progress_presets_have_expected_keys() -> None: + """All expected preset names are present in PROGRESS_PRESETS.""" + for name in ("default", "minimal", "window", "pane", "verbose"): + assert name in PROGRESS_PRESETS, f"Missing preset: {name}" + + +def test_progress_presets_default_includes_bar() -> None: + """The 'default' preset includes the {bar} token.""" + assert "{bar}" in PROGRESS_PRESETS["default"] + + +def test_progress_presets_minimal_format() -> None: + """The 'minimal' preset includes the Loading prefix and window_progress token.""" + expected = "Loading workspace: {session} [{window_progress}]" + assert PROGRESS_PRESETS["minimal"] == expected + + +# BuildTree remaining token tests + + +class RemainingTokenFixture(t.NamedTuple): + """Test fixture for windows_remaining and session_panes_remaining tokens.""" + + test_id: str + events: list[dict[str, t.Any]] + token: str + expected: int + + +REMAINING_TOKEN_FIXTURES: list[RemainingTokenFixture] = [ + RemainingTokenFixture( + "windows_remaining_initial", + [ + { + "event": "session_created", + "name": "s", + "window_total": 3, + "session_pane_total": 6, + }, + ], + "windows_remaining", + 3, + ), + RemainingTokenFixture( + "windows_remaining_after_done", + [ + { + "event": "session_created", + "name": "s", + "window_total": 3, + "session_pane_total": 6, + }, + {"event": "window_started", "name": "w1", "pane_total": 2}, + {"event": "window_done"}, + ], + "windows_remaining", + 2, + ), + RemainingTokenFixture( + "session_panes_remaining_initial", + [ + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 5, + }, + ], + "session_panes_remaining", + 5, + ), + RemainingTokenFixture( + "session_panes_remaining_after_window", + [ + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 5, + }, + {"event": "window_started", "name": "w1", "pane_total": 3}, + {"event": "window_done"}, + ], + "session_panes_remaining", + 2, + ), +] + + +@pytest.mark.parametrize( + list(RemainingTokenFixture._fields), + REMAINING_TOKEN_FIXTURES, + ids=[f.test_id for f in REMAINING_TOKEN_FIXTURES], +) +def test_build_tree_remaining_tokens( + test_id: str, + events: list[dict[str, t.Any]], + token: str, + expected: int, +) -> None: + """Remaining tokens decrement correctly as windows/panes complete.""" + tree = BuildTree() + for ev in events: + tree.on_event(ev) + assert tree._context()[token] == expected + + +# _visible_len tests + + +class VisibleLenFixture(t.NamedTuple): + """Test fixture for _visible_len ANSI-aware length calculation.""" + + test_id: str + text: str + expected_len: int + + +VISIBLE_LEN_FIXTURES: list[VisibleLenFixture] = [ + VisibleLenFixture("plain_text", "hello", 5), + VisibleLenFixture("ansi_green", "\033[32mgreen\033[0m", 5), + VisibleLenFixture("empty_string", "", 0), + VisibleLenFixture("nested_ansi", "\033[1m\033[31mbold red\033[0m", 8), + VisibleLenFixture("ansi_only", "\033[0m", 0), +] + + +@pytest.mark.parametrize( + list(VisibleLenFixture._fields), + VISIBLE_LEN_FIXTURES, + ids=[f.test_id for f in VISIBLE_LEN_FIXTURES], +) +def test_visible_len( + test_id: str, + text: str, + expected_len: int, +) -> None: + """_visible_len returns the visible character count, ignoring ANSI escapes.""" + assert _visible_len(text) == expected_len + + +# Spinner.add_output_line non-TTY write-through tests + + +class OutputLineFixture(t.NamedTuple): + """Test fixture for add_output_line TTY vs non-TTY behavior.""" + + test_id: str + isatty: bool + lines: list[str] + expected_deque: list[str] + expected_stream_contains: str + + +OUTPUT_LINE_FIXTURES: list[OutputLineFixture] = [ + OutputLineFixture( + "tty_accumulates_in_deque", + isatty=True, + lines=["line1\n", "line2\n"], + expected_deque=["line1", "line2"], + expected_stream_contains="", + ), + OutputLineFixture( + "non_tty_writes_to_stream", + isatty=False, + lines=["hello\n", "world\n"], + expected_deque=[], + expected_stream_contains="hello\nworld\n", + ), + OutputLineFixture( + "blank_lines_ignored", + isatty=True, + lines=["", "\n"], + expected_deque=[], + expected_stream_contains="", + ), +] + + +@pytest.mark.parametrize( + list(OutputLineFixture._fields), + OUTPUT_LINE_FIXTURES, + ids=[f.test_id for f in OUTPUT_LINE_FIXTURES], +) +def test_spinner_output_line_behavior( + test_id: str, + isatty: bool, + lines: list[str], + expected_deque: list[str], + expected_stream_contains: str, +) -> None: + """add_output_line accumulates in deque (TTY) or writes to stream (non-TTY).""" + stream = io.StringIO() + stream.isatty = lambda: isatty # type: ignore[method-assign] + + spinner = Spinner(message="Test", color_mode=ColorMode.NEVER, stream=stream) + for line in lines: + spinner.add_output_line(line) + + assert list(spinner._output_lines) == expected_deque + assert expected_stream_contains in stream.getvalue() + + +# Spinner.success tests + + +# Panel lines special values tests + + +class PanelLinesFixture(t.NamedTuple): + """Test fixture for Spinner panel_lines special values.""" + + test_id: str + output_lines: int + expected_maxlen: int | None # None = unbounded + expected_hidden: bool + add_count: int + expected_retained: int + + +PANEL_LINES_FIXTURES: list[PanelLinesFixture] = [ + PanelLinesFixture("zero_hides_panel", 0, 1, True, 10, 1), + PanelLinesFixture("negative_unlimited", -1, None, False, 100, 100), + PanelLinesFixture("positive_normal", 5, 5, False, 10, 5), + PanelLinesFixture("default_three", 3, 3, False, 5, 3), +] + + +@pytest.mark.parametrize( + list(PanelLinesFixture._fields), + PANEL_LINES_FIXTURES, + ids=[f.test_id for f in PANEL_LINES_FIXTURES], +) +def test_spinner_panel_lines_special_values( + test_id: str, + output_lines: int, + expected_maxlen: int | None, + expected_hidden: bool, + add_count: int, + expected_retained: int, +) -> None: + """Spinner panel_lines=0 hides, -1 is unlimited, positive caps normally.""" + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + spinner = Spinner( + message="Test", + color_mode=ColorMode.NEVER, + stream=stream, + output_lines=output_lines, + ) + for i in range(add_count): + spinner.add_output_line(f"line {i}") + + assert len(spinner._output_lines) == expected_retained + assert spinner._output_lines.maxlen == expected_maxlen + assert spinner._panel_hidden is expected_hidden + + +def test_spinner_unlimited_caps_rendered_panel( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Unlimited panel (-1) caps rendered lines to terminal_height - 2.""" + import os as _os + import shutil + + monkeypatch.setattr( + shutil, + "get_terminal_size", + lambda fallback=(80, 24): _os.terminal_size((80, 10)), + ) + + stream = io.StringIO() + stream.isatty = lambda: True # type: ignore[method-assign] + + spinner = Spinner( + message="Test", + color_mode=ColorMode.NEVER, + stream=stream, + output_lines=-1, + interval=0.01, + ) + for i in range(50): + spinner.add_output_line(f"line {i}") + + # All 50 lines should be retained in the unbounded deque + assert len(spinner._output_lines) == 50 + + # Start spinner briefly to render at least one frame + spinner.start() + time.sleep(0.05) + spinner.stop() + + output = stream.getvalue() + # Verify that not all 50 lines appear in any single frame + # The cap should limit to terminal_height - 2 = 8 lines + # Only the last 8 lines should appear in output + assert "line 49" in output + assert "line 0" not in output + + +class SuccessFixture(t.NamedTuple): + """Test fixture for Spinner.success() output behavior.""" + + test_id: str + isatty: bool + color_mode: ColorMode + expected_contains: str + + +SUCCESS_FIXTURES: list[SuccessFixture] = [ + SuccessFixture("tty_with_color", True, ColorMode.ALWAYS, "done"), + SuccessFixture("tty_no_color", True, ColorMode.NEVER, "✓ done"), + SuccessFixture("non_tty", False, ColorMode.NEVER, "✓ done"), +] + + +@pytest.mark.parametrize( + list(SuccessFixture._fields), + SUCCESS_FIXTURES, + ids=[f.test_id for f in SUCCESS_FIXTURES], +) +def test_spinner_success_behavior( + test_id: str, + isatty: bool, + color_mode: ColorMode, + expected_contains: str, +) -> None: + """success() always emits the checkmark message regardless of TTY/color mode.""" + stream = io.StringIO() + stream.isatty = lambda: isatty # type: ignore[method-assign] + + spinner = Spinner(message="Test", color_mode=color_mode, stream=stream) + spinner.success("done") + + output = stream.getvalue() + assert "✓" in output + assert expected_contains in output + + +# _truncate_visible tests + + +def test_truncate_visible_plain_text() -> None: + """Plain text is truncated to max_visible chars with default suffix.""" + assert _truncate_visible("hello world", 5) == "hello\x1b[0m..." + + +def test_truncate_visible_ansi_preserved() -> None: + """ANSI sequences are preserved whole; only visible chars count.""" + result = _truncate_visible("\033[32mgreen\033[0m", 3) + assert result == "\x1b[32mgre\x1b[0m..." + + +def test_truncate_visible_no_truncation() -> None: + """String shorter than max_visible is returned unchanged.""" + assert _truncate_visible("short", 10) == "short" + + +def test_truncate_visible_empty() -> None: + """Empty string returns empty string.""" + assert _truncate_visible("", 5) == "" + + +def test_truncate_visible_custom_suffix() -> None: + """Custom suffix is appended after truncation.""" + assert _truncate_visible("hello world", 5, suffix="~") == "hello\x1b[0m~" + + +def test_truncate_visible_no_suffix() -> None: + """Empty suffix produces only the reset sequence.""" + assert _truncate_visible("hello world", 5, suffix="") == "hello\x1b[0m" + + +# workspace_path token tests + + +def test_build_tree_workspace_path_in_context() -> None: + """workspace_path is available in _context() when set on construction.""" + tree = BuildTree(workspace_path="~/.tmuxp/foo.yaml") + tree.on_event({"event": "session_created", "name": "foo", "window_total": 1}) + ctx = tree._context() + assert ctx["workspace_path"] == "~/.tmuxp/foo.yaml" + + +def test_build_tree_workspace_path_empty_default() -> None: + """workspace_path defaults to empty string in _context().""" + tree = BuildTree() + tree.on_event({"event": "session_created", "name": "s", "window_total": 1}) + assert tree._context()["workspace_path"] == "" + + +def test_spinner_workspace_path_passed_to_tree() -> None: + """Spinner passes workspace_path through to its BuildTree.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading...", + color_mode=ColorMode.NEVER, + stream=stream, + workspace_path="~/.tmuxp/proj.yaml", + ) + assert spinner._build_tree.workspace_path == "~/.tmuxp/proj.yaml" + + +def test_build_tree_workspace_path_in_template() -> None: + """workspace_path token resolves in format_template.""" + tree = BuildTree(workspace_path="~/.tmuxp/bar.yaml") + tree.on_event({"event": "session_created", "name": "bar", "window_total": 1}) + result = tree.format_template("{session} ({workspace_path})") + assert result == "bar (~/.tmuxp/bar.yaml)" + + +# {summary} token tests + + +def test_build_tree_summary_empty_state() -> None: + """Summary token is empty string before any windows complete.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 3, + "session_pane_total": 6, + } + ) + assert tree._context()["summary"] == "" + + +def test_build_tree_summary_after_windows_done() -> None: + """Summary token shows bracketed win/pane counts after windows complete.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 3, + "session_pane_total": 8, + } + ) + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 3}) + tree.on_event({"event": "window_done"}) + tree.on_event({"event": "window_started", "name": "w2", "pane_total": 2}) + tree.on_event({"event": "window_done"}) + tree.on_event({"event": "window_started", "name": "w3", "pane_total": 3}) + tree.on_event({"event": "window_done"}) + assert tree._context()["summary"] == "[3 win, 8 panes]" + + +def test_build_tree_summary_windows_only_no_panes() -> None: + """Summary token shows only win count when pane_total is 0.""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 2, + "session_pane_total": 0, + } + ) + tree.on_event({"event": "window_started", "name": "w1", "pane_total": 0}) + tree.on_event({"event": "window_done"}) + tree.on_event({"event": "window_started", "name": "w2", "pane_total": 0}) + tree.on_event({"event": "window_done"}) + assert tree._context()["summary"] == "[2 win]" + + +def test_build_tree_summary_panes_only() -> None: + """Summary token shows only pane count when windows_done is 0 (edge case).""" + tree = BuildTree() + tree.on_event( + { + "event": "session_created", + "name": "s", + "window_total": 1, + "session_pane_total": 6, + } + ) + # Manually set session_panes_done without window_done to test edge case + tree.session_panes_done = 6 + assert tree._context()["summary"] == "[6 panes]" + + +# format_success() tests + + +def test_spinner_format_success_full_build() -> None: + """format_success renders SUCCESS_TEMPLATE with session, path, and summary.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading...", + color_mode=ColorMode.NEVER, + stream=stream, + workspace_path="~/.tmuxp/myapp.yaml", + ) + spinner._build_tree.on_event( + { + "event": "session_created", + "name": "myapp", + "window_total": 3, + "session_pane_total": 8, + } + ) + spinner._build_tree.on_event( + {"event": "window_started", "name": "w1", "pane_total": 3} + ) + spinner._build_tree.on_event({"event": "window_done"}) + spinner._build_tree.on_event( + {"event": "window_started", "name": "w2", "pane_total": 2} + ) + spinner._build_tree.on_event({"event": "window_done"}) + spinner._build_tree.on_event( + {"event": "window_started", "name": "w3", "pane_total": 3} + ) + spinner._build_tree.on_event({"event": "window_done"}) + + result = spinner.format_success() + assert "Loaded workspace:" in result + assert "myapp" in result + assert "~/.tmuxp/myapp.yaml" in result + assert "[3 win, 8 panes]" in result + + +def test_spinner_format_success_no_windows() -> None: + """format_success with no windows/panes done omits brackets.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading...", + color_mode=ColorMode.NEVER, + stream=stream, + workspace_path="~/.tmuxp/empty.yaml", + ) + spinner._build_tree.on_event( + { + "event": "session_created", + "name": "empty", + "window_total": 0, + "session_pane_total": 0, + } + ) + + result = spinner.format_success() + assert "Loaded workspace:" in result + assert "empty" in result + assert "~/.tmuxp/empty.yaml" in result + assert "[" not in result + + +# Spinner.success() with no args tests + + +def test_spinner_success_no_args_template_mode() -> None: + """success() with no args uses format_success when progress_format is set.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="default", + workspace_path="~/.tmuxp/proj.yaml", + ) + spinner._build_tree.on_event( + { + "event": "session_created", + "name": "proj", + "window_total": 1, + "session_pane_total": 2, + } + ) + spinner._build_tree.on_event( + {"event": "window_started", "name": "main", "pane_total": 2} + ) + spinner._build_tree.on_event({"event": "window_done"}) + + spinner.success() + + output = stream.getvalue() + assert "✓" in output + assert "Loaded workspace:" in output + assert "proj" in output + assert "~/.tmuxp/proj.yaml" in output + assert "[1 win, 2 panes]" in output + + +def test_spinner_success_no_args_no_template() -> None: + """success() with no args and no progress_format falls back to _base_message.""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading workspace: myapp", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format=None, + ) + spinner.success() + + output = stream.getvalue() + assert "✓ Loading workspace: myapp" in output + + +def test_spinner_success_explicit_text_backward_compat() -> None: + """success('custom text') still works as before (backward compat).""" + stream = io.StringIO() + stream.isatty = lambda: False # type: ignore[method-assign] + + spinner = Spinner( + message="Loading...", + color_mode=ColorMode.NEVER, + stream=stream, + progress_format="default", + ) + spinner.success("custom done message") + + output = stream.getvalue() + assert "✓ custom done message" in output + + +# SUCCESS_TEMPLATE constant tests + + +def test_success_template_value() -> None: + """SUCCESS_TEMPLATE contains expected tokens.""" + assert "{session}" in SUCCESS_TEMPLATE + assert "{workspace_path}" in SUCCESS_TEMPLATE + assert "{summary}" in SUCCESS_TEMPLATE + assert "Loaded workspace:" in SUCCESS_TEMPLATE + + +def test_no_success_message_on_build_error( + server: libtmux.Server, + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, + capfd: pytest.CaptureFixture[str], +) -> None: + """Success message is not emitted when _dispatch_build returns None.""" + import yaml + + from tmuxp.cli._colors import Colors + from tmuxp.cli.load import load_workspace + + monkeypatch.delenv("TMUX", raising=False) + + config = {"session_name": "test-fail", "windows": [{"window_name": "main"}]} + config_file = tmp_path / "fail.yaml" + config_file.write_text(yaml.dump(config)) + + monkeypatch.setattr( + "tmuxp.cli.load._dispatch_build", + lambda *args, **kwargs: None, + ) + + result = load_workspace( + str(config_file), + socket_name=server.socket_name, + cli_colors=Colors(ColorMode.NEVER), + ) + + assert result is None + captured = capfd.readouterr() + assert "\u2713" not in captured.err + assert "Loaded workspace:" not in captured.err diff --git a/tests/workspace/test_progress.py b/tests/workspace/test_progress.py new file mode 100644 index 0000000000..336fa9dafd --- /dev/null +++ b/tests/workspace/test_progress.py @@ -0,0 +1,258 @@ +"""Tests for tmuxp workspace builder progress callback.""" + +from __future__ import annotations + +import typing as t + +import pytest +from libtmux.server import Server + +from tmuxp import exc +from tmuxp.workspace.builder import WorkspaceBuilder + + +def test_builder_on_progress_callback( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """WorkspaceBuilder calls on_progress at each build milestone.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "progress-test", + "windows": [{"window_name": "editor", "panes": [{"shell_command": []}]}], + } + + calls: list[str] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_progress=calls.append, + ) + builder.build() + + assert any("Session created:" in c for c in calls) + assert any("Creating window:" in c for c in calls) + assert any("Creating pane:" in c for c in calls) + assert "Workspace built" in calls + + +def test_builder_on_before_script_not_called_without_script( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """on_before_script callback is not invoked when config has no before_script key.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "no-script-callback-test", + "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + } + called: list[bool] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_before_script=lambda: called.append(True), + ) + builder.build() + assert called == [] + + +def test_builder_on_script_output_not_called_without_script( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """on_script_output callback is not invoked when config has no before_script key.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "no-script-output-test", + "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + } + lines: list[str] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_script_output=lines.append, + ) + builder.build() + assert lines == [] + + +def test_builder_on_build_event_sequence( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """on_build_event fires the full event sequence during build().""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "build-event-test", + "windows": [ + { + "window_name": "editor", + "panes": [{"shell_command": []}, {"shell_command": []}], + }, + {"window_name": "logs", "panes": [{"shell_command": []}]}, + ], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + builder.build() + + event_types = [e["event"] for e in events] + assert event_types[0] == "session_created" + assert event_types[-1] == "workspace_built" + assert event_types.count("window_started") == 2 + assert event_types.count("window_done") == 2 + assert event_types.count("pane_creating") == 3 # 2 panes + 1 pane + + created = next(e for e in events if e["event"] == "session_created") + assert created["window_total"] == 2 + assert created["session_pane_total"] == 3 # 2 panes + 1 pane + + +def test_builder_on_build_event_session_name( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """session_created event carries correct session name.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "name-check", + "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + builder.build() + + created = next(e for e in events if e["event"] == "session_created") + assert created["name"] == "name-check" + assert created["window_total"] == 1 + + +def test_builder_on_build_event_session_pane_total( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """session_created event includes session_pane_total summing all windows' panes.""" + monkeypatch.delenv("TMUX", raising=False) + + pane: dict[str, list[object]] = {"shell_command": []} + session_config = { + "session_name": "pane-total-test", + "windows": [ + {"window_name": "w1", "panes": [pane, pane]}, + {"window_name": "w2", "panes": [pane]}, + {"window_name": "w3", "panes": [pane, pane, pane]}, + ], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + builder.build() + + created = next(e for e in events if e["event"] == "session_created") + assert created["session_pane_total"] == 6 # 2 + 1 + 3 + + +def test_builder_before_script_events( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """before_script_started fires before run; before_script_done fires in finally.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "before-script-events-test", + "before_script": "echo hello", + "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + builder.build() + + event_types = [e["event"] for e in events] + assert "before_script_started" in event_types + assert "before_script_done" in event_types + + bs_start_idx = event_types.index("before_script_started") + bs_done_idx = event_types.index("before_script_done") + win_idx = event_types.index("window_started") + assert bs_start_idx < bs_done_idx < win_idx + + +def test_builder_before_script_done_fires_on_failure( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """before_script_done fires in finally even when the script fails.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "before-script-fail-test", + "before_script": "/bin/false", + "windows": [{"window_name": "main", "panes": [{"shell_command": []}]}], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + with pytest.raises(exc.BeforeLoadScriptError): + builder.build() + + event_types = [e["event"] for e in events] + assert "before_script_started" in event_types + assert "before_script_done" in event_types + + +def test_builder_on_build_event_pane_numbers( + server: Server, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """pane_creating events carry 1-based pane_num and correct pane_total.""" + monkeypatch.delenv("TMUX", raising=False) + + session_config = { + "session_name": "pane-num-test", + "windows": [ + { + "window_name": "main", + "panes": [ + {"shell_command": []}, + {"shell_command": []}, + {"shell_command": []}, + ], + }, + ], + } + events: list[dict[str, t.Any]] = [] + builder = WorkspaceBuilder( + session_config=session_config, + server=server, + on_build_event=events.append, + ) + builder.build() + + pane_events = [e for e in events if e["event"] == "pane_creating"] + assert len(pane_events) == 3 + assert [e["pane_num"] for e in pane_events] == [1, 2, 3] + assert all(e["pane_total"] == 3 for e in pane_events) diff --git a/uv.lock b/uv.lock index 03668ca9d4..5606904017 100644 --- a/uv.lock +++ b/uv.lock @@ -1379,7 +1379,7 @@ wheels = [ [[package]] name = "tmuxp" -version = "1.66.0" +version = "1.67.0" source = { editable = "." } dependencies = [ { name = "libtmux" },