From ce447b81ede89eb6732c89974eaa1d6c473d4a3f Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Mon, 19 Feb 2018 16:57:35 +0200 Subject: [PATCH 1/7] updater: Refactor bootstrap phase to be resilient for network errors --- telegram/ext/updater.py | 146 +++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 55 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 83c421cc60b..6e19d0bfc7f 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -150,7 +150,7 @@ def _thread_wrapper(self, target, *args, **kwargs): target(*args, **kwargs) except Exception: self.__exception_event.set() - self.logger.exception('unhandled exception') + self.logger.exception('unhandled exception in %s', thr_name) raise self.logger.debug('{0} - ended'.format(thr_name)) @@ -159,7 +159,7 @@ def start_polling(self, timeout=10, network_delay=None, clean=False, - bootstrap_retries=0, + bootstrap_retries=-1, read_latency=2., allowed_updates=None): """Starts polling updates from Telegram. @@ -271,46 +271,72 @@ def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries # updates from Telegram and inserts them in the update queue of the # Dispatcher. - cur_interval = poll_interval - self.logger.debug('Updater thread started') + self.logger.debug('Updater thread started (polling)') self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None) + self.logger.debug('Bootstrap done') + + def polling_action_cb(): + updates = self.bot.get_updates( + self.last_update_id, timeout=timeout, read_latency=read_latency, + allowed_updates=allowed_updates) + + if updates: + if not self.running: + self.logger.debug('Updates ignored and will be pulled again on restart') + else: + for update in updates: + self.update_queue.put(update) + self.last_update_id = updates[-1].update_id + 1 + + return True + + def polling_onerr_cb(exc): + # Put the error into the update queue and let the Dispatcher + # broadcast it + self.update_queue.put(exc) + + self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', + poll_interval) + + def _network_loop_retry(self, action_cb, onerr_cb, description, interval): + """Perform a loop calling `action_cb`, retrying after network errors. + + Stop condition for loop: `self.running` evaluates False or return value of `action_cb` + evaluates False. + + Args: + action_cb (:obj:`callable`): Network oriented callback function to call. + onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the + exception object as a parameter. + description (:obj:`str`): Description text to use for logs and exception raised. + interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to + `action_cb`. + + """ + self.logger.debug('Start network loop retry %s', description) + cur_interval = interval while self.running: try: - updates = self.bot.get_updates( - self.last_update_id, - timeout=timeout, - read_latency=read_latency, - allowed_updates=allowed_updates) + if not action_cb(): + break except RetryAfter as e: - self.logger.info(str(e)) + self.logger.info('%s', e) cur_interval = 0.5 + e.retry_after except TimedOut as toe: - self.logger.debug('Timed out getting Updates: %s', toe) - # If get_updates() failed due to timeout, we should retry asap. + self.logger.debug('Timed out %s: %s', description, toe) + # If failure is due to timeout, we should retry asap. cur_interval = 0 + except InvalidToken as pex: + self.logger.error('Invalid token; aborting') + raise pex except TelegramError as te: - self.logger.error('Error while getting Updates: %s', te) - - # Put the error into the update queue and let the Dispatcher - # broadcast it - self.update_queue.put(te) - + self.logger.error('Error while %s: %s', description, te) + onerr_cb(te) cur_interval = self._increase_poll_interval(cur_interval) else: - if not self.running: - if len(updates) > 0: - self.logger.debug('Updates ignored and will be pulled ' - 'again on restart.') - break - - if updates: - for update in updates: - self.update_queue.put(update) - self.last_update_id = updates[-1].update_id + 1 - - cur_interval = poll_interval + cur_interval = interval if cur_interval: sleep(cur_interval) @@ -328,7 +354,7 @@ def _increase_poll_interval(current_interval): def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, webhook_url, allowed_updates): - self.logger.debug('Updater thread started') + self.logger.debug('Updater thread started (webhook)') use_ssl = cert is not None and key is not None if not url_path.startswith('/'): url_path = '/{0}'.format(url_path) @@ -379,34 +405,44 @@ def _check_ssl_cert(self, cert, key): def _gen_webhook_url(listen, port, url_path): return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path) - def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None): - retries = 0 - while 1: + def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None, + bootstrap_interval=5): + retries = [0] - try: - if clean: - # Disable webhook for cleaning - self.bot.delete_webhook() - self._clean_updates() - sleep(1) - - self.bot.set_webhook( - url=webhook_url, certificate=cert, allowed_updates=allowed_updates) - except (Unauthorized, InvalidToken): - raise - except TelegramError: - msg = 'error in bootstrap phase; try={0} max_retries={1}'.format(retries, - max_retries) - if max_retries < 0 or retries < max_retries: - self.logger.warning(msg) - retries += 1 - else: - self.logger.exception(msg) - raise + def bootstrap_del_webhook(): + self.bot.delete_webhook() + return False + + def bootstrap_clean_updates(): + self._clean_updates() + return False + + def bootstrap_set_webhook(): + self.bot.set_webhook( + url=webhook_url, certificate=cert, allowed_updates=allowed_updates) + return False + + def bootstrap_onerr_cb(exc): + if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries): + retries[0] += 1 + self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s', + retries[0], max_retries) else: - break + self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc) + raise exc + + if clean: + self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb, + 'bootstrap del webhook', bootstrap_interval) + retries[0] = 0 + self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, + 'bootstrap clean updates', bootstrap_interval) + retries[0] = 0 sleep(1) + self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, + 'bootstrap set webhook', bootstrap_interval) + def _clean_updates(self): self.logger.debug('Cleaning updates from Telegram server') updates = self.bot.get_updates() From fc98577e9e89e401a81dd664fc1ef64c24424693 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Thu, 22 Feb 2018 13:46:33 +0200 Subject: [PATCH 2/7] Improved unitests for polling updater --- tests/test_updater.py | 99 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 88 insertions(+), 11 deletions(-) diff --git a/tests/test_updater.py b/tests/test_updater.py index 7cec00cce49..815d51785f9 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -23,7 +23,7 @@ from functools import partial from queue import Queue from random import randrange -from threading import Thread +from threading import Thread, Event from time import sleep try: @@ -38,13 +38,18 @@ from future.builtins import bytes from telegram import TelegramError, Message, User, Chat, Update, Bot -from telegram.error import Unauthorized, InvalidToken +from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter from telegram.ext import Updater signalskip = pytest.mark.skipif(sys.platform == 'win32', reason='Can\'t send signals without stopping ' 'whole process on windows') +logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.DEBUG) + +logger = logging.getLogger(__name__) + @pytest.fixture(scope='function') def updater(bot): @@ -58,31 +63,98 @@ class TestUpdater(object): message_count = 0 received = None attempts = 0 + err_handler_called = Event() + cb_handler_called = Event() @pytest.fixture(autouse=True) def reset(self): self.message_count = 0 self.received = None self.attempts = 0 + self.err_handler_called.clear() + self.cb_handler_called.clear() def error_handler(self, bot, update, error): self.received = error.message + self.err_handler_called.set() def callback(self, bot, update): self.received = update.message.text + self.cb_handler_called.set() + + # TODO: test clean= argument of Updater._bootstrap + + @pytest.mark.parametrize(('error',), + argvalues=[(TelegramError('Test Error 2'),), + (Unauthorized('Test Unauthorized'),)], + ids=('TelegramError', 'Unauthorized')) + def test_get_updates_normal_err(self, monkeypatch, updater, error): + def test(*args, **kwargs): + raise error + + monkeypatch.setattr('telegram.Bot.get_updates', test) + monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) + updater.dispatcher.add_error_handler(self.error_handler) + updater.start_polling(0.01) + + # Make sure that the error handler was called + self.err_handler_called.wait() + assert self.received == error.message + + # Make sure that Updater polling thread keeps running + self.err_handler_called.clear() + self.err_handler_called.wait() + + def test_get_updates_bailout_err(self, monkeypatch, updater, caplog): + error = InvalidToken() + + def test(*args, **kwargs): + raise error - # TODO: test clean= argument + with caplog.at_level(logging.DEBUG): + monkeypatch.setattr('telegram.Bot.get_updates', test) + monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) + updater.dispatcher.add_error_handler(self.error_handler) + updater.start_polling(0.01) + assert self.err_handler_called.wait(0.5) is not True + + # NOTE: This test might hit a race condition and fail (though the 0.5 seconds delay above + # should work around it). + # NOTE: Checking Updater.running is problematic because it is not set to False when there's + # an unhandled exception. + # TODO: We should have a way to poll Updater status and decide if it's running or not. + assert any('unhandled exception in updater' in rec.getMessage() for rec in + caplog.get_records('call')) + + @pytest.mark.parametrize(('error',), + argvalues=[(RetryAfter(0.01),), + (TimedOut(),)], + ids=('RetryAfter', 'TimedOut')) + def test_get_updates_retries(self, monkeypatch, updater, error): + event = Event() - def test_error_on_get_updates(self, monkeypatch, updater): def test(*args, **kwargs): - raise TelegramError('Test Error 2') + event.set() + raise error monkeypatch.setattr('telegram.Bot.get_updates', test) monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) updater.dispatcher.add_error_handler(self.error_handler) updater.start_polling(0.01) - sleep(.1) - assert self.received == 'Test Error 2' + + # Make sure that get_updates was called, but not the error handler + logger.info('waiting for event') + event.wait() + logger.info('making sure err handler not called') + assert self.err_handler_called.wait(0.5) is not True + assert self.received != error.message + + # Make sure that Updater polling thread keeps running + event.clear() + logger.info('waiting for second event') + event.wait() + logger.info('making sure err handler not called 2') + assert self.err_handler_called.wait(0.5) is not True def test_webhook(self, monkeypatch, updater): q = Queue() @@ -145,17 +217,21 @@ def test_webhook_no_ssl(self, monkeypatch, updater): sleep(.2) assert q.get(False) == update - def test_bootstrap_retries_success(self, monkeypatch, updater): + @pytest.mark.parametrize(('error',), + argvalues=[(TelegramError(''),)], + ids=('TelegramError',)) + def test_bootstrap_retries_success(self, monkeypatch, updater, error): retries = 2 def attempt(_, *args, **kwargs): if self.attempts < retries: self.attempts += 1 - raise TelegramError('') + raise error monkeypatch.setattr('telegram.Bot.set_webhook', attempt) - updater._bootstrap(retries, False, 'path', None) + updater.running = True + updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0) assert self.attempts == retries @pytest.mark.parametrize(('error', 'attempts'), @@ -172,8 +248,9 @@ def attempt(_, *args, **kwargs): monkeypatch.setattr('telegram.Bot.set_webhook', attempt) + updater.running = True with pytest.raises(type(error)): - updater._bootstrap(retries, False, 'path', None) + updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0) assert self.attempts == attempts def test_webhook_invalid_posts(self, updater): From c5f0ed8bb4774e30165c5be4dbc9dfc74ec6e9b8 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Thu, 22 Feb 2018 14:36:57 +0200 Subject: [PATCH 3/7] Remove logs which shouldn't have been commited --- tests/test_updater.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/test_updater.py b/tests/test_updater.py index 815d51785f9..85c4a6c9971 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -45,11 +45,6 @@ reason='Can\'t send signals without stopping ' 'whole process on windows') -logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.DEBUG) - -logger = logging.getLogger(__name__) - @pytest.fixture(scope='function') def updater(bot): @@ -143,17 +138,13 @@ def test(*args, **kwargs): updater.start_polling(0.01) # Make sure that get_updates was called, but not the error handler - logger.info('waiting for event') event.wait() - logger.info('making sure err handler not called') assert self.err_handler_called.wait(0.5) is not True assert self.received != error.message # Make sure that Updater polling thread keeps running event.clear() - logger.info('waiting for second event') event.wait() - logger.info('making sure err handler not called 2') assert self.err_handler_called.wait(0.5) is not True def test_webhook(self, monkeypatch, updater): From 66a03eef073278e0bc9e8043e543e0e3f55a9764 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Thu, 1 Mar 2018 12:37:29 +0200 Subject: [PATCH 4/7] FIx documentation of start_polling() with new bootstrap_retries value --- telegram/ext/updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 93a394c51c4..b2b453b8180 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -169,7 +169,7 @@ def start_polling(self, clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers before actually starting to poll. Default is False. bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the - `Updater` will retry on failures on the Telegram server. + `Updater` will retry on failures on the Telegram server. Default is -1. * < 0 - retry indefinitely * 0 - no retries (default) From 597c9f5d9ac7fd30ccf84a7f6e7e2a9e4b785f35 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Thu, 1 Mar 2018 12:38:25 +0200 Subject: [PATCH 5/7] Move _clean_updates() into inner procedure bootstrap_clean_updates() --- telegram/ext/updater.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index b2b453b8180..67d27752d1e 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -405,7 +405,10 @@ def bootstrap_del_webhook(): return False def bootstrap_clean_updates(): - self._clean_updates() + self.logger.debug('Cleaning updates from Telegram server') + updates = self.bot.get_updates() + while updates: + updates = self.bot.get_updates(updates[-1].update_id + 1) return False def bootstrap_set_webhook(): @@ -434,12 +437,6 @@ def bootstrap_onerr_cb(exc): self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, 'bootstrap set webhook', bootstrap_interval) - def _clean_updates(self): - self.logger.debug('Cleaning updates from Telegram server') - updates = self.bot.get_updates() - while updates: - updates = self.bot.get_updates(updates[-1].update_id + 1) - def stop(self): """Stops the polling/webhook thread, the dispatcher and the job queue.""" From 152d034d9d903e65cc3db189d80886027432a535 Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Thu, 1 Mar 2018 12:44:46 +0200 Subject: [PATCH 6/7] squash! FIx documentation of start_{polling,webhook}() with new bootstrap_retries value [ci skip] --- telegram/ext/updater.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 67d27752d1e..1e95f95e4e0 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -169,10 +169,10 @@ def start_polling(self, clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers before actually starting to poll. Default is False. bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the - `Updater` will retry on failures on the Telegram server. Default is -1. + `Updater` will retry on failures on the Telegram server. - * < 0 - retry indefinitely - * 0 - no retries (default) + * < 0 - retry indefinitely (default) + * 0 - no retries * > 0 - retry up to X times allowed_updates (List[:obj:`str`], optional): Passed to @@ -229,8 +229,8 @@ def start_webhook(self, bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the `Updater` will retry on failures on the Telegram server. - * < 0 - retry indefinitely - * 0 - no retries (default) + * < 0 - retry indefinitely (default) + * 0 - no retries * > 0 - retry up to X times webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind From aa35961b52e52ff983d74dcab62e71dbe658c52c Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 2 Mar 2018 17:52:52 +0200 Subject: [PATCH 7/7] Don't call set_webhook with empty url - redundant to del_webhook --- telegram/ext/updater.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 1e95f95e4e0..77752ba17d0 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -242,7 +242,6 @@ def start_webhook(self, :obj:`Queue`: The update queue that can be filled from the main thread. """ - with self.__lock: if not self.running: self.running = True @@ -425,17 +424,27 @@ def bootstrap_onerr_cb(exc): self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc) raise exc - if clean: + # Cleaning pending messages is done by polling for them - so we need to delete webhook if + # one is configured. + # We also take this chance to delete pre-configured webhook if this is a polling Updater. + # NOTE: We don't know ahead if a webhook is configured, so we just delete. + if clean or not webhook_url: self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb, 'bootstrap del webhook', bootstrap_interval) retries[0] = 0 + + # Clean pending messages, if requested. + if clean: self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, 'bootstrap clean updates', bootstrap_interval) retries[0] = 0 sleep(1) - self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, - 'bootstrap set webhook', bootstrap_interval) + # Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, + # so we set it anyhow. + if webhook_url: + self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, + 'bootstrap set webhook', bootstrap_interval) def stop(self): """Stops the polling/webhook thread, the dispatcher and the job queue."""