8000 bpo-44336: Prevent tests hanging on child process handles on Windows … · python/cpython@19058b9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 19058b9

Browse files
authored
bpo-44336: Prevent tests hanging on child process handles on Windows (GH-26578)
Replace the child process `typeperf.exe` with a daemon thread that reads the performance counters directly. This prevents the issues that arise from inherited handles in grandchild processes (see issue37531 for discussion). We only use the load tracker when running tests in multiprocess mode. This prevents in 8000 advertent interactions with tests expecting a single threaded environment. Displaying load is really only helpful for buildbots running in multiprocess mode anyway.
1 parent 9ac2de9 commit 19058b9

File tree

2 files changed

+117
-185
lines changed

2 files changed

+117
-185
lines changed

Lib/test/libregrtest/main.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,24 @@ def run_tests(self):
533533

534534
if self.ns.use_mp:
535535
from test.libregrtest.runtest_mp import run_tests_multiprocess
536-
run_tests_multiprocess(self)
536+
# If we're on windows and this is the parent runner (not a worker),
537+
# track the load average.
538+
if sys.platform == 'win32' and self.worker_test_name is None:
539+
from test.libregrtest.win_utils import WindowsLoadTracker
540+
541+
try:
542+
self.win_load_tracker = WindowsLoadTracker()
543+
except PermissionError as error:
544+
# Standard accounts may not have access to the performance
545+
# counters.
546+
print(f'Failed to create WindowsLoadTracker: {error}')
547+
548+
try:
549+
run_tests_multiprocess(self)
550+
finally:
551+
if self.win_load_tracker is not None:
552+
self.win_load_tracker.close()
553+
self.win_load_tracker = None
537554
else:
538555
self.run_tests_sequential()
539556

@@ -695,28 +712,11 @@ def _main(self, tests, kwargs):
695712
self.list_cases()
696713
sys.exit(0)
697714

698-
# If we're on windows and this is the parent runner (not a worker),
699-
# track the load average.
700-
if sys.platform == 'win32' and self.worker_test_name is None:
701-
from test.libregrtest.win_utils import WindowsLoadTracker
702-
703-
try:
704-
self.win_load_tracker = WindowsLoadTracker()
705-
except FileNotFoundError as error:
706-
# Windows IoT Core and Windows Nano Server do not provide
707-
# typeperf.exe for x64, x86 or ARM
708-
print(f'Failed to create WindowsLoadTracker: {error}')
715+
self.run_tests()
716+
self.display_result()
709717

710-
try:
711-
self.run_tests()
712-
self.display_result()
713-
714-
if self.ns.verbose2 and self.bad:
715-
self.rerun_failed_tests()
716-
finally:
717-
if self.win_load_tracker is not None:
718-
self.win_load_tracker.close()
719-
self.win_load_tracker = None
718+
if self.ns.verbose2 and self.bad:
719+
self.rerun_failed_tests()
720720

721721
self.finalize()
722722

Lib/test/libregrtest/win_utils.py

Lines changed: 95 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1+
import _overlapped
2+
import _thread
13
import _winapi
24
import math
3-
import msvcrt
4-
import os
5-
import subprocess
6-
import uuid
5+
import struct
76
import winreg
8-
from test.support import os_helper
9-
from test.libregrtest.utils import print_warning
107

118

12-
# Max size of asynchronous reads
13-
BUFSIZE = 8192
149
# Seconds per measurement
1510
SAMPLING_INTERVAL = 1
1611
# Exponential damping factor to compute exponentially weighted moving average
@@ -19,174 +14,111 @@
1914
# Initialize the load using the arithmetic mean of the first NVALUE values
2015
# of the Processor Queue Length
2116
NVALUE = 5
22-
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
23-
# of typeperf are registered
24-
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
25-
r"\Perflib\CurrentLanguage")
2617

2718

2819
class WindowsLoadTracker():
2920
"""
30-
This class asynchronously interacts with the `typeperf` command to read
31-
the system load on Windows. Multiprocessing and threads can't be used
32-
here because they interfere with the test suite's cases for those
33-
modules.
21+
This class asynchronously reads the performance counters to calculate
22+
the system load on Windows. A "raw" thread is used here to prevent
23+
interference with the test suite's cases for the threading module.
3424
"""
3525

3626
def __init__(self):
27+
# Pre-flight test for access to the performance data;
28+
# `PermissionError` will be raised if not allowed
29+
winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
30+
3731
self._values = []
3832
self._load = None
39-
self._buffer = ''
40-
self._popen = None
41-
self.start()
42-
43-
def start(self):
44-
# Create a named pipe which allows for asynchronous IO in Windows
45-
pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
46-
47-
open_mode = _winapi.PIPE_ACCESS_INBOUND
48-
open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
49-
open_mode |= _winapi.FILE_FLAG_OVERLAPPED
50-
51-
# This is the read end of the pipe, where we will be grabbing output
52-
self.pipe = _winapi.CreateNamedPipe(
53-
pipe_name, open_mode, _winapi.PIPE_WAIT,
54-
1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
55-
)
56-
# The write end of the pipe which is passed to the created process
57-
pipe_write_end = _winapi.CreateFile(
58-
pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
59-
_winapi.OPEN_EXISTING, 0, _winapi.NULL
60-
)
61-
# Open up the handle as a python file object so we can pass it to
62-
# subprocess
63-
command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
64-
65-
# Connect to the read end of the pipe in overlap/async mode
66-
overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
67-
overlap.GetOverlappedResult(True)
68-
69-
# Spawn off the load monitor
70-
counter_name = self._get_counter_name()
71-
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
72-
self._popen = subprocess.Popen(' '.join(command),
73-
stdout=command_stdout,
74-
cwd=os_helper.SAVEDCWD)
75-
76-
# Close our copy of the write end of the pipe
77-
os.close(command_stdout)
78-
79-
def _get_counter_name(self):
80-
# accessing the registry to get the counter localization name
81-
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
82-
counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
83-
84-
# Convert [key1, value1, key2, value2, ...] list
85-
# to {key1: value1, key2: value2, ...} dict
86-
counters = iter(counters)
87-
counters_dict = dict(zip(counters, counters))
88-
89-
# System counter has key '2' and Processor Queue Length has key '44'
90-
system = counters_dict['2']
91-
process_queue_length = counters_dict['44']
92-
return f'"\\{system}\\{process_queue_length}"'
93-
94-
def close(self, kill=True):
95-
if self._popen is None:
33+
self._running = _overlapped.CreateEvent(None, True, False, None)
34+
self._stopped = _overlapped.CreateEvent(None, True, False, None)
35+
36+
_thread.start_new_thread(self._update_load, (), {})
37+
38+
def _update_load(self,
39+
# localize module access to prevent shutdown errors
40+
_wait=_winapi.WaitForSingleObject,
41+
_signal=_overlapped.SetEvent):
42+
# run until signaled to stop
43+
while _wait(self._running, 1000):
44+
self._calculate_load()
45+
# notify stopped
46+
_signal(self._stopped)
47+
48+
def _calculate_load(self,
49+
# localize module access to prevent shutdown errors
50+
_query=winreg.QueryValueEx,
51+
_hkey=winreg.HKEY_PERFORMANCE_DATA,
52+
_unpack=struct.unpack_from):
53+
# get the 'System' object
54+
data, _ = _query(_hkey, '2')
55+
# PERF_DATA_BLOCK {
56+
# WCHAR Signature[4] 8 +
57+
# DWOWD LittleEndian 4 +
58+
# DWORD Version 4 +
59+
# DWORD Revision 4 +
60+
# DWORD TotalByteLength 4 +
61+
# DWORD HeaderLength = 24 byte offset
62+
# ...
63+
# }
64+
obj_start, = _unpack('L', data, 24)
65+
# PERF_OBJECT_TYPE {
66+
# DWORD TotalByteLength
67+
# DWORD DefinitionLength
68+
# DWORD HeaderLength
69+
# ...
70+
# }
71+
data_start, defn_start = _unpack('4xLL', data, obj_start)
72+
data_base = obj_start + data_start
73+
defn_base = obj_start + defn_start
74+
# find the 'Processor Queue Length' counter (index=44)
75+
while defn_base < data_base:
76+
# PERF_COUNTER_DEFINITION {
77+
# DWORD ByteLength
78+
# DWORD CounterNameTitleIndex
79+
# ... [7 DWORDs/28 bytes]
80+
# DWORD CounterOffset
81+
# }
82+
size, idx, offset = _unpack('LL28xL', data, defn_base)
83+
defn_base += size
84+
if idx == 44:
85+
counter_offset = data_base + offset
86+
# the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
87+
processor_queue_length, = _unpack('L', data, counter_offset)
88+
break
89+
else:
9690
return
9791

98-
self._load = None
99-
100-
if kill:
101-
self._popen.kill()
102-
self._popen.wait()
103-
self._popen = None
104-
105-
def __del__(self):
106-
self.close()
107-
108-
def _parse_line(self, line):
109-
# typeperf outputs in a CSV format like this:
110-
# "07/19/2018 01:32:26.605","3.000000"
111-
# (date, process queue length)
112-
tokens = line.split(',')
113-
if len(tokens) != 2:
114-
raise ValueError
115-
116-
value = tokens[1]
117-
if not value.startswith('"') or not value.endswith('"'):
118-
raise ValueError
119-
value = value[1:-1]
120-
return float(value)
121-
122-
def _read_lines(self):
123-
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
124-
bytes_read, res = overlapped.GetOverlappedResult(False)
125-
if res != 0:
126-
return ()
127-
128-
output = overlapped.getbuffer()
129-
output = output.decode('oem', 'replace')
130-
output = self._buffer + output
131-
lines = output.splitlines(True)
132-
133-
# bpo-36670: typeperf only writes a newline *before* writing a value,
134-
# not after. Sometimes, the written line in incomplete (ex: only
135-
# timestamp, without the process queue length). Only pass the last line
136-
# to the parser if it's a valid value, otherwise store it in
137-
# self._buffer.
138-
try:
139-
self._parse_line(lines[-1])
140-
except ValueError:
141-
self._buffer = lines.pop(-1)
92+
# We use an exponentially weighted moving average, imitating the
93+
# load calculation on Unix systems.
94+
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
95+
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
96+
if self._load is not None:
97+
self._load = (self._load * LOAD_FACTOR_1
98+
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
99+
elif len(self._values) < NVALUE:
100+
self._values.append(processor_queue_length)
142101
else:
143-
self._buffer = ''
102+
self._load = sum(self._values) / len(self._values)
144103

145-
return lines
104+
def close(self, kill=True):
105+
self.__del__()
106+
return
107+
108+
def __del__(self,
109+
# localize module access to prevent shutdown errors
110+
_wait=_winapi.WaitForSingleObject,
111+
_close=_winapi.CloseHandle,
112+
_signal=_overlapped.SetEvent):
113+
if self._running is not None:
114+
# tell the update thread to quit
115+
_signal(self._running)
116+
# wait for the update thread to signal done
117+
_wait(self._stopped, -1)
118+
# cleanup events
119+
_close(self._running)
120+
_close(self._stopped)
121+
self._running = self._stopped = None
146122

147123
def getloadavg(self):
148-
if self._popen is None:
149-
return None
150-
151-
returncode = self._popen.poll()
152-
if returncode is not None:
153-
self.close(kill=False)
154-
return None
155-
156-
try:
157-
lines = self._read_lines()
158-
except BrokenPipeError:
159-
self.close()
160-
return None
161-
162-
for line in lines:
163-
line = line.rstrip()
164-
165-
# Ignore the initial header:
166-
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
167-
if 'PDH-CSV' in line:
168-
continue
169-
170-
# Ignore blank lines
171-
if not line:
172-
continue
173-
174-
try:
175-
processor_queue_length = self._parse_line(line)
176-
except ValueError:
177-
print_warning("Failed to parse typeperf output: %a" % line)
178-
continue
179-
180-
# We use an exponentially weighted moving average, imitating the
181-
# load calculation on Unix systems.
182-
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
183-
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
184-
if self._load is not None:
185-
self._load = (self._load * LOAD_FACTOR_1
186-
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
187-
elif len(self._values) < NVALUE:
188-
self._values.append(processor_queue_length)
189-
else:
190-
self._load = sum(self._values) / len(self._values)
191-
192124
return self._load

0 commit comments

Comments
 (0)
0