8000 gh-121723: Relax constraints on queue objects for `logging.handlers.QueueHandler`. by picnixz · Pull Request #122154 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-121723: Relax constraints on queue objects for logging.handlers.QueueHandler. #122154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 2, 2024
9 changes: 6 additions & 3 deletions Doc/library/logging.config.rst
8000
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,12 @@ The ``queue`` and ``listener`` keys are optional.

If the ``queue`` key is present, the corresponding value can be one of the following:

* An actual instance of :class:`queue.Queue` or a subclass thereof. This is of course
only possible if you are constructing or modifying the configuration dictionary in
code.
* An object implementing the :class:`queue.Queue` public API. For instance,
this may be an actual instance of :class:`queue.Queue` or a subclass thereof,
or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.

This is of course only possible if you are constructing or modifying
the configuration dictionary in code.

* A string that resolves to a callable which, when called with no arguments, returns
the :class:`queue.Queue` instance to use. That callable could be a
Expand Down
55 changes: 29 additions & 26 deletions Lib/logging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,33 @@ def as_tuple(self, value):
value = tuple(value)
return value

def _is_queue_like_object(obj):
"""Check that *obj* implements the Queue API."""
if isinstance(obj, queue.Queue):
return True
# defer importing multiprocessing as much as possible
from multiprocessing.queues import Queue as MPQueue
if isinstance(obj, MPQueue):
return True
# Depending on the multiprocessing start context, we cannot create
# a multiprocessing.managers.BaseManager instance 'mm' to get the
# runtime type of mm.Queue() or mm.JoinableQueue() (see gh-119819).
#
# Since we only need an object implementing the Queue API, we only
# do a protocol check, but we do not use typing.runtime_checkable()
# and typing.Protocol to reduce import time (see gh-121723).
#
# Ideally, we would have wanted to simply use strict type checking
# instead of a protocol-based type checking since the latter does
# not check the method signatures.
queue_interface = [
'empty', 'full', 'get', 'get_nowait',
'put', 'put_nowait', 'join', 'qsize',
'task_done',
]
return all(callable(getattr(obj, method, None))
for method in queue_interface)

class DictConfigurator(BaseConfigurator):
"""
Configure logging using a dictionary-like object to describe the
Expand Down Expand Up @@ -791,32 +818,8 @@ def configure_handler(self, config):
if '()' not in qspec:
raise TypeError('Invalid queue specifier %r' % qspec)
config['queue'] = self.configure_custom(dict(qspec))
else:
from multiprocessing.queues import Queue as MPQueue

if not isinstance(qspec, (queue.Queue, MPQueue)):
# Safely check if 'qspec' is an instance of Manager.Queue
# / Manager.JoinableQueue

from multiprocessing import Manager as MM
from multiprocessing.managers import BaseProxy

# if it's not an instance of BaseProxy, it also can't be
# an instance of Manager.Queue / Manager.JoinableQueue
if isinstance(qspec, BaseProxy):
# Sometimes manager or queue creation might fail
# (e.g. see issue gh-120868). In that case, any
# exception during the creation of these queues will
# propagate up to the caller and be wrapped in a
# `Valu 8000 eError`, whose cause will indicate the details of
# the failure.
mm = MM()
proxy_queue = mm.Queue()
proxy_joinable_queue = mm.JoinableQueue()
if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))):
raise TypeError('Invalid queue specifier %r' % qspec)
else:
raise TypeError('Invalid queue specifier %r' % qspec)
elif not _is_queue_like_object(qspec):
raise TypeError('Invalid queue specifier %r' % qspec)

if 'listener' in config:
lspec = config['listener']
Expand Down
107 changes: 86 additions & 21 deletions Lib/test/test_logging.py
6D40
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,26 @@ class CustomListener(logging.handlers.QueueListener):
class CustomQueue(queue.Queue):
pass

class CustomQueueProtocol:
def __init__(self, maxsize=0):
self.queue = queue.Queue(maxsize)

def __getattr__(self, attribute):
queue = object.__getattribute__(self, 'queue')
return getattr(queue, attribute)

class CustomQueueFakeProtocol(CustomQueueProtocol):
# An object implementing the Queue API (incorrect signatures).
# The object will be considered a valid queue class since we
# do not check the signatures (only callability of methods)
# but will NOT be usable in production since a TypeError will
# be raised due to a missing argument.
def empty(self, x):
pass

class CustomQueueWrongProtocol(CustomQueueProtocol):
empty = None

def queueMaker():
return queue.Queue()

Expand Down Expand Up @@ -3901,18 +3921,16 @@ def do_queuehandler_configuration(self, qspec, lspec):
@threading_helper.requires_working_threading()
@support.requires_subprocess()
def test_config_queue_handler(self):
q = CustomQueue()
dq = {
'()': __name__ + '.CustomQueue',
'maxsize': 10
}
qs = [CustomQueue(), CustomQueueProtocol()]
dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10}
for cls in ['CustomQueue', 'CustomQueueProtocol']]
dl = {
'()': __name__ + '.listenerMaker',
'arg1': None,
'arg2': None,
'respect_handler_level': True
}
qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', dq, q)
qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs)
lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
for qspec, lspec in itertools.product(qvalues, lvalues):
self.do_queuehandler_configuration(qspec, lspec)
Expand All @@ -3932,15 +3950,21 @@ def test_config_queue_handler(self):
@support.requires_subprocess()
@patch("multiprocessing.Manager")
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
# gh-120868
# gh-120868, gh-121723

from multiprocessing import Queue as MQ

q1 = {"()": "queue.Queue", "maxsize": -1}
q2 = MQ()
q3 = queue.Queue()

for qspec in (q1, q2, q3):
# CustomQueueFakeProtocol passes the checks but will not be usable
# since the signatures are incompatible. Checking the Queue API
# without testing the type of the actual queue is a trade-off
# between usability and the work we need to do in order to safely
# check that the queue object correctly implements the API.
q4 = CustomQueueFakeProtocol()

for qspec in (q1, q2, q3, q4):
self.apply_config(
{
"version": 1,
Expand All @@ -3956,21 +3980,62 @@ def test_config_queue_handler_does_not_create_multiprocessing_manager(self, mana

@patch("multiprocessing.Manager")
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
# gh-120868
# gh-120868, gh-121723

with self.assertRaises(ValueError):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": object(),
for qspec in [object(), CustomQueueWrongProtocol()]:
with self.assertRaises(ValueError):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": qspec,
},
},
},
}
)
manager.assert_not_called()

@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
"assertions in multiprocessing")
def test_config_queue_handler_multiprocessing_context(self):
# regression test for gh-121723
if support.MS_WINDOWS:
start_methods = ['spawn']
else:
start_methods = ['spawn', 'fork', 'forkserver']
for start_method in start_methods:
with self.subTest(start_method=start_method):
ctx = multiprocessing.get_context(start_method)
with ctx.Manager() as manager:
q = manager.Queue()
records = []
# use 1 process and 1 task per child to put 1 record
with ctx.Pool(1, initializer=self._mpinit_issue121723,
initargs=(q, "text"), maxtasksperchild=1):
records.append(q.get(timeout=60))
self.assertTrue(q.empty())
self.assertEqual(len(records), 1)

@staticmethod
def _mpinit_issue121723(qspec, message_to_log):
# static method for pickling support
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'log_to_parent': {
'class': 'logging.handlers.QueueHandler',
'queue': qspec
}
)
manager.assert_not_called()
},
'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'}
})
# log a message (this creates a record put in the queue)
logging.getLogger().info(message_to_log)

@skip_if_tsan_fork
@support.requires_subprocess()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Make :func:`logging.config.dictConfig` accept any object implementing the
Queue public API. See the :ref:`queue configuration <configure-queue>`
section for details. Patch by Bénédikt Tran.
Loading
0