8000 Update signal from CPython 3.12.3 by youknowone · Pull Request #5280 · RustPython/RustPython · GitHub
[go: up one dir, main page]

Skip to content

Update signal from CPython 3.12.3 #5280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files. 8000
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Lib/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@


def _int_to_enum(value, enum_klass):
"""Convert a numeric value to an IntEnum member.
If it's not a known member, return the numeric value itself.
"""Convert a possible numeric value to an IntEnum member.
If it's not a known member, return the value itself.
"""
if not isinstance(value, int):
return value
try:
return enum_klass(value)
except ValueError:
Expand Down
140 changes: 122 additions & 18 deletions Lib/test/test_signal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import enum
import errno
import functools
import inspect
import os
import random
Expand All @@ -13,6 +15,7 @@
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok, spawn_python
from test.support import threading_helper
try:
import _testcapi
except ImportError:
Expand All @@ -21,6 +24,8 @@

class GenericTests(unittest.TestCase):

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_enums(self):
for name in dir(signal):
sig = getattr(signal, name)
Expand All @@ -34,6 +39,32 @@ def test_enums(self):
self.assertIsInstance(sig, signal.Signals)
self.assertEqual(sys.platform, "win32")

CheckedSignals = enum._old_convert_(
enum.IntEnum, 'Signals', 'signal',
lambda name:
name.isupper()
and (name.startswith('SIG') and not name.startswith('SIG_'))
or name.startswith('CTRL_'),
source=signal,
)
enum._test_simple_enum(CheckedSignals, signal.Signals)

CheckedHandlers = enum._old_convert_(
enum.IntEnum, 'Handlers', 'signal',
lambda name: name in ('SIG_DFL', 'SIG_IGN'),
source=signal,
)
enum._test_simple_enum(CheckedHandlers, signal.Handlers)

Sigmasks = getattr(signal, 'Sigmasks', None)
if Sigmasks is not None:
CheckedSigmasks = enum._old_convert_(
enum.IntEnum, 'Sigmasks', 'signal',
lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
source=signal,
)
enum._test_simple_enum(CheckedSigmasks, Sigmasks)

def test_functions_module_attr(self):
# Issue #27718: If __all__ is not defined all non-builtin functions
# should have correct __module__ to be displayed by pydoc.
Expand All @@ -48,6 +79,9 @@ class PosixTests(unittest.TestCase):
def trivial_signal_handler(self, *args):
pass

def create_handler_with_partial(self, argument):
return functools.partial(self.trivial_signal_handler, argument)

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_out_of_range_signal_number_raises_error(self):
Expand All @@ -70,6 +104,28 @@ def test_getsignal(self):
signal.signal(signal.SIGHUP, hup)
self.assertEqual(signal.getsignal(signal.SIGHUP), hup)

def test_no_repr_is_called_on_signal_handler(self):
# See https://github.com/python/cpython/issues/112559.

class MyArgument:
def __init__(self):
self.repr_count = 0

def __repr__(self):
self.repr_count += 1
return super().__repr__()

argument = MyArgument()
self.assertEqual(0, argument.repr_count)

handler = self.create_handler_with_partial(argument)
hup = signal.signal(signal.SIGHUP, handler)
self.assertIsInstance(hup, signal.Handlers)
self.assertEqual(signal.getsignal(signal.SIGHUP), handler)
signal.signal(signal.SIGHUP, hup)
self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
self.assertEqual(0, argument.repr_count)

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_strsignal(self):
Expand All @@ -87,6 +143,10 @@ def test_interprocess_signal(self):

# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(
hasattr(signal, "valid_signals"),
"requires signal.valid_signals"
)
def test_valid_signals(self):
s = signal.valid_signals()
self.assertIsInstance(s, set)
Expand All @@ -96,7 +156,21 @@ def test_valid_signals(self):
self.assertNotIn(signal.NSIG, s)
self.assertLess(len(s), signal.NSIG)

# gh-91145: Make sure that all SIGxxx constants exposed by the Python
# signal module have a number in the [0; signal.NSIG-1] range.
for name in dir(signal):
if not name.startswith("SIG"):
continue
if name in {"SIG_IGN", "SIG_DFL"}:
# SIG_IGN and SIG_DFL are pointers
continue
with self.subTest(name=name):
signum = getattr(signal, name)
self.assertGreaterEqual(signum, 0)
self.assertLess(signum, signal.NSIG)

@unittest.skipUnless(sys.executable, "sys.executable required.")
@support.requires_subprocess()
def test_keyboard_interrupt_exit_code(self):
"""KeyboardInterrupt triggers exit via SIGINT."""
process = subprocess.run(
Expand Down Expand Up @@ -153,6 +227,7 @@ def test_issue9324(self):
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(sys.executable, "sys.executable required.")
@support.requires_subprocess()
def test_keyboard_interrupt_exit_code(self):
"""KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
# We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
Expand Down Expand Up @@ -185,6 +260,7 @@ def test_invalid_fd(self):
signal.set_wakeup_fd, fd)

@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skipUnless(support.has_socket_support, "needs working sockets.")
def test_invalid_socket(self):
sock = socket.socket()
fd = sock.fileno()
Expand All @@ -193,6 +269,10 @@ def test_invalid_socket(self):
signal.set_wakeup_fd, fd)

@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
# Emscripten does not support fstat on pipes yet.
# https://github.com/emscripten-core/emscripten/issues/16414
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_set_wakeup_fd_result(self):
r1, w1 = os.pipe()
self.addCleanup(os.close, r1)
Expand All @@ -211,6 +291,8 @@ def test_set_wakeup_fd_result(self):
self.assertEqual(signal.set_wakeup_fd(-1), -1)

@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(support.has_socket_support, "needs working sockets.")
def test_set_wakeup_fd_socket_result(self):
sock1 = socket.socket()
self.addCleanup(sock1.close)
Expand All @@ -230,6 +312,8 @@ def test_set_wakeup_fd_socket_result(self):
# On Windows, files are always blocking and Windows does not provide a
# function to test if a socket is in non-blocking mode.
@unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_set_wakeup_fd_blocking(self):
rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
Expand Down Expand Up @@ -290,6 +374,7 @@ def check_signum(signals):
assert_python_ok('-c', code)

@unittest.skipIf(_testcapi is None, 'need _testcapi')
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_wakeup_write_error(self):
# Issue #16105: write() errors in the C signal handler should not
# pass silently.
Expand Down Expand Up @@ -628,6 +713,8 @@ def handler(signum, frame):

@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
@support.requires_subprocess()
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
class SiginterruptTest(unittest.TestCase):

def readpipe_interrupted(self, interrupt):
Expand Down Expand Up @@ -708,6 +795,7 @@ def test_siginterrupt_on(self):
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)

@support.requires_resource('walltime')
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
Expand Down Expand Up @@ -781,15 +869,12 @@ def test_itimer_virtual(self):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(support.LONG_TIMEOUT):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_vtalrm handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# sig_vtalrm handler stopped this itimer
break

# virtual itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
Expand All @@ -803,15 +888,12 @@ def test_itimer_prof(self):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(support.LONG_TIMEOUT):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# sig_prof handler stopped this itimer
break

# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
Expand Down Expand Up @@ -873,6 +955,7 @@ def handler(signum, frame):

@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
@threading_helper.requires_working_threading()
def test_pthread_kill(self):
code = """if 1:
import signal
Expand Down Expand Up @@ -1009,6 +1092,7 @@ def test_sigtimedwait_negative_timeout(self):
'need signal.sigwait()')
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
@threading_helper.requires_working_threading()
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
Expand Down Expand Up @@ -1064,6 +1148,7 @@ def test_pthread_sigmask_valid_signals(self):

@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
@threading_helper.requires_working_threading()
def test_pthread_sigmask(self):
code = """if 1:
import signal
Expand Down Expand Up @@ -1141,6 +1226,7 @@ def read_sigmask():

@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
@threading_helper.requires_working_threading()
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
Expand Down Expand Up @@ -1276,8 +1362,6 @@ def handler(signum, frame):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL

expected_sigs = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT

while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
Expand All @@ -1286,16 +1370,19 @@ def handler(signum, frame):

expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.monotonic() < deadline:
time.sleep(1e-5)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if len(sigs) >= expected_sigs:
break

# All ITIMER_REAL signals should have been delivered to the
# Python handler
self.assertEqual(len(sigs), N, "Some signals were lost")

@unittest.skip("TODO: RUSTPYTHON; hang")
@unittest.skipIf(sys.platform == "darwin", "crashes due to system bug (FB13453490)")
@unittest.skipUnless(hasattr(signal, "SIGUSR1"),
"test needs SIGUSR1")
@threading_helper.requires_working_threading()
def test_stress_modifying_handlers(self):
# bpo-43406: race condition between trip_signal() and signal.signal
signum = signal.SIGUSR1
Expand All @@ -1314,7 +1401,7 @@ def set_interrupts():
num_sent_signals += 1

def cycle_handlers():
while num_sent_signals < 100:
while num_sent_signals < 100 or num_received_signals < 1:
for i in range(20000):
# Cycle between a Python-defined and a non-Python handler
for handler in [custom_handler, signal.SIG_IGN]:
Expand Down Expand Up @@ -1347,7 +1434,7 @@ def cycle_handlers():
if not ignored:
# Sanity check that some signals were received, but not all
self.assertGreater(num_received_signals, 0)
self.assertLess(num_received_signals, num_sent_signals)
self.assertLessEqual(num_received_signals, num_sent_signals)
finally:
do_stop = True
t.join()
Expand Down Expand Up @@ -1388,6 +1475,23 @@ def handler(a, b):
signal.raise_signal(signal.SIGINT)
self.assertTrue(is_ok)

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test__thread_interrupt_main(self):
# See https://github.com/python/cpython/issues/102397
code = """if 1:
import _thread
class Foo():
def __del__(self):
_thread.interrupt_main()

x = Foo()
"""

rc, out, err = assert_python_ok('-c', code)
self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)



class PidfdSignalTest(unittest.TestCase):

Expand Down
Loading
0