8000 Simple dispatcher · postgres-haskell/postgres-wire@2860c7f · GitHub
[go: up one dir, main page]

Skip to content

Commit 2860c7f

Browse files
Simple dispatcher
1 parent 6380fbb commit 2860c7f

File tree

1 file changed

+106
-53
lines changed

1 file changed

+106
-53
lines changed

src/Database/PostgreSQL/Connection.hs

Lines changed: 106 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import Data.Binary.Get ( runGetIncremental, pushChunk)
2121
import qualified Data.Binary.Get as BG (Decoder(..))
2222
import Data.Maybe (fromJust)
2323
import qualified Data.Vector as V
24-
import System.Socket hiding (connect, close)
24+
import System.Socket hiding (connect, close, Error(..))
2525
import qualified System.Socket as Socket (connect, close)
2626
import System.Socket.Family.Inet6
2727
import System.Socket.Type.Stream
@@ -43,24 +43,27 @@ type UnixSocket = Socket Unix Stream Unix
4343
data Connection = Connection
4444
{ connSocket :: UnixSocket
4545
, connReceiverThread :: ThreadId
46-
-- Chan for only data messages
47-
, connDataOutChan :: OutChan (Either Error DataMessage)
48-
-- Chan for all messages that filter
49-
, connAllOutChan :: OutChan ServerMessage
46+
-- channel only for Data messages
47+
, connOutDataChan :: OutChan (Either Error DataMessage)
48+
-- channel for all the others messages
49+
, connOutAllChan :: OutChan ServerMessage
5050
, connStatementStorage :: StatementStorage
5151
, connParameters :: ConnectionParameters
5252
}
5353

54-
newtype ServerMessageFilter = ServerMessageFilter (ServerMessage -> Bool)
54+
type ServerMessageFilter = ServerMessage -> Bool
5555

5656
type NotificationHandler = Notification -> IO ()
5757

5858
-- All possible errors
5959
data Error
6060
= PostgresError ErrorDesc
6161
| ImpossibleError
62+
deriving (Show)
63+
64+
data DataMessage = DataMessage [V.Vector B.ByteString]
65+
deriving (Show)
6266

63-
data DataMessage = DataMessage B.ByteString
6467

6568
address :: SocketAddress Unix
6669
address = fromJust $ socketAddressUnixPath "/var/run/postgresql/.s.PGSQL.5432"
@@ -73,13 +76,15 @@ connect settings = do
7376
r <- receive s 4096 mempty
7477
readAuthMessage r
7578

76-
(inChan, outChan) <- newChan
77-
tid <- forkIO $ receiverThread s inChan
79+
(inDataChan, outDataChan) <- newChan
80+
(inAllChan, outAllChan) <- newChan
81+
tid <- forkIO $ receiverThread s inDataChan inAllChan
7882
storage <- newStatementStorage
7983
pure Connection
8084
{ connSocket = s
8185
, connReceiverThread = tid
82-
, connOutChan = outChan
86+
, connOutDataChan = outDataChan
87+
, connOutAllChan = outAllChan
8388
, connStatementStorage = storage
8489
, connParameters = ConnectionParameters
8590
{ paramServerVersion = ServerVersion 1 1 1
@@ -115,48 +120,99 @@ readAuthMessage s =
115120
_ -> error "Invalid auth"
116121
f -> error $ show s
117122

118-
receiverThread :: UnixSocket -> InChan ServerMessage -> IO ()
119-
receiverThread sock chan = forever $ do
120-
r <- receive sock 4096 mempty
121-
print r
122-
go r
123+
receiverThread
124+
:: UnixSocket
125+
-> InChan (Either Error DataMessage)
126+
-> InChan ServerMessage
127+
-> IO ()
128+
receiverThread sock dataChan allChan = receiveLoop []
123129
where
130+
receiveLoop :: [V.Vector B.ByteString] -> IO()
131+
receiveLoop acc = do
132+
r <- receive sock 4096 mempty
133+
-- print r
134+
go r acc >>= receiveLoop
135+
124136
decoder = runGetIncremental decodeServerMessage
125-
go str = case pushChunk decoder str of
137+
go :: B.ByteString -> [V.Vector B.ByteString] -> IO [V.Vector B.ByteString]
138+
go str acc = case pushChunk decoder str of
126139
BG.Done rest _ v -> do
127140
putStrLn $ "Received: " ++ show v
128-
unless (B.null rest) $ go rest
141+
-- TODO select filter
142+
when (defaultFilter v) $ writeChan allChan v
143+
newAcc <- dispatch v acc
144+
if B.null rest
145+
then pure newAcc
146+
else go rest newAcc
129147
BG.Partial _ -> error "Partial"
130148
BG.Fail _ _ e -> error e
131-
dispatch :: ServerMessage -> IO ()
132-
-- dont receiving at this phase
133-
dispatch (BackendKeyData _ _) = pure ()
134-
dispatch (BindComplete) = pure ()
135-
dispatch CloseComplete = pure ()
136-
-- maybe return command result too
137-
dispatch (CommandComplete _) = pure ()
138-
dispatch r@(DataRow _) = writeChan chan r
139-
-- TODO throw error here
140-
dispatch EmptyQueryResponse = pure ()
141-
-- TODO throw error here
142-
dispatch (ErrorResponse desc) = pure ()
143-
-- TODO
144-
dispatch NoData = pure ()
145-
dispatch (NoticeResponse _) = pure ()
149+
150+
dispatch
151+
:: ServerMessage
152+
-> [V.Vector B.ByteString]
153+
-> IO [V.Vector B.ByteString]
154+
-- Command is completed, return the result
155+
dispatch (CommandComplete _) acc = do
156+
writeChan dataChan . Right . DataMessage $ reverse acc
157+
pure []
158+
-- note that data rows go in reversed order
159+
dispatch (DataRow row) acc = pure (row:acc)
160+
-- PostgreSQL sends this if query string was empty and datarows should be
161+
-- empty, but anyway we return data collected in `acc`.
162+
dispatch EmptyQueryResponse acc = do
163+
writeChan dataChan . Right . DataMessage $ reverse acc
164+
pure []
165+
-- On ErrorResponse we should discard all the collected datarows
166+
dispatch (ErrorResponse desc) acc = do
167+
writeChan dataChan $ Left $ PostgresError desc
168+
pure []
146169
-- TODO handle notifications
147-
dispatch (NotificationResponse n) = pure ()
148-
-- Ignore here ?
149-
dispatch (ParameterDescription _) = pure ()
150-
dispatch (ParameterStatus _ _) = pure ()
151-
dispatch (ParseComplete) = pure ()
152-
dispatch (PortalSuspended) = pure ()
153-
dispatch (ReadForQuery _) = pure ()
154-
dispatch (RowDescription _) = pure ()
170+
dispatch (NotificationResponse n) acc = pure acc
171+
-- We does not handled this case because we always send `execute`
172+
-- with no limit.
173+
dispatch PortalSuspended acc = pure acc
174+
-- do nothing on other messages
175+
dispatch _ acc = pure acc
176+
177+
defaultFilter :: ServerMessageFilter
178+
defaultFilter msg = case msg of
179+
-- PostgreSQL sends it only in startup phase
180+
BackendKeyData{} -> False
181+
-- just ignore
182+
BindComplete -> False
183+
-- just ignore
184+
CloseComplete -> False
185+
-- messages affecting data handled in dispatcher
186+
CommandComplete{} -> False
187+
-- messages affecting data handled in dispatcher
188+
DataRow{} -> False
189+
-- messages affecting data handled in dispatcher
190+
EmptyQueryResponse -> False
191+
-- We need collect all errors to know whether the whole command is successful
192+
ErrorResponse{} -> True
193+
-- We need to know if the server send NoData on `describe` message
194+
NoData -> True
195+
-- All notices are not showing
196+
NoticeResponse{} -> False
197+
-- notifications will be handled by callbacks or in a separate channel
198+
NotificationResponse{} -> False
199+
-- As result for `describe` message
200+
ParameterDescription{} -> True
201+
-- we dont store any run-time parameter that is not a constant
202+
ParameterStatus{} -> False
203+
-- just ignore
204+
ParseComplete -> False
205+
-- messages affecting data handled in dispatcher
206+
PortalSuspended -> False
207+
-- to know when command processing is finished
208+
ReadForQuery{} -> True
209+
-- as result for `describe` message
210+
RowDescription{} -> True
155211

156212
data Query = Query
157-
{ qStatement :: B.ByteString
158-
, qOids :: V.Vector Oid
159-
, qValues :: V.Vector B.ByteString
213+
{ qStatement :: B.ByteString
214+
, qOids :: V.Vector Oid
215+
, qValues :: V.Vector B.ByteString
160216
, qParamsFormat :: Format
161217
, qResultFormat :: Format
162218
} deriving (Show)
@@ -165,7 +221,6 @@ query1 = Query "SELECT $1 + $2" [Oid 23, Oid 23] ["1", "3"] Text Text
165221
query2 = Query "SELECT $1 + $2" [Oid 23, Oid 23] ["2", "3"] Text Text
166222
query3 = Query "SELECT $1 + $2" [Oid 23, Oid 23] ["3", "3"] Text Text
167223
query4 = Query "SELECT $1 + $2" [Oid 23, Oid 23] ["4", "3"] Text Text
168-
-- query5 = Query "SELECT * FROM a whereee v > $1 + $2 LIMIT 100" [Oid 23, Oid 23] ["5", "3"]
169224

170225
sendBatch :: Connection -> [Query] -> IO ()
171226
sendBatch conn qs = do
@@ -185,8 +240,11 @@ sendBatch conn qs = do
185240
test :: IO ()
186241
test = do
187242
c <- connect defaultConnectionSettings
188-
sendBatch c [query1, query2, query3, query4, query5]
189-
threadDelay $ 1 * 1000 * 1000
243+
sendBatch c [query1, query2, query3, query4 ]
244+
readNextData c >>= print
245+
readNextData c >>= print
246+
readNextData c >>= print
A836 247+
readNextData c >>= print
190248
close c
191249

192250

@@ -200,15 +258,10 @@ test = do
200258
-- sendBatch :: IsQuery a => [a] -> Connection -> IO ()
201259
-- sendBatch = undefined
202260

203-
-- readNextData :: Connection -> IO Data?
204-
-- readNextData = undefined
261+
readNextData :: Connection -> IO (Either Error DataMessage)
262+
readNextData conn = readChan $ connOutDataChan conn
205263
--
206264
-- readNextServerMessage ?
207265
--
208266
--
209-
-- Simple Queries support or maybe dont support it
210-
-- because single text query may be send through extended protocol
211-
-- may be support for all standalone queries
212-
213-
-- data Request = forall a . Request (QQuery a)
214267

0 commit comments

Comments
 (0)
0