1+ # Test the Windows-only _winapi module
2+
3+ import os
4+ import pathlib
5+ import random
6+ import re
7+ import threading
8+ import time
9+ import unittest
10+ from test .support import import_helper , os_helper
11+
12+ _winapi = import_helper .import_module ('_winapi' , required_on = ['win' ])
13+
14+ MAXIMUM_WAIT_OBJECTS = 64
15+ MAXIMUM_BATCHED_WAIT_OBJECTS = (MAXIMUM_WAIT_OBJECTS - 1 ) ** 2
16+
17+ class WinAPIBatchedWaitForMultipleObjectsTests (unittest .TestCase ):
18+ def _events_waitall_test (self , n ):
19+ evts = [_winapi .CreateEventW (0 , False , False , None ) for _ in range (n )]
20+
21+ with self .assertRaises (TimeoutError ):
22+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
23+
24+ # Ensure no errors raised when all are triggered
25+ for e in evts :
26+ _winapi .SetEvent (e )
27+ try :
28+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
29+ except TimeoutError :
30+ self .fail ("expected wait to complete immediately" )
31+
32+ # Choose 8 events to set, distributed throughout the list, to make sure
33+ # we don't always have them in the first chunk
34+ chosen = [i * (len (evts ) // 8 ) for i in range (8 )]
35+
36+ # Replace events with invalid handles to make sure we fail
37+ for i in chosen :
38+ old_evt = evts [i ]
39+ evts [i ] = - 1
40+ with self .assertRaises (OSError ):
41+ _winapi .BatchedWaitForMultipleObjects (evts , True , 100 )
42+ evts [i ] = old_evt
43+
44+
45+ def _events_waitany_test (self , n ):
46+ evts = [_winapi .CreateEventW (0 , False , False , None ) for _ in range (n )]
47+
48+ with self .assertRaises (TimeoutError ):
49+ _winapi .BatchedWaitForMultipleObjects (evts , False , 100 )
50+
51+ # Choose 8 events to set, distributed throughout the list, to make sure
52+ # we don't always have them in the first chunk
53+ chosen = [i * (len (evts ) // 8 ) for i in range (8 )]
54+
55+ # Trigger one by one. They are auto-reset events, so will only trigger once
56+ for i in chosen :
57+ with self .subTest (f"trigger event { i } of { len (evts )} " ):
58+ _winapi .SetEvent (evts [i ])
59+ triggered = _winapi .BatchedWaitForMultipleObjects (evts , False , 10000 )
60+ self .assertSetEqual (set (triggered ), {i })
61+
62+ # Trigger all at once. This may require multiple calls
63+ for i in chosen :
64+ _winapi .SetEvent (evts [i ])
65+ triggered = set ()
66+ while len (triggered ) < len (chosen ):
67+ triggered .update (_winapi .BatchedWaitForMultipleObjects (evts , False , 10000 ))
68+ self .assertSetEqual (triggered , set (chosen ))
69+
70+ # Replace events with invalid handles to make sure we fail
71+ for i in chosen :
72+ with self .subTest (f"corrupt event { i } of { len (evts )} " ):
73+ old_evt = evts [i ]
74+ evts [i ] = - 1
75+ with self .assertRaises (OSError ):
8000
div>
76+ _winapi .BatchedWaitForMultipleObjects (evts , False , 100 )
77+ evts [i ] = old_evt
78+
79+
80+ # TODO: RUSTPYTHON
81+ @unittest .expectedFailure
82+ def test_few_events_waitall (self ):
83+ self ._events_waitall_test (16 )
84+
85+ # TODO: RUSTPYTHON
86+ @unittest .expectedFailure
87+ def test_many_events_waitall (self ):
88+ self ._events_waitall_test (256 )
89+
90+ # TODO: RUSTPYTHON
91+ @unittest .expectedFailure
92+ def test_max_events_waitall (self ):
93+ self ._events_waitall_test (MAXIMUM_BATCHED_WAIT_OBJECTS )
94+
95+
96+ # TODO: RUSTPYTHON
97+ @unittest .expectedFailure
98+ def test_few_events_waitany (self ):
99+ self ._events_waitany_test (16 )
100+
101+ # TODO: RUSTPYTHON
102+ @unittest .expectedFailure
103+ def test_many_events_waitany (self ):
104+ self ._events_waitany_test (256 )
105+
106+ # TODO: RUSTPYTHON
107+ @unittest .expectedFailure
108+ def test_max_events_waitany (self ):
109+ self ._events_waitany_test (MAXIMUM_BATCHED_WAIT_OBJECTS )
110+
111+
112+ class WinAPITests (unittest .TestCase ):
113+ # TODO: RUSTPYTHON
114+ @unittest .expectedFailure
115+ def test_getlongpathname (self ):
116+ testfn = pathlib .Path (os .getenv ("ProgramFiles" )).parents [- 1 ] / "PROGRA~1"
117+ if not os .path .isdir (testfn ):
118+ raise unittest .SkipTest ("require x:\\ PROGRA~1 to test" )
119+
120+ # pathlib.Path will be rejected - only str is accepted
121+ with self .assertRaises (TypeError ):
122+ _winapi .GetLongPathName (testfn )
123+
124+ actual = _winapi .GetLongPathName (os .fsdecode (testfn ))
125+
126+ # Can't assume that PROGRA~1 expands to any particular variation, so
127+ # ensure it matches any one of them.
128+ candidates = set (testfn .parent .glob ("Progra*" ))
129+ self .assertIn (pathlib .Path (actual ), candidates )
130+
131+ # TODO: RUSTPYTHON
132+ @unittest .expectedFailure
133+ def test_getshortpathname (self ):
134+ testfn = pathlib .Path (os .getenv ("ProgramFiles" ))
135+ if not os .path .isdir (testfn ):
136+ raise unittest .SkipTest ("require '%ProgramFiles%' to test" )
137+
138+ # pathlib.Path will be rejected - only str is accepted
139+ with self .assertRaises (TypeError ):
140+ _winapi .GetShortPathName (testfn )
141+
142+ actual = _winapi .GetShortPathName (os .fsdecode (testfn ))
143+
144+ # Should contain "PROGRA~" but we can't predict the number
145+ self .assertIsNotNone (re .match (r".\:\\PROGRA~\d" , actual .upper ()), actual )
146+
147+ # TODO: RUSTPYTHON
148+ @unittest .expectedFailure
149+ def test_namedpipe (self ):
150+ pipe_name = rf"\\.\pipe\LOCAL\{ os_helper .TESTFN } "
151+
152+ # Pipe does not exist, so this raises
153+ with self .assertRaises (FileNotFoundError ):
154+ _winapi .WaitNamedPipe (pipe_name , 0 )
155+
156+ pipe = _winapi .CreateNamedPipe (
157+ pipe_name ,
158+ _winapi .PIPE_ACCESS_DUPLEX ,
159+ 8 , # 8=PIPE_REJECT_REMOTE_CLIENTS
160+ 2 , # two instances available
161+ 32 , 32 , 0 , 0 )
162+ self .addCleanup (_winapi .CloseHandle , pipe )
163+
164+ # Pipe instance is available, so this passes
165+ _winapi .WaitNamedPipe (pipe_name , 0 )
166+
167+ with open (pipe_name , 'w+b' ) as pipe2 :
168+ # No instances available, so this times out
169+ # (WinError 121 does not get mapped to TimeoutError)
170+ with self .assertRaises (OSError ):
171+ _winapi .WaitNamedPipe (pipe_name , 0 )
172+
173+ _winapi .WriteFile (pipe , b'testdata' )
174+ self .assertEqual (b'testdata' , pipe2 .read (8 ))
175+
176+ self .assertEqual ((b'' , 0 ), _winapi .PeekNamedPipe (pipe , 8 )[:2 ])
177+ pipe2 .write (b'testdata' )
178+ pipe2 .flush ()
179+ self .assertEqual ((b'testdata' , 8 ), _winapi .PeekNamedPipe (pipe , 8 )[:2 ])
0 commit comments