8000 Moved loopExtractDataRow to separate module · postgres-haskell/postgres-wire@cbf6613 · GitHub
[go: up one dir, main page]

Skip to content

Commit cbf6613

Browse files
Moved loopExtractDataRow to separate module
1 parent 321aea4 commit cbf6613

File tree

5 files changed

+141
-120
lines changed

5 files changed

+141
-120
lines changed

postgres-wire.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ library
2828
, Database.PostgreSQL.Protocol.Types
2929
, Database.PostgreSQL.Protocol.Encoders
3030
, Database.PostgreSQL.Protocol.Decoders
31+
, Database.PostgreSQL.Protocol.ExtractDataRows
3132
, Database.PostgreSQL.Protocol.Store.Encode
3233
, Database.PostgreSQL.Protocol.Store.Decode
3334
other-modules: Database.PostgreSQL.Protocol.Utils

src/Database/PostgreSQL/Driver/Connection.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Crypto.Hash (hash, Digest, MD5)
2424

2525
import Database.PostgreSQL.Protocol.Encoders
2626
import Database.PostgreSQL.Protocol.Decoders
27+
import Database.PostgreSQL.Protocol.ExtractDataRows
2728
import Database.PostgreSQL.Protocol.Types
2829
import Database.PostgreSQL.Protocol.Store.Encode (runEncode, Encode)
2930
import Database.PostgreSQL.Protocol.Store.Decode (runDecode)
@@ -200,7 +201,7 @@ parseParameters action str = Right <$> do
200201
}
201202
where
202203
parseDict bs dict = do
203-
(rest, v) <- parseServerMessages bs action
204+
(rest, v) <- decodeNextServerMessage bs action
204205
case v of
205206
ParameterStatus name value
206207
-> parseDict rest $ HM.insert name value dict
@@ -239,7 +240,7 @@ receiverThreadCommon
239240
receiverThreadCommon rawConn chan msgFilter ntfHandler = go ""
240241
where
241242
go bs = do
242-
(rest, msg) <- parseServerMessages bs readMoreAction
243+
(rest, msg) <- decodeNextServerMessage bs readMoreAction
243244
handler msg >> go rest
244245

245246
-- TODO

src/Database/PostgreSQL/Protocol/Decoders.hs

Lines changed: 8 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
{-# language RecordWildCards #-}
22

33
module Database.PostgreSQL.Protocol.Decoders
4-
( decodeAuthResponse
4+
(
5+
-- * High-lever decoder
6+
decodeNextServerMessage
7+
-- * Decoders
8+
, decodeAuthResponse
59
, decodeHeader
610
, decodeServerMessage
711
-- * Helpers
812
, parseServerVersion
913
, parseIntegerDatetimes
10-
, loopExtractDataRows
11-
, parseServerMessages
14+
, parseErrorDesc
1215
) where
1316

1417
import Control.Applicative
@@ -31,126 +34,15 @@ import Database.PostgreSQL.Protocol.Types
3134
import Database.PostgreSQL.Protocol.Store.Decode
3235
import Database.PostgreSQL.Protocol.Utils
3336

34-
-- Optimized loop for extracting chunks of DataRows.
35-
-- Ignores all messages from database that do not relate to data.
36-
-- Does not throw exceptions.
37-
loopExtractDataRows
38-
-- Action that returs more data with `ByteString` prepended.
39-
:: (B.ByteString -> IO B.ByteString)
40-
-- Will be called on every DataMessage.
41-
-> (DataMessage -> IO ())
42-
-> IO ()
43-
loopExtractDataRows readMoreAction callback = go "" ""
44-
where
45-
go :: B.ByteString -> BL.ByteString -> IO ()
46-
go bs acc
47-
-- 5 - header size, defined by PostgreSQL
48-
| B.length bs < 5 = readMoreAndGo bs acc
49-
| otherwise = do
50-
ScanRowResult ch rest r <- scanDataRows bs
51-
-- We should force accumulator
52-
-- Note: `BL.chunk` should not prepend empty bytestring as chunk.
53-
let !newAcc = BL.chunk ch acc
54-
55-
case r of
56-
-- Following happened:
57-
-- not enough bytes to read header
58-
-- or header is for `DataRow`, not enough bytes to read body
59-
1 -> readMoreAndGo rest newAcc
60-
-- Header was read, it is not for `DataRow`. We can safely
61-
-- call `parseHeader`, because scanDataRows already checked
62-
-- that there are enough bytes to read header.
63-
2 -> do
64-
Header mt len <- parseHeader rest
65-
dispatchHeader mt len (B.drop 5 rest) newAcc
66-
67-
{-# INLINE dispatchHeader #-}
68-
dispatchHeader :: Word8 -> Int -> B.ByteString -> BL.ByteString -> IO ()
69-
dispatchHeader mt len bs acc = case mt of
70-
-- 'C' - CommandComplete.
71-
-- Command is completed, return the result.
72-
67 -> do
73-
callback $
74-
DataMessage . DataRows $
75-
BL.foldlChunks (flip BL.chunk) BL.empty acc
76-
77-
newBs <- skipBytes bs len
78-
go newBs BL.empty
79-
80-
-- 'I' - EmptyQueryResponse.
81-
-- PostgreSQL sends this if query string was empty and datarows
82-
-- should be empty, but anyway we return data collected in `acc`.
83-
73 -> do
84-
callback $
85-
DataMessage . DataRows $
86-
BL.foldlChunks (flip BL.chunk) BL.empty acc
87-
88-
go bs BL.empty
89-
90-
-- 'E' - ErrorResponse.
91-
-- On ErrorResponse we should discard all the collected datarows.
92-
69 -> do
93-
(b, newBs) <- readAtLeast bs len
94-
-- TODO handle errors
95-
desc <- either (error. BS.unpack) pure $ parseErrorDesc b
96-
callback (DataError desc)
97-
98-
go newBs BL.empty
99-
100-
-- 'Z' - ReadyForQuery.
101-
-- To know when command processing is finished
102-
90 -> do
103-
callback DataReady
104-
105-
newBs <- skipBytes bs len
106-
go newBs acc
107-
108-
-- Skip any other message.
109-
_ -> do
110-
newBs <- skipBytes bs len
111-
go newBs acc
112-
113-
{-# INLINE readMoreAndGo #-}
114-
readMoreAndGo :: B.ByteString -> BL.ByteString -> IO ()
115-
readMoreAndGo bs acc = do
116-
newBs <- readMoreAction bs
117-
go newBs acc
118-
119-
-- | Returns a bytestring that contain exactly @len@ bytes and the rest.
120-
{-# INLINE readAtLeast #-}
121-
readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString)
122-
readAtLeast bs len
123-
| B.length bs >= len = pure $ B.splitAt len bs
124-
| otherwise = do
125-
newBs <- readMoreAction bs
126-
readAtLeast newBs len
127-
128-
-- | Skips exactly @toSkip@ bytes.
129-
{-# INLINE skipBytes #-}
130-
skipBytes :: B.ByteString -> Int -> IO B.ByteString
131-
skipBytes bs toSkip
132-
| toSkip <= 0 = pure bs
133-
| B.length bs < toSkip = do
134-
newBs <- readMoreAction B.empty
135-
skipBytes newBs (toSkip - B.length bs)
136-
| otherwise = pure $ B.drop toSkip bs
137-
138-
{-# INLINE parseHeader #-}
139-
parseHeader :: B.ByteString -> IO Header
140-
parseHeader bs =
141-
B.unsafeUseAsCStringLen bs $ \(ptr, len) -> do
142-
b <- peek (castPtr ptr)
143-
w <- byteSwap32 <$> peekByteOff (castPtr ptr) 1
144-
pure $ Header b $ fromIntegral (w - 4)
14537

14638
-- | Parses and dispatches all server messages except `DataRow`.
147-
parseServerMessages
39+
decodeNextServerMessage
14840
-- Initial buffer to parse from
14941
:: B.ByteString
15042
-- Action that returs more data with `ByteString` prepended.
15143
-> (B.ByteString -> IO B.ByteString)
15244
-> IO (B.ByteString, ServerMessage)
153-
parseServerMessages bs readMoreAction = go Nothing bs
45+
decodeNextServerMessage bs readMoreAction = go Nothing bs
15446
where
15547
-- Parse header
15648
go Nothing bs
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
module Database.PostgreSQL.Protocol.ExtractDataRows
2+
( loopExtractDataRows
3+
) where
4+
5+
import Data.Word (Word8, byteSwap32)
6+
import Foreign (peek, peekByteOff, castPtr)
7+
import qualified Data.ByteString as B
8+
import qualified Data.ByteString.Unsafe as B
9+
import qualified Data.ByteString.Lazy as BL
10+
import qualified Data.ByteString.Lazy.Internal as BL
11+
12+
import Database.PostgreSQL.Driver.Error
13+
import Database.PostgreSQL.Protocol.Types
14+
import Database.PostgreSQL.Protocol.Decoders
15+
import Database.PostgreSQL.Protocol.Utils
16+
17+
-- Optimized loop for extracting chunks of DataRows.
18+
-- Ignores all messages from database that do not relate to data.
19+
-- Does not throw exceptions.
20+
loopExtractDataRows
21+
-- Action that returs more data with `ByteString` prepended.
22+
:: (B.ByteString -> IO B.ByteString)
23+
-- Will be called on every DataMessage.
24+
-> (DataMessage -> IO ())
25+
-> IO ()
26+
loopExtractDataRows readMoreAction callback = go "" ""
27+
where
28+
go :: B.ByteString -> BL.ByteString -> IO ()
29+
go bs acc
30+
-- 5 - header size, defined by PostgreSQL
31+
| B.length bs < 5 = readMoreAndGo bs acc
32+
| otherwise = do
33+
ScanRowResult ch rest r <- scanDataRows bs
34+
-- We shou 1241 ld force accumulator
35+
-- Note: `BL.chunk` should not prepend empty bytestring as chunk.
36+
let !newAcc = BL.chunk ch acc
37+
38+
case r of
39+
-- Following happened:
40+
-- not enough bytes to read header
41+
-- or header is for `DataRow`, not enough bytes to read body
42+
1 -> readMoreAndGo rest newAcc
43+
-- Header was read, it is not for `DataRow`. We can safely
44+
-- call `parseHeader`, because scanDataRows already checked
45+
-- that there are enough bytes to read header.
46+
2 -> do
47+
Header mt len <- parseHeader rest
48+
dispatchHeader mt len (B.drop 5 rest) newAcc
49+
50+
{-# INLINE dispatchHeader #-}
51+
dispatchHeader :: Word8 -> Int -> B.ByteString -> BL.ByteString -> IO ()
52+
dispatchHeader mt len bs acc = case mt of
53+
-- 'C' - CommandComplete.
54+
-- Command is completed, return the result.
55+
67 -> do
56+
callback $
57+
DataMessage . DataRows $
58+
BL.foldlChunks (flip BL.chunk) BL.empty acc
59+
60+
newBs <- skipBytes bs len
61+
go newBs BL.empty
62+
63+
-- 'I' - EmptyQueryResponse.
64+
-- PostgreSQL sends this if query string was empty and datarows
65+
-- should be empty, but anyway we return data collected in `acc`.
66+
73 -> do
67+
callback $
68+
DataMessage . DataRows $
69+
BL.foldlChunks (flip BL.chunk) BL.empty acc
70+
71+
go bs BL.empty
72+
73+
-- 'E' - ErrorResponse.
74+
-- On ErrorResponse we should discard all the collected datarows.
75+
69 -> do
76+
(b, newBs) <- readAtLeast bs len
77+
-- TODO handle errors
78+
desc <- eitherToProtocolEx $ parseErrorDesc b
79+
callback (DataError desc)
80+
81+
go newBs BL.empty
82+
83+
-- 'Z' - ReadyForQuery.
84+
-- To know when command processing is finished
85+
90 -> do
86+
callback DataReady
87+
88+
newBs <- skipBytes bs len
89+
go newBs acc
90+
91+
-- Skip any other message.
92+
_ -> do
93+
newBs <- skipBytes bs len
94+
go newBs acc
95+
96+
{-# INLINE readMoreAndGo #-}
97+
readMoreAndGo :: B.ByteString -> BL.ByteString -> IO ()
98+
readMoreAndGo bs acc = do
99+
newBs <- readMoreAction bs
100+
go newBs acc
101+
102+
-- | Returns a bytestring that contain exactly @len@ bytes and the rest.
103+
{-# INLINE readAtLeast #-}
104+
readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString)
105+
readAtLeast bs len
106+
| B.length bs >= len = pure $ B.splitAt len bs
107+
| otherwise = do
108+
newBs <- readMoreAction bs
109+
readAtLeast newBs len
110+
111+
-- | Skips exactly @toSkip@ bytes.
112+
{-# INLINE skipBytes #-}
113+
skipBytes :: B.ByteString -> Int -> IO B.ByteString
114+
skipBytes bs toSkip
115+
| toSkip <= 0 = pure bs
116+
| B.length bs < toSkip = do
117+
newBs <- readMoreAction B.empty
118+
skipBytes newBs (toSkip - B.length bs)
119+
| otherwise = pure $ B.drop toSkip bs
120+
121+
{-# INLINE parseHeader #-}
122+
parseHeader :: B.ByteString -> IO Header
123+
parseHeader bs =
124+
B.unsafeUseAsCStringLen bs $ \(ptr, len) -> do
125+
b <- peek (castPtr ptr)
126+
w <- byteSwap32 <$> peekByteOff (castPtr ptr) 1
127+
pure $ Header b $ fromIntegral (w - 4)

src/Database/PostgreSQL/Protocol/Utils.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
module Database.PostgreSQL.Protocol.Utils where
33

44
import Foreign.C.Types (CInt, CSize(..), CChar)
5-
import Foreign (Ptr, peek, alloca)
5+
import Foreign (Ptr, peek, alloca)
66
import qualified Data.ByteString as B
77
import qualified Data.ByteString.Unsafe as B
88

@@ -11,8 +11,8 @@ data ScanRowResult = ScanRowResult
1111
{-# UNPACK #-} !B.ByteString -- the rest of string
1212
{-# UNPACK #-} !Int -- reason code
1313

14-
{-# INLINE scanDataRows #-}
1514
-- | Scans `ByteString` for a chunk of `DataRow`s.
15+
{-# INLINE scanDataRows #-}
1616
scanDataRows :: B.ByteString -> IO ScanRowResult
1717
scanDataRows bs =
1818
alloca $ \reasonPtr ->

0 commit comments

Comments
 (0)
0