diff --git a/src/libtmux/_internal/__init__.py b/src/libtmux/_internal/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/libtmux/_internal/dataclasses.py b/src/libtmux/_internal/dataclasses.py new file mode 100644 index 000000000..5cabfb19d --- /dev/null +++ b/src/libtmux/_internal/dataclasses.py @@ -0,0 +1,84 @@ +""":mod:`dataclasses` utilities. + +Note +---- +This is an internal API not covered by versioning policy. +""" +import dataclasses +from operator import attrgetter + + +class SkipDefaultFieldsReprMixin: + r"""Skip default fields in :func:`~dataclasses.dataclass` + :func:`object representation `. + + Notes + ----- + + Credit: Pietro Oldrati, 2022-05-08, Unilicense + + https://stackoverflow.com/a/72161437/1396928 + + Examples + -------- + + >>> @dataclasses.dataclass() + ... class Item: + ... name: str + ... unit_price: float = 1.00 + ... quantity_on_hand: int = 0 + ... + + >>> @dataclasses.dataclass(repr=False) + ... class ItemWithMixin(SkipDefaultFieldsReprMixin): + ... name: str + ... unit_price: float = 1.00 + ... quantity_on_hand: int = 0 + ... + + >>> Item('Test') + Item(name='Test', unit_price=1.0, quantity_on_hand=0) + + >>> ItemWithMixin('Test') + ItemWithMixin(name=Test) + + >>> Item('Test', quantity_on_hand=2) + Item(name='Test', unit_price=1.0, quantity_on_hand=2) + + >>> ItemWithMixin('Test', quantity_on_hand=2) + ItemWithMixin(name=Test, quantity_on_hand=2) + + If you want to copy/paste the :meth:`~.__repr__()` + directly, you can omit the ``repr=False``: + + >>> @dataclasses.dataclass + ... class ItemWithMixin(SkipDefaultFieldsReprMixin): + ... name: str + ... unit_price: float = 1.00 + ... quantity_on_hand: int = 0 + ... __repr__ = SkipDefaultFieldsReprMixin.__repr__ + ... + + >>> ItemWithMixin('Test') + ItemWithMixin(name=Test) + + >>> ItemWithMixin('Test', unit_price=2.00) + ItemWithMixin(name=Test, unit_price=2.0) + + >>> item = ItemWithMixin('Test') + >>> item.unit_price = 2.05 + + >>> item + ItemWithMixin(name=Test, unit_price=2.05) + """ + + def __repr__(self) -> str: + """Omit default fields in object representation.""" + nodef_f_vals = ( + (f.name, attrgetter(f.name)(self)) + for f in dataclasses.fields(self) + if attrgetter(f.name)(self) != f.default + ) + + nodef_f_repr = ", ".join(f"{name}={value}" for name, value in nodef_f_vals) + return f"{self.__class__.__name__}({nodef_f_repr})" diff --git a/src/libtmux/_internal/query_list.py b/src/libtmux/_internal/query_list.py new file mode 100644 index 000000000..70af21f0b --- /dev/null +++ b/src/libtmux/_internal/query_list.py @@ -0,0 +1,349 @@ +"""Utilities for filtering or searching :class:`list` of objects / list data. + +Note +---- +This is an internal API not covered by versioning policy. +""" +import re +import traceback +from collections.abc import Mapping, Sequence +from re import Pattern +from typing import TYPE_CHECKING, Any, Callable, List, Optional, TypeVar, Union + +if TYPE_CHECKING: + from typing_extensions import Protocol + + class LookupProtocol(Protocol): + """Protocol for :class:`QueryList` filtering operators.""" + + def __call__( + self, + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], + ) -> bool: + """Callback for :class:`QueryList` filtering operators.""" + ... + + +T = TypeVar("T", Any, Any) + + +def keygetter( + obj: "Mapping[str, Any]", + path: str, +) -> Union[None, Any, str, List[str], "Mapping[str, str]"]: + """obj, "foods__breakfast", obj['foods']['breakfast'] + + >>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods__breakfast") + 'cereal' + >>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods") + {'breakfast': 'cereal'} + + """ + try: + sub_fields = path.split("__") + dct = obj + for sub_field in sub_fields: + if isinstance(dct, dict): + dct = dct[sub_field] + elif hasattr(dct, sub_field): + dct = getattr(dct, sub_field) + + return dct + except Exception as e: + traceback.print_stack() + print(f"Above error was {e}") + return None + + +def parse_lookup(obj: "Mapping[str, Any]", path: str, lookup: str) -> Optional[Any]: + """Check if field lookup key, e.g. "my__path__contains" has comparator, return val. + + If comparator not used or value not found, return None. + + mykey__endswith("mykey") -> "mykey" else None + + >>> parse_lookup({ "food": "red apple" }, "food__istartswith", "__istartswith") + 'red apple' + """ + try: + if isinstance(path, str) and isinstance(lookup, str) and path.endswith(lookup): + field_name = path.rsplit(lookup)[0] + if field_name is not None: + return keygetter(obj, field_name) + except Exception: + traceback.print_stack() + return None + + +def lookup_exact( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + return rhs == data + + +def lookup_iexact( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, str): + return False + + return rhs.lower() == data.lower() + + +def lookup_contains( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, (str, Mapping, list)): + return False + + return rhs in data + + +def lookup_icontains( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, (str, Mapping, list)): + return False + + if isinstance(data, str): + return rhs.lower() in data.lower() + if isinstance(data, Mapping): + return rhs.lower() in [k.lower() for k in data.keys()] + + return False + + +def lookup_startswith( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, str): + return False + + return data.startswith(rhs) + + +def lookup_istartswith( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, str): + return False + + return data.lower().startswith(rhs.lower()) + + +def lookup_endswith( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, str): + return False + + return data.endswith(rhs) + + +def lookup_iendswith( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if not isinstance(rhs, str) or not isinstance(data, str): + return False + return data.lower().endswith(rhs.lower()) + + +def lookup_in( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if isinstance(rhs, list): + return data in rhs + + try: + if isinstance(rhs, str) and isinstance(data, Mapping): + return rhs in data + if isinstance(rhs, str) and isinstance(data, (str, list)): + return rhs in data + if isinstance(rhs, str) and isinstance(data, Mapping): + return rhs in data + # TODO: Add a deep Mappingionary matcher + # if isinstance(rhs, Mapping) and isinstance(data, Mapping): + # return rhs.items() not in data.items() + except Exception: + return False + return False + + +def lookup_nin( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if isinstance(rhs, list): + return data not in rhs + + try: + if isinstance(rhs, str) and isinstance(data, Mapping): + return rhs not in data + if isinstance(rhs, str) and isinstance(data, (str, list)): + return rhs not in data + if isinstance(rhs, str) and isinstance(data, Mapping): + return rhs not in data + # TODO: Add a deep Mappingionary matcher + # if isinstance(rhs, Mapping) and isinstance(data, Mapping): + # return rhs.items() not in data.items() + except Exception: + return False + return False + + +def lookup_regex( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if isinstance(data, (str, bytes, re.Pattern)) and isinstance(rhs, (str, bytes)): + return bool(re.search(rhs, data)) + return False + + +def lookup_iregex( + data: Union[str, List[str], "Mapping[str, str]"], + rhs: Union[str, List[str], "Mapping[str, str]", "Pattern[str]"], +) -> bool: + if isinstance(data, (str, bytes, re.Pattern)) and isinstance(rhs, (str, bytes)): + return bool(re.search(rhs, data, re.IGNORECASE)) + return False + + +LOOKUP_NAME_MAP: 'Mapping[str, "LookupProtocol"]' = { + "eq": lookup_exact, + "exact": lookup_exact, + "iexact": lookup_iexact, + "contains": lookup_contains, + "icontains": lookup_icontains, + "startswith": lookup_startswith, + "istartswith": lookup_istartswith, + "endswith": lookup_endswith, + "iendswith": lookup_iendswith, + "in": lookup_in, + "nin": lookup_nin, + "regex": lookup_regex, + "iregex": lookup_iregex, +} + + +class QueryList(List[T]): + """Filter list of object/dictionaries. For small, local datasets. + + *Experimental, unstable*. + + >>> query = QueryList( + ... [ + ... { + ... "place": "Largo", + ... "city": "Tampa", + ... "state": "Florida", + ... "foods": {"fruit": ["banana", "orange"], "breakfast": "cereal"}, + ... }, + ... { + ... "place": "Chicago suburbs", + ... "city": "Elmhurst", + ... "state": "Illinois", + ... "foods": {"fruit": ["apple", "cantelope"], "breakfast": "waffles"}, + ... }, + ... ] + ... ) + >>> query.filter(place="Chicago suburbs")[0]['city'] + 'Elmhurst' + >>> query.filter(place__icontains="chicago")[0]['city'] + 'Elmhurst' + >>> query.filter(foods__breakfast="waffles")[0]['city'] + 'Elmhurst' + >>> query.filter(foods__fruit__in="cantelope")[0]['city'] + 'Elmhurst' + >>> query.filter(foods__fruit__in="orange")[0]['city'] + 'Tampa' + """ + + data: "Sequence[T]" + pk_key: Optional[str] + + def items(self) -> List[T]: + data: "Sequence[T]" + + if self.pk_key is None: + raise Exception("items() require a pk_key exists") + return [(getattr(item, self.pk_key), item) for item in self] + + def __eq__( + self, + other: object, + # other: Union[ + # "QueryList[T]", + # List[Mapping[str, str]], + # List[Mapping[str, int]], + # List[Mapping[str, Union[str, Mapping[str, Union[List[str], str]]]]], + # ], + ) -> bool: + data = other + + if not isinstance(self, list) or not isinstance(data, list): + return False + + if len(self) == len(data): + for (a, b) in zip(self, data): + if isinstance(a, Mapping): + a_keys = a.keys() + if a.keys == b.keys(): + for key in a_keys: + if abs(a[key] - b[key]) > 1: + return False + else: + if a != b: + return False + + return True + return False + + def filter( + self, matcher: Optional[Union[Callable[[T], bool], T]] = None, **kwargs: Any + ) -> "QueryList[T]": + def filter_lookup(obj: Any) -> bool: + for path, v in kwargs.items(): + try: + lhs, op = path.rsplit("__", 1) + + if op not in LOOKUP_NAME_MAP: + raise ValueError(f"{op} not in LOOKUP_NAME_MAP") + except ValueError: + lhs = path + op = "exact" + + assert op in LOOKUP_NAME_MAP + path = lhs + data = keygetter(obj, path) + + if data is None or not LOOKUP_NAME_MAP[op](data, v): + return False + + return True + + if callable(matcher): + _filter = matcher + elif matcher is not None: + + def val_match(obj: Union[str, List[Any]]) -> bool: + if isinstance(matcher, list): + return obj in matcher + else: + return obj == matcher + + _filter = val_match + else: + _filter = filter_lookup + + return self.__class__(k for k in self if _filter(k)) diff --git a/src/libtmux/_internal/subprocess.py b/src/libtmux/_internal/subprocess.py new file mode 100644 index 000000000..1059a478f --- /dev/null +++ b/src/libtmux/_internal/subprocess.py @@ -0,0 +1,587 @@ +"""Invokable :mod:`subprocess` wrapper. + +Defer running a subprocess, such as by handing to an executor. + +Note +---- +This is an internal API not covered by versioning policy. + +Examples +-------- + +- :class:`~SubprocessCommand`: Wraps :class:`subprocess.Popen` and + :func:`subprocess.run` in a :func:`~dataclasses.dataclass`. + + Before: + + >>> import subprocess + >>> subprocess.run( + ... ['echo', 'hi'], + ... capture_output=True, universal_newlines=True + ... ).stdout + 'hi\\n' + + With this: + + >>> cmd = SubprocessCommand(['echo', 'hi']) + >>> cmd.args + ['echo', 'hi'] + >>> cmd.run(capture_output=True, universal_newlines=True).stdout + 'hi\\n' + + Tweak params before invocation: + + >>> cmd = SubprocessCommand(['echo', 'hi']) + >>> cmd.args[1] = 'hello' + >>> cmd.args + ['echo', 'hello'] + >>> cmd.run(capture_output=True, universal_newlines=True).stdout + 'hello\\n' +""" +import dataclasses +import subprocess +import sys +from collections.abc import Mapping, Sequence +from typing import ( + IO, + TYPE_CHECKING, + Any, + Callable, + List, + Optional, + TypeVar, + Union, + overload, +) + +from typing_extensions import Literal, TypeAlias + +from .dataclasses import SkipDefaultFieldsReprMixin +from .types import StrOrBytesPath + +F = TypeVar("F", bound=Callable[..., Any]) + + +if sys.platform == "win32": + _ENV: TypeAlias = "Mapping[str, str]" +else: + _ENV: TypeAlias = Union[ + "Mapping[bytes, StrOrBytesPath]", "Mapping[str, StrOrBytesPath]" + ] +_FILE: TypeAlias = Union[None, int, IO[Any]] +_TXT: TypeAlias = Union[bytes, str] + +if TYPE_CHECKING: + #: Command + _CMD: TypeAlias = Union[StrOrBytesPath, "Sequence[StrOrBytesPath]"] + + +@dataclasses.dataclass(repr=False) +class SubprocessCommand(SkipDefaultFieldsReprMixin): + """Wraps a :mod:`subprocess` request. Inspect, mutate, control before invocation. + + Attributes + ---------- + args : _CMD + A string, or a sequence of program arguments. + + bufsize : int + supplied as the buffering argument to the open() function when creating the + stdin/stdout/stderr pipe file objects + + executable : Optional[StrOrBytesPath] + A replacement program to execute. + + stdin : _FILE + standard output for executed program + + stdout : + standard output for executed program + + stderr : + standard output for executed program + + close_fds : Controls closing or inheriting of file descriptors. + + shell : If true, the command will be executed through the shell. + + cwd : Sets the current directory before the child is executed. + + env : Defines the environment variables for the new process. + + text : + If ``True``, decode stdin, stdout and stderr using the given encoding (if set) + or the system default otherwise. + + universal_newlines : + Alias of text, provided for backwards compatibility. + + startupinfo : + Windows only + + creationflags : + Windows only + + preexec_fn : + (POSIX only) An object to be called in the child process just before the child + is executed. + + restore_signals : + POSIX only + + start_new_session : + POSIX only + + group : + POSIX only + + extra_groups : + POSIX only + + user : + POSIX only + + umask : + POSIX only + + pass_fds : + POSIX only + + encoding : + Text mode encoding to use for file objects stdin, stdout and stderr. + + errors : + Text mode error handling to use for file objects stdin, stdout and stderr. + + Examples + -------- + >>> cmd = SubprocessCommand("ls") + >>> cmd.args + 'ls' + + With ``shell=True``: + + >>> cmd = SubprocessCommand("ls -l", shell=True) + >>> cmd.shell + True + >>> cmd.args + 'ls -l' + >>> cmd.check_call() + 0 + """ + + args: "_CMD" + bufsize: int = -1 + executable: Optional[StrOrBytesPath] = None + stdin: _FILE = None + stdout: _FILE = None + stderr: _FILE = None + preexec_fn: Optional[Callable[[], Any]] = None + close_fds: bool = True + shell: bool = False + cwd: Optional[StrOrBytesPath] = None + env: Optional[_ENV] = None + + # Windows + creationflags: int = 0 + startupinfo: Optional[Any] = None + + # POSIX-only + restore_signals: bool = True + start_new_session: bool = False + pass_fds: Any = () + if sys.version_info >= (3, 9): + umask: int = -1 + if sys.version_info >= (3, 10): + pipesize: int = -1 + + if sys.version_info >= (3, 9): + user: Optional[str] = None + group: Optional[str] = None + extra_groups: Optional[List[str]] = None + + # Alias of text, for backwards compatibility + universal_newlines: Optional[bool] = None + text: Optional[Literal[True]] = None + + # Text mode encoding and error handling to use for file objects + # stdin, stdout, stderr + encoding: Optional[str] = None + errors: Optional[str] = None + + # user, group, extra_groups, umask were added in 3.9 + @overload + def Popen( + self, + args: Optional["_CMD"] = ..., + universal_newlines: bool = ..., + *, + text: Optional[bool] = ..., + encoding: str, + errors: Optional[str] = ..., + ) -> "subprocess.Popen[str]": + ... + + @overload + def Popen( + self, + args: Optional["_CMD"] = ..., + universal_newlines: bool = ..., + *, + text: Optional[bool] = ..., + encoding: Optional[str] = ..., + errors: str, + ) -> "subprocess.Popen[str]": + ... + + @overload + def Popen( + self, + args: Optional["_CMD"] = ..., + *, + universal_newlines: Literal[True], + # where the *real* keyword only args start + text: Optional[bool] = ..., + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + ) -> "subprocess.Popen[str]": + ... + + @overload + def Popen( + self, + args: Optional["_CMD"] = ..., + universal_newlines: bool = ..., + *, + text: Literal[True], + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + ) -> "subprocess.Popen[str]": + ... + + @overload + def Popen( + self, + args: Optional["_CMD"] = ..., + universal_newlines: Literal[False] = ..., + *, + text: Literal[None, False] = ..., + encoding: None = ..., + errors: None = ..., + ) -> "subprocess.Popen[bytes]": + ... + + def Popen( + self, + args: Optional["_CMD"] = None, + universal_newlines: Optional[bool] = None, + *, + text: Optional[bool] = None, + encoding: Optional[str] = None, + errors: Optional[str] = None, + **kwargs: Any, + ) -> "subprocess.Popen[Any]": + """Run commands :class:`subprocess.Popen`, optionally overrides via kwargs. + + Parameters + ---------- + **kwargs : dict, optional + Overrides existing attributes for :class:`subprocess.Popen` + + Examples + -------- + >>> cmd = SubprocessCommand(args=['echo', 'hello']) + >>> proc = cmd.Popen(stdout=subprocess.PIPE) + >>> proc.communicate() # doctest: +SKIP + """ + return subprocess.Popen( + **dataclasses.replace( + self, + args=args or self.args, + encoding=encoding, + errors=errors, + text=text, + universal_newlines=universal_newlines, + **kwargs, + ).__dict__, + ) + + def check_call(self, **kwargs: Any) -> int: + """Run command :func:`subprocess.check_call`, optionally overrides via kwargs. + + Parameters + ---------- + **kwargs : dict, optional + Overrides existing attributes for :func:`subprocess.check_call` + + Examples + -------- + >>> cmd = SubprocessCommand(args=['echo', 'hello']) + >>> cmd.check_call(stdout=subprocess.PIPE) + 0 + """ + return subprocess.check_call(**dataclasses.replace(self, **kwargs).__dict__) + + @overload + def check_output( + self, + universal_newlines: bool = ..., + *, + input: Optional[Union[str, bytes]] = ..., + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + text: Literal[True], + **kwargs: Any, + ) -> str: + ... + + @overload + def check_output( + self, + universal_newlines: Optional[bool] = ..., + *, + input: Optional[Union[str, bytes]] = ..., + encoding: str, + errors: Optional[str] = ..., + text: Optional[bool] = ..., + **kwargs: Any, + ) -> str: + ... + + @overload + def check_output( + self, + universal_newlines: bool = ..., + *, + input: Optional[Union[str, bytes]] = ..., + encoding: Optional[str] = ..., + errors: str, + text: Optional[bool] = ..., + **kwargs: Any, + ) -> str: + ... + + @overload + def check_output( + self, + universal_newlines: Literal[True] = ..., + *, + input: Optional[Union[str, bytes]] = ..., + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + text: Optional[bool] = ..., + **kwargs: Any, + ) -> str: + ... + + @overload + def check_output( + self, + universal_newlines: Literal[False], + *, + input: Optional[Union[str, bytes]] = ..., + encoding: None = ..., + errors: None = ..., + text: Literal[None, False] = ..., + **kwargs: Any, + ) -> bytes: + ... + + def check_output( + self, + universal_newlines: Optional[bool] = None, + *, + input: Optional[Union[str, bytes]] = None, + encoding: Optional[str] = None, + errors: Optional[str] = None, + text: Optional[bool] = None, + **kwargs: Any, + ) -> Union[bytes, str]: + r"""Run command :func:`subprocess.check_output`, optionally override via kwargs. + + Parameters + ---------- + input : Union[bytes, str], optional + pass string to subprocess's stdin. Bytes by default, str in text mode. + + Text mode is triggered by setting any of text, encoding, errors or + universal_newlines. + **kwargs : dict, optional + Overrides existing attributes for :func:`subprocess.check_output` + + Examples + -------- + >>> cmd = SubprocessCommand(args=['echo', 'hello']) + >>> proc = cmd.check_output(shell=True) + + Examples from :mod:`subprocess`: + + >>> import subprocess + >>> cmd = SubprocessCommand( + ... ["/bin/sh", "-c", "ls -l non_existent_file ; exit 0"]) + >>> cmd.check_output(stderr=subprocess.STDOUT) + b"ls: ...non_existent_file...: No such file or directory\n" + + >>> cmd = SubprocessCommand(["sed", "-e", "s/foo/bar/"]) + >>> cmd.check_output(input=b"when in the course of fooman events\n") + b'when in the course of barman events\n' + """ + params = dataclasses.replace(self, **kwargs).__dict__ + params.pop("stdout") + output = subprocess.check_output(input=input, **params) + if isinstance(output, (bytes, str)): + return output + raise Exception(f"output is not str or bytes: {output}") + + @overload + def run( + self, + universal_newlines: bool = ..., + *, + capture_output: bool = ..., + check: bool = ..., + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + input: Optional[str] = ..., + text: Literal[True], + ) -> "subprocess.CompletedProcess[str]": + ... + + @overload + def run( + self, + universal_newlines: bool = ..., + *, + capture_output: bool = ..., + check: bool = ..., + encoding: str, + errors: Optional[str] = ..., + input: Optional[str] = ..., + text: Optional[bool] = ..., + ) -> "subprocess.CompletedProcess[str]": + ... + + @overload + def run( + self, + universal_newlines: bool = ..., + *, + capture_output: bool = ..., + check: bool = ..., + encoding: Optional[str] = ..., + errors: str, + input: Optional[str] = ..., + text: Optional[bool] = ..., + ) -> "subprocess.CompletedProcess[str]": + ... + + @overload + def run( + self, + *, + universal_newlines: Literal[True], + # where the *real* keyword only args start + capture_output: bool = ..., + check: bool = ..., + encoding: Optional[str] = ..., + errors: Optional[str] = ..., + input: Optional[str] = ..., + text: Optional[bool] = ..., + ) -> "subprocess.CompletedProcess[str]": + ... + + @overload + def run( + self, + universal_newlines: Literal[False] = ..., + *, + capture_output: bool = ..., + check: bool = ..., + encoding: None = ..., + errors: None = ..., + input: Optional[bytes] = ..., + text: Literal[None, False] = ..., + ) -> "subprocess.CompletedProcess[bytes]": + ... + + def run( + self, + universal_newlines: Optional[bool] = None, + *, + capture_output: bool = False, + check: bool = False, + encoding: Optional[str] = None, + errors: Optional[str] = None, + input: Optional[Union[str, bytes]] = None, + text: Optional[bool] = None, + timeout: Optional[float] = None, + **kwargs: Any, + ) -> "subprocess.CompletedProcess[Any]": + r"""Run command in :func:`subprocess.run`, optionally overrides via kwargs. + + Parameters + ---------- + input : Union[bytes, str], optional + pass string to subprocess's stdin. Bytes by default, str in text mode. + + Text mode is triggered by setting any of text, encoding, errors or + universal_newlines. + + check : bool + If True and the exit code was non-zero, it raises a + :exc:`subprocess.CalledProcessError`. The CalledProcessError object will + have the return code in the returncode attribute, and output & stderr + attributes if those streams were captured. + + timeout : int + If given, and the process takes too long, a :exc:`subprocess.TimeoutExpired` + + **kwargs : dict, optional + Overrides existing attributes for :func:`subprocess.run` + + Examples + -------- + >>> import subprocess + >>> cmd = SubprocessCommand( + ... ["/bin/sh", "-c", "ls -l non_existent_file ; exit 0"]) + >>> cmd.run() + CompletedProcess(args=['/bin/sh', '-c', 'ls -l non_existent_file ; exit 0'], + returncode=0) + + >>> import subprocess + >>> cmd = SubprocessCommand( + ... ["/bin/sh", "-c", "ls -l non_existent_file ; exit 0"]) + >>> cmd.run(check=True) + CompletedProcess(args=['/bin/sh', '-c', 'ls -l non_existent_file ; exit 0'], + returncode=0) + + >>> cmd = SubprocessCommand(["sed", "-e", "s/foo/bar/"]) + >>> completed = cmd.run(input=b"when in the course of fooman events\n") + >>> completed + CompletedProcess(args=['sed', '-e', 's/foo/bar/'], returncode=0) + >>> completed.stderr + + >>> cmd = SubprocessCommand(["sed", "-e", "s/foo/bar/"]) + >>> completed = cmd.run(input=b"when in the course of fooman events\n", + ... capture_output=True) + >>> completed + CompletedProcess(args=['sed', '-e', 's/foo/bar/'], returncode=0, + stdout=b'when in the course of barman events\n', stderr=b'') + >>> completed.stdout + b'when in the course of barman events\n' + >>> completed.stderr + b'' + """ + return subprocess.run( + **dataclasses.replace( + self, + universal_newlines=universal_newlines, + errors=errors, + text=text, + **kwargs, + ).__dict__, + check=check, + capture_output=capture_output, + input=input, + timeout=timeout, + ) diff --git a/src/libtmux/_internal/types.py b/src/libtmux/_internal/types.py new file mode 100644 index 000000000..b092268a7 --- /dev/null +++ b/src/libtmux/_internal/types.py @@ -0,0 +1,22 @@ +"""Internal :term:`type annotations ` + +Notes +----- + +:class:`StrPath` and :class:`StrOrBytesPath` is based on `typeshed's`_. + +.. _typeshed's: https://github.com/python/typeshed/blob/5df8de7/stdlib/_typeshed/__init__.pyi#L115-L118 +""" # NOQA E501 +from os import PathLike +from typing import TYPE_CHECKING, Union + +if TYPE_CHECKING: + from typing_extensions import TypeAlias + +StrPath: "TypeAlias" = Union[str, "PathLike[str]"] # stable +""":class:`os.PathLike` or :class:`str`""" + +StrOrBytesPath: "TypeAlias" = Union[ + str, bytes, "PathLike[str]", "PathLike[bytes]" +] # stable +""":class:`os.PathLike`, :class:`str` or :term:`bytes-like object`"""