|
| 1 | +import _overlapped |
| 2 | +import _thread |
1 | 3 | import _winapi
|
2 | 4 | import math
|
3 |
| -import msvcrt |
4 |
| -import os |
5 |
| -import subprocess |
6 |
| -import uuid |
| 5 | +import struct |
7 | 6 | import winreg
|
8 |
| -from test.support import os_helper |
9 |
| -from test.libregrtest.utils import print_warning |
10 | 7 |
|
11 | 8 |
|
12 |
| -# Max size of asynchronous reads |
13 |
| -BUFSIZE = 8192 |
14 | 9 | # Seconds per measurement
|
15 | 10 | SAMPLING_INTERVAL = 1
|
16 | 11 | # Exponential damping factor to compute exponentially weighted moving average
|
|
19 | 14 | # Initialize the load using the arithmetic mean of the first NVALUE values
|
20 | 15 | # of the Processor Queue Length
|
21 | 16 | NVALUE = 5
|
22 |
| -# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names |
23 |
| -# of typeperf are registered |
24 |
| -COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" |
25 |
| - r"\Perflib\CurrentLanguage") |
26 | 17 |
|
27 | 18 |
|
28 | 19 | class WindowsLoadTracker():
|
29 | 20 | """
|
30 |
| - This class asynchronously interacts with the `typeperf` command to read |
31 |
| - the system load on Windows. Multiprocessing and threads can't be used |
32 |
| - here because they interfere with the test suite's cases for those |
33 |
| - modules. |
| 21 | + This class asynchronously reads the performance counters to calculate |
| 22 | + the system load on Windows. A "raw" thread is used here to prevent |
| 23 | + interference with the test suite's cases for the threading module. |
34 | 24 | """
|
35 | 25 |
|
36 | 26 | def __init__(self):
|
| 27 | + # Pre-flight test for access to the performance data; |
| 28 | + # `PermissionError` will be raised if not allowed |
| 29 | + winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) |
| 30 | + |
37 | 31 | self._values = []
|
38 | 32 | self._load = None
|
39 |
| - self._buffer = '' |
40 |
| - self._popen = None |
41 |
| - self.start() |
42 |
| - |
43 |
| - def start(self): |
44 |
| - # Create a named pipe which allows for asynchronous IO in Windows |
45 |
| - pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) |
46 |
| - |
47 |
| - open_mode = _winapi.PIPE_ACCESS_INBOUND |
48 |
| - open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE |
49 |
| - open_mode |= _winapi.FILE_FLAG_OVERLAPPED |
50 |
| - |
51 |
| - # This is the read end of the pipe, where we will be grabbing output |
52 |
| - self.pipe = _winapi.CreateNamedPipe( |
53 |
| - pipe_name, open_mode, _winapi.PIPE_WAIT, |
54 |
| - 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL |
55 |
| - ) |
56 |
| - # The write end of the pipe which is passed to the created process |
57 |
| - pipe_write_end = _winapi.CreateFile( |
58 |
| - pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, |
59 |
| - _winapi.OPEN_EXISTING, 0, _winapi.NULL |
60 |
| - ) |
61 |
| - # Open up the handle as a python file object so we can pass it to |
62 |
| - # subprocess |
63 |
| - command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) |
64 |
| - |
65 |
| - # Connect to the read end of the pipe in overlap/async mode |
66 |
| - overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) |
67 |
| - overlap.GetOverlappedResult(True) |
68 |
| - |
69 |
| - # Spawn off the load monitor |
70 |
| - counter_name = self._get_counter_name() |
71 |
| - command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] |
72 |
| - self._popen = subprocess.Popen(' '.join(command), |
73 |
| - stdout=command_stdout, |
74 |
| - cwd=os_helper.SAVEDCWD) |
75 |
| - |
76 |
| - # Close our copy of the write end of the pipe |
77 |
| - os.close(command_stdout) |
78 |
| - |
79 |
| - def _get_counter_name(self): |
80 |
| - # accessing the registry to get the counter localization name |
81 |
| - with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: |
82 |
| - counters = winreg.QueryValueEx(perfkey, 'Counter')[0] |
83 |
| - |
84 |
| - # Convert [key1, value1, key2, value2, ...] list |
85 |
| - # to {key1: value1, key2: value2, ...} dict |
86 |
| - counters = iter(counters) |
87 |
| - counters_dict = dict(zip(counters, counters)) |
88 |
| - |
89 |
| - # System counter has key '2' and Processor Queue Length has key '44' |
90 |
| - system = counters_dict['2'] |
91 |
| - process_queue_length = counters_dict['44'] |
92 |
| - return f'"\\{system}\\{process_queue_length}"' |
93 |
| - |
94 |
| - def close(self, kill=True): |
95 |
| - if self._popen is None: |
| 33 | + self._running = _overlapped.CreateEvent(None, True, False, None) |
| 34 | + self._stopped = _overlapped.CreateEvent(None, True, False, None) |
| 35 | + |
| 36 | + _thread.start_new_thread(self._update_load, (), {}) |
| 37 | + |
| 38 | + def _update_load(self, |
| 39 | + # localize module access to prevent shutdown errors |
| 40 | + _wait=_winapi.WaitForSingleObject, |
| 41 | + _signal=_overlapped.SetEvent): |
| 42 | + # run until signaled to stop |
| 43 | + while _wait(self._running, 1000): |
| 44 | + self._calculate_load() |
| 45 | + # notify stopped |
| 46 | + _signal(self._stopped) |
| 47 | + |
| 48 | + def _calculate_load(self, |
| 49 | + # localize module access to prevent shutdown errors |
| 50 | + _query=winreg.QueryValueEx, |
| 51 | + _hkey=winreg.HKEY_PERFORMANCE_DATA, |
| 52 | + _unpack=struct.unpack_from): |
| 53 | + # get the 'System' object |
| 54 | + data, _ = _query(_hkey, '2') |
| 55 | + # PERF_DATA_BLOCK { |
| 56 | + # WCHAR Signature[4] 8 + |
| 57 | + # DWOWD LittleEndian 4 + |
| 58 | + # DWORD Version 4 + |
| 59 | + # DWORD Revision 4 + |
| 60 | + # DWORD TotalByteLength 4 + |
| 61 | + # DWORD HeaderLength = 24 byte offset |
| 62 | + # ... |
| 63 | + # } |
| 64 | + obj_start, = _unpack('L', data, 24) |
| 65 | + # PERF_OBJECT_TYPE { |
| 66 | + # DWORD TotalByteLength |
| 67 | + # DWORD DefinitionLength |
| 68 | + # DWORD HeaderLength |
| 69 | + # ... |
| 70 | + # } |
| 71 | + data_start, defn_start = _unpack('4xLL', data, obj_start) |
| 72 | + data_base = obj_start + data_start |
| 73 | + defn_base = obj_start + defn_start |
| 74 | + # find the 'Processor Queue Length' counter (index=44) |
| 75 | + while defn_base < data_base: |
| 76 | + # PERF_COUNTER_DEFINITION { |
| 77 | + # DWORD ByteLength |
| 78 | + # DWORD CounterNameTitleIndex |
| 79 | + # ... [7 DWORDs/28 bytes] |
| 80 | + # DWORD CounterOffset |
| 81 | + # } |
| 82 | + size, idx, offset = _unpack('LL28xL', data, defn_base) |
| 83 | + defn_base += size |
| 84 | + if idx == 44: |
| 85 | + counter_offset = data_base + offset |
| 86 | + # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) |
| 87 | + processor_queue_length, = _unpack('L', data, counter_offset) |
| 88 | + break |
| 89 | + else: |
96 | 90 | return
|
97 | 91 |
|
98 |
| - self._load = None |
99 |
| - |
100 |
| - if kill: |
101 |
| - self._popen.kill() |
102 |
| - self._popen.wait() |
103 |
| - self._popen = None |
104 |
| - |
105 |
| - def __del__(self): |
106 |
| - self.close() |
107 |
| - |
108 |
| - def _parse_line(self, line): |
109 |
| - # typeperf outputs in a CSV format like this: |
110 |
| - # "07/19/2018 01:32:26.605","3.000000" |
111 |
| - # (date, process queue length) |
112 |
| - tokens = line.split(',') |
113 |
| - if len(tokens) != 2: |
114 |
| - raise ValueError |
115 |
| - |
116 |
| - value = tokens[1] |
117 |
| - if not value.startswith('"') or not value.endswith('"'): |
118 |
| - raise ValueError |
119 |
| - value = value[1:-1] |
120 |
| - return float(value) |
121 |
| - |
122 |
| - def _read_lines(self): |
123 |
| - overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) |
124 |
| - bytes_read, res = overlapped.GetOverlappedResult(False) |
125 |
| - if res != 0: |
126 |
| - return () |
127 |
| - |
128 |
| - output = overlapped.getbuffer() |
129 |
| - output = output.decode('oem', 'replace') |
130 |
| - output = self._buffer + output |
131 |
| - lines = output.splitlines(True) |
132 |
| - |
133 |
| - # bpo-36670: typeperf only writes a newline *before* writing a value, |
134 |
| - # not after. Sometimes, the written line in incomplete (ex: only |
135 |
| - # timestamp, without the process queue length). Only pass the last line |
136 |
| - # to the parser if it's a valid value, otherwise store it in |
137 |
| - # self._buffer. |
138 |
| - try: |
139 |
| - self._parse_line(lines[-1]) |
140 |
| - except ValueError: |
141 |
| - self._buffer = lines.pop(-1) |
| 92 | + # We use an exponentially weighted moving average, imitating the |
| 93 | + # load calculation on Unix systems. |
| 94 | + # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation |
| 95 | + # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average |
| 96 | + if self._load is not None: |
| 97 | + self._load = (self._load * LOAD_FACTOR_1 |
| 98 | + + processor_queue_length * (1.0 - LOAD_FACTOR_1)) |
| 99 | + elif len(self._values) < NVALUE: |
| 100 | + self._values.append(processor_queue_length) |
142 | 101 | else:
|
143 |
| - self._buffer = '' |
| 102 | + self._load = sum(self._values) / len(self._values) |
144 | 103 |
|
145 |
| - return lines |
| 104 | + def close(self, kill=True): |
| 105 | + self.__del__() |
| 106 | + return |
| 107 | + |
| 108 | + def __del__(self, |
| 109 | + # localize module access to prevent shutdown errors |
| 110 | + _wait=_winapi.WaitForSingleObject, |
| 111 | + _close=_winapi.CloseHandle, |
| 112 | + _signal=_overlapped.SetEvent): |
| 113 | + if self._running is not None: |
| 114 | + # tell the update thread to quit |
| 115 | + _signal(self._running) |
| 116 | + # wait for the update thread to signal done |
| 117 | + _wait(self._stopped, -1) |
| 118 | + # cleanup events |
| 119 | + _close(self._running) |
| 120 | + _close(self._stopped) |
| 121 | + self._running = self._stopped = None |
146 | 122 |
|
147 | 123 | def getloadavg(self):
|
148 |
| - if self._popen is None: |
149 |
| - return None |
150 |
| - |
151 |
| - returncode = self._popen.poll() |
152 |
| - if returncode is not None: |
153 |
| - self.close(kill=False) |
154 |
| - return None |
155 |
| - |
156 |
| - try: |
157 |
| - lines = self._read_lines() |
158 |
| - except BrokenPipeError: |
159 |
| - self.close() |
160 |
| - return None |
161 |
| - |
162 |
| - for line in lines: |
163 |
| - line = line.rstrip() |
164 |
| - |
165 |
| - # Ignore the initial header: |
166 |
| - # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" |
167 |
| - if 'PDH-CSV' in line: |
168 |
| - continue |
169 |
| - |
170 |
| - # Ignore blank lines |
171 |
| - if not line: |
172 |
| - continue |
173 |
| - |
174 |
| - try: |
175 |
| - processor_queue_length = self._parse_line(line) |
176 |
| - except ValueError: |
177 |
| - print_warning("Failed to parse typeperf output: %a" % line) |
178 |
| - continue |
179 |
| - |
180 |
| - # We use an exponentially weighted moving average, imitating the |
181 |
| - # load calculation on Unix systems. |
182 |
| - # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation |
183 |
| - # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average |
184 |
| - if self._load is not None: |
185 |
| - self._load = (self._load * LOAD_FACTOR_1 |
186 |
| - + processor_queue_length * (1.0 - LOAD_FACTOR_1)) |
187 |
| - elif len(self._values) < NVALUE: |
188 |
| - self._values.append(processor_queue_length) |
189 |
| - else: |
190 |
| - self._load = sum(self._values) / len(self._values) |
191 |
| - |
192 | 124 | return self._load
|
0 commit comments