@@ -9,9 +9,10 @@ import Data.Traversable
9
9
import Data.Foldable
10
10
import Control.Applicative
11
11
import Control.Exception
12
+ import GHC.Conc (labelThread )
12
13
import Data.IORef
13
14
import Data.Monoid
14
- import Control.Concurrent (forkIO , killThread , ThreadId , threadDelay )
15
+ import Control.Concurrent (forkIOWithUnmask , killThread , ThreadId , threadDelay )
15
16
import Data.Binary.Get ( runGetIncremental , pushChunk )
16
17
import qualified Data.Binary.Get as BG (Decoder (.. ))
17
18
import qualified Data.Vector as V
@@ -43,6 +44,9 @@ data Connection = Connection
43
44
, connMode :: IORef ConnectionMode
44
45
}
45
46
47
+ type InDataChan = InChan (Either Error DataMessage )
48
+ type InAllChan = InChan (Either Error ServerMessage )
49
+
46
50
-- | Parameters of the current connection.
47
51
-- We store only the parameters that cannot change after startup.
48
52
-- For more information about additional parameters see
@@ -71,7 +75,7 @@ defaultNotificationHandler :: NotificationHandler
71
75
defaultNotificationHandler = const $ pure ()
72
76
73
77
type DataDispatcher
74
- = InChan ( Either Error DataMessage )
78
+ = InDataChan
75
79
-> ServerMessage
76
80
-> [V. Vector (Maybe B. ByteString )]
77
81
-> IO [V. Vector (Maybe B. ByteString )]
@@ -153,9 +157,12 @@ buildConnection rawConn connParams msgFilter = do
153
157
storage <- newStatementStorage
154
158
modeRef <- newIORef defaultConnectionMode
155
159
156
- tid <- forkIO $
157
- receiverThread msgFilter rawConn inDataChan inAllChan modeRef
158
- defaultNotificationHandler
160
+ tid <- mask_ $ forkIOWithUnmask $ \ unmask ->
161
+ unmask (receiverThread msgFilter rawConn
162
+ inDataChan inAllChan modeRef defaultNotificationHandler)
163
+ `catch` receiverOnException inDataChan inAllChan
164
+ labelThread tid " postgres-wire receiver"
165
+
159
166
pure Connection
160
167
{ connRawConnection = rawConn
161
168
, connReceiverThread = tid
@@ -203,11 +210,14 @@ close conn = do
203
210
killThread $ connReceiverThread conn
204
211
rClose $ connRawConnection conn
205
212
213
+ -- | When thread receives unexpected exception or fihishes by
214
+ -- any reason, than it writes Error to BOTH chans to prevent other threads
215
+ -- blocking on reading from these chans.
206
216
receiverThread
207
217
:: ServerMessageFilter
208
218
-> RawConnection
209
- -> InChan ( Either Error DataMessage )
210
- -> InChan ( Either Error ServerMessage )
219
+ -> InDataChan
220
+ -> InAllChan
211
221
-> IORef ConnectionMode
212
222
-> NotificationHandler
213
223
-> IO ()
@@ -224,28 +234,40 @@ receiverThread msgFilter rawConn dataChan allChan modeRef ntfHandler =
224
234
r <- rReceive rawConn 4096
225
235
receiveLoop Nothing (bs <> r) acc
226
236
| otherwise = case runDecode decodeHeader bs of
227
- Left reason -> pushDecodeError $ DecodeError $ BS. pack reason
237
+ Left reason -> reportReceiverError dataChan allChan
238
+ $ DecodeError $ BS. pack reason
228
239
Right (rest, h) -> receiveLoop (Just h) rest acc
229
240
-- Parsing body
230
241
receiveLoop (Just h@ (Header _ len)) bs acc
231
242
| B. length bs < len = do
232
243
r <- rReceive rawConn 4096
233
244
receiveLoop (Just h) (bs <> r) acc
234
245
| otherwise = case runDecode (decodeServerMessage h) bs of
235
- Left reason -> pushDecodeError $ DecodeError $ BS. pack reason
246
+ Left reason -> reportReceiverError dataChan allChan
247
+ $ DecodeError $ BS. pack reason
236
248
Right (rest, v) -> do
237
- dispatchIfNotification v
249
+ dispatchIfNotification v ntfHandler
238
250
when (msgFilter v) $ writeChan allChan $ Right v
239
251
mode <- readIORef modeRef
240
252
newAcc <- dispatch mode dataChan v acc
241
253
receiveLoop Nothing rest newAcc
242
254
243
- dispatchIfNotification (NotificationResponse n) = ntfHandler n
244
- dispatchIfNotification _ = pure ()
245
-
246
- pushDecodeError err = do
247
- writeChan dataChan (Left err)
248
- writeChan allChan (Left err)
255
+ dispatchIfNotification :: ServerMessage -> NotificationHandler -> IO ()
256
+ dispatchIfNotification msg handler = case msg of
257
+ NotificationResponse n -> handler n
258
+ _ -> pure ()
259
+
260
+ -- | Exception handler for receiver thread.
261
+ -- Called only in masked state.
262
+ receiverOnException :: InDataChan -> InAllChan -> SomeException -> IO ()
263
+ receiverOnException dataChan allChan exc =
264
+ reportReceiverError dataChan allChan $ UnexpectedError exc
265
+
266
+ -- | Reporting about any unexpected error to the thread than reads from chans.
267
+ reportReceiverError :: InDataChan -> InAllChan -> Error -> IO ()
268
+ reportReceiverError dataChan allChan err = do
269
+ writeChan dataChan (Left err)
270
+ writeChan allChan (Left err)
249
271
250
272
dispatch :: ConnectionMode -> DataDispatcher
251
273
dispatch SimpleQueryMode = dispatchSimple
0 commit comments