1
+ # Test the Windows-only _winapi module
2
+
3
+ import os
<
8000
div aria-hidden="true" class="position-absolute top-0 d-flex user-select-none DiffLineTableCellParts-module__comment-indicator--eI0hb">
4
+ import pathlib
5
+ import random
6
+ import re
7
+ import threading
8
+ import time
9
+ import unittest
10
+ from test .support import import_helper , os_helper
11
+
12
+ _winapi = import_helper .import_module ('_winapi' , required_on = ['win' ])
13
+
14
+ MAXIMUM_WAIT_OBJECTS = 64
15
+ MAXIMUM_BATCHED_WAIT_OBJECTS = (MAXIMUM_WAIT_OBJECTS - 1 ) ** 2
16
+
17
+ class WinAPIBatchedWaitForMultipleObjectsTests (unittest .TestCase ):
18
+ def _events_waitall_test (self , n ):
19
+ evts = [_winapi .CreateEventW (0 , False , False , None ) for _ in range (n )]
20
+
21
+ with self .assertRaises (TimeoutError ):
22
+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
23
+
24
+ # Ensure no errors raised when all are triggered
25
+ for e in evts :
26
+ _winapi .SetEvent (e )
27
+ try :
28
+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
29
+ except TimeoutError :
30
+ self .fail ("expected wait to complete immediately" )
31
+
32
+ # Choose 8 events to set, distributed throughout the list, to make sure
33
+ # we don't always have them in the first chunk
34
+ chosen = [i * (len (evts ) // 8 ) for i in range (8 )]
35
+
36
+ # Replace events with invalid handles to make sure we fail
37
+ for i in chosen :
38
+ old_evt = evts [i ]
39
+ evts [i ] = - 1
40
+ with self .assertRaises (OSError ):
41
+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
42
+ evts [i ] = old_evt
43
+
44
+
45
+ def _events_waitany_test (self , n ):
46
+ evts = [_winapi .CreateEventW (0 , False , False , None ) for _ in range (n )]
47
+
48
+ with self .assertRaises (TimeoutError ):
49
+ _winapi .BatchedWaitForMultipleObjects (evts , False , 100 )
50
+
51
+ # Choose 8 events to set, distributed throughout the list, to make sure
52
+ # we don't always have them in the first chunk
53
+ chosen = [i * (len (evts ) // 8 ) for i in range (8 )]
54
+
55
+ # Trigger one by one. They are auto-reset events, so will only trigger once
56
+ for i in chosen :
57
+ with self .subTest (f"trigger event { i } of { len (evts )} " ):
58
+ _winapi .SetEvent (evts [i ])
59
+ triggered = _winapi .BatchedWaitForMultipleObjects (evts , False , 10000 )
60
+ self .assertSetEqual (set (triggered ), {i })
61
+
62
+ # Trigger all at once. This may require multiple calls
63
+ for i in chosen :
64
+ _winapi .SetEvent (evts [i ])
65
+ triggered = set ()
66
+ while len (triggered ) < len (chosen ):
67
+ triggered .update (_winapi .BatchedWaitForMultipleObjects (evts , False , 10000 ))
68
+ self .assertSetEqual (triggered , set (chosen ))
69
+
70
+ # Replace events with invalid handles to make sure we fail
71
+ for i in chosen :
72
+ with self .subTest (f"corrupt event { i } of { len (evts )} " ):
73
+ old_evt = evts [i ]
74
+ evts [i ] = - 1
75
+ with self .assertRaises (OSError ):
76
+ _winapi .BatchedWaitForMultipleObjects (evts , False , 100 )
77
+ evts [i ] = old_evt
78
+
79
+
80
+ # TODO: RUSTPYTHON
81
+ @unittest .expectedFailure
82
+ def test_few_events_waitall (self ):
83
+ self ._events_waitall_test (16 )
84
+
85
+ # TODO: RUSTPYTHON
86
+ @unittest .expectedFailure
87
+ def test_many_events_waitall (self ):
88
+ self ._events_waitall_test (256 )
89
+
90
+ # TODO: RUSTPYTHON
91
+ @unittest .expectedFailure
92
+ def test_max_events_waitall (self ):
93
+ self ._events_waitall_test (MAXIMUM_BATCHED_WAIT_OBJECTS )
94
+
95
+
96
+ # TODO: RUSTPYTHON
97
+ @unittest .expectedFailure
98
+ def test_few_events_waitany (self ):
99
+ self ._events_waitany_test (16 )
100
+
101
+ # TODO: RUSTPYTHON
102
+ @unittest .expectedFailure
103
+ def test_many_events_waitany (self ):
104
+ self ._events_waitany_test (256 )
105
+
106
+ # TODO: RUSTPYTHON
107
+ @unittest .expectedFailure
108
+ def test_max_events_waitany (self ):
109
+ self ._events_waitany_test (MAXIMUM_BATCHED_WAIT_OBJECTS )
110
+
111
+
112
+ class WinAPITests (unittest .TestCase ):
113
+ # TODO: RUSTPYTHON
114
+ @unittest .expectedFailure
115
+ def test_getlongpathname (self ):
116
+ testfn = pathlib .Path (os .getenv ("ProgramFiles" )).parents [- 1 ] / "PROGRA~1"
117
+ if not os .path .isdir (testfn ):
118
+ raise unittest .SkipTest ("require x:\\ PROGRA~1 to test" )
119
+
120
+ # pathlib.Path will be rejected - only str is accepted
121
+ with self .assertRaises (TypeError ):
122
+ _winapi .GetLongPathName (testfn )
123
+
124
+ actual = _winapi .GetLongPathName (os .fsdecode (testfn ))
125
+
126
+ # Can't assume that PROGRA~1 expands to any particular variation, so
127
+ # ensure it matches any one of them.
128
+ candidates = set (testfn .parent .glob ("Progra*" ))
129
+ self .assertIn (pathlib .Path (actual ), candidates )
130
+
131
+ # TODO: RUSTPYTHON
132
+ @unittest .expectedFailure
133
+ def test_getshortpathname (self ):
134
+ testfn = pathlib .Path (os .getenv ("ProgramFiles" ))
135
+ if not os .path .isdir (testfn ):
136
+ raise unittest .SkipTest ("require '%ProgramFiles%' to test" )
137
+
138
+ # pathlib.Path will be rejected - only str is accepted
139
+ with self .assertRaises (TypeError ):
140
+ _winapi .GetShortPathName (testfn )
141
+
142
+ actual = _winapi .GetShortPathName (os .fsdecode (testfn ))
143
+
144
+ # Should contain "PROGRA~" but we can't predict the number
145
+ self .assertIsNotNone (re .match (r".\:\\PROGRA~\d" , actual .upper ()), actual )
146
+
147
+ # TODO: RUSTPYTHON
148
+ @unittest .expectedFailure
149
+ def test_namedpipe (self ):
150
+ pipe_name = rf"\\.\pipe\LOCAL\{ os_helper .TESTFN } "
151
+
152
+ # Pipe does not exist, so this raises
153
+ with self .assertRaises (FileNotFoundError ):
154
+ _winapi .WaitNamedPipe (pipe_name , 0 )
155
+
156
+ pipe = _winapi .CreateNamedPipe (
157
+ pipe_name ,
158
+ _winapi .PIPE_ACCESS_DUPLEX ,
159
+ 8 , # 8=PIPE_REJECT_REMOTE_CLIENTS
160
+ 2 , # two instances available
161
+ 32 , 32 , 0 , 0 )
162
+ self .addCleanup (_winapi .CloseHandle , pipe )
163
+
164
+ # Pipe instance is available, so this passes
165
+ _winapi .WaitNamedPipe (pipe_name , 0 )
166
+
167
+ with open (pipe_name , 'w+b' ) as pipe2 :
168
+ # No instances available, so this times out
169
+ # (WinError 121 does not get mapped to TimeoutError)
170
+ with self .assertRaises (OSError ):
171
+ _winapi .WaitNamedPipe (pipe_name , 0 )
172
+
173
+ _winapi .WriteFile (pipe , b'testdata' )
174
+ self .assertEqual (b'testdata' , pipe2 .read (8 ))
175
+
176
+ self .assertEqual ((b'' , 0 ), _winapi .PeekNamedPipe (pipe , 8 )[:2 ])
177
+ pipe2 .write (b'testdata' )
178
+ pipe2 .flush ()
179
+ self .assertEqual ((b'testdata' , 8 ), _winapi .PeekNamedPipe (pipe , 8 )[:2 ])
0 commit comments