@@ -30,60 +30,80 @@ import Database.PostgreSQL.Protocol.Types
30
30
import Database.PostgreSQL.Protocol.Store.Decode
31
31
import Database.PostgreSQL.Protocol.Utils
32
32
33
- -- Extracts DataRows
34
- --
33
+ -- Optimized loop for extracting chunks of DataRows.
34
+ -- Ignores all messages from database that do not relate to data.
35
+ -- Does not throw exceptions.
35
36
loopExtractDataRows
36
- :: (B. ByteString -> IO B. ByteString ) -- read more action
37
- -> (DataMessage -> IO () ) -- callback on every DataMessage
37
+ -- Action that returs more data with `ByteString` prepended.
38
+ :: (B. ByteString -> IO B. ByteString )
39
+ -- Will be called on every DataMessage.
40
+ -> (DataMessage -> IO () )
38
41
-> IO ()
39
42
loopExtractDataRows readMoreAction callback = go " " " "
40
43
where
41
44
go :: B. ByteString -> BL. ByteString -> IO ()
42
45
go bs acc
46
+ -- 5 - header size, defined by PostgreSQL
43
47
| B. length bs < 5 = readMoreAndGo bs acc
44
48
| otherwise = do
45
49
ScanRowResult ch rest r <- scanDataRows bs
46
50
-- We should force accumulator
51
+ -- `BL.chunk` does not prepend empty bytestring as chunk.
47
52
let ! newAcc = BL. chunk ch acc
48
53
49
54
case r of
55
+ -- Following happened:
56
+ -- not enough bytes to read header
57
+ -- or header is for `DataRow`, not enough bytes to read body
50
58
1 -> readMoreAndGo rest newAcc
59
+ -- Header was read, it is not for `DataRow`. We can safely
60
+ -- call `parseHeader`, because scanDataRows already checked
61
+ -- that there are enough bytes to read header.
51
62
2 -> do
52
63
Header mt len <- parseHeader rest
53
64
dispatchHeader mt len (B. drop 5 rest) newAcc
54
65
55
66
{-# INLINE dispatchHeader #-}
56
67
dispatchHeader :: Word8 -> Int -> B. ByteString -> BL. ByteString -> IO ()
57
68
dispatchHeader mt len bs acc = case mt of
58
- -- 'C'
69
+ -- 'C' - CommandComplete.
70
+ -- Command is completed, return the result.
59
71
67 -> do
60
72
callback $
61
73
DataMessage . DataRows $
62
74
BL. foldlChunks (flip BL. chunk) BL. empty acc
63
75
64
76
newBs <- skipBytes bs len
65
77
go newBs BL. empty
66
- -- 'I'
78
+
79
+ -- 'I' - EmptyQueryResponse.
80
+ -- PostgreSQL sends this if query string was empty and datarows
81
+ -- should be empty, but anyway we return data collected in `acc`.
67
82
73 -> do
68
83
callback $
69
84
DataMessage . DataRows $
70
85
BL. foldlChunks (flip BL. chunk) BL. empty acc
71
86
72
87
go bs BL. empty
73
- -- 'E"
88
+
89
+ -- 'E' - ErrorResponse.
90
+ -- On ErrorResponse we should discard all the collected datarows.
74
91
69 -> do
75
92
(b, newBs) <- readAtLeast bs len
76
93
desc <- eitherToDecode $ parseErrorDesc b
77
94
callback (DataError desc)
78
95
79
96
go newBs BL. empty
80
- -- 'Z'
97
+
98
+ -- 'Z' - ReadyForQuery.
99
+ -- To know when command processing is finished
81
100
90 -> do
82
101
callback DataReady
83
102
84
103
newBs <- skipBytes bs len
85
104
go newBs acc
86
105
106
+ -- Skip any other message.
87
107
_ -> do
88
108
newBs <- skipBytes bs len
89
109
go newBs acc
@@ -94,6 +114,7 @@ loopExtractDataRows readMoreAction callback = go "" ""
94
114
newBs <- readMoreAction bs
95
115
go newBs acc
96
116
117
+ -- | Returns a bytestring that contain exactly @len@ bytes and the rest.
97
118
{-# INLINE readAtLeast #-}
98
119
readAtLeast :: B. ByteString -> Int -> IO (B. ByteString , B. ByteString )
99
120
readAtLeast bs len
@@ -102,6 +123,7 @@ loopExtractDataRows readMoreAction callback = go "" ""
102
123
newBs <- readMoreAction bs
103
124
readAtLeast newBs len
104
125
126
+ -- | Skips exactly @toSkip@ bytes.
105
127
{-# INLINE skipBytes #-}
106
128
skipBytes :: B. ByteString -> Int -> IO B. ByteString
107
129
skipBytes bs toSkip
@@ -113,7 +135,7 @@ loopExtractDataRows readMoreAction callback = go "" ""
113
135
114
136
{-# INLINE parseHeader #-}
115
137
parseHeader :: B. ByteString -> IO Header
116
- parseHeader ! bs =
138
+ parseHeader bs =
117
139
B. unsafeUseAsCStringLen bs $ \ (ptr, len) -> do
118
140
b <- peek (castPtr ptr)
119
141
w <- byteSwap32 <$> peekByteOff (castPtr ptr) 1
0 commit comments