8000 Extractor DataRows · postgres-haskell/postgres-wire@4957a61 · GitHub
[go: up one dir, main page]

Skip to content

Commit 4957a61

Browse files
Extractor DataRows
1 parent f610b57 commit 4957a61

File tree

3 files changed

+117
-3
lines changed

3 files changed

+117
-3
lines changed

bench/Bench.hs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ benchRequests connectAction queryAction = do
3535

3636
benchMultiPw :: IO ()
3737
benchMultiPw = benchRequests createConnection $ \c -> do
38-
sendBatchAndSync c [q]
38+
sendBatchAndSync c [q, q, q, q, q, q, q, q, q, q]
39+
readNextData c
40+
readNextData c
41+
readNextData c
42+
readNextData c
43+
readNextData c
44+
readNextData c
45+
readNextData c
46+
readNextData c
47+
readNextData c
3948
readNextData c
4049
waitReadyForQuery c
4150
where

src/Database/PostgreSQL/Driver/Connection.hs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ receiverThread
229229
:: RawConnection
230230
-> InDataChan
231231
-> IO ()
232-
receiverThread rawConn dataChan = receiveLoop Nothing "" []
232+
receiverThread rawConn dataChan =
233+
loopExtractDataRows
234+
(rReceive rawConn 4096)
235+
(writeChan dataChan . Right)
233236
where
234237
receiveLoop
235238
:: Maybe Header

src/Database/PostgreSQL/Protocol/Decoders.hs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,124 @@ module Database.PostgreSQL.Protocol.Decoders
77
-- * Helpers
88
, parseServerVersion
99
, parseIntegerDatetimes
10+
, loopExtractDataRows
1011
) where
1112

1213
import Control.Applicative
1314
import Control.Monad
1415
import Data.Monoid ((<>))
1516
import Data.Maybe (fromMaybe)
1617
import Data.Char (chr)
18+
import Data.Word
1719
import Text.Read (readMaybe)
1820
import qualified Data.Vector as V
1921
import qualified Data.ByteString as B
22+
import qualified Data.ByteString.Lazy as BL
2023
import Data.ByteString.Char8 as BS(readInteger, readInt, unpack, pack)
2124
import qualified Data.HashMap.Strict as HM
2225

2326
import Database.PostgreSQL.Protocol.Types
2427
import Database.PostgreSQL.Protocol.Store.Decode
2528

29+
-- Extracts DataRows
30+
--
31+
data ExtractorResult = NeedMore | OtherHeader
32+
33+
loopExtractDataRows
34+
:: IO B.ByteString -- read more action
35+
-> (DataMessage -> IO ()) -- callback on every DataMessage
36+
-> IO ()
37+
loopExtractDataRows readMoreAction callback = go "" []
38+
where
39+
go :: B.ByteString -> [B.ByteString] -> IO ()
40+
-- header size
41+
go bs acc
42+
| B.length bs < 5 = readMoreAndGo bs acc
43+
| otherwise = do
44+
-- print "Main branch"
45+
let (offset, r) = scanDataRows 0 bs
46+
let (ch, nbs) = B.splitAt offset bs
47+
let (newAcc, newBs) = if B.null ch
48+
then (acc, bs)
49+
else (ch:acc, nbs)
50+
case r of
51+
NeedMore -> readMoreAndGo newBs newAcc
52+
OtherHeader -> do
53+
let (Header mt len) = parseHeader newBs
54+
goOther mt len (B.drop 5 newBs) newAcc
55+
56+
goOther :: Word8 -> Int -> B.ByteString -> [B.ByteString] -> IO ()
57+
goOther mt len bs acc = case chr (fromIntegral mt) of
58+
'C' -> do
59+
newBs <- skipBytes bs len
60+
callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc
61+
go newBs []
62+
'I' -> do
63+
callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc
64+
go bs []
65+
'E' -> do
66+
(b, newBs) <- readAtLeast bs len
67+
desc <- eitherToDecode $ parseErrorDesc b
68+
callback (DataError desc)
69+
go newBs []
70+
'Z' -> do
71+
newBs <- skipBytes bs len
72+
callback DataReady
73+
go newBs acc
74+
c -> do
75+
-- print $ "Unexpected: " ++ show c
76+
newBs <- skipBytes bs len
77+
go newBs acc
78+
79+
readMoreAndGo bs acc = do
80+
-- print "Read more and go"
81+
newBs <- readMoreAction
82+
go (bs <> newBs) acc
83+
84+
readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString)
85+
readAtLeast bs len | B.length bs >= len = pure $ B.splitAt len bs
86+
| otherwise = do
87+
newBs <- readMoreAction
88+
readAtLeast (bs <> newBs) len
89+
90+
skipBytes :: B.ByteString -> Int -> IO B.ByteString
91+
skipBytes bs toSkip | toSkip <= 0 = pure bs
92+
| B.length bs < toSkip = do
93+
print $ "to skip " ++ show toSkip
94+
newBs <- readMoreAction
95+
skipBytes newBs (toSkip - B.length bs)
96+
| otherwise = pure $ B.drop toSkip bs
97+
98+
scanDataRows :: Int -> B.ByteString -> (Int, ExtractorResult)
99+
scanDataRows !acc !bs | B.length bs < 5 = (acc, NeedMore)
100+
| otherwise =
101+
let (Header mt len) = parseHeader bs
102+
in if chr (fromIntegral mt) == 'D'
103+
then if B.length bs < len + 5
104+
then (acc, NeedMore)
105+
else scanDataRows (acc + len + 5)
106+
$ B.drop (len + 5) bs
107+
else (acc, OtherHeader)
108+
109+
parseHeader :: B.ByteString -> Header
110+
parseHeader bs =
111+
let w1 = B.index bs 1
112+
w2 = B.index bs 2
113+
w3 = B.index bs 3
114+
w4 = B.index bs 4
115+
w = fromIntegral w1 * 256 * 256 * 256 +
116+
fromIntegral w2 * 256 * 256 +
117+
fromIntegral w3 * 256 +
118+
fromIntegral w4
119+
in Header (B.index bs 0) (w - 4)
120+
121+
122+
-- MT_COMMAND_COMPLETE 'C'
123+
-- MT_EMPTY_QUERY_RESPONSE 'I'
124+
-- MT_ERROR_RESPONSE 'E'
125+
-- MT_READY_FOR_QUERY 'Z'
126+
127+
26128
decodeAuthResponse :: Decode AuthResponse
27129
decodeAuthResponse = do
28130
c <- getWord8
@@ -252,6 +354,6 @@ parseNoticeDesc s = do
252354
"is not presented in NoticeResponse message")
253355
Right . HM.lookup c
254356

255-
eitherToDecode :: Either B.ByteString a -> Decode a
357+
eitherToDecode :: Monad m => Either B.ByteString a -> m a
256358
eitherToDecode = either (fail . BS.unpack) pure
257359

0 commit comments

Comments
 (0)
0