8000 Add `BaseUpdateProcessor.current_concurrent_updates` (#4626) · vavasik800/python-telegram-bot@f57dd52 · GitHub
[go: up one dir, main page]

Skip to content

Commit f57dd52

Browse files
authored
Add BaseUpdateProcessor.current_concurrent_updates (python-telegram-bot#4626)
1 parent 16605c5 commit f57dd52

File tree

3 files changed

+100
-2
lines changed

3 files changed

+100
-2
lines changed

telegram/ext/_baseupdateprocessor.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
# along with this program. If not, see [http://www.gnu.org/licenses/].
1919
"""This module contains the BaseProcessor class."""
2020
from abc import ABC, abstractmethod
21-
from asyncio import BoundedSemaphore
2221
from contextlib import AbstractAsyncContextManager
2322
from types import TracebackType
2423
from typing import TYPE_CHECKING, Any, Optional, TypeVar, final
2524

25+
from telegram.ext._utils.asyncio import TrackedBoundedSemaphore
26+
2627
if TYPE_CHECKING:
2728
from collections.abc import Awaitable
2829

@@ -71,7 +72,7 @@ def __init__(self, max_concurrent_updates: int):
7172
self._max_concurrent_updates = max_concurrent_updates
7273
if self.max_concurrent_updates < 1:
7374
raise Valu 8000 eError("`max_concurrent_updates` must be a positive integer!")
74-
self._semaphore = BoundedSemaphore(self.max_concurrent_updates)
75+
self._semaphore = TrackedBoundedSemaphore(self.max_concurrent_updates)
7576

7677
async def __aenter__(self: _BUPT) -> _BUPT: # noqa: PYI019
7778
"""|async_context_manager| :meth:`initializes <initialize>` the Processor.
@@ -104,6 +105,18 @@ def max_concurrent_updates(self) -> int:
104105
""":obj:`int`: The maximum number of updates that can be processed concurrently."""
105106
return self._max_concurrent_updates
106107

108+
@property
109+
def current_concurrent_updates(self) -> int:
110+
""":obj:`int`: The number of updates currently being processed.
111+
112+
Caution:
113+
This value is a snapshot of the current number of updates being processed. It may
114+
change immediately after being read.
115+
116+
.. versionadded:: NEXT.VERSION
117+
"""
118+
return self.max_concurrent_updates - self._semaphore.current_value
119+
107120
@abstractmethod
108121
async def do_process_update(
109122
self,

telegram/ext/_utils/asyncio.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env python
2+
#
3+
# A library that provides a Python interface to the Telegram Bot API
4+
# Copyright (C) 2015-2025
5+
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
6+
#
7+
# This program is free software: you can redistribute it and/or modify
8+
# it under the terms of the GNU Lesser Public License as published by
9+
# the Free Software Foundation, either version 3 of the License, or
10+
# (at your option) any later version.
11+
#
12+
# This program is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
# GNU Lesser Public License for more details.
16+
#
17+
# You should have received a copy of the GNU Lesser Public License
18+
# along with this program. If not, see [http://www.gnu.org/licenses/].
19+
"""This module contains helper functions related to the std-lib asyncio module.
20+
21+
.. versionadded:: NEXT.VERSION
22+
23+
Warning:
24+
Contents of this module are intended to be used internally by the library and *not* by the
25+
user. Changes to this module are not considered breaking changes and may not be documented in
26+
the changelog.
27+
"""
28+
import asyncio
29+
from typing import Literal
30+
31+
32+
class TrackedBoundedSemaphore(asyncio.BoundedSemaphore):
33+
"""Simple subclass of :class:`asyncio.BoundedSemaphore` that tracks the current value of the
34+
semaphore. While there is an attribute ``_value`` in the superclass, it's private and we
35+
don't want to rely on it.
36+
"""
37+
38+
__slots__ = ("_current_value",)
39+
40+
def __init__(self, value: int = 1) -> None:
41+
super().__init__(value)
42+
self._current_value = value
43+
44+
@property
45+
def current_value(self) -> int:
46+
return self._current_value
47+
48+
async def acquire(self) -> Literal[True]:
49+
await super().acquire()
50+
self._current_value -= 1
51+
return True
52+
53+
def release(self) -> None:
54+
super().release()
55+
self._current_value += 1

tests/ext/test_baseupdateprocessor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,33 @@ async def shutdown(*args, **kwargs):
164164
pass
165165

166166
assert self.test_flag == "shutdown"
167+
168+
async def test_current_concurrent_updates(self, mock_processor):
169+
async def callback(event: asyncio.Event):
170+
await event.wait()
171+
172+
events = {i: asyncio.Event() for i in range(10)}
173+
coroutines = {i: callback(event) for i, event in events.items()}
174+
175+
process_tasks = [
176+
asyncio.create_task(mock_processor.process_update(Update(i), coroutines[i]))
177+
for i in range(10)
178+
]
179+
await asyncio.sleep(0.01)
180+
181+
assert mock_processor.current_concurrent_updates == mock_processor.max_concurrent_updates
182+
for i in range(5):
183+
events[i].set()
184+
185+
await asyncio.sleep(0.01)
186+
assert mock_processor.current_concurrent_updates == mock_processor.max_concurrent_updates
187+
188+
for i in range(5, 10):
189+
events[i].set()
190+
await asyncio.sleep(0.01)
191+
assert (
192+
mock_processor.current_concurrent_updates
193+
== mock_processor.max_concurrent_updates - (i - 4)
194+
)
195+
196+
await asyncio.gather(*process_tasks)

0 commit comments

Comments
 (0)
0