-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Updater improvements #1018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Updater improvements #1018
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
ce447b8
updater: Refactor bootstrap phase to be resilient for network errors
tsnoam fc98577
Improved unitests for polling updater
tsnoam c5f0ed8
Remove logs which shouldn't have been commited
tsnoam b7c9ace
Merge remote-tracking branch 'origin/master' into resilient_bootstrap
tsnoam 66a03ee
FIx documentation of start_polling() with new bootstrap_retries value
tsnoam 597c9f5
Move _clean_updates() into inner procedure bootstrap_clean_updates()
tsnoam 152d034
squash! FIx documentation of start_{polling,webhook}() with new boots…
tsnoam aa35961
Don't call set_webhook with empty url - redundant to del_webhook
tsnoam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,15 +149,15 @@ def _thread_wrapper(self, target, *args, **kwargs): | |
target(*args, **kwargs) | ||
except Exception: | ||
self.__exception_event.set() | ||
self.logger.exception('unhandled exception') | ||
self.logger.exception('unhandled exception in %s', thr_name) | ||
raise | ||
self.logger.debug('{0} - ended'.format(thr_name)) | ||
|
||
def start_polling(self, | ||
poll_interval=0.0, | ||
timeout=10, | ||
clean=False, | ||
bootstrap_retries=0, | ||
bootstrap_retries=-1, | ||
read_latency=2., | ||
allowed_updates=None): | ||
"""Starts polling updates from Telegram. | ||
|
@@ -171,8 +171,8 @@ def start_polling(self, | |
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
|
||
* < 0 - retry indefinitely | ||
* 0 - no retries (default) | ||
* < 0 - retry indefinitely (default) | ||
* 0 - no retries | ||
* > 0 - retry up to X times | ||
|
||
allowed_updates (List[:obj:`str`], optional): Passed to | ||
|
@@ -229,8 +229,8 @@ def start_webhook(self, | |
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
|
||
* < 0 - retry indefinitely | ||
* 0 - no retries (default) | ||
* < 0 - retry indefinitely (default) | ||
* 0 - no retries | ||
Comment on lines
+232
to
+233
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. documentation was updated to state indefinite retries, but the default value in the signature was not updated |
||
* > 0 - retry up to X times | ||
|
||
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind | ||
|
@@ -242,7 +242,6 @@ def start_webhook(self, | |
:obj:`Queue`: The update queue that can be filled from the main thread. | ||
|
||
""" | ||
|
||
with self.__lock: | ||
if not self.running: | ||
self.running = True | ||
|
@@ -262,46 +261,72 @@ def _start_polling(self, poll_interval, timeout, read_latency, bootstrap_retries | |
# updates from Telegram and inserts them in the update queue of the | ||
# Dispatcher. | ||
|
||
cur_interval = poll_interval | ||
self.logger.debug('Updater thread started') | ||
self.logger.debug('Updater thread started (polling)') | ||
|
||
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None) | ||
|
||
self.logger.debug('Bootstrap done') | ||
|
||
def polling_action_cb(): | ||
updates = self.bot.get_updates( | ||
self.last_update_id, timeout=timeout, read_latency=read_latency, | ||
allowed_updates=allowed_updates) | ||
|
||
if updates: | ||
if not self.running: | ||
self.logger.debug('Updates ignored and will be pulled again on restart') | ||
else: | ||
for update in updates: | ||
self.update_queue.put(update) | ||
self.last_update_id = updates[-1].update_id + 1 | ||
|
||
return True | ||
|
||
def polling_onerr_cb(exc): | ||
# Put the error into the update queue and let the Dispatcher | ||
# broadcast it | ||
self.update_queue.put(exc) | ||
|
||
self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', | ||
poll_interval) | ||
|
||
def _network_loop_retry(self, action_cb, onerr_cb, description, interval): | ||
"""Perform a loop calling `action_cb`, retrying after network errors. | ||
|
||
Stop condition for loop: `self.running` evaluates False or return value of `action_cb` | ||
evaluates False. | ||
|
||
Args: | ||
action_cb (:obj:`callable`): Network oriented callback function to call. | ||
onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the | ||
exception object as a parameter. | ||
description (:obj:`str`): Description text to use for logs and exception raised. | ||
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to | ||
`action_cb`. | ||
|
||
""" | ||
self.logger.debug('Start network loop retry %s', description) | ||
cur_interval = interval | ||
while self.running: | ||
try: | ||
updates = self.bot.get_updates( | ||
self.last_update_id, | ||
timeout=timeout, | ||
read_latency=read_latency, | ||
allowed_updates=allowed_updates) | ||
if not action_cb(): | ||
break | ||
except RetryAfter as e: | ||
self.logger.info(str(e)) | ||
self.logger.info('%s', e) | ||
cur_interval = 0.5 + e.retry_after | ||
except TimedOut as toe: | ||
self.logger.debug('Timed out getting Updates: %s', toe) | ||
# If get_updates() failed due to timeout, we should retry asap. | ||
self.logger.debug('Timed out %s: %s', description, toe) | ||
# If failure is due to timeout, we should retry asap. | ||
cur_interval = 0 | ||
except InvalidToken as pex: | ||
self.logger.error('Invalid token; aborting') | ||
raise pex | ||
except TelegramError as te: | ||
self.logger.error('Error while getting Updates: %s', te) | ||
|
||
# Put the error into the update queue and let the Dispatcher | ||
# broadcast it | ||
self.update_queue.put(te) | ||
|
||
self.logger.error('Error while %s: %s', description, te) | ||
onerr_cb(te) | ||
cur_interval = self._increase_poll_interval(cur_interval) | ||
else: | ||
if not self.running: | ||
if len(updates) > 0: | ||
self.logger.debug('Updates ignored and will be pulled ' | ||
'again on restart.') | ||
break | ||
|
||
if updates: | ||
for update in updates: | ||
self.update_queue.put(update) | ||
self.last_update_id = updates[-1].update_id + 1 | ||
|
||
cur_interval = poll_interval | ||
cur_interval = interval | ||
|
||
if cur_interval: | ||
sleep(cur_interval) | ||
|
@@ -319,7 +344,7 @@ def _increase_poll_interval(current_interval): | |
|
||
def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, | ||
webhook_url, allowed_updates): | ||
self.logger.debug('Updater thread started') | ||
self.logger.debug('Updater thread started (webhook)') | ||
use_ssl = cert is not None and key is not None | ||
if not url_path.startswith('/'): | ||
url_path = '/{0}'.format(url_path) | ||
|
@@ -370,39 +395,56 @@ def _check_ssl_cert(self, cert, key): | |
def _gen_webhook_url(listen, port, url_path): | ||
return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path) | ||
|
||
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None): | ||
retries = 0 | ||
while 1: | ||
|
||
try: | ||
if clean: | ||
# Disable webhook for cleaning | ||
self.bot.delete_webhook() | ||
self._clean_updates() | ||
sleep(1) | ||
|
||
self.bot.set_webhook( | ||
url=webhook_url, certificate=cert, allowed_updates=allowed_updates) | ||
except (Unauthorized, InvalidToken): | ||
raise | ||
except TelegramError: | ||
msg = 'error in bootstrap phase; try={0} max_retries={1}'.format(retries, | ||
max_retries) | ||
if max_retries < 0 or retries < max_retries: | ||
self.logger.warning(msg) | ||
retries += 1 | ||
else: | ||
self.logger.exception(msg) | ||
raise | ||
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None, | ||
bootstrap_interval=5): | ||
retries = [0] | ||
|
||
def bootstrap_del_webhook(): | ||
self.bot.delete_webhook() | ||
return False | ||
|
||
def bootstrap_clean_updates(): | ||
self.logger.debug('Cleaning updates from Telegram server') | ||
updates = self.bot.get_updates() | ||
while updates: | ||
updates = self.bot.get_updates(updates[-1].update_id + 1) | ||
return False | ||
|
||
def bootstrap_set_webhook(): | ||
self.bot.set_webhook( | ||
url=webhook_url, certificate=cert, allowed_updates=allowed_updates) | ||
return False | ||
|
||
def bootstrap_onerr_cb(exc): | ||
if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries): | ||
retries[0] += 1 | ||
self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s', | ||
retries[0], max_retries) | ||
else: | ||
break | ||
self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc) | ||
raise exc | ||
|
||
# Cleaning pending messages is done by polling for them - so we need to delete webhook if | ||
# one is configured. | ||
# We also take this chance to delete pre-configured webhook if this is a polling Updater. | ||
# NOTE: We don't know ahead if a webhook is configured, so we just delete. | ||
if clean or not webhook_url: | ||
self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb, | ||
'bootstrap del webhook', bootstrap_interval) | ||
retries[0] = 0 | ||
|
||
# Clean pending messages, if requested. | ||
if clean: | ||
self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, | ||
'bootstrap clean updates', bootstrap_interval) | ||
retries[0] = 0 | ||
sleep(1) | ||
|
||
def _clean_updates(self): | ||
self.logger.debug('Cleaning updates from Telegram server') | ||
updates = self.bot.get_updates() | ||
while updates: | ||
updates = self.bot.get_updates(updates[-1].update_id + 1) | ||
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, | ||
# so we set it anyhow. | ||
if webhook_url: | ||
self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, | ||
'bootstrap set webhook', bootstrap_interval) | ||
|
||
def stop(self): | ||
"""Stops the polling/webhook thread, the dispatcher and the job queue.""" | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring doesn;t reflect this change to the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix