@@ -21,7 +21,7 @@ import Data.Binary.Get ( runGetIncremental, pushChunk)
21
21
import qualified Data.Binary.Get as BG (Decoder (.. ))
22
22
import Data.Maybe (fromJust )
23
23
import qualified Data.Vector as V
24
- import System.Socket hiding (connect , close )
24
+ import System.Socket hiding (connect , close , Error ( .. ) )
25
25
import qualified System.Socket as Socket (connect , close )
26
26
import System.Socket.Family.Inet6
27
27
import System.Socket.Type.Stream
@@ -43,24 +43,27 @@ type UnixSocket = Socket Unix Stream Unix
43
43
data Connection = Connection
44
44
{ connSocket :: UnixSocket
45
45
, connReceiverThread :: ThreadId
46
- -- Chan for only data messages
47
- , connDataOutChan :: OutChan (Either Error DataMessage )
48
- -- Chan for all messages that filter
49
- , connAllOutChan :: OutChan ServerMessage
46
+ -- channel only for Data messages
47
+ , connOutDataChan :: OutChan (Either Error DataMessage )
48
+ -- channel for all the others messages
49
+ , connOutAllChan :: OutChan ServerMessage
50
50
, connStatementStorage :: StatementStorage
51
51
, connParameters :: ConnectionParameters
52
52
}
53
53
54
- newtype ServerMessageFilter = ServerMessageFilter ( ServerMessage -> Bool )
54
+ type ServerMessageFilter = ServerMessage -> Bool
55
55
56
56
type NotificationHandler = Notification -> IO ()
57
57
58
58
-- All possible errors
59
59
data Error
60
60
= PostgresError ErrorDesc
61
61
| ImpossibleError
62
+ deriving (Show )
63
+
64
+ data DataMessage = DataMessage [V. Vector B. ByteString ]
65
+ deriving (Show )
62
66
63
- data DataMessage = DataMessage B. ByteString
64
67
65
68
address :: SocketAddress Unix
66
69
address = fromJust $ socketAddressUnixPath " /var/run/postgresql/.s.PGSQL.5432"
@@ -73,13 +76,15 @@ connect settings = do
73
76
r <- receive s 4096 mempty
74
77
readAuthMessage r
75
78
76
- (inChan, outChan) <- newChan
77
- tid <- forkIO $ receiverThread s inChan
79
+ (inDataChan, outDataChan) <- newChan
80
+ (inAllChan, outAllChan) <- newChan
81
+ tid <- forkIO $ receiverThread s inDataChan inAllChan
78
82
storage <- newStatementStorage
79
83
pure Connection
80
84
{ connSocket = s
81
85
, connReceiverThread = tid
82
- , connOutChan = outChan
86
+ , connOutDataChan = outDataChan
87
+ , connOutAllChan = outAllChan
83
88
, connStatementStorage = storage
84
89
, connParameters = ConnectionParameters
85
90
{ paramServerVersion = ServerVersion 1 1 1
@@ -115,48 +120,99 @@ readAuthMessage s =
115
120
_ -> error " Invalid auth"
116
121
f -> error $ show s
117
122
118
- receiverThread :: UnixSocket -> InChan ServerMessage -> IO ()
119
- receiverThread sock chan = forever $ do
120
- r <- receive sock 4096 mempty
121
- print r
122
- go r
123
+ receiverThread
124
+ :: UnixSocket
125
+ -> InChan (Either Error DataMessage )
126
+ -> InChan ServerMessage
127
+ -> IO ()
128
+ receiverThread sock dataChan allChan = receiveLoop []
123
129
where
130
+ receiveLoop :: [V. Vector B. ByteString ] -> IO ()
131
+ receiveLoop acc = do
132
+ r <- receive sock 4096 mempty
133
+ -- print r
134
+ go r acc >>= receiveLoop
135
+
124
136
decoder = runGetIncremental decodeServerMessage
125
- go str = case pushChunk decoder str of
137
+ go :: B. ByteString -> [V. Vector B. ByteString ] -> IO [V. Vector B. ByteString ]
138
+ go str acc = case pushChunk decoder str of
126
139
BG. Done rest _ v -> do
127
140
putStrLn $ " Received: " ++ show v
128
- unless (B. null rest) $ go rest
141
+ -- TODO select filter
142
+ when (defaultFilter v) $ writeChan allChan v
143
+ newAcc <- dispatch v acc
144
+ if B. null rest
145
+ then pure newAcc
146
+ else go rest newAcc
129
147
BG. Partial _ -> error " Partial"
130
148
BG. Fail _ _ e -> error e
131
- dispatch :: ServerMessage -> IO ()
132
- -- dont receiving at this phase
133
- dispatch (BackendKeyData _ _) = pure ()
134
- dispatch (BindComplete ) = pure ()
135
- dispatch CloseComplete = pure ()
136
- -- maybe return command result too
137
- dispatch (CommandComplete _) = pure ()
138
- dispatch r@ (DataRow _) = writeChan chan r
139
- -- TODO throw error here
140
- dispatch EmptyQueryResponse = pure ()
141
- -- TODO throw error here
142
- dispatch (ErrorResponse desc) = pure ()
143
- -- TODO
144
- dispatch NoData = pure ()
145
- dispatch (NoticeResponse _) = pure ()
149
+
150
+ dispatch
151
+ :: ServerMessage
152
+ -> [V. Vector B. ByteString ]
153
+ -> IO [V. Vector B. ByteString ]
154
+ -- Command is completed, return the result
155
+ dispatch (CommandComplete _) acc = do
156
+ writeChan dataChan . Right . DataMessage $ reverse acc
157
+ pure []
158
+ -- note that data rows go in reversed order
159
+ dispatch (DataRow row) acc = pure (row: acc)
160
+ -- PostgreSQL sends this if query string was empty and datarows should be
161
+ -- empty, but anyway we return data collected in `acc`.
162
+ dispatch EmptyQueryResponse acc = do
163
+ writeChan dataChan . Right . DataMessage $ reverse acc
164
+ pure []
165
+ -- On ErrorResponse we should discard all the collected datarows
166
+ dispatch (ErrorResponse desc) acc = do
167
+ writeChan dataChan $ Left $ PostgresError desc
168
+ pure []
146
169
-- TODO handle notifications
147
- dispatch (NotificationResponse n) = pure ()
148
- -- Ignore here ?
149
- dispatch (ParameterDescription _) = pure ()
150
- dispatch (ParameterStatus _ _) = pure ()
151
- dispatch (ParseComplete ) = pure ()
152
- dispatch (PortalSuspended ) = pure ()
153
- dispatch (ReadForQuery _) = pure ()
154
- dispatch (RowDescription _) = pure ()
170
+ dispatch (NotificationResponse n) acc = pure acc
171
+ -- We does not handled this case because we always send `execute`
172
+ -- with no limit.
173
+ dispatch PortalSuspended acc = pure acc
174
+ -- do nothing on other messages
175
+ dispatch _ acc = pure acc
176
+
177
+ defaultFilter :: ServerMessageFilter
178
+ defaultFilter msg = case msg of
179
+ -- PostgreSQL sends it only in startup phase
180
+ BackendKeyData {} -> False
181
+ -- just ignore
182
+ BindComplete -> False
183
+ -- just ignore
184
+ CloseComplete -> False
185
+ -- messages affecting data handled in dispatcher
186
+ CommandComplete {} -> False
187
+ -- messages affecting data handled in dispatcher
188
+ DataRow {} -> False
189
+ -- messages affecting data handled in dispatcher
190
+ EmptyQueryResponse -> False
191
+ -- We need collect all errors to know whether the whole command is successful
192
+ ErrorResponse {} -> True
193
+ -- We need to know if the server send NoData on `describe` message
194
+ NoData -> True
195
+ -- All notices are not showing
196
+ NoticeResponse {} -> False
197
+ -- notifications will be handled by callbacks or in a separate channel
198
+ NotificationResponse {} -> False
199
+ -- As result for `describe` message
200
+ ParameterDescription {} -> True
201
+ -- we dont store any run-time parameter that is not a constant
202
+ ParameterStatus {} -> False
203
+ -- just ignore
204
+ ParseComplete -> False
205
+ -- messages affecting data handled in dispatcher
206
+ PortalSuspended -> False
207
+ -- to know when command processing is finished
208
+ ReadForQuery {} -> True
209
+ -- as result for `describe` message
210
+ RowDescription {} -> True
155
211
156
212
data Query = Query
157
- { qStatement :: B. ByteString
158
- , qOids :: V. Vector Oid
159
- , qValues :: V. Vector B. ByteString
213
+ { qStatement :: B. ByteString
214
+ , qOids :: V. Vector Oid
215
+ , qValues :: V. Vector B. ByteString
160
216
, qParamsFormat :: Format
161
217
, qResultFormat :: Format
162
218
} deriving (Show )
@@ -165,7 +221,6 @@ query1 = Query "SELECT $1 + $2" [Oid 23, Oid 23] ["1", "3"] Text Text
165
221
query2 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 2" , " 3" ] Text Text
166
222
query3 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 3" , " 3" ] Text Text
167
223
query4 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 4" , " 3" ] Text Text
168
- -- query5 = Query "SELECT * FROM a whereee v > $1 + $2 LIMIT 100" [Oid 23, Oid 23] ["5", "3"]
169
224
170
225
sendBatch :: Connection -> [Query ] -> IO ()
171
226
sendBatch conn qs = do
@@ -185,8 +240,11 @@ sendBatch conn qs = do
185
240
test :: IO ()
186
241
test = do
187
242
c <- connect defaultConnectionSettings
188
- sendBatch c [query1, query2, query3, query4, query5]
189
- threadDelay $ 1 * 1000 * 1000
243
+ sendBatch c [query1, query2, query3, query4 ]
244
+ readNextData c >>= print
245
+ readNextData c >>= print
246
+ readNextData c >>= print
A836
247
+ readNextData c >>= print
190
248
close c
191
249
192
250
@@ -200,15 +258,10 @@ test = do
200
258
-- sendBatch :: IsQuery a => [a] -> Connection -> IO ()
201
259
-- sendBatch = undefined
202
260
203
- -- readNextData :: Connection -> IO Data?
204
- -- readNextData = undefined
261
+ readNextData :: Connection -> IO ( Either Error DataMessage )
262
+ readNextData conn = readChan $ connOutDataChan conn
205
263
--
206
264
-- readNextServerMessage ?
207
265
--
208
266
--
209
- -- Simple Queries support or maybe dont support it
210
- -- because single text query may be send through extended protocol
211
- -- may be support for all standalone queries
212
-
213
- -- data Request = forall a . Request (QQuery a)
214
267
0 commit comments