@@ -2,10 +2,12 @@ module Database.PostgreSQL.Driver.Query where
2
2
3
3
import Control.Concurrent.Chan.Unagi
4
4
import Data.Foldable
5
+ import Data.Monoid
5
6
import qualified Data.Vector as V
6
7
import qualified Data.ByteString as B
7
8
8
9
import Database.PostgreSQL.Protocol.Encoders
10
+ import Database.PostgreSQL.Protocol.Store.Encode
9
11
import Database.PostgreSQL.Protocol.Decoders
10
12
import Database.PostgreSQL.Protocol.Types
11
13
@@ -23,47 +25,45 @@ data Query = Query
23
25
} deriving (Show )
24
26
25
27
-- | Public
26
- sendBatch :: Connection -> [Query ] -> IO ()
27
- sendBatch conn = traverse_ sendSingle
28
+ sendBatchAndSync :: Connection -> [Query ] -> IO ()
29
+ sendBatchAndSync = sendBatchEndBy Sync
30
+
31
+ -- | Public
32
+ sendBatchAndFlush :: Connection -> [Query ] -> IO ()
33
+ sendBatchAndFlush = sendBatchEndBy Flush
34
+
35
+ -- Helper
36
+ sendBatchEndBy :: ClientMessage -> Connection -> [Query ] -> IO ()
37
+ sendBatchEndBy msg conn qs = do
38
+ batch <- constructBatch conn qs
39
+ sendEncode (connRawConnection conn) $ batch <> encodeClientMessage msg
40
+
41
+ constructBatch :: Connection -> [Query ] -> IO Encode
42
+ constructBatch conn = fmap fold . traverse constructSingle
28
43
where
29
- s = connRawConnection conn
30
44
storage = connStatementStorage conn
31
45
pname = PortalName " "
32
- sendSingle q = do
46
+ constructSingle q = do
33
47
let stmtSQL = StatementSQL $ qStatement q
34
- sname <- case qCachePolicy q of
48
+ ( sname, parseMessage) <- case qCachePolicy q of
35
49
AlwaysCache -> do
36
50
mName <- lookupStatement storage stmtSQL
37
51
case mName of
38
52
Nothing -> do
39
53
newName <- storeStatement storage stmtSQL
40
- sendMessage s $
41
- Parse newName stmtSQL (fst <$> qValues q)
42
- pure newName
43
- Just name -> pure name
54
+ pure (newName, encodeClientMessage $
55
+ Parse newName stmtSQL (fst <$> qValues q))
56
+ Just name -> pure (name, mempty )
44
57
NeverCache -> do
45
58
let newName = defaultStatementName
46
- sendMessage s $
47
- Parse newName stmtSQL (fst <$> qValues q)
48
- pure newName
49
- sendMessage s $
50
- Bind pname sname (qParamsFormat q) (snd <$> qValues q)
51
- (qResultFormat q)
52
- sendMessage s $ Execute pname noLimitToReceive
53
-
54
- -- | Public
55
- sendBatchAndSync :: Connection -> [Query ] -> IO ()
56
- sendBatchAndSync conn qs = sendBatch conn qs >> sendSync conn
57
-
58
- -- | Public
59
- sendBatchAndFlush :: Connection -> [Query ] -> IO ()
60
- sendBatchAndFlush conn qs = sendBatch conn qs >> sendFlush conn
61
-
62
- sendSync :: Connection -> IO ()
63
- sendSync conn = sendMessage (connRawConnection conn) Sync
64
-
65
- sendFlush :: Connection -> IO ()
66
- sendFlush conn = sendMessage (connRawConnection conn) Flush
59
+ pure (newName, encodeClientMessage $
60
+ Parse newName stmtSQL (fst <$> qValues q))
61
+ let bindMessage = encodeClientMessage $
62
+ Bind pname sname (qParamsFormat q) (snd <$> qValues q)
63
+ (qResultFormat q)
64
+ executeMessage = encodeClientMessage $
65
+ Execute pname noLimitToReceive
66
+ pure $ parseMessage <> bindMessage <> executeMessage
67
67
68
68
-- | Public
69
69
readNextData :: Connection -> IO (Either Error DataMessage )
@@ -104,9 +104,10 @@ describeStatement
104
104
-> B. ByteString
105
105
-> IO (Either Error (V. Vector Oid , V. Vector FieldDescription ))
106
106
describeStatement conn stmt = do
107
- sendMessage s $ Parse sname (StatementSQL stmt) V. empty
108
- sendMessage s $ DescribeStatement sname
109
- sendMessage s Sync
107
+ sendEncode s $
108
+ encodeClientMessage (Parse sname (StatementSQL stmt) V. empty)
109
+ <> encodeClientMessage (DescribeStatement sname)
110
+ <> encodeClientMessage Sync
110
111
parseMessages <$> collectBeforeReadyForQuery conn
111
112
where
112
113
s = connRawConnection conn
0 commit comments