@@ -17,7 +17,7 @@ import Data.Traversable
17
17
import Data.Foldable
18
18
import Control.Applicative
19
19
import Data.Monoid
20
- import Control.Concurrent
20
+ import Control.Concurrent ( forkIO , killThread , ThreadId )
21
21
import Data.Binary.Get ( runGetIncremental , pushChunk )
22
22
import qualified Data.Binary.Get as BG (Decoder (.. ))
23
23
import Data.Maybe (fromJust )
@@ -29,6 +29,7 @@ import System.Socket.Type.Stream
29
29
import System.Socket.Protocol.TCP
30
30
import System.Socket.Family.Unix
31
31
import Data.Time.Clock.POSIX
32
+ import Control.Concurrent.Chan.Unagi
32
33
33
34
import Database.PostgreSQL.Protocol.Settings
34
35
import Database.PostgreSQL.Protocol.Encoders
@@ -39,8 +40,12 @@ import Database.PostgreSQL.Protocol.StatementStorage
39
40
40
41
type UnixSocket = Socket Unix Stream Unix
41
42
-- data Connection = Connection (Socket Inet6 Stream TCP)
42
- -- TODO add statement storage
43
- data Connection = Connection UnixSocket ThreadId
43
+ data Connection = Connection
44
+ { connSocket :: UnixSocket
45
+ , connReceiverThread :: ThreadId
46
+ , connOutChan :: OutChan ServerMessage
47
+ , connStatementStorage :: StatementStorage
48
+ }
44
49
45
50
address :: SocketAddress Unix
46
51
address = fromJust $ socketAddressUnixPath " /var/run/postgresql/.s.PGSQL.5432"
@@ -53,11 +58,12 @@ connect settings = do
53
58
r <- receive s 4096 mempty
54
59
readAuthMessage r
55
60
56
- tid <- forkIO $ receiverThread s
57
- pure $ Connection s tid
61
+ (inChan, outChan) <- newChan
62
+ tid <- forkIO $ receiverThread s inChan
63
+ pure $ Connection s tid outChan
58
64
59
65
close :: Connection -> IO ()
60
- close (Connection s tid) = do
66
+ close (Connection s tid chan ) = do
61
67
killThread tid
62
68
Socket. close s
63
69
@@ -79,8 +85,8 @@ readAuthMessage s =
79
85
_ -> error " Invalid auth"
80
86
f -> error $ show s
81
87
82
- receiverThread :: UnixSocket -> IO ()
83
- receiverThread sock = forever $ do
88
+ receiverThread :: UnixSocket -> InChan ServerMessage -> IO ()
89
+ receiverThread sock chan = forever $ do
84
90
r <- receive sock 4096 mempty
85
91
print " Receive time"
86
92
getPOSIXTime >>= print
@@ -91,21 +97,23 @@ receiverThread sock = forever $ do
91
97
go str = case pushChunk decoder str of
92
98
BG. Done rest _ v -> do
93
99
print v
100
+ writeChan chan v
94
101
unless (B. null rest) $ go rest
95
102
BG. Partial _ -> error " Partial"
96
103
BG. Fail _ _ e -> error e
97
104
98
105
data QQuery a = QQuery
99
- { qStmt :: B. ByteString
106
+ { qName :: B. ByteString
107
+ , qStmt :: B. ByteString
100
108
, qOids :: V. Vector Oid
101
109
, qValues :: V. Vector B. ByteString
102
110
} deriving Show
103
111
104
- -- query1 = QQuery "test1" "SELECT $1 + $2" [23, 23] ["1", "3"]
105
- -- query2 = QQuery "test2" "SELECT $1 + $2" [23, 23] ["2", "3"]
106
- -- query3 = QQuery "test3" "SELECT $1 + $2" [23, 23] ["3", "3"]
107
- -- query4 = QQuery "test4" "SELECT $1 + $2" [23, 23] ["4", "3"]
108
- -- query5 = QQuery "test5" "SELECT $1 + $2" [23, 23] ["5", "3"]
112
+ query1 = QQuery " test1" " SELECT $1 + $2" [23 , 23 ] [" 1" , " 3" ]
113
+ query2 = QQuery " test2" " SELECT $1 + $2" [23 , 23 ] [" 2" , " 3" ]
114
+ query3 = QQuery " test3" " SELECT $1 + $2" [23 , 23 ] [" 3" , " 3" ]
115
+ query4 = QQuery " test4" " SELECT $1 + $2" [23 , 23 ] [" 4" , " 3" ]
116
+ query5 = QQuery " test5" " SELECT $1 + $2" [23 , 23 ] [" 5" , " 3" ]
109
117
-- query1 = QQuery "test1" "select sum(v) from a" [] []
110
118
-- query2 = QQuery "test2" "select sum(v) from a" [] []
111
119
-- query3 = QQuery "test3" "select sum(v) from a" [] []
@@ -153,82 +161,29 @@ data QQuery a = QQuery
153
161
-- sendBatch :: IsQuery a => [a] -> Connection -> IO ()
154
162
-- sendBatch = undefined
155
163
156
- -- Session Monad
164
+ -- readNextData :: Connection -> IO Data?
165
+ -- readNextData = undefined
157
166
--
167
+ -- Simple Queries support or maybe dont support it
168
+ -- because single text query may be send through extended protocol
169
+ -- may be support for all standalone queries
158
170
159
171
data Request = forall a . Request (QQuery a )
160
172
161
173
query :: Decode a => QQuery a -> Session a
162
174
query q = Send One [Request q] $ Receive Done
163
175
164
- data Count = One | Many
165
- deriving (Eq , Show )
166
-
167
- data Session a
168
- = Done a
169
- | forall r . Decode r => Receive (r -> Session a )
170
- | Send Count [Request ] (Session a )
171
-
172
- instance Functor Session where
173
- f `fmap` (Done a) = Done $ f a
174
- f `fmap` (Receive g) = Receive $ fmap f . g
175
- f `fmap` (Send n br c) = Send n br (f <$> c)
176
-
177
- instance Applicative Session where
178
- pure = Done
179
-
180
- f <*> x = case (f, x) of
181
- (Done g, Done y) -> Done (g y)
182
- (Done g, Receive next) -> Receive $ fmap g . next
183
- (Done g, Send n br c) -> Send n br (g <$> c)
184
-
185
- (Send n br c, Done y) -> Send n br (c <*> pure y)
186
- (Send n br c, Receive next)
187
- -> Send n br $ c <*> Receive next
188
- (Send n1 br1 c1, Send n2 br2 c2)
189
- -> if n1 == One
190
- then Send n2 (br1 <> br2) (c1 <*> c2)
191
- else Send n1 br1 (c1 <*> Send n2 br2 c2)
192
-
193
- (Receive next1, Receive next2) ->
194
- Receive $ (\ g -> Receive $ (g <*> ) . next2) . next1
195
- (Receive next, Done y) -> Receive $ (<*> Done y) . next
196
- (Receive next, Send n br c)
197
- -> Receive $ (<*> Send n br c) . next
198
-
199
- instance Monad Session where
200
- return = pure
201
-
202
- m >>= f = case m of
203
- Done a -> f a
204
- Receive g -> Receive $ (>>= f) . g
205
- Send _n br c -> Send Many br (c >>= f)
206
-
207
- (>>) = (*>)
208
-
209
- runSession :: Show a => Session a -> IO a
210
- runSession (Done x) = do
211
- putStrLn $ " Return " ++ show x
212
- pure x
213
- runSession (Receive f) = do
214
- putStrLn " Receiving"
215
- -- TODO receive here
216
- -- x <- receive
217
- -- runProgram (f $ decode x)
218
- undefined
219
- runSession (Send _ rs c) = do
220
- putStrLn " Sending requests "
221
- -- TODO send requests here in batch
222
- runSession c
223
-
224
-
225
- -- Type classes
226
- class Decode a where
227
- decode :: String -> a
228
-
229
- instance Decode Integer where
230
- decode = read
231
-
232
- instance Decode String where
233
- decode = id
176
+
177
+
178
+ sendBatch :: Connection -> [Request ] -> IO ()
179
+ sendBatch (Connection s _ _) rs = do
180
+ traverse sendSingle rs
181
+ sendMessage s $ encodeClientMessage Sync
182
+ where
183
+ sendSingle (Request q) = do
184
+ sendMessage s $ encodeClientMessage $
185
+ Parse (qName q) (qStmt q) (qOids q)
186
+ sendMessage s $ encodeClientMessage $
187
+ Bind (qName q) (qName q) Text (qValues q) Text
188
+ sendMessage s $ encodeClientMessage $ Execute (qName q)
234
189
0 commit comments