8000 Exception handling in receiver thread · postgres-haskell/postgres-wire@f0e8753 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit f0e8753

Browse files
Exception handling in receiver thread
1 parent 384900d commit f0e8753

File tree

2 files changed

+40
-16
lines changed

2 files changed

+40
-16
lines changed

src/Database/PostgreSQL/Driver/Connection.hs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import Data.Traversable
99
import Data.Foldable
1010
import Control.Applicative
1111
import Control.Exception
12+
import GHC.Conc (labelThread)
1213
import Data.IORef
1314
import Data.Monoid
14-
import Control.Concurrent (forkIO, killThread, ThreadId, threadDelay)
15+
import Control.Concurrent (forkIOWithUnmask, killThread, ThreadId, threadDelay)
1516
import Data.Binary.Get ( runGetIncremental, pushChunk)
1617
import qualified Data.Binary.Get as BG (Decoder(..))
1718
import qualified Data.Vector as V
@@ -43,6 +44,9 @@ data Connection = Connection
4344
, connMode :: IORef ConnectionMode
4445
}
4546

47+
type InDataChan = InChan (Either Error DataMessage)
48+
type InAllChan = InChan (Either Error ServerMessage)
49+
4650
-- | Parameters of the current connection.
4751
-- We store only the parameters that cannot change after startup.
4852
-- For more information about additional parameters see
@@ -71,7 +75,7 @@ defaultNotificationHandler :: NotificationHandler
7175
defaultNotificationHandler = const $ pure ()
7276

7377
type DataDispatcher
74-
= InChan (Either Error DataMessage)
78+
= InDataChan
7579
-> ServerMessage
7680
-> [V.Vector (Maybe B.ByteString)]
7781
-> IO [V.Vector (Maybe B.ByteString)]
@@ -153,9 +157,12 @@ buildConnection rawConn connParams msgFilter = do
153157
storage <- newStatementStorage
154158
modeRef <- newIORef defaultConnectionMode
155159

156-
tid <- forkIO $
157-
receiverThread msgFilter rawConn inDataChan inAllChan modeRef
158-
defaultNotificationHandler
160+
tid <- mask_ $ forkIOWithUnmask $ \unmask ->
161+
unmask (receiverThread msgFilter rawConn
162+
inDataChan inAllChan modeRef defaultNotificationHandler)
163+
`catch` receiverOnException inDataChan inAllChan
164+
labelThread tid "postgres-wire receiver"
165+
159166
pure Connection
160167
{ connRawConnection = rawConn
161168
, connReceiverThread = tid
@@ -203,11 +210,14 @@ close conn = do
203210
killThread $ connReceiverThread conn
204211
rClose $ connRawConnection conn
205212

213+
-- | When thread receives unexpected exception or fihishes by
214+
-- any reason, than it writes Error to BOTH chans to prevent other threads
215+
-- blocking on reading from these chans.
206216
receiverThread
207217
:: ServerMessageFilter
208218
-> RawConnection
209-
-> InChan (Either Error DataMessage)
210-
-> InChan (Either Error ServerMessage)
219+
-> InDataChan
220+
-> InAllChan
211221
-> IORef ConnectionMode
212222
-> NotificationHandler
213223
-> IO ()
@@ -224,28 +234,40 @@ receiverThread msgFilter rawConn dataChan allChan modeRef ntfHandler =
224234
r <- rReceive rawConn 4096
225235
receiveLoop Nothing (bs <> r) acc
226236
| otherwise = case runDecode decodeHeader bs of
227-
Left reason -> pushDecodeError $ DecodeError $ BS.pack reason
237+
Left reason -> reportReceiverError dataChan allChan
238+
$ DecodeError $ BS.pack reason
228239
Right (rest, h) -> receiveLoop (Just h) rest acc
229240
-- Parsing body
230241
receiveLoop (Just h@(Header _ len)) bs acc
231242
| B.length bs < len = do
232243
r <- rReceive rawConn 4096
233244
receiveLoop (Just h) (bs <> r) acc
234245
| otherwise = case runDecode (decodeServerMessage h) bs of
235-
Left reason -> pushDecodeError $ DecodeError $ BS.pack reason
246+
Left reason -> reportReceiverError dataChan allChan
247+
$ DecodeError $ BS.pack reason
236248
Right (rest, v) -> do
237-
dispatchIfNotification v
249+
dispatchIfNotification v ntfHandler
238250
when (msgFilter v) $ writeChan allChan $ Right v
239251
mode <- readIORef modeRef
240252
newAcc <- dispatch mode dataChan v acc
241253
receiveLoop Nothing rest newAcc
242254

243-
dispatchIfNotification (NotificationResponse n) = ntfHandler n
244-
dispatchIfNotification _ = pure ()
245-
246-
pushDecodeError err = do
247-
writeChan dataChan (Left err)
248-
writeChan allChan (Left err)
255+
dispatchIfNotification :: ServerMessage -> NotificationHandler -> IO ()
256+
dispatchIfNotification msg handler = case msg of
257+
NotificationResponse n -> handler n
258+
_ -> pure ()
259+
260+
-- | Exception handler for receiver thread.
261+
-- Called only in masked state.
262+
receiverOnException :: InDataChan -> InAllChan -> SomeException -> IO ()
263+
receiverOnException dataChan allChan exc =
264+
reportReceiverError dataChan allChan $ UnexpectedError exc
265+
266+
-- | Reporting about any unexpected error to the thread than reads from chans.
267+
reportReceiverError :: InDataChan -> InAllChan -> Error -> IO ()
268+
reportReceiverError dataChan allChan err = do
269+
writeChan dataChan (Left err)
270+
writeChan allChan (Left err)
249271

250272
dispatch :: ConnectionMode -> DataDispatcher
251273
dispatch SimpleQueryMode = dispatchSimple

src/Database/PostgreSQL/Driver/Error.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Database.PostgreSQL.Driver.Error where
22

3+
import Control.Exception
34
import Data.ByteString (ByteString)
45
import System.Socket (AddressInfoException)
56

@@ -11,6 +12,7 @@ data Error
1112
| DecodeError ByteString
1213
| AuthError AuthError
1314
| ImpossibleError ByteString
15+
| UnexpectedError SomeException
1416
deriving (Show)
1517

1618
-- Errors that might occur at authorization phase.

0 commit comments

Comments
 (0)
0