@@ -47,36 +47,66 @@ class IPCBase:
47
47
48
48
connection = None # type: _IPCHandle
49
49
50
- def __init__ (self , name : str ) -> None :
50
+ def __init__ (self , name : str , timeout : Optional [ float ] ) -> None :
51
51
self .READ_SIZE = 100000
52
52
self .name = name
53
+ self .timeout = timeout
53
54
54
55
def read (self ) -> bytes :
55
56
"""Read bytes from an IPC connection until its empty."""
56
57
bdata = bytearray ()
57
- while True :
58
- if sys .platform == 'win32' :
59
- more , _ = _winapi .ReadFile (self .connection , self .READ_SIZE )
60
- else :
58
+ if sys .platform == 'win32' :
59
+ while True :
60
+ ov , err = _winapi .ReadFile (self .connection , self .READ_SIZE , overlapped = True )
61
+ # TODO: remove once typeshed supports Literal types
62
+ assert isinstance (ov , _winapi .Overlapped )
63
+ assert isinstance (err , int )
64
+ try :
65
+ if err != 0 :
66
+ assert err == _winapi .ERROR_IO_PENDING
67
+ timeout = int (self .timeout * 1000 ) if self .timeout else _winapi .INFINITE
68
+ res = _winapi .WaitForSingleObject (ov .event , timeout )
69
+ assert res == _winapi .WAIT_OBJECT_0
70
+ except BaseException :
71
+ ov .cancel ()
72
+ raise
73
+ _ , err = ov .GetOverlappedResult (True )
74
+ more = ov .getbuffer ()
75
+ if more :
76
+ bdata .extend (more )
77
+ if err == 0 :
78
+ # we are done!
79
+ break
80
+ elif err == _winapi .ERROR_OPERATION_ABORTED :
81
+ raise IPCException ("ReadFile operation aborted." )
82
+ else :
83
+ while True :
61
84
more = self .connection .recv (self .READ_SIZE )
62
- if not more :
63
- break
64
- bdata .extend (more )
85
+ if not more :
86
+ break
87
+ bdata .extend (more )
65
88
return bytes (bdata )
66
89
67
90
def write (self , data : bytes ) -> None :
68
91
"""Write bytes to an IPC connection."""
69
92
if sys .platform == 'win32' :
70
93
try :
71
- # Only send data if there is data to send, to avoid it
72
- # being confused with the empty message sent to terminate
73
- # the connection. (We will still send the end-of-message
74
- # empty message below, which will cause read to return.)
75
- if data :
67E6
76
- _winapi .WriteFile (self .connection , data )
77
- # this empty write is to copy the behavior of socket.sendall,
78
- # which also sends an empty message to signify it is done writing
79
- _winapi .WriteFile (self .connection , b'' )
94
+ ov , err = _winapi .WriteFile (self .connection , data , overlapped = True )
95
+ # TODO: remove once typeshed supports Literal types
96
+ assert isinstance (ov , _winapi .Overlapped )
97
+ assert isinstance (err , int )
98
+ try :
99
+ if err != 0 :
100
+ assert err == _winapi .ERROR_IO_PENDING
101
+ timeout = int (self .timeout * 1000 ) if self .timeout else _winapi .INFINITE
102
+ res = _winapi .WaitForSingleObject (ov .event , timeout )
103
+ assert res == _winapi .WAIT_OBJECT_0
104
+ except BaseException :
105
+ ov .cancel ()
106
+ raise
107
+ bytes_written , err = ov .GetOverlappedResult (True )
108
+ assert err == 0
109
+ assert bytes_written == len (data )
80
110
except WindowsError as e :
81
111
raise IPCException ("Failed to write with error: {}" .format (e .winerror ))
82
112
else :
@@ -95,9 +125,9 @@ class IPCClient(IPCBase):
95
125
"""The client side of an IPC connection."""
96
126
97
127
def __init__ (self , name : str , timeout : Optional [float ]) -> None :
98
- super ().__init__ (name )
128
+ super ().__init__ (name , timeout )
99
129
if sys .platform == 'win32' :
100
- timeout = int (timeout * 1000 ) if timeout else 0xFFFFFFFF # NMPWAIT_WAIT_FOREVER
130
+ timeout = int (self . timeout * 1000 ) if self . timeout else _winapi . NMPWAIT_WAIT_FOREVER
101
131
try :
102
132
_winapi .WaitNamedPipe (self .name , timeout )
103
133
except FileNotFoundError :
@@ -114,7 +144,7 @@ def __init__(self, name: str, timeout: Optional[float]) -> None:
114
144
0 ,
115
145
_winapi .NULL ,
116
146
_winapi .OPEN_EXISTING ,
117
- 0 ,
147
+ _winapi . FILE_FLAG_OVERLAPPED ,
118
148
_winapi .NULL ,
119
149
)
120
150
except WindowsError as e :
@@ -147,25 +177,26 @@ class IPCServer(IPCBase):
147
177
148
178
BUFFER_SIZE = 2 ** 16
149
179
150
- def __init__ (self , name : str , timeout : Optional [int ] = None ) -> None :
180
+ def __init__ (self , name : str , timeout : Optional [float ] = None ) -> None :
151
181
if sys .platform == 'win32' :
152
182
name = r'\\.\pipe\{}-{}.pipe' .format (
153
183
name , base64 .urlsafe_b64encode (os .urandom (6 )).decode ())
154
184
else :
155
185
name = '{}.sock' .format (name )
156
- super ().__init__ (name )
186
+ super ().__init__ (name , timeout )
157
187
if sys .platform == 'win32' :
158
188
self .connection = _winapi .CreateNamedPipe (self .name ,
159
189
_winapi .PIPE_ACCESS_DUPLEX
160
- | _winapi .FILE_FLAG_FIRST_PIPE_INSTANCE ,
190
+ | _winapi .FILE_FLAG_FIRST_PIPE_INSTANCE
191
+ | _winapi .FILE_FLAG_OVERLAPPED ,
161
192
_winapi .PIPE_READMODE_MESSAGE
162
193
| _winapi .PIPE_TYPE_MESSAGE
163
194
| _winapi .PIPE_WAIT
164
195
| 0x8 , # PIPE_REJECT_REMOTE_CLIENTS
165
196
1 , # one instance
166
197
self .BUFFER_SIZE ,
167
198
self .BUFFER_SIZE ,
168
- 1000 , # Default timeout in milis
199
+ _winapi . NMPWAIT_WAIT_FOREVER ,
169
200
0 , # Use default security descriptor
170
201
)
171
202
if self .connection == - 1 : # INVALID_HANDLE_VALUE
@@ -185,12 +216,24 @@ def __enter__(self) -> 'IPCServer':
185
216
# NOTE: It is theoretically possible that this will hang forever if the
186
217
# client never connects, though this can be "solved" by killing the server
187
218
try :
188
- _winapi .ConnectNamedPipe (self .connection , _winapi .NULL )
219
+ ov = _winapi .ConnectNamedPipe (self .connection , overlapped = True )
220
+ # TODO: remove once typeshed supports Literal types
221
+ assert isinstance (ov , _winapi .Overlapped )
189
222
except WindowsError as e :
190
- if e .winerror == _winapi .ERROR_PIPE_CONNECTED :
191
- pass # The client already exists, which is fine.
192
- else :
223
+ # Don't raise if the client already exists, or the client already connected
224
+ if e .winerror not in (_winapi .ERROR_PIPE_CONNECTED , _winapi .ERROR_NO_DATA ):
225
+ raise
226
+ else :
227
+ try :
228
+ timeout = int (self .timeout * 1000 ) if self .timeout else _winapi .INFINITE
229
+ res = _winapi .WaitForSingleObject (ov .event , timeout )
230
+ assert res == _winapi .WAIT_OBJECT_0
231
+ except BaseException :
232
+ ov .cancel ()
233
+ _winapi .CloseHandle (self .connection )
193
234
raise
235
+ _ , err = ov .GetOverlappedResult (True )
236
+ assert err == 0
194
237
else :
195
238
try :
196
239
self .connection , _ = self .sock .accept ()
0 commit comments