@@ -67,16 +67,15 @@ data ConnectionParameters = ConnectionParameters
67
67
68
68
-- | Public
69
69
data Connection = Connection
70
- { connRawConnection :: RawConnection
71
- , connReceiverThread :: ThreadId
70
+ { connRawConnection :: RawConnection
71
+ , connReceiverThread :: ThreadId
72
72
-- channel only for Data messages
73
- , connOutDataChan :: OutChan (Either Error DataMessage )
73
+ , connOutDataChan :: OutChan (Either Error DataMessage )
74
74
-- channel for all the others messages
75
- , connOutAllChan :: OutChan ServerMessage
76
- , connStatementStorage :: StatementStorage
77
- , connParameters :: ConnectionParameters
78
- , connMode :: IORef ConnectionMode
79
- , connNotificationHandler :: NotificationHandler
75
+ , connOutAllChan :: OutChan ServerMessage
76
+ , connStatementStorage :: StatementStorage
77
+ , connParameters :: ConnectionParameters
78
+ , connMode :: IORef ConnectionMode
80
79
}
81
80
82
81
-- | Public
@@ -107,6 +106,7 @@ buildConnection rawConn connParams msgFilter = do
107
106
108
107
tid <- forkIO $
109
108
receiverThread msgFilter rawConn inDataChan inAllChan modeRef
109
+ defaultNotificationHandler
110
110
pure Connection
111
111
{ connRawConnection = rawConn
112
112
, connReceiverThread = tid
@@ -115,7 +115,6 @@ buildConnection rawConn connParams msgFilter = do
115
115
, connStatementStorage = storage
116
116
, connParameters = connParams
117
117
, connMode = modeRef
118
- , connNotificationHandler = defaultNotificationHandler
119
118
}
120
119
121
120
-- | Authorizes on the server and reads connection parameters.
@@ -203,8 +202,10 @@ receiverThread
203
202
-> InChan (Either Error DataMessage )
204
203
-> InChan ServerMessage
205
204
-> IORef ConnectionMode
205
+ -> NotificationHandler
206
206
-> IO ()
207
- receiverThread msgFilter rawConn dataChan allChan modeRef = receiveLoop []
207
+ receiverThread msgFilter rawConn dataChan allChan modeRef ntfHandler =
208
+ receiveLoop []
208
209
where
209
210
receiveLoop :: [V. Vector (Maybe B. ByteString )] -> IO ()
210
211
receiveLoop acc = do
@@ -215,23 +216,25 @@ receiverThread msgFilter rawConn dataChan allChan modeRef = receiveLoop []
215
216
go :: B. ByteString -> [V. Vector (Maybe B. ByteString )] -> IO [V. Vector (Maybe B. ByteString )]
216
217
go str acc = case runDecode decodeServerMessage str of
217
218
Right (rest, v) -> do
219
+ dispatchIfNotification v
218
220
when (msgFilter v) $ writeChan allChan v
219
221
mode <- readIORef modeRef
220
222
newAcc <- dispatch mode dataChan v acc
221
223
if B. null rest
222
224
then pure newAcc
223
225
else go rest newAcc
224
226
Left reason -> error reason
227
+ dispatchIfNotification (NotificationResponse n) = ntfHandler n
228
+ dispatchIfNotification _ = pure ()
229
+
225
230
226
231
dispatch :: ConnectionMode -> Dispatcher
227
232
dispatch SimpleQueryMode = dispatchSimple
228
233
dispatch ExtendedQueryMode = dispatchExtended
229
234
230
235
-- | Dispatcher for the SimpleQuery mode.
231
236
dispatchSimple :: Dispatcher
232
- dispatchSimple dataChan message acc = case message of
233
- NotificationResponse n -> pure acc
234
- _ -> pure acc
237
+ dispatchSimple dataChan message = pure
235
238
236
239
-- | Dispatcher for the ExtendedQuery mode.
237
240
dispatchExtended :: Dispatcher
@@ -247,15 +250,14 @@ dispatchExtended dataChan message acc = case message of
247
250
EmptyQueryResponse -> do
248
251
writeChan dataChan . Right . DataMessage $ reverse acc
249
252
pure []
250
- -- On ErrorResponse we should discard all the collected datarows
253
+ -- On ErrorResponse we should discard all the collected datarows.
251
254
ErrorResponse desc -> do
252
255
writeChan dataChan $ Left $ PostgresError desc
253
256
pure []
254
- -- TODO handle notifications
255
- NotificationResponse n -> pure acc
256
- -- We does not handled this case because we always send `execute`
257
+ -- We does not handled `PortalSuspended` because we always send `execute`
257
258
-- with no limit.
258
- PortalSuspended -> pure acc
259
+ -- PortalSuspended -> pure acc
260
+
259
261
-- do nothing on other messages
260
262
_ -> pure acc
261
263
0 commit comments