@@ -7,22 +7,124 @@ module Database.PostgreSQL.Protocol.Decoders
7
7
-- * Helpers
8
8
, parseServerVersion
9
9
, parseIntegerDatetimes
10
+ , loopExtractDataRows
10
11
) where
11
12
12
13
import Control.Applicative
13
14
import Control.Monad
14
15
import Data.Monoid ((<>) )
15
16
import Data.Maybe (fromMaybe )
16
17
import Data.Char (chr )
18
+ import Data.Word
17
19
import Text.Read (readMaybe )
18
20
import qualified Data.Vector as V
19
21
import qualified Data.ByteString as B
22
+ import qualified Data.ByteString.Lazy as BL
20
23
import Data.ByteString.Char8 as BS (readInteger , readInt , unpack , pack )
21
24
import qualified Data.HashMap.Strict as HM
22
25
23
26
import Database.PostgreSQL.Protocol.Types
24
27
import Database.PostgreSQL.Protocol.Store.Decode
25
28
29
+ -- Extracts DataRows
30
+ --
31
+ data ExtractorResult = NeedMore | OtherHeader
32
+
33
+ loopExtractDataRows
34
+ :: IO B. ByteString -- read more action
35
+ -> (DataMessage -> IO () ) -- callback on every DataMessage
36
+ -> IO ()
37
+ loopExtractDataRows readMoreAction callback = go " " []
38
+ where
39
+ go :: B. ByteString -> [B. ByteString ] -> IO ()
40
+ -- header size
41
+ go bs acc
42
+ | B. length bs < 5 = readMoreAndGo bs acc
43
+ | otherwise = do
44
+ -- print "Main branch"
45
+ let (offset, r) = scanDataRows 0 bs
46
+ let (ch, nbs) = B. splitAt offset bs
47
+ let (newAcc, newBs) = if B. null ch
48
+ then (acc, bs)
49
+ else (ch: acc, nbs)
50
+ case r of
51
+ NeedMore -> readMoreAndGo newBs newAcc
52
+ OtherHeader -> do
53
+ let (Header mt len) = parseHeader newBs
54
+ goOther mt len (B. drop 5 newBs) newAcc
55
+
56
+ goOther :: Word8 -> Int -> B. ByteString -> [B. ByteString ] -> IO ()
57
+ goOther mt len bs acc = case chr (fromIntegral mt) of
58
+ ' C' -> do
59
+ newBs <- skipBytes bs len
60
+ callback $ DataMessage . DataRows $ BL. fromChunks $ reverse acc
61
+ go newBs []
62
+ ' I' -> do
63
+ callback $ DataMessage . DataRows $ BL. fromChunks $ reverse acc
64
+ go bs []
65
+ ' E' -> do
66
+ (b, newBs) <- readAtLeast bs len
67
+ desc <- eitherToDecode $ parseErrorDesc b
68
+ callback (DataError desc)
69
+ go newBs []
70
+ ' Z' -> do
71
+ newBs <- skipBytes bs len
72
+ callback DataReady
73
+ go newBs acc
74
+ c -> do
75
+ -- print $ "Unexpected: " ++ show c
76
+ newBs <- skipBytes bs len
77
+ go newBs acc
78
+
79
+ readMoreAndGo bs acc = do
80
+ -- print "Read more and go"
81
+ newBs <- readMoreAction
82
+ go (bs <> newBs) acc
83
+
84
+ readAtLeast :: B. ByteString -> Int -> IO (B. ByteString , B. ByteString )
85
+ readAtLeast bs len | B. length bs >= len = pure $ B. splitAt len bs
86
+ | otherwise = do
87
+ newBs <- readMoreAction
88
+ readAtLeast (bs <> newBs) len
89
+
90
+ skipBytes :: B. ByteString -> Int -> IO B. ByteString
91
+ skipBytes bs toSkip | toSkip <= 0 = pure bs
92
+ | B. length bs < toSkip = do
93
+ print $ " to skip " ++ show toSkip
94
+ newBs <- readMoreAction
95
+ skipBytes newBs (toSkip - B. length bs)
96
+ | otherwise = pure $ B. drop toSkip bs
97
+
98
+ scanDataRows :: Int -> B. ByteString -> (Int , ExtractorResult )
99
+ scanDataRows ! acc ! bs | B. length bs < 5 = (acc, NeedMore )
100
+ | otherwise =
101
+ let (Header mt len) = parseHeader bs
102
+ in if chr (fromIntegral mt) == ' D'
103
+ then if B. length bs < len + 5
104
+ then (acc, NeedMore )
105
+ else scanDataRows (acc + len + 5 )
106
+ $ B. drop (len + 5 ) bs
107
+ else (acc, OtherHeader )
108
+
109
+ parseHeader :: B. ByteString -> Header
110
+ parseHeader bs =
111
+ let w1 = B. index bs 1
112
+ w2 = B. index bs 2
113
+ w3 = B. index bs 3
114
+ w4 = B. index bs 4
115
+ w = fromIntegral w1 * 256 * 256 * 256 +
116
+ fromIntegral w2 * 256 * 256 +
117
+ fromIntegral w3 * 256 +
118
+ fromIntegral w4
119
+ in Header (B. index bs 0 ) (w - 4 )
120
+
121
+
122
+ -- MT_COMMAND_COMPLETE 'C'
123
+ -- MT_EMPTY_QUERY_RESPONSE 'I'
124
+ -- MT_ERROR_RESPONSE 'E'
125
+ -- MT_READY_FOR_QUERY 'Z'
126
+
127
+
26
128
decodeAuthResponse :: Decode AuthResponse
27
129
decodeAuthResponse = do
28
130
c <- getWord8
@@ -252,6 +354,6 @@ parseNoticeDesc s = do
252
354
" is not presented in NoticeResponse message" )
253
355
Right . HM. lookup c
254
356
255
- eitherToDecode :: Either B. ByteString a -> Decode a
357
+ eitherToDecode :: Monad m => Either B. ByteString a -> m a
256
358
eitherToDecode = either (fail . BS. unpack) pure
257
359
0 commit comments