From 46f53552d3c16ac3c4c63cb5cd905e5dc8197f54 Mon Sep 17 00:00:00 2001 From: Aditya Date: Sun, 9 Apr 2023 22:57:13 +0530 Subject: [PATCH 01/25] add customized concurrent handling --- docs/source/telegram.ext.BaseProcessor.rst | 6 ++ telegram/ext/__init__.py | 3 +- telegram/ext/_application.py | 97 +++++++++++++++++++--- telegram/ext/_applicationbuilder.py | 16 ++-- 4 files changed, 105 insertions(+), 17 deletions(-) create mode 100644 docs/source/telegram.ext.BaseProcessor.rst diff --git a/docs/source/telegram.ext.BaseProcessor.rst b/docs/source/telegram.ext.BaseProcessor.rst new file mode 100644 index 00000000000..3112bf6793b --- /dev/null +++ b/docs/source/telegram.ext.BaseProcessor.rst @@ -0,0 +1,6 @@ +BaseProcessor +====================== + +.. autoclass:: telegram.ext.BaseProcessor + :members: + :show-inheritance: \ No newline at end of file diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index b37d0fc70ca..3aac746a597 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -25,6 +25,7 @@ "ApplicationHandlerStop", "BaseHandler", "BasePersistence", + "BaseProcessor", "BaseRateLimiter", "CallbackContext", "CallbackDataCache", @@ -59,7 +60,7 @@ from . import filters from ._aioratelimiter import AIORateLimiter -from ._application import Application, ApplicationHandlerStop +from ._application import Application, ApplicationHandlerStop, BaseProcessor from ._applicationbuilder import ApplicationBuilder from ._basepersistence import BasePersistence, PersistenceInput from ._baseratelimiter import BaseRateLimiter diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index ddb9c0d4ef7..3ed74534f21 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -23,6 +23,8 @@ import logging import platform import signal +from abc import abstractmethod +from asyncio import BoundedSemaphore from collections import defaultdict from copy import deepcopy from pathlib import Path @@ -257,7 +259,7 @@ def __init__( update_queue: "asyncio.Queue[object]", updater: Optional[Updater], job_queue: JQ, - concurrent_updates: Union[bool, int], + concurrent_updates: Union[bool, int, "BaseProcessor"], persistence: Optional[BasePersistence[UD, CD, BD]], context_types: ContextTypes[CCT, UD, CD, BD], post_init: Optional[ @@ -295,14 +297,16 @@ def __init__( self.post_stop: Optional[ Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]] ] = post_stop - - if isinstance(concurrent_updates, int) and concurrent_updates < 0: - raise ValueError("`concurrent_updates` must be a non-negative integer!") if concurrent_updates is True: concurrent_updates = 256 - self._concurrent_updates_sem = asyncio.BoundedSemaphore(concurrent_updates or 1) - self._concurrent_updates: int = concurrent_updates or 0 - + if isinstance(concurrent_updates, int): + if concurrent_updates < 0: + raise ValueError("`concurrent_updates` must be a non-negative integer!") + concurrent_updates = SimpleProcessor(concurrent_updates) + self._concurrent_updates_sem = asyncio.BoundedSemaphore( + concurrent_updates.max_concurrent_updates or 1 + ) + self._concurrent_updates: BaseProcessor = concurrent_updates self.bot_data: BD = self.context_types.bot_data() self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data) self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data) @@ -359,7 +363,7 @@ def concurrent_updates(self) -> int: .. seealso:: :wiki:`Concurrency` """ - return self._concurrent_updates + return self._concurrent_updates.max_concurrent_updates @property def job_queue(self) -> Optional["JobQueue[CCT]"]: @@ -377,6 +381,14 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]: ) return self._job_queue + @property + def processor(self) -> Optional["BaseProcessor"]: + """:class:`telegram.ext.BaseProcessor`: The processor used by this application. + + .. seealso:: :wiki:`Concurrency` + """ + return self._concurrent_updates + async def initialize(self) -> None: """Initializes the Application by initializing: @@ -1047,7 +1059,6 @@ async def _update_fetcher(self) -> None: # Continuously fetch updates from the queue. Exit only once the signal object is found. while True: update = await self.update_queue.get() - if update is _STOP_SIGNAL: _logger.debug("Dropping pending updates") while not self.update_queue.empty(): @@ -1061,7 +1072,12 @@ async def _update_fetcher(self) -> None: if self._concurrent_updates: # We don't await the below because it has to be run concurrently - self.create_task(self.__process_update_wrapper(update), update=update) + self.create_task( + self._concurrent_updates.do_process_update( + update, self.process_update(update) + ), + update=update, + ) else: await self.__process_update_wrapper(update) @@ -1640,3 +1656,64 @@ async def process_error( _logger.exception("No error handlers are registered, logging exception.", exc_info=error) return False + + +class BaseProcessor: + def __init__(self, max_concurrent_updates: int): + self.max_concurrent_updates = max_concurrent_updates + self.semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) + + @abstractmethod + async def process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + pass + + @abstractmethod + async def initialize(self) -> None: + pass + + @abstractmethod + async def shutdown(self) -> None: + pass + + async def do_process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + async with self.semaphore: + await self.process_update(update, coroutine) + + async def __aenter__(self) -> "BaseProcessor": + try: + await self.initialize() + return self + except Exception as exc: + await self.shutdown() + raise exc + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.shutdown() + + +class SimpleProcessor(BaseProcessor): + async def process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + await coroutine + + async def initialize(self) -> None: + pass + + async def shutdown(self) -> None: + pass diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 7c8876d60d0..c004b5fa280 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -35,7 +35,7 @@ from telegram._bot import Bot from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput -from telegram.ext._application import Application +from telegram.ext._application import Application, BaseProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._jobqueue import JobQueue @@ -198,7 +198,7 @@ def __init__(self: "InitApplicationBuilder"): self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes()) self._application_class: DVType[Type[Application]] = DefaultValue(Application) self._application_kwargs: Dict[str, object] = {} - self._concurrent_updates: Union[int, DefaultValue[bool]] = DEFAULT_FALSE + self._concurrent_updates: Union[int, DefaultValue[bool], "BaseProcessor"] = DEFAULT_FALSE self._updater: ODVInput[Updater] = DEFAULT_NONE self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None @@ -901,7 +901,9 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp self._update_queue = update_queue return self - def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) -> BuilderType: + def concurrent_updates( + self: BuilderType, concurrent_updates: Union[bool, int, "BaseProcessor"] + ) -> BuilderType: """Specifies if and how many updates may be processed concurrently instead of one by one. If not called, updates will be processed one by one. @@ -916,9 +918,11 @@ def concurrent_updates(self: BuilderType, concurrent_updates: Union[bool, int]) .. seealso:: :attr:`telegram.ext.Application.concurrent_updates` Args: - concurrent_updates (:obj:`bool` | :obj:`int`): Passing :obj:`True` will allow for - ``256`` updates to be processed concurrently. Pass an integer to specify a - different number of updates that may be processed concurrently. + concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseProcessor`): Passing + :obj:`True` will allow for `256`` updates to be processed concurrently. Pass an + integer to specify a different number of updates that may be processed + concurrently. Pass an instance of `BaseProcessor` to use that instance + for handling updates concurrently. Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. From a8bf4271dd3979ccb7b363f07c5884d01f40011e Mon Sep 17 00:00:00 2001 From: Aditya Date: Wed, 12 Apr 2023 00:55:19 +0530 Subject: [PATCH 02/25] add some requested changes --- docs/source/telegram.ext.BaseProcessor.rst | 6 -- .../telegram.ext.baseupdateprocessor.rst | 6 ++ .../telegram.ext.simpleupdateprocessor.rst | 6 ++ telegram/ext/__init__.py | 5 +- telegram/ext/_application.py | 93 ++++--------------- telegram/ext/_applicationbuilder.py | 48 ++++++---- telegram/ext/_baseupdateprocessor.py | 90 ++++++++++++++++++ 7 files changed, 152 insertions(+), 102 deletions(-) delete mode 100644 docs/source/telegram.ext.BaseProcessor.rst create mode 100644 docs/source/telegram.ext.baseupdateprocessor.rst create mode 100644 docs/source/telegram.ext.simpleupdateprocessor.rst create mode 100644 telegram/ext/_baseupdateprocessor.py diff --git a/docs/source/telegram.ext.BaseProcessor.rst b/docs/source/telegram.ext.BaseProcessor.rst deleted file mode 100644 index 3112bf6793b..00000000000 --- a/docs/source/telegram.ext.BaseProcessor.rst +++ /dev/null @@ -1,6 +0,0 @@ -BaseProcessor -====================== - -.. autoclass:: telegram.ext.BaseProcessor - :members: - :show-inheritance: \ No newline at end of file diff --git a/docs/source/telegram.ext.baseupdateprocessor.rst b/docs/source/telegram.ext.baseupdateprocessor.rst new file mode 100644 index 00000000000..7155adf7191 --- /dev/null +++ b/docs/source/telegram.ext.baseupdateprocessor.rst @@ -0,0 +1,6 @@ +BaseUpdateProcessor +=================== + +.. autoclass:: telegram.ext.BaseUpdateProcessor + :members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/telegram.ext.simpleupdateprocessor.rst b/docs/source/telegram.ext.simpleupdateprocessor.rst new file mode 100644 index 00000000000..1e30c27566c --- /dev/null +++ b/docs/source/telegram.ext.simpleupdateprocessor.rst @@ -0,0 +1,6 @@ +SimpleUpdateProcessor +===================== + +.. autoclass:: telegram.ext.SimpleUpdateProcessor + :members: + :show-inheritance: \ No newline at end of file diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index 3aac746a597..b0e13d7a281 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -25,7 +25,7 @@ "ApplicationHandlerStop", "BaseHandler", "BasePersistence", - "BaseProcessor", + "BaseUpdateProcessor", "BaseRateLimiter", "CallbackContext", "CallbackDataCache", @@ -60,10 +60,11 @@ from . import filters from ._aioratelimiter import AIORateLimiter -from ._application import Application, ApplicationHandlerStop, BaseProcessor +from ._application import Application, ApplicationHandlerStop from ._applicationbuilder import ApplicationBuilder from ._basepersistence import BasePersistence, PersistenceInput from ._baseratelimiter import BaseRateLimiter +from ._baseupdateprocessor import BaseUpdateProcessor from ._callbackcontext import CallbackContext from ._callbackdatacache import CallbackDataCache, InvalidCallbackData from ._callbackqueryhandler import CallbackQueryHandler diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 30fc3afda8d..11561dbcf8a 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -23,8 +23,6 @@ import itertools import platform import signal -from abc import abstractmethod -from asyncio import BoundedSemaphore from collections import defaultdict from copy import deepcopy from pathlib import Path @@ -59,6 +57,7 @@ from telegram._utils.warnings import warn from telegram.error import TelegramError from telegram.ext._basepersistence import BasePersistence +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._handler import BaseHandler @@ -261,7 +260,7 @@ def __init__( update_queue: "asyncio.Queue[object]", updater: Optional[Updater], job_queue: JQ, - concurrent_updates: Union[bool, int, "BaseProcessor"], + concurrent_updates: Union[bool, int, "BaseUpdateProcessor"], persistence: Optional[BasePersistence[UD, CD, BD]], context_types: ContextTypes[CCT, UD, CD, BD], post_init: Optional[ @@ -275,7 +274,8 @@ def __init__( ], ): if not was_called_by( - inspect.currentframe(), Path(__file__).parent.resolve() / "_applicationbuilder.py" + inspect.currentframe(), + Path(__file__).parent.resolve() / "_applicationbuilder.py", ): warn( "`Application` instances should be built via the `ApplicationBuilder`.", @@ -304,11 +304,11 @@ def __init__( if isinstance(concurrent_updates, int): if concurrent_updates < 0: raise ValueError("`concurrent_updates` must be a non-negative integer!") - concurrent_updates = SimpleProcessor(concurrent_updates) + concurrent_updates = SimpleUpdateProcessor(concurrent_updates) self._concurrent_updates_sem = asyncio.BoundedSemaphore( concurrent_updates.max_concurrent_updates or 1 ) - self._concurrent_updates: BaseProcessor = concurrent_updates + self._update_processor: BaseUpdateProcessor = concurrent_updates self.bot_data: BD = self.context_types.bot_data() self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data) self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data) @@ -363,9 +363,12 @@ def concurrent_updates(self) -> int: """:obj:`int`: The number of concurrent updates that will be processed in parallel. A value of ``0`` indicates updates are *not* being processed concurrently. + Note: + This is now just a shortcut to `processor.max_concurrent_updates`. + .. seealso:: :wiki:`Concurrency` """ - return self._concurrent_updates.max_concurrent_updates + return self._update_processor.max_concurrent_updates @property def job_queue(self) -> Optional["JobQueue[CCT]"]: @@ -384,12 +387,12 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]: return self._job_queue @property - def processor(self) -> Optional["BaseProcessor"]: + def processor(self) -> Optional["BaseUpdateProcessor"]: """:class:`telegram.ext.BaseProcessor`: The processor used by this application. .. seealso:: :wiki:`Concurrency` """ - return self._concurrent_updates + return self._update_processor async def initialize(self) -> None: """Initializes the Application by initializing: @@ -1070,12 +1073,10 @@ async def _update_fetcher(self) -> None: _LOGGER.debug("Processing update %s", update) - if self._concurrent_updates: + if self._update_processor: # We don't await the below because it has to be run concurrently self.create_task( - self._concurrent_updates.do_process_update( - update, self.process_update(update) - ), + self._update_processor.do_process_update(update, self.process_update(update)), update=update, ) else: @@ -1313,7 +1314,10 @@ def drop_user_data(self, user_id: int) -> None: self._user_ids_to_be_deleted_in_persistence.add(user_id) def migrate_chat_data( - self, message: "Message" = None, old_chat_id: int = None, new_chat_id: int = None + self, + message: "Message" = None, + old_chat_id: int = None, + new_chat_id: int = None, ) -> None: """Moves the contents of :attr:`chat_data` at key :paramref:`old_chat_id` to the key :paramref:`new_chat_id`. Also marks the entries to be updated accordingly in the next run @@ -1688,64 +1692,3 @@ async def process_error( _LOGGER.exception("No error handlers are registered, logging exception.", exc_info=error) return False - - -class BaseProcessor: - def __init__(self, max_concurrent_updates: int): - self.max_concurrent_updates = max_concurrent_updates - self.semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) - - @abstractmethod - async def process_update( - self, - update: object, - coroutine: "Awaitable[Any]", - ) -> None: - pass - - @abstractmethod - async def initialize(self) -> None: - pass - - @abstractmethod - async def shutdown(self) -> None: - pass - - async def do_process_update( - self, - update: object, - coroutine: "Awaitable[Any]", - ) -> None: - async with self.semaphore: - await self.process_update(update, coroutine) - - async def __aenter__(self) -> "BaseProcessor": - try: - await self.initialize() - return self - except Exception as exc: - await self.shutdown() - raise exc - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - await self.shutdown() - - -class SimpleProcessor(BaseProcessor): - async def process_update( - self, - update: object, - coroutine: "Awaitable[Any]", - ) -> None: - await coroutine - - async def initialize(self) -> None: - pass - - async def shutdown(self) -> None: - pass diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 56349001a54..78c3a9fc433 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -35,7 +35,8 @@ from telegram._bot import Bot from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput -from telegram.ext._application import Application, BaseProcessor +from telegram.ext._application import Application +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._jobqueue import JobQueue @@ -198,7 +199,9 @@ def __init__(self: "InitApplicationBuilder"): self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes()) self._application_class: DVType[Type[Application]] = DefaultValue(Application) self._application_kwargs: Dict[str, object] = {} - self._concurrent_updates: Union[int, DefaultValue[bool], "BaseProcessor"] = DEFAULT_FALSE + self._concurrent_updates: Union[ + int, DefaultValue[bool], "BaseUpdateProcessor" + ] = DEFAULT_FALSE self._updater: ODVInput[Updater] = DEFAULT_NONE self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None @@ -409,7 +412,12 @@ def _request_check(self, get_updates: bool) -> None: # Code below tests if it's okay to set a Request object. Only okay if no other request args # or instances containing a Request were set previously - for attr in ("connect_timeout", "read_timeout", "write_timeout", "pool_timeout"): + for attr in ( + "connect_timeout", + "read_timeout", + "write_timeout", + "pool_timeout", + ): if not isinstance(getattr(self, f"_{prefix}{attr}"), DefaultValue): raise RuntimeError(_TWO_ARGS_REQ.format(name, attr)) @@ -442,7 +450,10 @@ def _request_param_check(self, name: str, get_updates: bool) -> None: ) ) - if self._updater not in (DEFAULT_NONE, None): # disallow request args for updater(has bot) + if self._updater not in ( + DEFAULT_NONE, + None, + ): # disallow request args for updater(has bot) raise RuntimeError( _TWO_ARGS_REQ.format(f"get_updates_{name}" if get_updates else name, "updater") ) @@ -902,7 +913,7 @@ def update_queue(self: BuilderType, update_queue: "Queue[object]") -> BuilderTyp return self def concurrent_updates( - self: BuilderType, concurrent_updates: Union[bool, int, "BaseProcessor"] + self: BuilderType, concurrent_updates: Union[bool, int, "BaseUpdateProcessor"] ) -> BuilderType: """Specifies if and how many updates may be processed concurrently instead of one by one. If not called, updates will be processed one by one. @@ -918,11 +929,11 @@ def concurrent_updates( .. seealso:: :attr:`telegram.ext.Application.concurrent_updates` Args: - concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseProcessor`): Passing + concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing :obj:`True` will allow for `256`` updates to be processed concurrently. Pass an integer to specify a different number of updates that may be processed - concurrently. Pass an instance of `BaseProcessor` to use that instance - for handling updates concurrently. + concurrently. Pass an instance of :class:`telegram.ext.BaseUpdateProcessor` to use + that instance for handling updates concurrently. Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. @@ -1084,7 +1095,8 @@ async def post_init(application: Application) -> None: return self def post_shutdown( - self: BuilderType, post_shutdown: Callable[[Application], Coroutine[Any, Any, None]] + self: BuilderType, + post_shutdown: Callable[[Application], Coroutine[Any, Any, None]], ) -> BuilderType: """ Sets a callback to be executed by :meth:`Application.run_polling` and @@ -1181,13 +1193,11 @@ def rate_limiter( return self # type: ignore[return-value] -InitApplicationBuilder = ( # This is defined all the way down here so that its type is inferred - ApplicationBuilder[ # by Pylance correctly. - ExtBot[None], - ContextTypes.DEFAULT_TYPE, - Dict[Any, Any], - Dict[Any, Any], - Dict[Any, Any], - JobQueue[ContextTypes.DEFAULT_TYPE], - ] -) +InitApplicationBuilder = ApplicationBuilder[ + ExtBot[None], + ContextTypes.DEFAULT_TYPE, + Dict[Any, Any], + Dict[Any, Any], + Dict[Any, Any], + JobQueue[ContextTypes.DEFAULT_TYPE], +] # This is defined all the way down here so that its type is inferred # by Pylance correctly. diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py new file mode 100644 index 00000000000..be864f256b1 --- /dev/null +++ b/telegram/ext/_baseupdateprocessor.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# A library that provides a Python interface to the Telegram Bot API +# Copyright (C) 2015-2023 +# Leandro Toledo de Souza +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser Public License for more details. +# +# You should have received a copy of the GNU Lesser Public License +# along with this program. If not, see [http://www.gnu.org/licenses/]. +"""This module contains the BaseProcessor class.""" +from abc import abstractmethod +from asyncio import BoundedSemaphore +from types import TracebackType +from typing import Any, Awaitable, Optional, Type + + +class BaseUpdateProcessor: + def __init__(self, max_concurrent_updates: int): + self._max_concurrent_updates = max_concurrent_updates + if self.max_concurrent_updates < 0: + raise ValueError("max_concurrent_updates must be a non-negative integer!") + self.semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) + + @property + def max_concurrent_updates(self) -> int: + return self._max_concurrent_updates + + @abstractmethod + async def process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + pass + + @abstractmethod + async def initialize(self) -> None: + pass + + @abstractmethod + async def shutdown(self) -> None: + pass + + async def do_process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + async with self.semaphore: + await self.process_update(update, coroutine) + + async def __aenter__(self) -> "BaseUpdateProcessor": + try: + await self.initialize() + return self + except Exception as exc: + await self.shutdown() + raise exc + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.shutdown() + + +class SimpleUpdateProcessor(BaseUpdateProcessor): + async def process_update( + self, + update: object, + coroutine: "Awaitable[Any]", + ) -> None: + await coroutine + + async def initialize(self) -> None: + pass + + async def shutdown(self) -> None: + pass From ecf27212e17655e647a8e44d5d43ae4de4020a6e Mon Sep 17 00:00:00 2001 From: Aditya Date: Thu, 13 Apr 2023 17:47:59 +0530 Subject: [PATCH 03/25] add docstrings --- telegram/ext/_applicationbuilder.py | 18 +++++---- telegram/ext/_baseupdateprocessor.py | 57 +++++++++++++++++++++++++--- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 78c3a9fc433..4a46cd7a87f 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -1193,11 +1193,13 @@ def rate_limiter( return self # type: ignore[return-value] -InitApplicationBuilder = ApplicationBuilder[ - ExtBot[None], - ContextTypes.DEFAULT_TYPE, - Dict[Any, Any], - Dict[Any, Any], - Dict[Any, Any], - JobQueue[ContextTypes.DEFAULT_TYPE], -] # This is defined all the way down here so that its type is inferred # by Pylance correctly. +InitApplicationBuilder = ( # This is defined all the way down here so that its type is inferred + ApplicationBuilder[ # by Pylance correctly. + ExtBot[None], + ContextTypes.DEFAULT_TYPE, + Dict[Any, Any], + Dict[Any, Any], + Dict[Any, Any], + JobQueue[ContextTypes.DEFAULT_TYPE], + ] +) diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index be864f256b1..110b6980f82 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -24,14 +24,31 @@ class BaseUpdateProcessor: + """An abstract base class for update processors. You can use this class to implement + your own update processor. + + .. seealso:: :wiki:`Concurrency` + + .. versionadded:: NEXT.VERSION + + Args: + max_concurrent_updates (:obj:`int`): The maximum number of concurrent updates to be + processed. If this number is exceeded, new updates will be queued until the number of + currently processed updates decreases. + + Raises: + :exc:`ValueError`: If `max_concurrent_updates` is a negative integer. + """ + def __init__(self, max_concurrent_updates: int): self._max_concurrent_updates = max_concurrent_updates if self.max_concurrent_updates < 0: - raise ValueError("max_concurrent_updates must be a non-negative integer!") - self.semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) + raise ValueError("`max_concurrent_updates` must be a non-negative integer!") + self._semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) @property def max_concurrent_updates(self) -> int: + """:obj:`int`: The maximum number of updates that can be processed concurrently.""" return self._max_concurrent_updates @abstractmethod @@ -40,25 +57,46 @@ async def process_update( update: object, coroutine: "Awaitable[Any]", ) -> None: - pass + """Custom implementation of how to process an update. + + Args: + update (:obj:`object`): The update to be processed. + coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + """ @abstractmethod async def initialize(self) -> None: - pass + """Initializes the Processor so resources can be allocated. + + .. seealso:: + :meth:`shutdown` + """ @abstractmethod async def shutdown(self) -> None: - pass + """Shutdown the Processor so resources can be freed. + + .. seealso:: + :meth:`initialize` + """ async def do_process_update( self, update: object, coroutine: "Awaitable[Any]", ) -> None: - async with self.semaphore: + """Calls :meth:`process_update` with a semaphore to limit the number of concurrent + updates. + + Args: + update (:obj:`object`): The update to be processed. + coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + """ + async with self._semaphore: await self.process_update(update, coroutine) async def __aenter__(self) -> "BaseUpdateProcessor": + """Simple context manager which initializes the Processor.""" try: await self.initialize() return self @@ -72,6 +110,7 @@ async def __aexit__( exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: + """Shutdown the Processor from the context manager.""" await self.shutdown() @@ -81,6 +120,12 @@ async def process_update( update: object, coroutine: "Awaitable[Any]", ) -> None: + """Immediately awaits the coroutine, i.e. does not apply any additional processing. + + Args: + update (:obj:`object`): The update to be processed. + coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + """ await coroutine async def initialize(self) -> None: From f9d8f3077d5aafa5697fa14d01590ffd68797c83 Mon Sep 17 00:00:00 2001 From: Aditya Date: Fri, 14 Apr 2023 19:40:54 +0530 Subject: [PATCH 04/25] some more improvements --- telegram/ext/__init__.py | 3 ++- telegram/ext/_application.py | 18 +++++++---------- telegram/ext/_applicationbuilder.py | 30 +++++++++++++++++++++------- telegram/ext/_baseupdateprocessor.py | 9 +++++++-- 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index b0e13d7a281..4af93ad513b 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -52,6 +52,7 @@ "PreCheckoutQueryHandler", "PrefixHandler", "ShippingQueryHandler", + "SimpleUpdateProcessor", "StringCommandHandler", "StringRegexHandler", "TypeHandler", @@ -64,7 +65,7 @@ from ._applicationbuilder import ApplicationBuilder from ._basepersistence import BasePersistence, PersistenceInput from ._baseratelimiter import BaseRateLimiter -from ._baseupdateprocessor import BaseUpdateProcessor +from ._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor from ._callbackcontext import CallbackContext from ._callbackdatacache import CallbackDataCache, InvalidCallbackData from ._callbackqueryhandler import CallbackQueryHandler diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 11561dbcf8a..50c4e43170e 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -57,7 +57,7 @@ from telegram._utils.warnings import warn from telegram.error import TelegramError from telegram.ext._basepersistence import BasePersistence -from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._handler import BaseHandler @@ -260,7 +260,7 @@ def __init__( update_queue: "asyncio.Queue[object]", updater: Optional[Updater], job_queue: JQ, - concurrent_updates: Union[bool, int, "BaseUpdateProcessor"], + update_processor: "BaseUpdateProcessor", persistence: Optional[BasePersistence[UD, CD, BD]], context_types: ContextTypes[CCT, UD, CD, BD], post_init: Optional[ @@ -299,16 +299,10 @@ def __init__( self.post_stop: Optional[ Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]] ] = post_stop - if concurrent_updates is True: - concurrent_updates = 256 - if isinstance(concurrent_updates, int): - if concurrent_updates < 0: - raise ValueError("`concurrent_updates` must be a non-negative integer!") - concurrent_updates = SimpleUpdateProcessor(concurrent_updates) self._concurrent_updates_sem = asyncio.BoundedSemaphore( - concurrent_updates.max_concurrent_updates or 1 + update_processor.max_concurrent_updates or 1 ) - self._update_processor: BaseUpdateProcessor = concurrent_updates + self._update_processor = update_processor self.bot_data: BD = self.context_types.bot_data() self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data) self._chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data) @@ -1076,7 +1070,9 @@ async def _update_fetcher(self) -> None: if self._update_processor: # We don't await the below because it has to be run concurrently self.create_task( - self._update_processor.do_process_update(update, self.process_update(update)), + self._update_processor.do_process_update( + update, self.process_update(update), self + ), update=update, ) else: diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 4a46cd7a87f..dee66486eff 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -36,7 +36,7 @@ from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue from telegram._utils.types import DVInput, DVType, FilePathInput, ODVInput from telegram.ext._application import Application -from telegram.ext._baseupdateprocessor import BaseUpdateProcessor +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._jobqueue import JobQueue @@ -309,7 +309,7 @@ def build( bot=bot, update_queue=update_queue, updater=updater, - concurrent_updates=DefaultValue.get_value(self._concurrent_updates), + update_processor=DefaultValue.get_value(self._concurrent_updates), job_queue=job_queue, persistence=persistence, context_types=DefaultValue.get_value(self._context_types), @@ -930,15 +930,31 @@ def concurrent_updates( Args: concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing - :obj:`True` will allow for `256`` updates to be processed concurrently. Pass an - integer to specify a different number of updates that may be processed - concurrently. Pass an instance of :class:`telegram.ext.BaseUpdateProcessor` to use - that instance for handling updates concurrently. + :obj:`True` will allow for ``256`` updates to be processed concurrently. Pass an + integer to specify a different number of updates that may be processed concurrently, + in that case :class:`telegram.ext.SimpleUpdateProcessor` is used to handle updates. + Pass an instance of :class:`telegram.ext.BaseUpdateProcessor` to use that instance + for handling updates concurrently. Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. """ - self._concurrent_updates = concurrent_updates + # Check if concurrent updates is bool and convert to integer + if concurrent_updates is True: + concurrent_updates = 256 + elif concurrent_updates is False: + concurrent_updates = 0 + + # If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor` + # instance with that integer value; otherwise, raise an error if the value + # is negative or not an integer. + if isinstance(concurrent_updates, int): + if concurrent_updates < 0: + raise ValueError("`concurrent_updates` must be a non-negative integer!") + concurrent_updates = SimpleUpdateProcessor(concurrent_updates) + + # Assign the value of `concurrent_updates` to `_concurrent_updates` + self._concurrent_updates: BaseUpdateProcessor = concurrent_updates return self def job_queue( diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 110b6980f82..ef3c7ee2cf6 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -22,6 +22,8 @@ from types import TracebackType from typing import Any, Awaitable, Optional, Type +from telegram.ext import Application + class BaseUpdateProcessor: """An abstract base class for update processors. You can use this class to implement @@ -84,6 +86,7 @@ async def do_process_update( self, update: object, coroutine: "Awaitable[Any]", + application: Application, ) -> None: """Calls :meth:`process_update` with a semaphore to limit the number of concurrent updates. @@ -91,9 +94,11 @@ async def do_process_update( Args: update (:obj:`object`): The update to be processed. coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + application (:class:`telegram.ext.Application`): The application instance. """ async with self._semaphore: await self.process_update(update, coroutine) + application.update_queue.task_done() async def __aenter__(self) -> "BaseUpdateProcessor": """Simple context manager which initializes the Processor.""" @@ -129,7 +134,7 @@ async def process_update( await coroutine async def initialize(self) -> None: - pass + """Does nothing.""" async def shutdown(self) -> None: - pass + """Does nothing.""" From ab95f79dab5f70d9293b3824f2de119836e05882 Mon Sep 17 00:00:00 2001 From: Aditya Date: Mon, 17 Apr 2023 19:38:32 +0530 Subject: [PATCH 05/25] exclude making new tasks when concurrent_updates is 0 --- telegram/ext/_application.py | 11 ++++------- telegram/ext/_applicationbuilder.py | 20 ++++++-------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 50c4e43170e..27edf5eb6b0 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -274,8 +274,7 @@ def __init__( ], ): if not was_called_by( - inspect.currentframe(), - Path(__file__).parent.resolve() / "_applicationbuilder.py", + inspect.currentframe(), Path(__file__).parent.resolve() / "_applicationbuilder.py" ): warn( "`Application` instances should be built via the `ApplicationBuilder`.", @@ -1056,6 +1055,7 @@ async def _update_fetcher(self) -> None: # Continuously fetch updates from the queue. Exit only once the signal object is found. while True: update = await self.update_queue.get() + if update is _STOP_SIGNAL: _LOGGER.debug("Dropping pending updates") while not self.update_queue.empty(): @@ -1067,7 +1067,7 @@ async def _update_fetcher(self) -> None: _LOGGER.debug("Processing update %s", update) - if self._update_processor: + if self._update_processor and self._update_processor.max_concurrent_updates > 0: # We don't await the below because it has to be run concurrently self.create_task( self._update_processor.do_process_update( @@ -1310,10 +1310,7 @@ def drop_user_data(self, user_id: int) -> None: self._user_ids_to_be_deleted_in_persistence.add(user_id) def migrate_chat_data( - self, - message: "Message" = None, - old_chat_id: int = None, - new_chat_id: int = None, + self, message: "Message" = None, old_chat_id: int = None, new_chat_id: int = None ) -> None: """Moves the contents of :attr:`chat_data` at key :paramref:`old_chat_id` to the key :paramref:`new_chat_id`. Also marks the entries to be updated accordingly in the next run diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index dee66486eff..d52ad229967 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -412,12 +412,7 @@ def _request_check(self, get_updates: bool) -> None: # Code below tests if it's okay to set a Request object. Only okay if no other request args # or instances containing a Request were set previously - for attr in ( - "connect_timeout", - "read_timeout", - "write_timeout", - "pool_timeout", - ): + for attr in ("connect_timeout", "read_timeout", "write_timeout", "pool_timeout"): if not isinstance(getattr(self, f"_{prefix}{attr}"), DefaultValue): raise RuntimeError(_TWO_ARGS_REQ.format(name, attr)) @@ -450,10 +445,7 @@ def _request_param_check(self, name: str, get_updates: bool) -> None: ) ) - if self._updater not in ( - DEFAULT_NONE, - None, - ): # disallow request args for updater(has bot) + if self._updater not in (DEFAULT_NONE, None): # disallow request args for updater(has bot) raise RuntimeError( _TWO_ARGS_REQ.format(f"get_updates_{name}" if get_updates else name, "updater") ) @@ -947,13 +939,14 @@ def concurrent_updates( # If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor` # instance with that integer value; otherwise, raise an error if the value - # is negative or not an integer. + # is negative if isinstance(concurrent_updates, int): if concurrent_updates < 0: raise ValueError("`concurrent_updates` must be a non-negative integer!") concurrent_updates = SimpleUpdateProcessor(concurrent_updates) - # Assign the value of `concurrent_updates` to `_concurrent_updates` + # Assign default value of concurrent_updates if it is instance of + # `BaseUpdateProcessor` self._concurrent_updates: BaseUpdateProcessor = concurrent_updates return self @@ -1111,8 +1104,7 @@ async def post_init(application: Application) -> None: return self def post_shutdown( - self: BuilderType, - post_shutdown: Callable[[Application], Coroutine[Any, Any, None]], + self: BuilderType, post_shutdown: Callable[[Application], Coroutine[Any, Any, None]] ) -> BuilderType: """ Sets a callback to be executed by :meth:`Application.run_polling` and From 2be64f29381c311c207067c0a08d1d40ad1649b5 Mon Sep 17 00:00:00 2001 From: Aditya Date: Wed, 19 Apr 2023 20:14:57 +0530 Subject: [PATCH 06/25] requested changes --- telegram/ext/_application.py | 15 ++++++-------- telegram/ext/_applicationbuilder.py | 28 +++++++++++++-------------- telegram/ext/_baseupdateprocessor.py | 29 +++++++++++++++++++--------- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 27edf5eb6b0..20cc724fcda 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -230,7 +230,6 @@ class Application(Generic[BT, CCT, UD, CD, BD, JQ], AsyncContextManager["Applica "_chat_ids_to_be_deleted_in_persistence", "_chat_ids_to_be_updated_in_persistence", "_concurrent_updates", - "_concurrent_updates_sem", "_conversation_handler_conversations", "_initialized", "_job_queue", @@ -298,9 +297,6 @@ def __init__( self.post_stop: Optional[ Callable[["Application[BT, CCT, UD, CD, BD, JQ]"], Coroutine[Any, Any, None]] ] = post_stop - self._concurrent_updates_sem = asyncio.BoundedSemaphore( - update_processor.max_concurrent_updates or 1 - ) self._update_processor = update_processor self.bot_data: BD = self.context_types.bot_data() self._user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data) @@ -356,8 +352,8 @@ def concurrent_updates(self) -> int: """:obj:`int`: The number of concurrent updates that will be processed in parallel. A value of ``0`` indicates updates are *not* being processed concurrently. - Note: - This is now just a shortcut to `processor.max_concurrent_updates`. + .. verisonchanged:: NEXT.VERSION + This is now just a shortcut to ``update_processor.max_concurrent_updates``. .. seealso:: :wiki:`Concurrency` """ @@ -380,8 +376,9 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]: return self._job_queue @property - def processor(self) -> Optional["BaseUpdateProcessor"]: - """:class:`telegram.ext.BaseProcessor`: The processor used by this application. + def update_processor(self) -> Optional["BaseUpdateProcessor"]: + """:class:`telegram.ext.BaseUpdateProcessor`: The update processor used by this + application. .. seealso:: :wiki:`Concurrency` """ @@ -1079,7 +1076,7 @@ async def _update_fetcher(self) -> None: await self.__process_update_wrapper(update) async def __process_update_wrapper(self, update: object) -> None: - async with self._concurrent_updates_sem: + async with self._update_processor.semaphore: await self.process_update(update) self.update_queue.task_done() diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index d52ad229967..d8ff9ca82c5 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -128,7 +128,7 @@ class ApplicationBuilder(Generic[BT, CCT, UD, CD, BD, JQ]): "_base_file_url", "_base_url", "_bot", - "_concurrent_updates", + "_update_processor", "_connect_timeout", "_connection_pool_size", "_context_types", @@ -199,9 +199,9 @@ def __init__(self: "InitApplicationBuilder"): self._context_types: DVType[ContextTypes] = DefaultValue(ContextTypes()) self._application_class: DVType[Type[Application]] = DefaultValue(Application) self._application_kwargs: Dict[str, object] = {} - self._concurrent_updates: Union[ - int, DefaultValue[bool], "BaseUpdateProcessor" - ] = DEFAULT_FALSE + self._update_processor: "BaseUpdateProcessor" = SimpleUpdateProcessor( + max_concurrent_updates=1 + ) self._updater: ODVInput[Updater] = DEFAULT_NONE self._post_init: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None self._post_shutdown: Optional[Callable[[Application], Coroutine[Any, Any, None]]] = None @@ -309,7 +309,7 @@ def build( bot=bot, update_queue=update_queue, updater=updater, - update_processor=DefaultValue.get_value(self._concurrent_updates), + update_processor=self._update_processor, job_queue=job_queue, persistence=persistence, context_types=DefaultValue.get_value(self._context_types), @@ -922,11 +922,11 @@ def concurrent_updates( Args: concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing - :obj:`True` will allow for ``256`` updates to be processed concurrently. Pass an - integer to specify a different number of updates that may be processed concurrently, - in that case :class:`telegram.ext.SimpleUpdateProcessor` is used to handle updates. - Pass an instance of :class:`telegram.ext.BaseUpdateProcessor` to use that instance - for handling updates concurrently. + :obj:`True` will allow for ``256`` updates to be processed concurrently using + :class:`telegram.ext.SimpleUpdateProcessor`. Pass an integer to specify a different + number of updates that may be processed concurrently. Pass an instance of + :class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates + concurrently. Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. @@ -935,19 +935,19 @@ def concurrent_updates( if concurrent_updates is True: concurrent_updates = 256 elif concurrent_updates is False: - concurrent_updates = 0 + concurrent_updates = 1 # If `concurrent_updates` is an integer, create a `SimpleUpdateProcessor` # instance with that integer value; otherwise, raise an error if the value # is negative if isinstance(concurrent_updates, int): - if concurrent_updates < 0: - raise ValueError("`concurrent_updates` must be a non-negative integer!") + if concurrent_updates < 1: + raise ValueError("`concurrent_updates` must be a positive integer!") concurrent_updates = SimpleUpdateProcessor(concurrent_updates) # Assign default value of concurrent_updates if it is instance of # `BaseUpdateProcessor` - self._concurrent_updates: BaseUpdateProcessor = concurrent_updates + self._update_processor: BaseUpdateProcessor = concurrent_updates # type: ignore[no-redef] return self def job_queue( diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index ef3c7ee2cf6..13006815eda 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -34,18 +34,18 @@ class BaseUpdateProcessor: .. versionadded:: NEXT.VERSION Args: - max_concurrent_updates (:obj:`int`): The maximum number of concurrent updates to be - processed. If this number is exceeded, new updates will be queued until the number of - currently processed updates decreases. + max_concurrent_updates (:obj:`int`): The maximum number of updates to be processed + concurrently. If this number is exceeded, new updates will be queued until the number + of currently processed updates decreases. Raises: - :exc:`ValueError`: If `max_concurrent_updates` is a negative integer. + :exc:`ValueError`: If :paramref:`max_concurrent_updates` is a negative integer. """ def __init__(self, max_concurrent_updates: int): self._max_concurrent_updates = max_concurrent_updates if self.max_concurrent_updates < 0: - raise ValueError("`max_concurrent_updates` must be a non-negative integer!") + raise ValueError("`max_concurrent_updates` must be a positive integer!") self._semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) @property @@ -53,13 +53,23 @@ def max_concurrent_updates(self) -> int: """:obj:`int`: The maximum number of updates that can be processed concurrently.""" return self._max_concurrent_updates + @property + def semaphore(self) -> BoundedSemaphore: + """:class:`asyncio.BoundedSemaphore`: The semaphore used to limit the number of + concurrent updates.""" + return self._semaphore + @abstractmethod async def process_update( self, update: object, coroutine: "Awaitable[Any]", ) -> None: - """Custom implementation of how to process an update. + """Custom implementation of how to process an update. Must be implemented by a subclass. + + Warning: + This method will be called by :meth:`do_process_update`. It should *not* be called + manually. Args: update (:obj:`object`): The update to be processed. @@ -68,7 +78,8 @@ async def process_update( @abstractmethod async def initialize(self) -> None: - """Initializes the Processor so resources can be allocated. + """Initializes the processor so resources can be allocated. Must be implemented by a + subclass. .. seealso:: :meth:`shutdown` @@ -76,7 +87,7 @@ async def initialize(self) -> None: @abstractmethod async def shutdown(self) -> None: - """Shutdown the Processor so resources can be freed. + """Shutdown the processor so resources can be freed. Must be implemented by a subclass. .. seealso:: :meth:`initialize` @@ -96,7 +107,7 @@ async def do_process_update( coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. application (:class:`telegram.ext.Application`): The application instance. """ - async with self._semaphore: + async with self.semaphore: await self.process_update(update, coroutine) application.update_queue.task_done() From 88d11baa7b0f94194f2f9dff645730546904c3b8 Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 25 Apr 2023 16:44:20 +0530 Subject: [PATCH 07/25] add tests and requested changes --- telegram/ext/__init__.py | 2 +- telegram/ext/_application.py | 20 ++++++--- telegram/ext/_baseupdateprocessor.py | 33 ++++++++------ tests/ext/test_application.py | 56 ++++++++++-------------- tests/ext/test_applicationbuilder.py | 33 ++++++++++++-- tests/ext/test_baseupdateprocessor.py | 62 +++++++++++++++++++++++++++ 6 files changed, 149 insertions(+), 57 deletions(-) create mode 100644 tests/ext/test_baseupdateprocessor.py diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index 4af93ad513b..a6abdb974e9 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -25,8 +25,8 @@ "ApplicationHandlerStop", "BaseHandler", "BasePersistence", - "BaseUpdateProcessor", "BaseRateLimiter", + "BaseUpdateProcessor", "CallbackContext", "CallbackDataCache", "CallbackQueryHandler", diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 20cc724fcda..2a26e7ec5a3 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -229,11 +229,11 @@ class Application(Generic[BT, CCT, UD, CD, BD, JQ], AsyncContextManager["Applica "_chat_data", "_chat_ids_to_be_deleted_in_persistence", "_chat_ids_to_be_updated_in_persistence", - "_concurrent_updates", "_conversation_handler_conversations", "_initialized", "_job_queue", "_running", + "_update_processor", "_user_data", "_user_ids_to_be_deleted_in_persistence", "_user_ids_to_be_updated_in_persistence", @@ -352,13 +352,21 @@ def concurrent_updates(self) -> int: """:obj:`int`: The number of concurrent updates that will be processed in parallel. A value of ``0`` indicates updates are *not* being processed concurrently. - .. verisonchanged:: NEXT.VERSION - This is now just a shortcut to ``update_processor.max_concurrent_updates``. + .. versionchanged:: NEXT.VERSION + This is now just a shortcut to :attr:`update_processor.max_concurrent_updates + `. .. seealso:: :wiki:`Concurrency` """ return self._update_processor.max_concurrent_updates + @property + def active_updates(self) -> int: + """:obj:`int`: The number of active updates that are currently being processed by the + update processor. + """ + return self.update_processor._semaphore._value # pylint: disable=protected-access + @property def job_queue(self) -> Optional["JobQueue[CCT]"]: """ @@ -376,7 +384,7 @@ def job_queue(self) -> Optional["JobQueue[CCT]"]: return self._job_queue @property - def update_processor(self) -> Optional["BaseUpdateProcessor"]: + def update_processor(self) -> "BaseUpdateProcessor": """:class:`telegram.ext.BaseUpdateProcessor`: The update processor used by this application. @@ -1064,7 +1072,7 @@ async def _update_fetcher(self) -> None: _LOGGER.debug("Processing update %s", update) - if self._update_processor and self._update_processor.max_concurrent_updates > 0: + if self._update_processor and self._update_processor.max_concurrent_updates > 1: # We don't await the below because it has to be run concurrently self.create_task( self._update_processor.do_process_update( @@ -1076,7 +1084,7 @@ async def _update_fetcher(self) -> None: await self.__process_update_wrapper(update) async def __process_update_wrapper(self, update: object) -> None: - async with self._update_processor.semaphore: + async with self._update_processor._semaphore: # pylint: disable=protected-access await self.process_update(update) self.update_queue.task_done() diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 13006815eda..4dc35565a13 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -20,9 +20,10 @@ from abc import abstractmethod from asyncio import BoundedSemaphore from types import TracebackType -from typing import Any, Awaitable, Optional, Type +from typing import TYPE_CHECKING, Any, Awaitable, Optional, Type -from telegram.ext import Application +if TYPE_CHECKING: + from telegram.ext import Application class BaseUpdateProcessor: @@ -44,21 +45,15 @@ class BaseUpdateProcessor: def __init__(self, max_concurrent_updates: int): self._max_concurrent_updates = max_concurrent_updates - if self.max_concurrent_updates < 0: + if self.max_concurrent_updates < 1: raise ValueError("`max_concurrent_updates` must be a positive integer!") - self._semaphore = BoundedSemaphore(self.max_concurrent_updates or 1) + self._semaphore = BoundedSemaphore(self.max_concurrent_updates) @property def max_concurrent_updates(self) -> int: """:obj:`int`: The maximum number of updates that can be processed concurrently.""" return self._max_concurrent_updates - @property - def semaphore(self) -> BoundedSemaphore: - """:class:`asyncio.BoundedSemaphore`: The semaphore used to limit the number of - concurrent updates.""" - return self._semaphore - @abstractmethod async def process_update( self, @@ -97,7 +92,7 @@ async def do_process_update( self, update: object, coroutine: "Awaitable[Any]", - application: Application, + application: "Application", ) -> None: """Calls :meth:`process_update` with a semaphore to limit the number of concurrent updates. @@ -107,7 +102,7 @@ async def do_process_update( coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. application (:class:`telegram.ext.Application`): The application instance. """ - async with self.semaphore: + async with self._semaphore: await self.process_update(update, coroutine) application.update_queue.task_done() @@ -131,6 +126,14 @@ async def __aexit__( class SimpleUpdateProcessor(BaseUpdateProcessor): + """Instance of :class:`telegram.ext.BaseUpdateProcessor` that immediately awaits the + coroutine, i.e. does not apply any additional processing. This is used by default when + :attr:`telegram.ext.ApplicationBuilder.concurrent_updates` is :obj:`int`. + + Objects of this class are comparable in terms of equality. Two objects of this class are + considered equal, if their :paramref:`max_concurrent_updates` is equal. + """ + async def process_update( self, update: object, @@ -144,6 +147,12 @@ async def process_update( """ await coroutine + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, self.__class__) + and other.max_concurrent_updates == self.max_concurrent_updates + ) + async def initialize(self) -> None: """Does nothing.""" diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 8ee4d09ec0f..17ffc118182 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -48,6 +48,7 @@ JobQueue, MessageHandler, PicklePersistence, + SimpleUpdateProcessor, TypeHandler, Updater, filters, @@ -133,7 +134,7 @@ def test_manual_init_warning(self, recwarn, updater): persistence=None, context_types=ContextTypes(), updater=updater, - concurrent_updates=False, + update_processor=False, post_init=None, post_shutdown=None, post_stop=None, @@ -145,15 +146,13 @@ def test_manual_init_warning(self, recwarn, updater): ) assert recwarn[0].filename == __file__, "stacklevel is incorrect!" - @pytest.mark.parametrize( - ("concurrent_updates", "expected"), [(0, 0), (4, 4), (False, 0), (True, 256)] - ) @pytest.mark.filterwarnings("ignore: `Application` instances should") - def test_init(self, one_time_bot, concurrent_updates, expected): + def test_init(self, one_time_bot): update_queue = asyncio.Queue() job_queue = JobQueue() persistence = PicklePersistence("file_path") context_types = ContextTypes() + update_processor = SimpleUpdateProcessor(1) updater = Updater(bot=one_time_bot, update_queue=update_queue) async def post_init(application: Application) -> None: @@ -172,7 +171,7 @@ async def post_stop(application: Application) -> None: persistence=persistence, context_types=context_types, updater=updater, - concurrent_updates=concurrent_updates, + update_processor=update_processor, post_init=post_init, post_shutdown=post_shutdown, post_stop=post_stop, @@ -185,7 +184,7 @@ async def post_stop(application: Application) -> None: assert app.updater is updater assert app.update_queue is updater.update_queue assert app.bot is updater.bot - assert app.concurrent_updates == expected + assert app.update_processor is update_processor assert app.post_init is post_init assert app.post_shutdown is post_shutdown assert app.post_stop is post_stop @@ -199,20 +198,6 @@ async def post_stop(application: Application) -> None: assert isinstance(app.chat_data[1], dict) assert isinstance(app.user_data[1], dict) - with pytest.raises(ValueError, match="must be a non-negative"): - Application( - bot=one_time_bot, - update_queue=update_queue, - job_queue=job_queue, - persistence=persistence, - context_types=context_types, - updater=updater, - concurrent_updates=-1, - post_init=None, - post_shutdown=None, - post_stop=None, - ) - def test_job_queue(self, one_time_bot, app, recwarn): expected_warning = ( "No `JobQueue` set up. To use `JobQueue`, you must install PTB via " @@ -1305,7 +1290,7 @@ def gen(): await app.create_task(gen()) assert event.is_set() - async def test_no_concurrent_updates(self, app): + async def test_no_update_processor(self, app): queue = asyncio.Queue() event_1 = asyncio.Event() event_2 = asyncio.Event() @@ -1333,14 +1318,14 @@ async def callback(u, c): await app.stop() - @pytest.mark.parametrize("concurrent_updates", [15, 50, 100]) - async def test_concurrent_updates(self, one_time_bot, concurrent_updates): + @pytest.mark.parametrize("update_processor", [15, 50, 100]) + async def test_update_processor(self, one_time_bot, update_processor): # We don't test with `True` since the large number of parallel coroutines quickly leads # to test instabilities - app = ( - Application.builder().bot(one_time_bot).concurrent_updates(concurrent_updates).build() - ) - events = {i: asyncio.Event() for i in range(app.concurrent_updates + 10)} + app = Application.builder().bot(one_time_bot).concurrent_updates(update_processor).build() + events = { + i: asyncio.Event() for i in range(app.update_processor.max_concurrent_updates + 10) + } queue = asyncio.Queue() for event in events.values(): await queue.put(event) @@ -1352,25 +1337,28 @@ async def callback(u, c): app.add_handler(TypeHandler(object, callback)) async with app: await app.start() - for i in range(app.concurrent_updates + 10): + for i in range(app.update_processor.max_concurrent_updates + 10): await app.update_queue.put(i) - for i in range(app.concurrent_updates + 10): + for i in range(app.update_processor.max_concurrent_updates + 10): assert not events[i].is_set() await asyncio.sleep(0.9) - for i in range(app.concurrent_updates): + for i in range(app.update_processor.max_concurrent_updates): assert events[i].is_set() - for i in range(app.concurrent_updates, app.concurrent_updates + 10): + for i in range( + app.update_processor.max_concurrent_updates, + app.update_processor.max_concurrent_updates + 10, + ): assert not events[i].is_set() await asyncio.sleep(0.5) - for i in range(app.concurrent_updates + 10): + for i in range(app.update_processor.max_concurrent_updates + 10): assert events[i].is_set() await app.stop() - async def test_concurrent_updates_done_on_shutdown(self, one_time_bot): + async def test_update_processor_done_on_shutdown(self, one_time_bot): app = Application.builder().bot(one_time_bot).concurrent_updates(True).build() event = asyncio.Event() diff --git a/tests/ext/test_applicationbuilder.py b/tests/ext/test_applicationbuilder.py index b17d30451fb..b4902e9287f 100644 --- a/tests/ext/test_applicationbuilder.py +++ b/tests/ext/test_applicationbuilder.py @@ -35,6 +35,7 @@ Updater, ) from telegram.ext._applicationbuilder import _BOT_CHECKS +from telegram.ext._baseupdateprocessor import SimpleUpdateProcessor from telegram.request import HTTPXRequest from tests.auxil.constants import PRIVATE_KEY from tests.auxil.envvars import TEST_WITH_OPT_DEPS @@ -96,7 +97,7 @@ class Client: app = builder.token(bot.token).build() assert isinstance(app, Application) - assert app.concurrent_updates == 0 + assert app.update_processor == SimpleUpdateProcessor(1) assert isinstance(app.bot, ExtBot) assert isinstance(app.bot.request, HTTPXRequest) @@ -367,12 +368,21 @@ def __init__(self, arg, **kwargs): assert isinstance(app, CustomApplication) assert app.arg == 2 - def test_all_application_args_custom(self, builder, bot, monkeypatch): + @pytest.mark.parametrize( + ("concurrent_updates", "expected"), + [ + (4, SimpleUpdateProcessor(4)), + (False, SimpleUpdateProcessor(1)), + (True, SimpleUpdateProcessor(256)), + ], + ) + def test_all_application_args_custom( + self, builder, bot, monkeypatch, concurrent_updates, expected + ): job_queue = JobQueue() persistence = PicklePersistence("file_path") update_queue = asyncio.Queue() context_types = ContextTypes() - concurrent_updates = 123 async def post_init(app: Application) -> None: pass @@ -395,6 +405,7 @@ async def post_stop(app: Application) -> None: .post_stop(post_stop) .arbitrary_callback_data(True) ).build() + assert app.job_queue is job_queue assert app.job_queue.application is app assert app.persistence is persistence @@ -403,7 +414,7 @@ async def post_stop(app: Application) -> None: assert app.updater.update_queue is update_queue assert app.updater.bot is app.bot assert app.context_types is context_types - assert app.concurrent_updates == concurrent_updates + assert app.update_processor == expected assert app.post_init is post_init assert app.post_shutdown is post_shutdown assert app.post_stop is post_stop @@ -415,6 +426,20 @@ async def post_stop(app: Application) -> None: assert app.bot is updater.bot assert app.update_queue is updater.update_queue + with pytest.raises(ValueError, match="must be a positive"): + ( + builder.token(bot.token) + .job_queue(job_queue) + .persistence(persistence) + .update_queue(update_queue) + .context_types(context_types) + .concurrent_updates(-1) + .post_init(post_init) + .post_shutdown(post_shutdown) + .post_stop(post_stop) + .arbitrary_callback_data(True) + ).build() + @pytest.mark.parametrize("input_type", ["bytes", "str", "Path"]) def test_all_private_key_input_types(self, builder, bot, input_type): private_key = data_file("private.key") diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py new file mode 100644 index 00000000000..2ca136f4ed7 --- /dev/null +++ b/tests/ext/test_baseupdateprocessor.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# +# A library that provides a Python interface to the Telegram Bot API +# Copyright (C) 2015-2023 +# Leandro Toledo de Souza +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser Public License for more details. +# +# You should have received a copy of the GNU Lesser Public License +# along with this program. If not, see [http://www.gnu.org/licenses/]. +import pytest + +from telegram import Update +from telegram.ext import ApplicationBuilder, BaseUpdateProcessor, SimpleUpdateProcessor + + +class TestBaseUpdateProcessor: + def test_init_with_negative_max_concurrent_updates(self): + with pytest.raises(ValueError, match="must be a positive integer"): + BaseUpdateProcessor(-1) + + def test_max_concurrent_updates_property(self): + processor = BaseUpdateProcessor(3) + assert processor.max_concurrent_updates == 3 + + async def test_do_process_update(self, one_time_bot): + class MockProcessor(BaseUpdateProcessor): + async def process_update(self, update, coroutine): + pass + + processor = MockProcessor(5) + application = ApplicationBuilder().concurrent_updates(processor).bot(one_time_bot).build() + update = Update(1, None) + + async def coroutine(): + pass + + await processor.do_process_update(update, coroutine(), application) + + +class TestSimpleUpdateProcessor: + async def test_process_update(self): + processor = SimpleUpdateProcessor(1) + update = Update(1, None) + + async def coroutine(): + pass + + await processor.process_update(update, coroutine()) + + async def test_equality(self): + processor1 = SimpleUpdateProcessor(1) + processor2 = SimpleUpdateProcessor(1) + assert processor1 == processor2 From 99e556e90469a87449170ab9a92ab77990003a0a Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 25 Apr 2023 19:48:00 +0530 Subject: [PATCH 08/25] fix tests and docstrings --- docs/source/telegram.ext.rst | 2 ++ telegram/ext/_application.py | 2 +- telegram/ext/_applicationbuilder.py | 10 ++++----- telegram/ext/_baseupdateprocessor.py | 29 +++++++++++++++------------ tests/ext/test_baseupdateprocessor.py | 12 ++++++----- 5 files changed, 31 insertions(+), 24 deletions(-) diff --git a/docs/source/telegram.ext.rst b/docs/source/telegram.ext.rst index 1116fa31df2..153b94e5395 100644 --- a/docs/source/telegram.ext.rst +++ b/docs/source/telegram.ext.rst @@ -7,12 +7,14 @@ telegram.ext package telegram.ext.application telegram.ext.applicationbuilder telegram.ext.applicationhandlerstop + telegram.ext.baseupdateprocessor telegram.ext.callbackcontext telegram.ext.contexttypes telegram.ext.defaults telegram.ext.extbot telegram.ext.job telegram.ext.jobqueue + telegram.ext.simpleupdateprocessor telegram.ext.updater telegram.ext.handlers-tree.rst telegram.ext.persistence-tree.rst diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 2a26e7ec5a3..a9ed6e60964 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -1075,7 +1075,7 @@ async def _update_fetcher(self) -> None: if self._update_processor and self._update_processor.max_concurrent_updates > 1: # We don't await the below because it has to be run concurrently self.create_task( - self._update_processor.do_process_update( + self._update_processor.process_update( update, self.process_update(update), self ), update=update, diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index d8ff9ca82c5..4da458b7760 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -922,11 +922,11 @@ def concurrent_updates( Args: concurrent_updates (:obj:`bool` | :obj:`int` | :class:`BaseUpdateProcessor`): Passing - :obj:`True` will allow for ``256`` updates to be processed concurrently using - :class:`telegram.ext.SimpleUpdateProcessor`. Pass an integer to specify a different - number of updates that may be processed concurrently. Pass an instance of - :class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates - concurrently. + :obj:`True` will allow for ``256`` updates to be processed concurrently using + :class:`telegram.ext.SimpleUpdateProcessor`. Pass an integer to specify a different + number of updates that may be processed concurrently. Pass an instance of + :class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates + concurrently. Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 4dc35565a13..0bef973fe11 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -36,8 +36,8 @@ class BaseUpdateProcessor: Args: max_concurrent_updates (:obj:`int`): The maximum number of updates to be processed - concurrently. If this number is exceeded, new updates will be queued until the number - of currently processed updates decreases. + concurrently. If this number is exceeded, new updates will be queued until the number + of currently processed updates decreases. Raises: :exc:`ValueError`: If :paramref:`max_concurrent_updates` is a negative integer. @@ -55,7 +55,7 @@ def max_concurrent_updates(self) -> int: return self._max_concurrent_updates @abstractmethod - async def process_update( + async def do_process_update( self, update: object, coroutine: "Awaitable[Any]", @@ -63,7 +63,7 @@ async def process_update( """Custom implementation of how to process an update. Must be implemented by a subclass. Warning: - This method will be called by :meth:`do_process_update`. It should *not* be called + This method will be called by :meth:`process_update`. It should *not* be called manually. Args: @@ -88,13 +88,13 @@ async def shutdown(self) -> None: :meth:`initialize` """ - async def do_process_update( + async def process_update( self, update: object, coroutine: "Awaitable[Any]", application: "Application", ) -> None: - """Calls :meth:`process_update` with a semaphore to limit the number of concurrent + """Calls :meth:`do_process_update` with a semaphore to limit the number of concurrent updates. Args: @@ -103,7 +103,7 @@ async def do_process_update( application (:class:`telegram.ext.Application`): The application instance. """ async with self._semaphore: - await self.process_update(update, coroutine) + await self.do_process_update(update, coroutine) application.update_queue.task_done() async def __aenter__(self) -> "BaseUpdateProcessor": @@ -134,7 +134,7 @@ class SimpleUpdateProcessor(BaseUpdateProcessor): considered equal, if their :paramref:`max_concurrent_updates` is equal. """ - async def process_update( + async def do_process_update( self, update: object, coroutine: "Awaitable[Any]", @@ -147,14 +147,17 @@ async def process_update( """ await coroutine + async def initialize(self) -> None: + """Does nothing.""" + + async def shutdown(self) -> None: + """Does nothing.""" + def __eq__(self, other: object) -> bool: return ( isinstance(other, self.__class__) and other.max_concurrent_updates == self.max_concurrent_updates ) - async def initialize(self) -> None: - """Does nothing.""" - - async def shutdown(self) -> None: - """Does nothing.""" + def __hash__(self) -> int: + return hash(self.max_concurrent_updates) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index 2ca136f4ed7..d1f17154fa7 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -31,9 +31,9 @@ def test_max_concurrent_updates_property(self): processor = BaseUpdateProcessor(3) assert processor.max_concurrent_updates == 3 - async def test_do_process_update(self, one_time_bot): + async def test_process_update(self, one_time_bot): class MockProcessor(BaseUpdateProcessor): - async def process_update(self, update, coroutine): + async def do_process_update(self, update, coroutine): pass processor = MockProcessor(5) @@ -43,18 +43,20 @@ async def process_update(self, update, coroutine): async def coroutine(): pass - await processor.do_process_update(update, coroutine(), application) + await application.update_queue.put(1) + await processor.process_update(update, coroutine, application) + assert not application.update_queue.empty() class TestSimpleUpdateProcessor: - async def test_process_update(self): + async def test_do_process_update(self): processor = SimpleUpdateProcessor(1) update = Update(1, None) async def coroutine(): pass - await processor.process_update(update, coroutine()) + await processor.do_process_update(update, coroutine()) async def test_equality(self): processor1 = SimpleUpdateProcessor(1) From 19d47fe07d7615c757dd9929df3341bc735b3c3a Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 25 Apr 2023 20:19:37 +0530 Subject: [PATCH 09/25] add slots --- telegram/ext/_baseupdateprocessor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 0bef973fe11..1b784f2e226 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -43,6 +43,8 @@ class BaseUpdateProcessor: :exc:`ValueError`: If :paramref:`max_concurrent_updates` is a negative integer. """ + __slots__ = ("_max_concurrent_updates", "_semaphore") + def __init__(self, max_concurrent_updates: int): self._max_concurrent_updates = max_concurrent_updates if self.max_concurrent_updates < 1: @@ -134,6 +136,8 @@ class SimpleUpdateProcessor(BaseUpdateProcessor): considered equal, if their :paramref:`max_concurrent_updates` is equal. """ + __slots__ = () + async def do_process_update( self, update: object, From c922b9cabea83d8f67db5735379bd7231127c0cd Mon Sep 17 00:00:00 2001 From: Aditya Date: Thu, 27 Apr 2023 19:43:59 +0530 Subject: [PATCH 10/25] extend test coverage --- telegram/ext/_application.py | 2 +- tests/ext/test_application.py | 15 +++++++++++++++ tests/ext/test_baseupdateprocessor.py | 1 + 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index a9ed6e60964..5e29bbd80f6 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -365,7 +365,7 @@ def active_updates(self) -> int: """:obj:`int`: The number of active updates that are currently being processed by the update processor. """ - return self.update_processor._semaphore._value # pylint: disable=protected-access + return self._update_processor._semaphore._value # pylint: disable=protected-access @property def job_queue(self) -> Optional["JobQueue[CCT]"]: diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 17ffc118182..97f0264a688 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -210,6 +210,21 @@ def test_job_queue(self, one_time_bot, app, recwarn): assert str(recwarn[0].message) == expected_warning assert recwarn[0].filename == __file__, "wrong stacklevel" + def test_concurrent_updates(self, one_time_bot): + application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(2).build() + assert application.concurrent_updates == ( + application.update_processor.max_concurrent_updates + ) + + def test_active_updates(self, one_time_bot): + application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(2).build() + assert application.active_updates == application.update_processor._semaphore._value + + def test_update_processor_property(self, one_time_bot): + processor = SimpleUpdateProcessor(2) + application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(processor).build() + assert application.update_processor is processor + def test_custom_context_init(self, one_time_bot): cc = ContextTypes( context=CustomContext, diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index d1f17154fa7..4dc25440542 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -62,3 +62,4 @@ async def test_equality(self): processor1 = SimpleUpdateProcessor(1) processor2 = SimpleUpdateProcessor(1) assert processor1 == processor2 + assert hash(processor1) == hash(processor1) From 5f4ab1d409640d4f84911a52a0e5bd9b5b94094e Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 2 May 2023 20:11:25 +0530 Subject: [PATCH 11/25] fix tests --- tests/ext/test_application.py | 21 +++++++++++++--- tests/ext/test_baseupdateprocessor.py | 35 +++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 97f0264a688..2a4b029f441 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -34,13 +34,14 @@ import pytest -from telegram import Bot, Chat, Message, MessageEntity, User +from telegram import Bot, Chat, Message, MessageEntity, Update, User from telegram.error import TelegramError from telegram.ext import ( Application, ApplicationBuilder, ApplicationHandlerStop, BaseHandler, + BaseUpdateProcessor, CallbackContext, CommandHandler, ContextTypes, @@ -216,9 +217,21 @@ def test_concurrent_updates(self, one_time_bot): application.update_processor.max_concurrent_updates ) - def test_active_updates(self, one_time_bot): - application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(2).build() - assert application.active_updates == application.update_processor._semaphore._value + async def test_active_updates(self, one_time_bot): + class MockProcessor(BaseUpdateProcessor): + async def do_process_update(self, update, coroutine) -> None: + pass + + processor = MockProcessor(2) + + async def coroutine(): + pass + + application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(processor).build() + assert application.active_updates == processor._semaphore._value + await application.update_queue.put(1) + await processor.process_update(Update(1), coroutine, application) + assert application.active_updates == processor._semaphore._value def test_update_processor_property(self, one_time_bot): processor = SimpleUpdateProcessor(2) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index 4dc25440542..afe652391fb 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -20,6 +20,7 @@ from telegram import Update from telegram.ext import ApplicationBuilder, BaseUpdateProcessor, SimpleUpdateProcessor +from tests.auxil.asyncio_helpers import call_after class TestBaseUpdateProcessor: @@ -38,7 +39,7 @@ async def do_process_update(self, update, coroutine): processor = MockProcessor(5) application = ApplicationBuilder().concurrent_updates(processor).bot(one_time_bot).build() - update = Update(1, None) + update = Update(1) async def coroutine(): pass @@ -47,11 +48,41 @@ async def coroutine(): await processor.process_update(update, coroutine, application) assert not application.update_queue.empty() + async def test_context_manager(self, monkeypatch): + class MockProcessor(BaseUpdateProcessor): + async def do_process_update(self, update, coroutine): + pass + + processor = MockProcessor(5) + self.test_flag = set() + + async def after_initialize(*args, **kwargs): + self.test_flag.add("initialize") + + async def after_shutdown(*args, **kwargs): + self.test_flag.add("stop") + + monkeypatch.setattr( + BaseUpdateProcessor, + "initialize", + call_after(BaseUpdateProcessor.initialize, after_initialize), + ) + monkeypatch.setattr( + BaseUpdateProcessor, + "shutdown", + call_after(BaseUpdateProcessor.shutdown, after_shutdown), + ) + + async with processor: + pass + + assert self.test_flag == {"initialize", "stop"} + class TestSimpleUpdateProcessor: async def test_do_process_update(self): processor = SimpleUpdateProcessor(1) - update = Update(1, None) + update = Update(1) async def coroutine(): pass From 3c8f4e57a34b2c24c7c905ce8b4bed0a48f28a9e Mon Sep 17 00:00:00 2001 From: Aditya Date: Sun, 7 May 2023 18:31:33 +0530 Subject: [PATCH 12/25] add tests and some requested changes --- telegram/ext/_application.py | 28 ++++++++++----- telegram/ext/_baseupdateprocessor.py | 20 +---------- tests/ext/test_application.py | 11 ------ tests/ext/test_applicationbuilder.py | 6 ++-- tests/ext/test_baseupdateprocessor.py | 51 ++++++++++++++++----------- 5 files changed, 54 insertions(+), 62 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index ec5bf509529..88624e417ee 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -57,7 +57,7 @@ from telegram._utils.warnings import warn from telegram.error import TelegramError from telegram.ext._basepersistence import BasePersistence -from telegram.ext._baseupdateprocessor import BaseUpdateProcessor +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._handler import BaseHandler @@ -398,6 +398,8 @@ async def initialize(self) -> None: * The :attr:`bot`, by calling :meth:`telegram.Bot.initialize`. * The :attr:`updater`, by calling :meth:`telegram.ext.Updater.initialize`. * The :attr:`persistence`, by loading persistent conversations and data. + * The :attr:`update_processor` by calling :meth:`telegram.ext.BaseUpdateProcessor.\ + initialize`. Does *not* call :attr:`post_init` - that is only done by :meth:`run_polling` and :meth:`run_webhook`. @@ -410,6 +412,11 @@ async def initialize(self) -> None: return await self.bot.initialize() + if isinstance(self._update_processor, BaseUpdateProcessor) and not isinstance( + self._update_processor, SimpleUpdateProcessor + ): + await self._update_processor.initialize() + if self.updater: await self.updater.initialize() @@ -442,6 +449,7 @@ async def shutdown(self) -> None: * :attr:`updater` by calling :meth:`telegram.ext.Updater.shutdown` * :attr:`persistence` by calling :meth:`update_persistence` and :meth:`BasePersistence.flush` + * :attr:`update_processor` by calling :meth:`telegram.ext.BaseUpdateProcessor.shutdown` Does *not* call :attr:`post_shutdown` - that is only done by :meth:`run_polling` and :meth:`run_webhook`. @@ -460,6 +468,11 @@ async def shutdown(self) -> None: return await self.bot.shutdown() + if isinstance(self._update_processor, BaseUpdateProcessor) and not isinstance( + self._update_processor, SimpleUpdateProcessor + ): + await self._update_processor.shutdown() + if self.updater: await self.updater.shutdown() @@ -1073,17 +1086,15 @@ async def _update_fetcher(self) -> None: _LOGGER.debug("Processing update %s", update) - if self._update_processor and self._update_processor.max_concurrent_updates > 1: + if self._update_processor.max_concurrent_updates > 1: # We don't await the below because it has to be run concurrently self.create_task( - self._update_processor.process_update( - update, self.process_update(update), self - ), + self.__process_update_wrapper(update), update=update, ) else: await self.__process_update_wrapper(update) - + except asyncio.CancelledError: # This may happen if the application is manually run via application.start() and # then a KeyboardInterrupt is sent. We must prevent this loop to die since @@ -1094,9 +1105,8 @@ async def _update_fetcher(self) -> None: ) async def __process_update_wrapper(self, update: object) -> None: - async with self._update_processor._semaphore: # pylint: disable=protected-access - await self.process_update(update) - self.update_queue.task_done() + await self._update_processor.process_update(update, self.process_update(update)) + self.update_queue.task_done() async def process_update(self, update: object) -> None: """Processes a single update and marks the update to be updated by the persistence later. diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 1b784f2e226..1923ebf22d4 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -20,10 +20,7 @@ from abc import abstractmethod from asyncio import BoundedSemaphore from types import TracebackType -from typing import TYPE_CHECKING, Any, Awaitable, Optional, Type - -if TYPE_CHECKING: - from telegram.ext import Application +from typing import Any, Awaitable, Optional, Type class BaseUpdateProcessor: @@ -94,7 +91,6 @@ async def process_update( self, update: object, coroutine: "Awaitable[Any]", - application: "Application", ) -> None: """Calls :meth:`do_process_update` with a semaphore to limit the number of concurrent updates. @@ -102,11 +98,9 @@ async def process_update( Args: update (:obj:`object`): The update to be processed. coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. - application (:class:`telegram.ext.Application`): The application instance. """ async with self._semaphore: await self.do_process_update(update, coroutine) - application.update_queue.task_done() async def __aenter__(self) -> "BaseUpdateProcessor": """Simple context manager which initializes the Processor.""" @@ -131,9 +125,6 @@ class SimpleUpdateProcessor(BaseUpdateProcessor): """Instance of :class:`telegram.ext.BaseUpdateProcessor` that immediately awaits the coroutine, i.e. does not apply any additional processing. This is used by default when :attr:`telegram.ext.ApplicationBuilder.concurrent_updates` is :obj:`int`. - - Objects of this class are comparable in terms of equality. Two objects of this class are - considered equal, if their :paramref:`max_concurrent_updates` is equal. """ __slots__ = () @@ -156,12 +147,3 @@ async def initialize(self) -> None: async def shutdown(self) -> None: """Does nothing.""" - - def __eq__(self, other: object) -> bool: - return ( - isinstance(other, self.__class__) - and other.max_concurrent_updates == self.max_concurrent_updates - ) - - def __hash__(self) -> int: - return hash(self.max_concurrent_updates) diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 03d41d83c1b..3cf3d8561ae 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -213,12 +213,6 @@ def test_job_queue(self, one_time_bot, app, recwarn): assert recwarn[0].category is PTBUserWarning assert recwarn[0].filename == __file__, "wrong stacklevel" - def test_concurrent_updates(self, one_time_bot): - application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(2).build() - assert application.concurrent_updates == ( - application.update_processor.max_concurrent_updates - ) - async def test_active_updates(self, one_time_bot): class MockProcessor(BaseUpdateProcessor): async def do_process_update(self, update, coroutine) -> None: @@ -235,11 +229,6 @@ async def coroutine(): await processor.process_update(Update(1), coroutine, application) assert application.active_updates == processor._semaphore._value - def test_update_processor_property(self, one_time_bot): - processor = SimpleUpdateProcessor(2) - application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(processor).build() - assert application.update_processor is processor - def test_custom_context_init(self, one_time_bot): cc = ContextTypes( context=CustomContext, diff --git a/tests/ext/test_applicationbuilder.py b/tests/ext/test_applicationbuilder.py index b4902e9287f..01b338d0c8f 100644 --- a/tests/ext/test_applicationbuilder.py +++ b/tests/ext/test_applicationbuilder.py @@ -376,8 +376,9 @@ def __init__(self, arg, **kwargs): (True, SimpleUpdateProcessor(256)), ], ) + @pytest.mark.parametrize("conc_updates", [0, -1]) def test_all_application_args_custom( - self, builder, bot, monkeypatch, concurrent_updates, expected + self, builder, bot, monkeypatch, concurrent_updates, expected, conc_updates ): job_queue = JobQueue() persistence = PicklePersistence("file_path") @@ -415,6 +416,7 @@ async def post_stop(app: Application) -> None: assert app.updater.bot is app.bot assert app.context_types is context_types assert app.update_processor == expected + assert app.concurrent_updates == (app.update_processor.max_concurrent_updates) assert app.post_init is post_init assert app.post_shutdown is post_shutdown assert app.post_stop is post_stop @@ -433,7 +435,7 @@ async def post_stop(app: Application) -> None: .persistence(persistence) .update_queue(update_queue) .context_types(context_types) - .concurrent_updates(-1) + .concurrent_updates(conc_updates) .post_init(post_init) .post_shutdown(post_shutdown) .post_stop(post_stop) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index afe652391fb..369f0ed2c6c 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -23,21 +23,24 @@ from tests.auxil.asyncio_helpers import call_after -class TestBaseUpdateProcessor: - def test_init_with_negative_max_concurrent_updates(self): - with pytest.raises(ValueError, match="must be a positive integer"): - BaseUpdateProcessor(-1) +def mock_processor(): + class MockProcessor(BaseUpdateProcessor): + async def do_process_update(self, update, coroutine): + pass + + return MockProcessor(5) + - def test_max_concurrent_updates_property(self): +class TestBaseUpdateProcessor: + @pytest.mark.parametrize("concurrent_updates", [0, -1]) + def test_init(self, concurrent_updates): processor = BaseUpdateProcessor(3) assert processor.max_concurrent_updates == 3 + with pytest.raises(ValueError, match="must be a positive integer"): + BaseUpdateProcessor(concurrent_updates) async def test_process_update(self, one_time_bot): - class MockProcessor(BaseUpdateProcessor): - async def do_process_update(self, update, coroutine): - pass - - processor = MockProcessor(5) + processor = mock_processor() application = ApplicationBuilder().concurrent_updates(processor).bot(one_time_bot).build() update = Update(1) @@ -49,11 +52,7 @@ async def coroutine(): assert not application.update_queue.empty() async def test_context_manager(self, monkeypatch): - class MockProcessor(BaseUpdateProcessor): - async def do_process_update(self, update, coroutine): - pass - - processor = MockProcessor(5) + processor = mock_processor() self.test_flag = set() async def after_initialize(*args, **kwargs): @@ -78,6 +77,22 @@ async def after_shutdown(*args, **kwargs): assert self.test_flag == {"initialize", "stop"} + async def test_context_manager_exception_on_init(self, monkeypatch): + async def initialize(*args, **kwargs): + raise RuntimeError("initialize") + + async def shutdown(*args, **kwargs): + self.test_flag = "shutdown" + + monkeypatch.setattr(BaseUpdateProcessor, "initialize", initialize) + monkeypatch.setattr(BaseUpdateProcessor, "shutdown", shutdown) + + with pytest.raises(RuntimeError, match="initialize"): + async with mock_processor(): + pass + + assert self.test_flag == "shutdown" + class TestSimpleUpdateProcessor: async def test_do_process_update(self): @@ -88,9 +103,3 @@ async def coroutine(): pass await processor.do_process_update(update, coroutine()) - - async def test_equality(self): - processor1 = SimpleUpdateProcessor(1) - processor2 = SimpleUpdateProcessor(1) - assert processor1 == processor2 - assert hash(processor1) == hash(processor1) From 9c0fa9ac3733478619052cd0ebf8fb5b0f246ff0 Mon Sep 17 00:00:00 2001 From: Aditya Date: Sat, 13 May 2023 22:43:21 +0530 Subject: [PATCH 13/25] fix tests --- telegram/ext/_application.py | 7 --- tests/ext/test_application.py | 89 +++++++++++++++++++++------- tests/ext/test_applicationbuilder.py | 8 ++- 3 files changed, 72 insertions(+), 32 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 88624e417ee..2383007d6ee 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -360,13 +360,6 @@ def concurrent_updates(self) -> int: """ return self._update_processor.max_concurrent_updates - @property - def active_updates(self) -> int: - """:obj:`int`: The number of active updates that are currently being processed by the - update processor. - """ - return self._update_processor._semaphore._value # pylint: disable=protected-access - @property def job_queue(self) -> Optional["JobQueue[CCT]"]: """ diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 3cf3d8561ae..3725a2200cb 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -34,7 +34,7 @@ import pytest -from telegram import Bot, Chat, Message, MessageEntity, Update, User +from telegram import Bot, Chat, Message, MessageEntity, User from telegram.error import TelegramError from telegram.ext import ( Application, @@ -213,22 +213,6 @@ def test_job_queue(self, one_time_bot, app, recwarn): assert recwarn[0].category is PTBUserWarning assert recwarn[0].filename == __file__, "wrong stacklevel" - async def test_active_updates(self, one_time_bot): - class MockProcessor(BaseUpdateProcessor): - async def do_process_update(self, update, coroutine) -> None: - pass - - processor = MockProcessor(2) - - async def coroutine(): - pass - - application = ApplicationBuilder().bot(one_time_bot).concurrent_updates(processor).build() - assert application.active_updates == processor._semaphore._value - await application.update_queue.put(1) - await processor.process_update(Update(1), coroutine, application) - assert application.active_updates == processor._semaphore._value - def test_custom_context_init(self, one_time_bot): cc = ContextTypes( context=CustomContext, @@ -243,53 +227,114 @@ def test_custom_context_init(self, one_time_bot): assert isinstance(application.chat_data[1], float) assert isinstance(application.bot_data, complex) + @pytest.mark.parametrize( + "update_processor", [BaseUpdateProcessor(1), SimpleUpdateProcessor(1)] + ) @pytest.mark.parametrize("updater", [True, False]) - async def test_initialize(self, one_time_bot, monkeypatch, updater): + async def test_initialize(self, one_time_bot, monkeypatch, updater, update_processor): """Initialization of persistence is tested test_basepersistence""" + is_update_processor = False self.test_flag = set() async def after_initialize_bot(*args, **kwargs): self.test_flag.add("bot") + async def after_initialize_update_processor(*args, **kwargs): + self.test_flag.add("update_processor") + async def after_initialize_updater(*args, **kwargs): self.test_flag.add("updater") monkeypatch.setattr(Bot, "initialize", call_after(Bot.initialize, after_initialize_bot)) + if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( + update_processor, SimpleUpdateProcessor + ): + is_update_processor = True + monkeypatch.setattr( + BaseUpdateProcessor, + "initialize", + call_after(BaseUpdateProcessor.initialize, after_initialize_update_processor), + ) monkeypatch.setattr( Updater, "initialize", call_after(Updater.initialize, after_initialize_updater) ) - - if updater: + if updater and is_update_processor: + app = ( + ApplicationBuilder().bot(one_time_bot).concurrent_updates(update_processor).build() + ) + await app.initialize() + assert self.test_flag == {"bot", "update_processor", "updater"} + await app.shutdown() + elif updater and not is_update_processor: app = ApplicationBuilder().bot(one_time_bot).build() await app.initialize() assert self.test_flag == {"bot", "updater"} await app.shutdown() + elif is_update_processor and not updater: + app = ( + ApplicationBuilder() + .bot(one_time_bot) + .updater(None) + .concurrent_updates(update_processor) + .build() + ) + await app.initialize() + assert self.test_flag == {"bot", "update_processor"} + await app.shutdown() else: app = ApplicationBuilder().bot(one_time_bot).updater(None).build() await app.initialize() assert self.test_flag == {"bot"} await app.shutdown() + @pytest.mark.parametrize( + "update_processor", [BaseUpdateProcessor(1), SimpleUpdateProcessor(1)] + ) @pytest.mark.parametrize("updater", [True, False]) - async def test_shutdown(self, one_time_bot, monkeypatch, updater): + async def test_shutdown(self, one_time_bot, monkeypatch, updater, update_processor): """Shutdown of persistence is tested in test_basepersistence""" + is_update_processor = False self.test_flag = set() def after_bot_shutdown(*args, **kwargs): self.test_flag.add("bot") + def after_shutdown_update_processor(*args, **kwargs): + self.test_flag.add("update_processor") + def after_updater_shutdown(*args, **kwargs): self.test_flag.add("updater") monkeypatch.setattr(Bot, "shutdown", call_after(Bot.shutdown, after_bot_shutdown)) + if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( + update_processor, SimpleUpdateProcessor + ): + is_update_processor = True + monkeypatch.setattr( + BaseUpdateProcessor, + "initialize", + call_after(BaseUpdateProcessor.shutdown, after_shutdown_update_processor), + ) monkeypatch.setattr( Updater, "shutdown", call_after(Updater.shutdown, after_updater_shutdown) ) - if updater: + if updater and is_update_processor: + async with ApplicationBuilder().bot(one_time_bot).concurrent_updates( + update_processor + ).build(): + pass + assert self.test_flag == {"bot", "update_processor", "updater"} + elif updater and not is_update_processor: async with ApplicationBuilder().bot(one_time_bot).build(): pass assert self.test_flag == {"bot", "updater"} + elif is_update_processor and not updater: + async with ApplicationBuilder().bot(one_time_bot).updater(None).concurrent_updates( + update_processor + ).build(): + pass + assert self.test_flag == {"bot", "update_processor"} else: async with ApplicationBuilder().bot(one_time_bot).updater(None).build(): pass diff --git a/tests/ext/test_applicationbuilder.py b/tests/ext/test_applicationbuilder.py index 01b338d0c8f..b32806423e4 100644 --- a/tests/ext/test_applicationbuilder.py +++ b/tests/ext/test_applicationbuilder.py @@ -97,7 +97,8 @@ class Client: app = builder.token(bot.token).build() assert isinstance(app, Application) - assert app.update_processor == SimpleUpdateProcessor(1) + assert isinstance(app.update_processor, SimpleUpdateProcessor) + assert app.update_processor.max_concurrent_updates == 1 assert isinstance(app.bot, ExtBot) assert isinstance(app.bot.request, HTTPXRequest) @@ -415,8 +416,9 @@ async def post_stop(app: Application) -> None: assert app.updater.update_queue is update_queue assert app.updater.bot is app.bot assert app.context_types is context_types - assert app.update_processor == expected - assert app.concurrent_updates == (app.update_processor.max_concurrent_updates) + assert isinstance(app.update_processor, SimpleUpdateProcessor) + assert app.update_processor.max_concurrent_updates == expected.max_concurrent_updates + assert app.concurrent_updates == app.update_processor.max_concurrent_updates assert app.post_init is post_init assert app.post_shutdown is post_shutdown assert app.post_stop is post_stop From 5b7c4a062390f4f255459937759ee7840ad3340a Mon Sep 17 00:00:00 2001 From: Aditya Date: Sat, 13 May 2023 22:55:37 +0530 Subject: [PATCH 14/25] really fix them this time --- tests/ext/test_baseupdateprocessor.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index 369f0ed2c6c..f7379221cb9 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -19,7 +19,7 @@ import pytest from telegram import Update -from telegram.ext import ApplicationBuilder, BaseUpdateProcessor, SimpleUpdateProcessor +from telegram.ext import BaseUpdateProcessor, SimpleUpdateProcessor from tests.auxil.asyncio_helpers import call_after @@ -41,15 +41,12 @@ def test_init(self, concurrent_updates): async def test_process_update(self, one_time_bot): processor = mock_processor() - application = ApplicationBuilder().concurrent_updates(processor).bot(one_time_bot).build() update = Update(1) async def coroutine(): pass - await application.update_queue.put(1) - await processor.process_update(update, coroutine, application) - assert not application.update_queue.empty() + await processor.process_update(update, coroutine) async def test_context_manager(self, monkeypatch): processor = mock_processor() From f3a65852d4c31dffdd88aae5122217ed591f73ab Mon Sep 17 00:00:00 2001 From: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com> Date: Wed, 17 May 2023 22:29:34 +0200 Subject: [PATCH 15/25] Pimp my tests :) --- tests/ext/test_baseupdateprocessor.py | 68 +++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index f7379221cb9..a1b2491dac8 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -16,6 +16,8 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +import asyncio + import pytest from telegram import Update @@ -23,10 +25,14 @@ from tests.auxil.asyncio_helpers import call_after +@pytest.fixture() def mock_processor(): class MockProcessor(BaseUpdateProcessor): + test_flag = False + async def do_process_update(self, update, coroutine): - pass + await coroutine + self.test_flag = True return MockProcessor(5) @@ -39,14 +45,62 @@ def test_init(self, concurrent_updates): with pytest.raises(ValueError, match="must be a positive integer"): BaseUpdateProcessor(concurrent_updates) - async def test_process_update(self, one_time_bot): - processor = mock_processor() + async def test_process_update(self, mock_processor): + """Test that process_update calls do_process_update.""" update = Update(1) async def coroutine(): pass - await processor.process_update(update, coroutine) + await mock_processor.process_update(update, coroutine()) + # This flag is set in the mock processor in do_process_update, telling us that + # do_process_update was called. + assert mock_processor.test_flag + + async def test_max_concurrent_updates_enforcement(self, mock_processor): + """Test that max_concurrent_updates is enforced, i.e. that the processor will run + at most max_concurrent_updates coroutines at the same time.""" + count = 2 * mock_processor.max_concurrent_updates + events = {i: asyncio.Event() for i in range(count)} + queue = asyncio.Queue() + for event in events.values(): + await queue.put(event) + + async def callback(): + await asyncio.sleep(0.5) + (await queue.get()).set() + + # We start several calls to `process_update` at the same time, each of them taking + # 0.5 seconds to complete. We know that they are completed when the corresponding + # event is set. + tasks = [ + asyncio.create_task(mock_processor.process_update(update=_, coroutine=callback())) + for _ in range(count) + ] + + # Right now we expect no event to be set + for i in range(count): + assert not events[i].is_set() + + # After 0.5 seconds (+ some buffer), we expect that exactly max_concurrent_updates + # events are set. + await asyncio.sleep(0.75) + for i in range(mock_processor.max_concurrent_updates): + assert events[i].is_set() + for i in range( + mock_processor.max_concurrent_updates, + count, + ): + assert not events[i].is_set() + + # After wating another 0.5 seconds, we expect that the next max_concurrent_updates + # events are set. + await asyncio.sleep(0.5) + for i in range(count): + assert events[i].is_set() + + # Sanity check: we expect that all tasks are completed. + await asyncio.gather(*tasks) async def test_context_manager(self, monkeypatch): processor = mock_processor() @@ -93,10 +147,14 @@ async def shutdown(*args, **kwargs): class TestSimpleUpdateProcessor: async def test_do_process_update(self): + """Test that do_process_update calls the coroutine.""" processor = SimpleUpdateProcessor(1) update = Update(1) + test_flag = False async def coroutine(): - pass + nonlocal test_flag + test_flag = True await processor.do_process_update(update, coroutine()) + assert test_flag From d535c469cb81cab43fd57ca1a6b4a5161482bf64 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com> Date: Wed, 17 May 2023 22:42:19 +0200 Subject: [PATCH 16/25] Fix failing tests --- tests/ext/test_baseupdateprocessor.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index a1b2491dac8..6ff94c3fbc9 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -102,8 +102,7 @@ async def callback(): # Sanity check: we expect that all tasks are completed. await asyncio.gather(*tasks) - async def test_context_manager(self, monkeypatch): - processor = mock_processor() + async def test_context_manager(self, monkeypatch, mock_processor): self.test_flag = set() async def after_initialize(*args, **kwargs): @@ -123,12 +122,12 @@ async def after_shutdown(*args, **kwargs): call_after(BaseUpdateProcessor.shutdown, after_shutdown), ) - async with processor: + async with mock_processor: pass assert self.test_flag == {"initialize", "stop"} - async def test_context_manager_exception_on_init(self, monkeypatch): + async def test_context_manager_exception_on_init(self, monkeypatch, mock_processor): async def initialize(*args, **kwargs): raise RuntimeError("initialize") @@ -139,7 +138,7 @@ async def shutdown(*args, **kwargs): monkeypatch.setattr(BaseUpdateProcessor, "shutdown", shutdown) with pytest.raises(RuntimeError, match="initialize"): - async with mock_processor(): + async with mock_processor: pass assert self.test_flag == "shutdown" From bb7caf82561da15d4a275808b2227266e4dd5cfe Mon Sep 17 00:00:00 2001 From: Aditya Date: Thu, 18 May 2023 14:56:08 +0530 Subject: [PATCH 17/25] fix docs --- telegram/ext/_application.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index dd7da979e5d..2a3569b6100 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -391,8 +391,8 @@ async def initialize(self) -> None: * The :attr:`bot`, by calling :meth:`telegram.Bot.initialize`. * The :attr:`updater`, by calling :meth:`telegram.ext.Updater.initialize`. * The :attr:`persistence`, by loading persistent conversations and data. - * The :attr:`update_processor` by calling :meth:`telegram.ext.BaseUpdateProcessor.\ - initialize`. + * The :attr:`update_processor` by calling + :meth:`telegram.ext.BaseUpdateProcessor.initialize`. Does *not* call :attr:`post_init` - that is only done by :meth:`run_polling` and :meth:`run_webhook`. From b60c119ddb9101fba9dccf825ec4dddca38a386e Mon Sep 17 00:00:00 2001 From: Aditya Date: Fri, 19 May 2023 18:29:29 +0530 Subject: [PATCH 18/25] add test for SUP with 'is' --- tests/ext/test_applicationbuilder.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/ext/test_applicationbuilder.py b/tests/ext/test_applicationbuilder.py index b32806423e4..648cd58470e 100644 --- a/tests/ext/test_applicationbuilder.py +++ b/tests/ext/test_applicationbuilder.py @@ -429,6 +429,19 @@ async def post_stop(app: Application) -> None: assert app.updater is updater assert app.bot is updater.bot assert app.update_queue is updater.update_queue + app = ( + builder.token(bot.token) + .job_queue(job_queue) + .persistence(persistence) + .update_queue(update_queue) + .context_types(context_types) + .concurrent_updates(expected) + .post_init(post_init) + .post_shutdown(post_shutdown) + .post_stop(post_stop) + .arbitrary_callback_data(True) + ).build() + assert app.update_processor is expected with pytest.raises(ValueError, match="must be a positive"): ( From 39b1adb9f75b8017821717e7cf4ecf3cc529355f Mon Sep 17 00:00:00 2001 From: Aditya Date: Sat, 20 May 2023 08:12:46 +0530 Subject: [PATCH 19/25] add final requested changes --- telegram/ext/_application.py | 12 ++------ telegram/ext/_baseupdateprocessor.py | 4 +-- tests/ext/test_application.py | 44 +++++++++++----------------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 2a3569b6100..87e11eaa0cb 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -57,7 +57,7 @@ from telegram._utils.warnings import warn from telegram.error import TelegramError from telegram.ext._basepersistence import BasePersistence -from telegram.ext._baseupdateprocessor import BaseUpdateProcessor, SimpleUpdateProcessor +from telegram.ext._baseupdateprocessor import BaseUpdateProcessor from telegram.ext._contexttypes import ContextTypes from telegram.ext._extbot import ExtBot from telegram.ext._handler import BaseHandler @@ -405,10 +405,7 @@ async def initialize(self) -> None: return await self.bot.initialize() - if isinstance(self._update_processor, BaseUpdateProcessor) and not isinstance( - self._update_processor, SimpleUpdateProcessor - ): - await self._update_processor.initialize() + await self._update_processor.initialize() if self.updater: await self.updater.initialize() @@ -461,10 +458,7 @@ async def shutdown(self) -> None: return await self.bot.shutdown() - if isinstance(self._update_processor, BaseUpdateProcessor) and not isinstance( - self._update_processor, SimpleUpdateProcessor - ): - await self._update_processor.shutdown() + await self._update_processor.shutdown() if self.updater: await self.updater.shutdown() diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 1923ebf22d4..93bf54224d0 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -17,13 +17,13 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains the BaseProcessor class.""" -from abc import abstractmethod +from abc import ABC, abstractmethod from asyncio import BoundedSemaphore from types import TracebackType from typing import Any, Awaitable, Optional, Type -class BaseUpdateProcessor: +class BaseUpdateProcessor(ABC): """An abstract base class for update processors. You can use this class to implement your own update processor. diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 3725a2200cb..802a035d095 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -233,7 +233,6 @@ def test_custom_context_init(self, one_time_bot): @pytest.mark.parametrize("updater", [True, False]) async def test_initialize(self, one_time_bot, monkeypatch, updater, update_processor): """Initialization of persistence is tested test_basepersistence""" - is_update_processor = False self.test_flag = set() async def after_initialize_bot(*args, **kwargs): @@ -249,28 +248,28 @@ async def after_initialize_updater(*args, **kwargs): if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( update_processor, SimpleUpdateProcessor ): - is_update_processor = True monkeypatch.setattr( BaseUpdateProcessor, "initialize", call_after(BaseUpdateProcessor.initialize, after_initialize_update_processor), ) + else: + monkeypatch.setattr( + SimpleUpdateProcessor, + "initialize", + call_after(SimpleUpdateProcessor.initialize, after_initialize_update_processor), + ) monkeypatch.setattr( Updater, "initialize", call_after(Updater.initialize, after_initialize_updater) ) - if updater and is_update_processor: + if updater: app = ( ApplicationBuilder().bot(one_time_bot).concurrent_updates(update_processor).build() ) await app.initialize() assert self.test_flag == {"bot", "update_processor", "updater"} await app.shutdown() - elif updater and not is_update_processor: - app = ApplicationBuilder().bot(one_time_bot).build() - await app.initialize() - assert self.test_flag == {"bot", "updater"} - await app.shutdown() - elif is_update_processor and not updater: + else: app = ( ApplicationBuilder() .bot(one_time_bot) @@ -281,11 +280,6 @@ async def after_initialize_updater(*args, **kwargs): await app.initialize() assert self.test_flag == {"bot", "update_processor"} await app.shutdown() - else: - app = ApplicationBuilder().bot(one_time_bot).updater(None).build() - await app.initialize() - assert self.test_flag == {"bot"} - await app.shutdown() @pytest.mark.parametrize( "update_processor", [BaseUpdateProcessor(1), SimpleUpdateProcessor(1)] @@ -293,7 +287,6 @@ async def after_initialize_updater(*args, **kwargs): @pytest.mark.parametrize("updater", [True, False]) async def test_shutdown(self, one_time_bot, monkeypatch, updater, update_processor): """Shutdown of persistence is tested in test_basepersistence""" - is_update_processor = False self.test_flag = set() def after_bot_shutdown(*args, **kwargs): @@ -309,36 +302,33 @@ def after_updater_shutdown(*args, **kwargs): if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( update_processor, SimpleUpdateProcessor ): - is_update_processor = True monkeypatch.setattr( BaseUpdateProcessor, "initialize", - call_after(BaseUpdateProcessor.shutdown, after_shutdown_update_processor), + call_after(BaseUpdateProcessor.initialize, after_shutdown_update_processor), + ) + else: + monkeypatch.setattr( + SimpleUpdateProcessor, + "initialize", + call_after(SimpleUpdateProcessor.initialize, after_shutdown_update_processor), ) monkeypatch.setattr( Updater, "shutdown", call_after(Updater.shutdown, after_updater_shutdown) ) - if updater and is_update_processor: + if updater: async with ApplicationBuilder().bot(one_time_bot).concurrent_updates( update_processor ).build(): pass assert self.test_flag == {"bot", "update_processor", "updater"} - elif updater and not is_update_processor: - async with ApplicationBuilder().bot(one_time_bot).build(): - pass - assert self.test_flag == {"bot", "updater"} - elif is_update_processor and not updater: + else: async with ApplicationBuilder().bot(one_time_bot).updater(None).concurrent_updates( update_processor ).build(): pass assert self.test_flag == {"bot", "update_processor"} - else: - async with ApplicationBuilder().bot(one_time_bot).updater(None).build(): - pass - assert self.test_flag == {"bot"} async def test_multiple_inits_and_shutdowns(self, app, monkeypatch): self.received = defaultdict(int) From 6cc21021b72c965aa112da6816cb9f898578e3c2 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com> Date: Sat, 20 May 2023 22:57:42 +0200 Subject: [PATCH 20/25] Fine tune tests --- tests/ext/test_application.py | 51 +++++++---------------- tests/ext/test_baseupdateprocessor.py | 59 +++++++++++++++------------ 2 files changed, 47 insertions(+), 63 deletions(-) diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 802a035d095..ca427fe6043 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -41,7 +41,6 @@ ApplicationBuilder, ApplicationHandlerStop, BaseHandler, - BaseUpdateProcessor, CallbackContext, CommandHandler, ContextTypes, @@ -227,11 +226,8 @@ def test_custom_context_init(self, one_time_bot): assert isinstance(application.chat_data[1], float) assert isinstance(application.bot_data, complex) - @pytest.mark.parametrize( - "update_processor", [BaseUpdateProcessor(1), SimpleUpdateProcessor(1)] - ) @pytest.mark.parametrize("updater", [True, False]) - async def test_initialize(self, one_time_bot, monkeypatch, updater, update_processor): + async def test_initialize(self, one_time_bot, monkeypatch, updater): """Initialization of persistence is tested test_basepersistence""" self.test_flag = set() @@ -244,21 +240,13 @@ async def after_initialize_update_processor(*args, **kwargs): async def after_initialize_updater(*args, **kwargs): self.test_flag.add("updater") + update_processor = SimpleUpdateProcessor(1) monkeypatch.setattr(Bot, "initialize", call_after(Bot.initialize, after_initialize_bot)) - if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( - update_processor, SimpleUpdateProcessor - ): - monkeypatch.setattr( - BaseUpdateProcessor, - "initialize", - call_after(BaseUpdateProcessor.initialize, after_initialize_update_processor), - ) - else: - monkeypatch.setattr( - SimpleUpdateProcessor, - "initialize", - call_after(SimpleUpdateProcessor.initialize, after_initialize_update_processor), - ) + monkeypatch.setattr( + SimpleUpdateProcessor, + "initialize", + call_after(SimpleUpdateProcessor.initialize, after_initialize_update_processor), + ) monkeypatch.setattr( Updater, "initialize", call_after(Updater.initialize, after_initialize_updater) ) @@ -281,11 +269,8 @@ async def after_initialize_updater(*args, **kwargs): assert self.test_flag == {"bot", "update_processor"} await app.shutdown() - @pytest.mark.parametrize( - "update_processor", [BaseUpdateProcessor(1), SimpleUpdateProcessor(1)] - ) @pytest.mark.parametrize("updater", [True, False]) - async def test_shutdown(self, one_time_bot, monkeypatch, updater, update_processor): + async def test_shutdown(self, one_time_bot, monkeypatch, updater): """Shutdown of persistence is tested in test_basepersistence""" self.test_flag = set() @@ -298,21 +283,13 @@ def after_shutdown_update_processor(*args, **kwargs): def after_updater_shutdown(*args, **kwargs): self.test_flag.add("updater") + update_processor = SimpleUpdateProcessor(1) monkeypatch.setattr(Bot, "shutdown", call_after(Bot.shutdown, after_bot_shutdown)) - if isinstance(update_processor, BaseUpdateProcessor) and not isinstance( - update_processor, SimpleUpdateProcessor - ): - monkeypatch.setattr( - BaseUpdateProcessor, - "initialize", - call_after(BaseUpdateProcessor.initialize, after_shutdown_update_processor), - ) - else: - monkeypatch.setattr( - SimpleUpdateProcessor, - "initialize", - call_after(SimpleUpdateProcessor.initialize, after_shutdown_update_processor), - ) + monkeypatch.setattr( + SimpleUpdateProcessor, + "initialize", + call_after(SimpleUpdateProcessor.initialize, after_shutdown_update_processor), + ) monkeypatch.setattr( Updater, "shutdown", call_after(Updater.shutdown, after_updater_shutdown) ) diff --git a/tests/ext/test_baseupdateprocessor.py b/tests/ext/test_baseupdateprocessor.py index 6ff94c3fbc9..3ae10d2dd16 100644 --- a/tests/ext/test_baseupdateprocessor.py +++ b/tests/ext/test_baseupdateprocessor.py @@ -16,18 +16,21 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +"""Here we run tests directly with SimpleUpdateProcessor because that's easier than providing dummy +implementations for SimpleUpdateProcessor and we want to test SimpleUpdateProcessor anyway.""" import asyncio import pytest from telegram import Update -from telegram.ext import BaseUpdateProcessor, SimpleUpdateProcessor +from telegram.ext import SimpleUpdateProcessor from tests.auxil.asyncio_helpers import call_after +from tests.auxil.slots import mro_slots @pytest.fixture() def mock_processor(): - class MockProcessor(BaseUpdateProcessor): + class MockProcessor(SimpleUpdateProcessor): test_flag = False async def do_process_update(self, update, coroutine): @@ -37,13 +40,19 @@ async def do_process_update(self, update, coroutine): return MockProcessor(5) -class TestBaseUpdateProcessor: +class TestSimpleUpdateProcessor: + def test_slot_behaviour(self): + inst = SimpleUpdateProcessor(1) + for attr in inst.__slots__: + assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'" + assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot" + @pytest.mark.parametrize("concurrent_updates", [0, -1]) def test_init(self, concurrent_updates): - processor = BaseUpdateProcessor(3) + processor = SimpleUpdateProcessor(3) assert processor.max_concurrent_updates == 3 with pytest.raises(ValueError, match="must be a positive integer"): - BaseUpdateProcessor(concurrent_updates) + SimpleUpdateProcessor(concurrent_updates) async def test_process_update(self, mock_processor): """Test that process_update calls do_process_update.""" @@ -57,6 +66,19 @@ async def coroutine(): # do_process_update was called. assert mock_processor.test_flag + async def test_do_process_update(self): + """Test that do_process_update calls the coroutine.""" + processor = SimpleUpdateProcessor(1) + update = Update(1) + test_flag = False + + async def coroutine(): + nonlocal test_flag + test_flag = True + + await processor.do_process_update(update, coroutine()) + assert test_flag + async def test_max_concurrent_updates_enforcement(self, mock_processor): """Test that max_concurrent_updates is enforced, i.e. that the processor will run at most max_concurrent_updates coroutines at the same time.""" @@ -112,14 +134,14 @@ async def after_shutdown(*args, **kwargs): self.test_flag.add("stop") monkeypatch.setattr( - BaseUpdateProcessor, + SimpleUpdateProcessor, "initialize", - call_after(BaseUpdateProcessor.initialize, after_initialize), + call_after(SimpleUpdateProcessor.initialize, after_initialize), ) monkeypatch.setattr( - BaseUpdateProcessor, + SimpleUpdateProcessor, "shutdown", - call_after(BaseUpdateProcessor.shutdown, after_shutdown), + call_after(SimpleUpdateProcessor.shutdown, after_shutdown), ) async with mock_processor: @@ -134,26 +156,11 @@ async def initialize(*args, **kwargs): async def shutdown(*args, **kwargs): self.test_flag = "shutdown" - monkeypatch.setattr(BaseUpdateProcessor, "initialize", initialize) - monkeypatch.setattr(BaseUpdateProcessor, "shutdown", shutdown) + monkeypatch.setattr(SimpleUpdateProcessor, "initialize", initialize) + monkeypatch.setattr(SimpleUpdateProcessor, "shutdown", shutdown) with pytest.raises(RuntimeError, match="initialize"): async with mock_processor: pass assert self.test_flag == "shutdown" - - -class TestSimpleUpdateProcessor: - async def test_do_process_update(self): - """Test that do_process_update calls the coroutine.""" - processor = SimpleUpdateProcessor(1) - update = Update(1) - test_flag = False - - async def coroutine(): - nonlocal test_flag - test_flag = True - - await processor.do_process_update(update, coroutine()) - assert test_flag From 764a5d07e6350be1380a73a6a73e68ff837a5874 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com> Date: Sat, 20 May 2023 23:08:34 +0200 Subject: [PATCH 21/25] Add some versioning directives --- telegram/ext/_application.py | 2 ++ telegram/ext/_applicationbuilder.py | 3 +++ telegram/ext/_baseupdateprocessor.py | 2 ++ 3 files changed, 7 insertions(+) diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 87e11eaa0cb..df8738b43f6 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -382,6 +382,8 @@ def update_processor(self) -> "BaseUpdateProcessor": application. .. seealso:: :wiki:`Concurrency` + + .. versionadded:: NEXT.VERSION """ return self._update_processor diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 7a627643472..a488d9cdf2c 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -929,6 +929,9 @@ def concurrent_updates( :class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates concurrently. + .. verionchanged:: NEXT.VERSION + Now accepts :class:`BaseUpdateProcessor` instances. + Returns: :class:`ApplicationBuilder`: The same builder with the updated argument. """ diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 93bf54224d0..21c22964db6 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -125,6 +125,8 @@ class SimpleUpdateProcessor(BaseUpdateProcessor): """Instance of :class:`telegram.ext.BaseUpdateProcessor` that immediately awaits the coroutine, i.e. does not apply any additional processing. This is used by default when :attr:`telegram.ext.ApplicationBuilder.concurrent_updates` is :obj:`int`. + + .. versionadded:: NEXT.VERSION """ __slots__ = () From 9aeeb08c086f7fdc6c5532fa3ac36743d1e61aa8 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com> Date: Sat, 20 May 2023 23:14:54 +0200 Subject: [PATCH 22/25] typo --- telegram/ext/_applicationbuilder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index 90da3cc0fbe..c1f4012b511 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -929,7 +929,7 @@ def concurrent_updates( :class:`telegram.ext.BaseUpdateProcessor` to use that instance for handling updates concurrently. - .. verionchanged:: NEXT.VERSION + .. versionchanged:: NEXT.VERSION Now accepts :class:`BaseUpdateProcessor` instances. Returns: From 168eac4489d458eb6b740e8525c27179632b8e1b Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 30 May 2023 19:00:50 +0000 Subject: [PATCH 23/25] address final review --- .../telegram.ext.simpleupdateprocessor.rst | 1 + telegram/ext/_application.py | 2 +- telegram/ext/_applicationbuilder.py | 2 -- telegram/ext/_baseupdateprocessor.py | 8 ++++---- tests/ext/test_application.py | 4 ++-- tests/ext/test_applicationbuilder.py | 17 +---------------- 6 files changed, 9 insertions(+), 25 deletions(-) diff --git a/docs/source/telegram.ext.simpleupdateprocessor.rst b/docs/source/telegram.ext.simpleupdateprocessor.rst index 1e30c27566c..ae67a996651 100644 --- a/docs/source/telegram.ext.simpleupdateprocessor.rst +++ b/docs/source/telegram.ext.simpleupdateprocessor.rst @@ -2,5 +2,6 @@ SimpleUpdateProcessor ===================== .. autoclass:: telegram.ext.SimpleUpdateProcessor + :inherited-members: telegram.ext.BaseUpdateProcessor :members: :show-inheritance: \ No newline at end of file diff --git a/telegram/ext/_application.py b/telegram/ext/_application.py index 472a1b0bb29..15178d9176f 100644 --- a/telegram/ext/_application.py +++ b/telegram/ext/_application.py @@ -394,7 +394,7 @@ async def initialize(self) -> None: * The :attr:`updater`, by calling :meth:`telegram.ext.Updater.initialize`. * The :attr:`persistence`, by loading persistent conversations and data. * The :attr:`update_processor` by calling - :meth:`telegram.ext.BaseUpdateProcessor.initialize`. + :meth:`telegram.ext.BaseUpdateProcessor.initialize`. Does *not* call :attr:`post_init` - that is only done by :meth:`run_polling` and :meth:`run_webhook`. diff --git a/telegram/ext/_applicationbuilder.py b/telegram/ext/_applicationbuilder.py index c1f4012b511..cd3b7ad35c0 100644 --- a/telegram/ext/_applicationbuilder.py +++ b/telegram/ext/_applicationbuilder.py @@ -945,8 +945,6 @@ def concurrent_updates( # instance with that integer value; otherwise, raise an error if the value # is negative if isinstance(concurrent_updates, int): - if concurrent_updates < 1: - raise ValueError("`concurrent_updates` must be a positive integer!") concurrent_updates = SimpleUpdateProcessor(concurrent_updates) # Assign default value of concurrent_updates if it is instance of diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 21c22964db6..95c4b0ea73b 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -37,7 +37,7 @@ class BaseUpdateProcessor(ABC): of currently processed updates decreases. Raises: - :exc:`ValueError`: If :paramref:`max_concurrent_updates` is a negative integer. + :exc:`ValueError`: If :paramref:`max_concurrent_updates` is a non-positive integer. """ __slots__ = ("_max_concurrent_updates", "_semaphore") @@ -67,7 +67,7 @@ async def do_process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. """ @abstractmethod @@ -97,7 +97,7 @@ async def process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. """ async with self._semaphore: await self.do_process_update(update, coroutine) @@ -140,7 +140,7 @@ async def do_process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:obj:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. """ await coroutine diff --git a/tests/ext/test_application.py b/tests/ext/test_application.py index 3f4bc38ee68..00001524853 100644 --- a/tests/ext/test_application.py +++ b/tests/ext/test_application.py @@ -288,8 +288,8 @@ def after_updater_shutdown(*args, **kwargs): monkeypatch.setattr(Bot, "shutdown", call_after(Bot.shutdown, after_bot_shutdown)) monkeypatch.setattr( SimpleUpdateProcessor, - "initialize", - call_after(SimpleUpdateProcessor.initialize, after_shutdown_update_processor), + "shutdown", + call_after(SimpleUpdateProcessor.shutdown, after_shutdown_update_processor), ) monkeypatch.setattr( Updater, "shutdown", call_after(Updater.shutdown, after_updater_shutdown) diff --git a/tests/ext/test_applicationbuilder.py b/tests/ext/test_applicationbuilder.py index 648cd58470e..0f9eb29ad7f 100644 --- a/tests/ext/test_applicationbuilder.py +++ b/tests/ext/test_applicationbuilder.py @@ -377,9 +377,8 @@ def __init__(self, arg, **kwargs): (True, SimpleUpdateProcessor(256)), ], ) - @pytest.mark.parametrize("conc_updates", [0, -1]) def test_all_application_args_custom( - self, builder, bot, monkeypatch, concurrent_updates, expected, conc_updates + self, builder, bot, monkeypatch, concurrent_updates, expected ): job_queue = JobQueue() persistence = PicklePersistence("file_path") @@ -443,20 +442,6 @@ async def post_stop(app: Application) -> None: ).build() assert app.update_processor is expected - with pytest.raises(ValueError, match="must be a positive"): - ( - builder.token(bot.token) - .job_queue(job_queue) - .persistence(persistence) - .update_queue(update_queue) - .context_types(context_types) - .concurrent_updates(conc_updates) - .post_init(post_init) - .post_shutdown(post_shutdown) - .post_stop(post_stop) - .arbitrary_callback_data(True) - ).build() - @pytest.mark.parametrize("input_type", ["bytes", "str", "Path"]) def test_all_private_key_input_types(self, builder, bot, input_type): private_key = data_file("private.key") From 550431c608b022c35fbbe6e4e20adb0d1610200c Mon Sep 17 00:00:00 2001 From: Aditya Date: Tue, 30 May 2023 19:20:47 +0000 Subject: [PATCH 24/25] fix pre-commit --- telegram/ext/_baseupdateprocessor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/telegram/ext/_baseupdateprocessor.py b/telegram/ext/_baseupdateprocessor.py index 95c4b0ea73b..a3b59d4fb92 100644 --- a/telegram/ext/_baseupdateprocessor.py +++ b/telegram/ext/_baseupdateprocessor.py @@ -67,7 +67,8 @@ async def do_process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the + update. """ @abstractmethod @@ -97,7 +98,8 @@ async def process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the + update. """ async with self._semaphore: await self.do_process_update(update, coroutine) @@ -140,7 +142,8 @@ async def do_process_update( Args: update (:obj:`object`): The update to be processed. - coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the update. + coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the + update. """ await coroutine From d03f81e253022b3e1b74d6a029a4ab31b3c60a6b Mon Sep 17 00:00:00 2001 From: Aditya Yadav <69784758+clot27@users.noreply.github.com> Date: Fri, 2 Jun 2023 16:23:25 +0530 Subject: [PATCH 25/25] Update docs/source/telegram.ext.simpleupdateprocessor.rst Co-authored-by: Harshil <37377066+harshil21@users.noreply.github.com> --- docs/source/telegram.ext.simpleupdateprocessor.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source/telegram.ext.simpleupdateprocessor.rst b/docs/source/telegram.ext.simpleupdateprocessor.rst index ae67a996651..1e30c27566c 100644 --- a/docs/source/telegram.ext.simpleupdateprocessor.rst +++ b/docs/source/telegram.ext.simpleupdateprocessor.rst @@ -2,6 +2,5 @@ SimpleUpdateProcessor ===================== .. autoclass:: telegram.ext.SimpleUpdateProcessor - :inherited-members: telegram.ext.BaseUpdateProcessor :members: :show-inheritance: \ No newline at end of file