8000 Better settings and connection types · postgres-haskell/postgres-wire@2ee5703 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2ee5703

Browse files
Better settings and connection types
1 parent 3284767 commit 2ee5703

File tree

5 files changed

+115
-95
lines changed

5 files changed

+115
-95
lines changed

postgres-wire.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ library
3131
, time
3232
, hashable
3333
, hashtables
34+
, unagi-chan
3435
default-language: Haskell2010
3536
default-extensions: OverloadedStrings
3637

src/Database/PostgreSQL/Protocol/Connection.hs

Lines changed: 40 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Data.Traversable
1717
import Data.Foldable
1818
import Control.Applicative
1919
import Data.Monoid
20-
import Control.Concurrent
20+
import Control.Concurrent (forkIO, killThread, ThreadId)
2121
import Data.Binary.Get ( runGetIncremental, pushChunk)
2222
import qualified Data.Binary.Get as BG (Decoder(..))
2323
import Data.Maybe (fromJust)
@@ -29,6 +29,7 @@ import System.Socket.Type.Stream
2929
import System.Socket.Protocol.TCP
3030
import System.Socket.Family.Unix
3131
import Data.Time.Clock.POSIX
32+
import Control.Concurrent.Chan.Unagi
3233

3334
import Database.PostgreSQL.Protocol.Settings
3435
import Database.PostgreSQL.Protocol.Encoders
@@ -39,8 +40,12 @@ import Database.PostgreSQL.Protocol.StatementStorage
3940

4041
type UnixSocket = Socket Unix Stream Unix
4142
-- data Connection = Connection (Socket Inet6 Stream TCP)
42-
-- TODO add statement storage
43-
data Connection = Connection UnixSocket ThreadId
43+
data Connection = Connection
44+
{ connSocket :: UnixSocket
45+
, connReceiverThread :: ThreadId
46+
, connOutChan :: OutChan ServerMessage
47+
, connStatementStorage :: StatementStorage
48+
}
4449

4550
address :: SocketAddress Unix
4651
address = fromJust $ socketAddressUnixPath "/var/run/postgresql/.s.PGSQL.5432"
@@ -53,11 +58,12 @@ connect settings = do
5358
r <- receive s 4096 mempty
5459
readAuthMessage r
5560

56-
tid <- forkIO $ receiverThread s
57-
pure $ Connection s tid
61+
(inChan, outChan) <- newChan
62+
tid <- forkIO $ receiverThread s inChan
63+
pure $ Connection s tid outChan
5864

5965
close :: Connection -> IO ()
60-
close (Connection s tid) = do
66+
close (Connection s tid chan) = do
6167
killThread tid
6268
Socket.close s
6369

@@ -79,8 +85,8 @@ readAuthMessage s =
7985
_ -> error "Invalid auth"
8086
f -> error $ show s
8187

82-
receiverThread :: UnixSocket -> IO ()
83-
receiverThread sock = forever $ do
88+
receiverThread :: UnixSocket -> InChan ServerMessage -> IO ()
89+
receiverThread sock chan = forever $ do
8490
r <- receive sock 4096 mempty
8591
print "Receive time"
8692
getPOSIXTime >>= print
@@ -91,21 +97,23 @@ receiverThread sock = forever $ do
9197
go str = case pushChunk decoder str of
9298
BG.Done rest _ v -> do
9399
print v
100+
writeChan chan v
94101
unless (B.null rest) $ go rest
95102
BG.Partial _ -> error "Partial"
96103
BG.Fail _ _ e -> error e
97104

98105
data QQuery a = QQuery
99-
{ qStmt :: B.ByteString
106+
{ qName :: B.ByteString
107+
, qStmt :: B.ByteString
100108
, qOids :: V.Vector Oid
101109
, qValues :: V.Vector B.ByteString
102110
} deriving Show
103111

104-
-- query1 = QQuery "test1" "SELECT $1 + $2" [23, 23] ["1", "3"]
105-
-- query2 = QQuery "test2" "SELECT $1 + $2" [23, 23] ["2", "3"]
106-
-- query3 = QQuery "test3" "SELECT $1 + $2" [23, 23] ["3", "3"]
107-
-- query4 = QQuery "test4" "SELECT $1 + $2" [23, 23] ["4", "3"]
108-
-- query5 = QQuery "test5" "SELECT $1 + $2" [23, 23] ["5", "3"]
112+
query1 = QQuery "test1" "SELECT $1 + $2" [23, 23] ["1", "3"]
113+
query2 = QQuery "test2" "SELECT $1 + $2" [23, 23] ["2", "3"]
114+
query3 = QQuery "test3" "SELECT $1 + $2" [23, 23] ["3", "3"]
115+
query4 = QQuery "test4" "SELECT $1 + $2" [23, 23] ["4", "3"]
116+
query5 = QQuery "test5" "SELECT $1 + $2" [23, 23] ["5", "3"]
109117
-- query1 = QQuery "test1" "select sum(v) from a" [] []
110118
-- query2 = QQuery "test2" "select sum(v) from a" [] []
111119
-- query3 = QQuery "test3" "select sum(v) from a" [] []
@@ -153,82 +161,29 @@ data QQuery a = QQuery
153161
-- sendBatch :: IsQuery a => [a] -> Connection -> IO ()
154162
-- sendBatch = undefined
155163

156-
-- Session Monad
164+
-- readNextData :: Connection -> IO Data?
165+
-- readNextData = undefined
157166
--
167+
-- Simple Queries support or maybe dont support it
168+
-- because single text query may be send through extended protocol
169+
-- may be support for all standalone queries
158170

159171
data Request = forall a . Request (QQuery a)
160172

161173
query :: Decode a => QQuery a -> Session a
162174
query q = Send One [Request q] $ Receive Done
163175

164-
data Count = One | Many
165-
deriving (Eq, Show)
166-
167-
data Session a
168-
= Done a
169-
| forall r . Decode r => Receive (r -> Session a)
170-
| Send Count [Request] (Session a)
171-
172-
instance Functor Session where
173-
f `fmap` (Done a) = Done $ f a
174-
f `fmap` (Receive g) = Receive $ fmap f . g
175-
f `fmap` (Send n br c) = Send n br (f <$> c)
176-
177-
instance Applicative Session where
178-
pure = Done
179-
180-
f <*> x = case (f, x) of
181-
(Done g, Done y) -> Done (g y)
182-
(Done g, Receive next) -> Receive $ fmap g . next
183-
(Done g, Send n br c) -> Send n br (g <$> c)
184-
185-
(Send n br c, Done y) -> Send n br (c <*> pure y)
186-
(Send n br c, Receive next)
187-
-> Send n br $ c <*> Receive next
188-
(Send n1 br1 c1, Send n2 br2 c2)
189-
-> if n1 == One
190-
then Send n2 (br1 <> br2) (c1 <*> c2)
191-
else Send n1 br1 (c1 <*> Send n2 br2 c2)
192-
193-
(Receive next1, Receive next2) ->
194-
Receive $ (\g -> Receive $ (g <*> ) . next2) . next1
195-
(Receive next, Done y) -> Receive $ (<*> Done y) . next
196-
(Receive next, Send n br c)
197-
-> Receive $ (<*> Send n br c) . next
198-
199-
instance Monad Session where
200-
return = pure
201-
202-
m >>= f = case m of
203-
Done a -> f a
204-
Receive g -> Receive $ (>>=f) . g
205-
Send _n br c -> Send Many br (c >>= f)
206-
207-
(>>) = (*>)
208-
209-
runSession :: Show a => Session a -> IO a
210-
runSession (Done x) = do
211-
putStrLn $ "Return " ++ show x
212-
pure x
213-
runSession (Receive f) = do
214-
putStrLn "Receiving"
215-
-- TODO receive here
216-
-- x <- receive
217-
-- runProgram (f $ decode x)
218-
undefined
219-
runSession (Send _ rs c) = do
220-
putStrLn "Sending requests "
221-
-- TODO send requests here in batch
222-
runSession c
223-
224-
225-
-- Type classes
226-
class Decode a where
227-
decode :: String -> a
228-
229-
instance Decode Integer where
230-
decode = read
231-
232-
instance Decode String where
233-
decode = id
176+
177+
178+
sendBatch :: Connection -> [Request] -> IO ()
179+
sendBatch (Connection s _ _) rs = do
180+
traverse sendSingle rs
181+
sendMessage s $ encodeClientMessage Sync
182+
where
183+
sendSingle (Request q) = do
184+
sendMessage s $ encodeClientMessage $
185+
Parse (qName q) (qStmt q) (qOids q)
186+
sendMessage s $ encodeClientMessage $
187+
Bind (qName q) (qName q) Text (qValues q) Text
188+
sendMessage s $ encodeClientMessage $ Execute (qName q)
234189

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
data Count = One | Many
2+
deriving (Eq, Show)
3+
4+
data Session a
5+
= Done a
6+
| forall r . Decode r => Receive (r -> Session a)
7+
| Send Count [Request] (Session a)
8+
9+
instance Functor Session where
10+
f `fmap` (Done a) = Done $ f a
11+
f `fmap` (Receive g) = Receive $ fmap f . g
12+
f `fmap` (Send n br c) = Send n br (f <$> c)
13+
14+
instance Applicative Session where
15+
pure = Done
16+
17+
f <*> x = case (f, x) of
18+
(Done g, Done y) -> Done (g y)
19+
(Done g, Receive next) -> Receive $ fmap g . next
20+
(Done g, Send n br c) -> Send n br (g <$> c)
21+
22+
(Send n br c, Done y) -> Send n br (c <*> pure y)
23+
(Send n br c, Receive next)
24+
-> Send n br $ c <*> Receive next
25+
(Send n1 br1 c1, Send n2 br2 c2)
26+
-> if n1 == One
27+
then Send n2 (br1 <> br2) (c1 <*> c2)
28+
else Send n1 br1 (c1 <*> Send n2 br2 c2)
29+
30+
(Receive next1, Receive next2) ->
31+
Receive $ (\g -> Receive $ (g <*> ) . next2) . next1
32+
(Receive next, Done y) -> Receive $ (<*> Done y) . next
33+
(Receive next, Send n br c)
34+
-> Receive $ (<*> Send n br c) . next
35+
36+
instance Monad Session where
37+
return = pure
38+
39+
m >>= f = case m of
40+
Done a -> f a
41+
Receive g -> Receive $ (>>=f) . g
42+
Send _n br c -> Send Many br (c >>= f)
43+
44+
(>>) = (*>)
45+
46+
runSession :: Show a => Connection -> Session a -> IO a
47+
runSession conn@(Connection sock _ chan) = go
48+
where
49+
go (Done x) = do
50+
putStrLn $ "Return " ++ show x
51+
pure x
52+
go (Receive f) = do
53+
putStrLn "Receiving"
54+
-- TODO receive here
55+
-- x <- receive
56+
x <- getLine
57+
go (f $ decode x)
58+
go (Send _ rs c) = do
59+
putStrLn "Sending requests "
60+
-- TODO send requests here in batch
61+
sendBatch conn rs
62+
go c
63+

src/Database/PostgreSQL/Protocol/Settings.hs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@ import Data.Word (Word16)
66
import Data.ByteString (ByteString)
77

88
data ConnectionSettings = ConnectionSettings
9-
{ connHost :: ByteString
10-
, connPort :: Word16
11-
, connDatabase :: ByteString
12-
, connUser :: ByteString
13-
, connPassword :: ByteString
9+
{ settingsHost :: ByteString
10+
, settingsPort :: Word16
11+
, settingsDatabase :: ByteString
12+
, settingsUser :: ByteString
13+
, settingsPassword :: ByteString
1414
} deriving (Show)
1515

1616
defaultConnectionSettings :: ConnectionSettings
1717
defaultConnectionSettings = ConnectionSettings
18-
{ connHost = ""
19-
, connPort = 5432
20-
, connDatabase = "testdb"
21-
, connUser = "v"
22-
, connPassword = ""
18+
{ settingsHost = ""
19+
, settingsPort = 5432
20+
, settingsDatabase = "testdb"
21+
, settingsUser = "v"
22+
, settingsPassword = ""
2323
}
2424

stack.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ packages:
88
# (e.g., acme-missiles-0.3)
99
extra-deps:
1010
- socket-unix-0.1.0.0
11+
- unagi-chan-0.4.0.0
1112

1213
# Override default flag values for local packages and extra-deps
1314
flags: {}

0 commit comments

Comments
 (0)
0