8000 Add `Application.stop_running` and Improve Marking Updates as Read on `Updater.stop` by Bibo-Joshi · Pull Request #3804 · python-telegram-bot/python-telegram-bot · GitHub
[go: up one dir, main page]

Skip to content

Add Application.stop_running and Improve Marking Updates as Read on Updater.stop #3804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions docs/source/inclusions/application_run_tip.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
.. tip::
When combining ``python-telegram-bot`` with other :mod:`asyncio` based frameworks, using this
method is likely not the best choice, as it blocks the event loop until it receives a stop
signal as described above.
Instead, you can manually call the methods listed below to start and shut down the application
and the :attr:`~telegram.ext.Application.updater`.
Keeping the event loop running and listening for a stop signal is then up to you.
* When combining ``python-telegram-bot`` with other :mod:`asyncio` based frameworks, using this
method is likely not the best choice, as it blocks the event loop until it receives a stop
signal as described above.
Instead, you can manually call the methods listed below to start and shut down the application
and the :attr:`~telegram.ext.Application.updater`.
Keeping the event loop running and listening for a stop signal is then up to you.
* To gracefully stop the execution of this method from within a handler, job or error callback,
use :meth:`~telegram.ext.Application.stop_running`.
20 changes: 19 additions & 1 deletion telegram/ext/_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,24 @@ async def stop(self) -> None:

_LOGGER.info("Application.stop() complete")

def stop_running(self) -> None:
"""This method can be used to stop the execution of :meth:`run_polling` or
:meth:`run_webhook` from within a handler, job or error callback. This allows a graceful
shutdown of the application, i.e. the methods listed in :attr:`run_polling` and
:attr:`run_webhook` will still be executed.

Note:
If the application is not running, this method does nothing.

.. versionadded:: NEXT.VERSION
"""
if self.running:
# This works because `__run` is using `loop.run_forever()`. If that changes, this
# method needs to be adapted.
asyncio.get_running_loop().stop()
else:
_LOGGER.debug("Application is not running, stop_running() does nothing.")

def run_polling(
self,
poll_interval: float = 0.0,
Expand Down Expand Up @@ -939,7 +957,7 @@ def __run(
loop.run_until_complete(self.start())
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
pass
_LOGGER.debug("Application received stop signal. Shutting down.")
except Exception as exc:
# In case the coroutine wasn't awaited, we don't need to bother the user with a warning
updater_coroutine.close()
Expand Down
33 changes: 33 additions & 0 deletions telegram/ext/_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Callable,
Coroutine,
Expand Down Expand Up @@ -121,6 +122,7 @@ def __init__(
self._httpd: Optional[WebhookServer] = None
self.__lock = asyncio.Lock()
self.__polling_task: Optional[asyncio.Task] = None
self.__polling_cleanup_cb: Optional[Callable[[], Coroutine[Any, Any, None]]] = None

@property
def running(self) -> bool:
Expand Down Expand Up @@ -367,6 +369,28 @@ def default_error_callback(exc: TelegramError) -> None:
name="Updater:start_polling:polling_task",
)

# Prepare a cleanup callback to await on _stop_polling
# Calling get_updates one more time with the latest `offset` parameter ensures that
# all updates that where put into the update queue are also marked as "read" to TG,
# so we do not receive them again on the next startup
# We define this here so that we can use the same parameters as in the polling task
async def _get_updates_cleanup() -> None:
_LOGGER.debug(
"Calling `get_updates` one more time to mark all fetched updates as read."
)
await self.bot.get_updates(
offset=self._last_update_id,
# We don't want to do long polling here!
timeout=0,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
write_timeout=write_timeout,
pool_timeout=pool_timeout,
allowed_updates=allowed_updates,
)

self.__polling_cleanup_cb = _get_updates_cleanup

if ready is not None:
ready.set()

Expand Down Expand Up @@ -748,3 +772,12 @@ async def _stop_polling(self) -> None:
# after start_polling(), but lets better be safe than sorry ...

self.__polling_task = None

if self.__polling_cleanup_cb:
await self.__polling_cleanup_cb()
self.__polling_cleanup_cb = None
else:
_LOGGER.warning(
"No polling cleanup callback defined. The last fetched updates may be "
"fetched again on the next polling start."
)
158 changes: 146 additions & 12 deletions tests/ext/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ async def callback(update, context):
platform.system() == "Windows",
reason="Can't send signals without stopping whole process on windows",
)
def test_run_polling_basic(self, app, monkeypatch):
def test_run_polling_basic(self, app, monkeypatch, caplog):
exception_event = threading.Event()
update_event = threading.Event()
exception = TelegramError("This is a test error")
Expand Down Expand Up @@ -1464,6 +1464,9 @@ def thread_target():
time.sleep(0.05)
assertions["exception_handling"] = self.received == exception.message

# So that the get_updates call on shutdown doesn't fail
exception_event.clear()

os.kill(os.getpid(), signal.SIGINT)
time.sleep(0.1)

Expand All @@ -1478,13 +1481,20 @@ def thread_target():

thread = Thread(target=thread_target)
thread.start()
app.run_polling(drop_pending_updates=True, close_loop=False)
thread.join()
with caplog.at_level(logging.DEBUG):
app.run_polling(drop_pending_updates=True, close_loop=False)
thread.join()

assert len(assertions) == 8
for key, value in assertions.items():
assert value, f"assertion '{key}' failed!"

found_log = False
for record in caplog.records:
if "received stop signal" in record.getMessage() and record.levelno == logging.DEBUG:
found_log = True
assert found_log

@pytest.mark.skipif(
platform.system() == "Windows",
reason="Can't send signals without stopping whole process on windows",
Expand Down Expand Up @@ -1692,7 +1702,7 @@ def thread_target():
platform.system() == "Windows",
reason="Can't send signals without stopping whole process on windows",
)
def test_run_webhook_basic(self, app, monkeypatch):
def test_run_webhook_basic(self, app, monkeypatch, caplog):
assertions = {}

async def delete_webhook(*args, **kwargs):
Expand Down Expand Up @@ -1741,19 +1751,26 @@ def thread_target():
ip = "127.0.0.1"
port = randrange(1024, 49152)

app.run_webhook(
ip_address=ip,
port=port,
url_path="TOKEN",
drop_pending_updates=True,
close_loop=False,
)
thread.join()
with caplog.at_level(logging.DEBUG):
app.run_webhook(
ip_address=ip,
port=port,
url_path="TOKEN",
drop_pending_updates=True,
close_loop=False,
)
thread.join()

assert len(assertions) == 7
for key, value in assertions.items():
assert value, f"assertion '{key}' failed!"

found_log = False
for record in caplog.records:
if "received stop signal" in record.getMessage() and record.levelno == logging.DEBUG:
found_log = True
assert found_log

@pytest.mark.skipif(
platform.system() == "Windows",
reason="Can't send signals without stopping whole process on windows",
Expand Down Expand Up @@ -2226,3 +2243,120 @@ def abort_app():
assert received_signals == []
else:
assert received_signals == [signal.SIGINT, signal.SIGTERM, signal.SIGABRT]

def test_stop_running_not_running(self, app, caplog):
with caplog.at_level(logging.DEBUG):
app.stop_running()

assert len(caplog.records) == 1
assert caplog.records[-1].name == "telegram.ext.Application"
assert caplog.records[-1].getMessage().endswith("stop_running() does nothing.")

@pytest.mark.parametrize("method", ["polling", "webhook"])
def test_stop_running(self, one_time_bot, monkeypatch, method):
# asyncio.Event() seems to be hard to use across different threads (awaiting in main
# thread, setting in another thread), so we use threading.Event() instead.
# This requires the use of run_in_executor, but that's fine.
put_update_event = threading.Event()
callback_done_event = threading.Event()
called_stop_running = threading.Event()
assertions = {}

async def get_updates(*args, **kwargs):
await asyncio.sleep(0)
return []

async def delete_webhook(*args, **kwargs):
return True

async def set_webhook(*args, **kwargs):
return True

async def post_init(app):
# Simply calling app.update_queue.put_nowait(method) in the thread_target doesn't work
# for some reason (probably threading magic), so we use an event from the thread_target
# to put the update into the queue in the main thread.
async def task(app):
await asyncio.get_running_loop().run_in_executor(None, put_update_event.wait)
await app.update_queue.put(method)

app.create_task(task(app))

app = ApplicationBuilder().bot(one_time_bot).post_init(post_init).build()
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)

events = []
monkeypatch.setattr(
app.updater,
"stop",
call_after(app.updater.stop, lambda _: events.append("updater.stop")),
)
monkeypatch.setattr(
app,
"stop",
call_after(app.stop, lambda _: events.append("app.stop")),
)
monkeypatch.setattr(
app,
"shutdown",
call_after(app.shutdown, lambda _: events.append("app.shutdown")),
)

def thread_target():
waited = 0
while not app.running:
time.sleep(0.05)
waited += 0.05
if waited > 5:
pytest.fail("App apparently won't start")

time.sleep(0.1)
assertions["called_stop_running_not_set"] = not called_stop_running.is_set()

put_update_event.set()
time.sleep(0.1)

assertions["called_stop_running_set"] = called_stop_running.is_set()

# App should have entered `stop` now but not finished it yet because the callback
# is still running
assertions["updater.stop_event"] = events == ["updater.stop"]
assertions["app.running_False"] = not app.running

callback_done_event.set()
time.sleep(0.1)

# Now that the update is fully handled, we expect the full shutdown
assertions["events"] = events == ["updater.stop", "app.stop", "app.shutdown"]

async def callback(update, context):
context.application.stop_running()
called_stop_running.set()
await asyncio.get_running_loop().run_in_executor(None, callback_done_event.wait)

app.add_handler(TypeHandler(object, callback))

thread = Thread(target=thread_target)
thread.start()

if method == "polling":
app.run_polling(close_loop=False, drop_pending_updates=True)
else:
ip = "127.0.0.1"
port = randrange(1024, 49152)

app.run_webhook(
ip_address=ip,
port=port,
url_path="TOKEN",
drop_pending_updates=False,
close_loop=False,
)

thread.join()

assert len(assertions) == 5
for key, value in assertions.items():
assert value, f"assertion '{key}' failed!"
Loading
0