8000 Fix thread shutdown on PY39+ (fix #1121) by haakonvt · Pull Request #1122 · cognitedata/cognite-sdk-python · GitHub
[go: up one dir, main page]

Skip to content

Fix thread shutdown on PY39+ (fix #1121) #1122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [5.3.4] - 21-01-24
## [5.3.5] - 25-01-23
### Fixed
- Fixed an atexit-exception (`TypeError: '<' not supported between instances of 'tuple' and 'NoneType'`) that could be raised on PY39+ after fetching datapoints (which uses a custom thread pool implementation).

## [5.3.4] - 25-01-23
### Fixed
- Displaying Cognite resources like an `Asset` or a `TimeSeriesList` in a Jupyter notebook or similar environments depending on `._repr_html_`, no longer raises `CogniteImportError` stating that `pandas` is required. Instead, a warning is issued and `.dump()` is used as fallback.

## [5.3.3] - 20-01-24
## [5.3.3] - 24-01-23
### Added
- New parameter `token_cache_path` now accepted by `OAuthInteractive` and `OAuthDeviceCode` to allow overriding location of token cache.

### Fixed
- Platform dependent temp directory for the caching of the token in `OAuthInteractive` and `OAuthDeviceCode` (no longer crashes at exit on Windows).

## [5.3.2] - 20-01-24
## [5.3.2] - 24-01-23
### Changed
- Update pytest and other dependencies to get rid of dependency on the `py` package.

Expand Down
2 changes: 2 additions & 0 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ def _get_result_with_exception_handling(
except CancelledError:
return None
except CogniteAPIError as e:
# Break ref cycle with the exception:
future._exception = None # type: ignore [attr-defined]
if not (e.code == 400 and e.missing and ts_task.query.ignore_unknown_ids):
# TODO: We only notify the user one the first occurrence of a missing time series, and we
# should probably change that (add note to exception or await all ts have been checked)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "5.3.4"
__version__ = "5.3.5"
__api_subversion__ = "V20220125"
87 changes: 55 additions & 32 deletions cognite/client/utils/_priority_tpe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code has been modified from the original created by Oleg Lupats, 2019, under an MIT license:
This code has been heavily modified from the original created by Oleg Lupats, 2019, under an MIT license:
project = 'PriorityThreadPoolExecutor'
url = 'https://github.com/oleglpts/PriorityThreadPoolExecutor'
copyright = '2019, Oleg Lupats'
Expand Down Expand Up @@ -30,39 +30,50 @@
"""
from __future__ import annotations

import atexit
import inspect
import itertools
import sys
import threading
import weakref
from concurrent.futures.thread import ThreadPoolExecutor, _base, _python_exit, _threads_queues, _WorkItem
from queue import PriorityQueue
from threading import Lock
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor, _base, _WorkItem
from queue import Empty, PriorityQueue
from threading import Lock, Thread

NULL_ENTRY = (sys.maxsize, None, _WorkItem(None, None, (), {}))
NULL_ENTRY = (-1, None, None)
_THREADS_QUEUES = weakref.WeakKeyDictionary()
_SHUTDOWN = False
# Lock that ensures that new workers are not created while the interpreter is
# shutting down. Must be held while mutating _THREADS_QUEUES and _SHUTDOWN.
_GLOBAL_SHUTDOWN_LOCK = Lock()


def python_exit():
global _SHUTDOWN
_SHUTDOWN = True
items = list(_threads_queues.items())
with _GLOBAL_SHUTDOWN_LOCK:
_SHUTDOWN = True
items = list(_THREADS_QUEUES.items())
for thread, queue in items:
queue.put(NULL_ENTRY)
for thread, queue in items:
thread.join()


atexit.unregister(_python_exit)
atexit.register(python_exit)
# Starting with 3.9, ThreadPoolExecutor no longer uses daemon threads, and so instead, an internal
# function hook very similar to atexit.register() gets called at 'threading' shutdown instead of
# interpreter shutdown. Check out https://github.com/python/cpython/issues/83993
if sys.version_info[:2] < (3, 9):
from atexit import register as _register_atexit
else:
from threading import _register_atexit

_register_atexit(python_exit)


def _worker(executor_reference, work_queue):
try:
while True:
priority, _, work_item = work_queue.get(block=True)
if priority != sys.maxsize:
work_item = work_queue.get(block=True)[-1]
if work_item is not None:
work_item.run()
del work_item
continue
Expand All @@ -76,34 +87,38 @@ def _worker(executor_reference, work_queue):


class PriorityThreadPoolExecutor(ThreadPoolExecutor):
"""Thread pool executor with queue.PriorityQueue()"""
"""Thread pool executor with queue.PriorityQueue() as its work queue. Accepts a 'priority' parameter
thats controls the prioritisation of tasks: lower numbers being run before higher numbers, and
0 (zero) being the highest possible priority.

All tasks not given a priority will be given `priority=0` to work seamlessly as a stand-in for the
regular ThreadPoolExecutor.
"""

def __init__(self, max_workers=None):
super().__init__(max_workers)
self._work_queue = PriorityQueue()
self._lock = Lock()
self._counter = itertools.count()

def counter(self):
with self._lock:
return next(self._counter)
self._task_counter = itertools.count().__next__

def submit(self, fn, *args, **kwargs):
if "priority" in inspect.signature(fn).parameters:
raise TypeError(f"Given function {fn} cannot accept reserved parameter name `priority`")

with self._shutdown_lock:
with self._shutdown_lock, _GLOBAL_SHUTDOWN_LOCK:
if self._shutdown:
raise RuntimeError("Cannot schedule new futures after shutdown")

priority = kwargs.pop("priority", None)
assert isinstance(priority, int), "`priority` has to be an integer"
if _SHUTDOWN:
raise RuntimeError("Cannot schedule new futures after interpreter shutdown")

future = _base.Future()
if (priority := kwargs.pop("priority", 0)) < 0:
raise ValueError("'priority' has to be a nonnegative number, 0 being the highest possible priority")

future = Future()
work_item = _WorkItem(future, fn, args, kwargs)

# `counter` to break ties, but keep order:
self._work_queue.put((priority, self.counter(), work_item))
# We use a counter to break ties, but keep order:
self._work_queue.put((priority, self._task_counter(), work_item))
self._adjust_thread_count()
return future

Expand All @@ -112,20 +127,28 @@ def weak_ref_cb(_, queue=self._work_queue):
queue.put(NULL_ENTRY)

if len(self._threads) < self._max_workers:
thread = threading.Thread(target=_worker, args=(weakref.ref(self, weak_ref_cb), self._work_queue))
thread = Thread(target=_worker, args=(weakref.ref(self, weak_ref_cb), self._work_queue))
thread.daemon = True
thread.start()
self._threads.add(thread)
_threads_queues[thread] = self._work_queue
_THREADS_QUEUES[thread] = self._work_queue

def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True

# Empty the entire work queue: (mod. from py39's added option 'cancel_futures')
while True:
try:
work_item = self._work_queue.get_nowait()[-1]
except Empty:
break
if work_item is not None:
work_item.future.cancel()

# Send a wake-up to prevent threads calling _work_queue.get(block=True) from permanently blocking
self._work_queue.put(NULL_ENTRY)

if wait:
for thread in self._threads:
thread.join()
else:
# See: https://gist.github.com/clchiou/f2608cbe54403edb0b13
self._threads.clear()
_threads_queues.clear()
2 changes: 1 addition & 1 deletion pyproject.toml
4AFB
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "5.3.4"
version = "5.3.5"

description = "Cognite Python SDK"
readme = "README.md"
Expand Down
0