8000 Scan datarows in C · postgres-haskell/postgres-wire@1f47567 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1f47567

Browse files
Scan datarows in C
1 parent b5fe575 commit 1f47567

File tree

7 files changed

+168
-34
lines changed

7 files changed

+168
-34
lines changed

bench/Bench.hs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
{-# language BangPatterns #-}
12
module Main where
23

34
import Data.ByteString.Lazy (toStrict)
5+
import qualified Data.ByteString.Lazy as BL
6+
import qualified Data.ByteString as B
47
import Data.ByteString.Builder (toLazyByteString)
58
import Data.ByteString (ByteString)
69
import Data.Vector as V(fromList, empty)
@@ -12,10 +15,35 @@ import Data.Monoid
1215

1316
import Database.PostgreSQL.Protocol.Types
1417
import Database.PostgreSQL.Protocol.Encoders
18+
import Database.PostgreSQL.Protocol.Decoders
1519
import Database.PostgreSQL.Driver
1620
import Criterion.Main
1721

18-
main = benchMultiPw
22+
main = benchLoop
23+
24+
benchLoop :: IO ()
25+
benchLoop = do
26+
ref <- newIORef 0 :: IO (IORef Word)
27+
rbs <- newIORef "" :: IO (IORef BL.ByteString)
28+
!bs <- B.readFile "1.txt"
29+
let str = BL.cycle $ BL.fromStrict bs
30+
writeIORef rbs str
31+
32+
let handler dm = case dm of
33+
DataMessage _ -> modifyIORef' ref (+1)
34+
_ -> pure ()
35+
newChunk preBs = do
36+
b <- readIORef rbs
37+
let (nb, rest) = BL.splitAt 4096 b
38+
writeIORef rbs rest
39+
-- let res = preBs <> (B.copy $ BL.toStrict nb)
40+
let res = preBs <> ( BL.toStrict nb)
41+
res `seq` pure res
42+
tid <- forkIO $ forever $ loopExtractDataRows newChunk handler
43+
threadDelay 1000000
44+
killThread tid
45+
s <- readIORef ref
46+
print $ "Requests: " ++ show s
1947

2048
benchRequests :: IO c -> (c -> IO a) -> IO ()
2149
benchRequests connectAction queryAction = do

cbits/include/pw_utils.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include <stdint.h>
2+
#include <stdlib.h>
3+
#include <arpa/inet.h>
4+
5+
#define DATAROW_HEADER 'D'
6+
#define HEADER_SIZE 5
7+
#define HEADER_TYPE_SIZE 1
8+
9+
#define NEED_MORE_INPUT 0x01
10+
#define OTHER_HEADER 0x02
11+
12+
size_t scan_datarows(char *buffer, size_t len, int *reason);

cbits/src/pw_utils.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include <pw_utils.h>
2+
3+
/* pointer to the next data message
4+
length of buffer
5+
ptr where result reasond would be put
6+
returns offset of buffer when datarows end.
7+
*/
8+
size_t scan_datarows(char *buffer, size_t len, int *reason)
9+
{
10+
size_t offset = 0;
11+
uint32_t message_len = 0;
12+
13+
while (1)
14+
{
15+
if (len - offset < HEADER_SIZE) {
16+
*reason = NEED_MORE_INPUT;
17+
break;
18+
}
19+
if (*(buffer + offset)!= DATAROW_HEADER) {
20+
*reason = OTHER_HEADER;
21+
break;
22+
}
23+
message_len = *(uint32_t*)(buffer + offset + HEADER_TYPE_SIZE);
24+
message_len = ntohl(message_len);
25+
if (len - offset - HEADER_TYPE_SIZE < (size_t)message_len) {
26+
*reason = NEED_MORE_INPUT;
27+
break;
28+
}
29+
offset = offset + HEADER_TYPE_SIZE + (size_t)message_len;
30+
}
31+
return offset;
32+
}

cbits/src/pw_utils.s

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
.file "pw_utils.c"
2+
.text
3+
.p2align 4,,15
4+
.globl scan_datarows
5+
.type scan_datarows, @function
6+
scan_datarows:
7+
.LFB18:
8+
.cfi_startproc
9+
cmpq $4, %rsi
10+
jbe .L9
11+
xorl %eax, %eax
12+
cmpb $68, (%rdi)
13+
movq %rsi, %r8
14+
je .L6
15+
jmp .L16
16+
.p2align 4,,10
17+
.p2align 3
18+
.L7:
19+
leaq 1(%rcx,%rax), %rax
20+
movq %rsi, %r8
21+
subq %rax, %r8
22+
cmpq $4, %r8
23+
jbe .L13
24+
cmpb $68, (%rdi,%rax)
25+
jne .L3
26+
.L6:
27+
movl 1(%rdi,%rax), %ecx
28+
subq $1, %r8
29+
bswap %ecx
30+
movl %ecx, %ecx
31+
cmpq %rcx, %r8
32+
jnb .L7
33+
.L13:
34+
movl $1, (%rdx)
35+
ret
36+
.L16:
37+
xorl %eax, %eax
38+
.p2align 4,,10
39+
.p2align 3
40+
.L3:
41+
movl $2, (%rdx)
42+
ret
43+
.L9:
44+
xorl %eax, %eax
45+
jmp .L13
46+
.cfi_endproc
47+
.LFE18:
48+
.size scan_datarows, .-scan_datarows
49+
.ident "GCC: (Ubuntu 4.8.4-2ubuntu1~14.04.3) 4.8.4"
50+
.section .note.GNU-stack,"",@progbits

postgres-wire.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ cabal-version: >=1.10
1515

1616
library
1717
hs-source-dirs: src
18+
include-dirs: cbits/include
19+
c-sources: cbits/src/pw_utils.c
1820
exposed-modules: Database.PostgreSQL.Driver
1921
, Database.PostgreSQL.Driver.Connection
2022
, Database.PostgreSQL.Driver.RawConnection
@@ -50,6 +52,7 @@ library
5052
BangPatterns
5153
OverloadedStrings
5254
GeneralizedNewtypeDeriving
55+
cpp-options: -O2
5356

5457
test-suite postgres-wire-test-connection
5558
type: exitcode-stdio-1.0

src/Database/PostgreSQL/Protocol/Decoders.hs

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# language RecordWildCards #-}
2+
{-# language ForeignFunctionInterface #-}
23

34
module Database.PostgreSQL.Protocol.Decoders
45
( decodeAuthResponse
@@ -19,14 +20,21 @@ import Data.Word
1920
import Text.Read (readMaybe)
2021
import qualified Data.Vector as V
2122
import qualified Data.ByteString as B
23+
import qualified Data.ByteString.Unsafe as B
2224
import qualified Data.ByteString.Lazy as BL
2325
import qualified Data.ByteString.Lazy.Internal as BL
2426
import Data.ByteString.Char8 as BS(readInteger, readInt, unpack, pack)
2527
import qualified Data.HashMap.Strict as HM
2628

29+
import Foreign.C.Types (CInt, CSize(..), CChar)
30+
import Foreign
31+
2732
import Database.PostgreSQL.Protocol.Types
2833
import Database.PostgreSQL.Protocol.Store.Decode
2934

35+
foreign import ccall unsafe "static pw_utils.h scan_datarows" c_scan_datarows
36+
:: Ptr CChar -> CSize -> Ptr CInt -> IO CSize
37+
3038
-- Extracts DataRows
3139
--
3240
data ExtractorResult = NeedMore | OtherHeader
@@ -37,81 +45,82 @@ loopExtractDataRows
3745
-> IO ()
3846
loopExtractDataRows readMoreAction callback = go "" ""
3947
where
40-
{-# NOINLINE go #-}
4148
go :: B.ByteString -> BL.ByteString -> IO ()
4249
-- header size
4350
go !bs !acc
4451
| B.length bs < 5 = readMoreAndGo bs acc
4552
| otherwise = do
4653
-- print "Main branch"
47-
let (offset, r) = scanDataRows 0 bs
54+
(offset, r) <- alloca $ \reasonPtr ->
55+
B.unsafeUseAsCStringLen bs $ \(ptr, len) -> do
56+
offset <- fromIntegral <$>
57+
c_scan_datarows ptr (fromIntegral len) reasonPtr
58+
r <- peek reasonPtr
59+
pure (offset, r)
4860
let (ch, nbs) = B.splitAt offset bs
4961
let (!newAcc, !newBs) = if B.null ch
5062
then (acc, bs)
5163
else (BL.chunk ch acc, nbs)
5264
case r of
53-
NeedMore -> readMoreAndGo newBs newAcc
54-
OtherHeader -> do
65+
1 -> readMoreAndGo newBs newAcc
66+
2 -> do
5567
let (Header mt len) = parseHeader newBs
5668
goOther mt len (B.drop 5 newBs) newAcc
5769

5870
{-# INLINE goOther #-}
5971
goOther :: Word8 -> Int -> B.ByteString -> BL.ByteString -> IO ()
6072
goOther !mt !len !bs !acc = case chr (fromIntegral mt) of
6173
'C' -> do
62-
let !msg = DataMessage . DataRows $ BL.foldlChunks (flip BL.chunk) "" acc
63-
callback msg
74+
callback $
75+
DataMessage . DataRows $
76+
BL.foldlChunks (flip BL.chunk) BL.empty acc
6477

65-
newBs <- skipBytes readMoreAction bs len
78+
newBs <- skipBytes bs len
6679
go newBs BL.empty
6780
'I' -> do
68-
let !msg = DataMessage . DataRows $ BL.foldlChunks (flip BL.chunk) "" acc
69-
callback msg
81+
callback $
82+
DataMessage . DataRows $
83+
BL.foldlChunks (flip BL.chunk) BL.empty acc
7084

7185
go bs BL.empty
7286
'E' -> do
7387
(b, newBs) <- readAtLeast bs len
7488
desc <- eitherToDecode $ parseErrorDesc b
75-
-- callback (DataError desc)
89+
callback (DataError desc)
90+
7691
go newBs BL.empty
7792
'Z' -> do
7893
callback DataReady
7994

80-
newBs <- skipBytes readMoreAction bs len
95+
newBs <- skipBytes bs len
8196
go newBs acc
8297
c -> do
83-
newBs <- skipBytes readMoreAction bs len
98+
newBs <- skipBytes bs len
8499
go newBs acc
85100

101+
{-# INLINE readMoreAndGo #-}
102+
readMoreAndGo :: B.ByteString -> BL.ByteString -> IO ()
86103
readMoreAndGo !bs !acc = do
87104
-- print "Read more and go"
88105
newBs <- readMoreAction bs
89106
go newBs acc
90107

108+
{-# INLINE readAtLeast #-}
91109
readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString)
92-
readAtLeast !bs !len | B.length bs >= len = pure $ B.splitAt len bs
93-
| otherwise = do
110+
readAtLeast !bs !len
111+
| B.length bs >= len = pure $ B.splitAt len bs
112+
| otherwise = do
94113
newBs <- readMoreAction bs
95114
readAtLeast newBs len
96115

97-
{-# NOINLINE skipBytes #-}
98-
-- skipBytes :: IO B.ByteString -> B.ByteString -> Int -> IO B.ByteString
99-
skipBytes readMoreAction !bs !toSkip | toSkip <= 0 = pure bs
100-
| B.length bs < toSkip = do
101-
newBs <- readMoreAction ""
102-
skipBytes readMoreAction newBs (toSkip - B.length bs)
103-
| otherwise = pure $ B.drop toSkip bs
104-
105-
scanDataRows :: Int -> B.ByteString -> (Int, ExtractorResult)
106-
scanDataRows !acc !bs | B.length bs < 5 = (acc, NeedMore)
107-
| otherwise =
108-
let (Header mt len) = parseHeader bs
109-
in if chr (fromIntegral mt) == 'D'
110-
then if B.length bs < len + 5
111-
then (acc, NeedMore)
112-
else scanDataRows (acc + len + 5)
113-
$ B.drop (len + 5) bs
114-
else (acc, OtherHeader)
116+
{-# INLINE skipBytes #-}
117+
skipBytes :: B.ByteString -> Int -> IO B.ByteString
118+
skipBytes !bs !toSkip | toSkip <= 0 = pure bs
119+
| B.length bs < toSkip = do
120+
newBs <- readMoreAction ""
121+
skipBytes newBs (toSkip - B.length bs)
122+
| otherwise = pure $ B.drop toSkip bs
123+
115124

116125
{-# INLINE parseHeader #-}
117126
parseHeader :: B.ByteString -> Header

src/Database/PostgreSQL/Protocol/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ newtype DataRows = DataRows BL.ByteString
5353
-- | Ad-hoc type only for data rows.
5454
data DataMessage
5555
= DataError ErrorDesc
56-
| DataMessage DataRows
56+
| DataMessage !DataRows
5757
-- ReadyForQuery received.
5858
| DataReady
5959
deriving (Show)

0 commit comments

Comments
 (0)
0