5
5
{-# language ExistentialQuantification #-}
6
6
{-# language TypeSynonymInstances #-}
7
7
{-# language FlexibleInstances #-}
8
- module Database.PostgreSQL.Protocol. Connection where
8
+ module Database.PostgreSQL.Connection where
9
9
10
10
11
11
import qualified Data.ByteString as B
@@ -30,23 +30,38 @@ import System.Socket.Family.Unix
30
30
import Data.Time.Clock.POSIX
31
31
import Control.Concurrent.Chan.Unagi
32
32
33
- import Database.PostgreSQL.Protocol.Settings
34
33
import Database.PostgreSQL.Protocol.Encoders
35
34
import Database.PostgreSQL.Protocol.Decoders
36
35
import Database.PostgreSQL.Protocol.Types
37
- import Database.PostgreSQL.Protocol.StatementStorage
36
+ import Database.PostgreSQL.Settings
37
+ import Database.PostgreSQL.StatementStorage
38
+ import Database.PostgreSQL.Types
38
39
39
40
40
41
type UnixSocket = Socket Unix Stream Unix
41
42
-- data Connecti
8000
on = Connection (Socket Inet6 Stream TCP)
42
43
data Connection = Connection
43
44
{ connSocket :: UnixSocket
44
45
, connReceiverThread :: ThreadId
45
- , connOutChan :: OutChan ServerMessage
46
+ -- Chan for only data messages
47
+ , connDataOutChan :: OutChan (Either Error DataMessage )
48
+ -- Chan for all messages that filter
49
+ , connAllOutChan :: OutChan ServerMessage
46
50
, connStatementStorage :: StatementStorage
47
51
, connParameters :: ConnectionParameters
48
52
}
49
53
54
+ newtype ServerMessageFilter = ServerMessageFilter (ServerMessage -> Bool )
55
+
56
+ type NotificationHandler = Notification -> IO ()
57
+
58
+ -- All possible errors
59
+ data Error
60
+ = PostgresError ErrorDesc
61
+ | ImpossibleError
62
+
63
+ data DataMessage = DataMessage B. ByteString
64
+
50
65
address :: SocketAddress Unix
51
66
address = fromJust $ socketAddressUnixPath " /var/run/postgresql/.s.PGSQL.5432"
52
67
@@ -85,15 +100,11 @@ consStartupMessage stg = StartupMessage
85
100
sendStartMessage :: UnixSocket -> StartMessage -> IO ()
86
101
sendStartMessage sock msg = void $ do
87
102
let smsg = toStrict . toLazyByteString $ encodeStartMessage msg
88
- -- putStrLn "sending message:"
89
- -- print smsg
90
103
send sock smsg mempty
91
104
92
105
sendMessage :: UnixSocket -> ClientMessage -> IO ()
93
106
sendMessage sock msg = void $ do
94
107
let smsg = toStrict . toLazyByteString $ encodeClientMessage msg
95
- -- putStrLn "sending message:"
96
- -- print smsg
97
108
send sock smsg mempty
98
109
99
110
readAuthMessage :: B. ByteString -> IO ()
@@ -107,33 +118,54 @@ readAuthMessage s =
107
118
receiverThread :: UnixSocket -> InChan ServerMessage -> IO ()
108
119
receiverThread sock chan = forever $ do
109
120
r <- receive sock 4096 mempty
121
+ print r
110
122
go r
111
123
where
112
124
decoder = runGetIncremental decodeServerMessage
113
125
go str = case pushChunk decoder str of
114
126
BG. Done rest _ v -> do
115
- print v
116
- writeChan chan v
127
+ putStrLn $ " Received: " ++ show v
117
128
unless (B. null rest) $ go rest
118
129
BG. Partial _ -> error " Partial"
119
130
BG. Fail _ _ e -> error e
131
+ dispatch :: ServerMessage -> IO ()
132
+ -- dont receiving at this phase
133
+ dispatch (BackendKeyData _ _) = pure ()
134
+ dispatch (BindComplete ) = pure ()
135
+ dispatch CloseComplete = pure ()
136
+ -- maybe return command result too
137
+ dispatch (CommandComplete _) = pure ()
138
+ dispatch r@ (DataRow _) = writeChan chan r
139
+ -- TODO throw error here
140
+ dispatch EmptyQueryResponse = pure ()
141
+ -- TODO throw error here
142
+ dispatch (ErrorResponse desc) = pure ()
143
+ -- TODO
144
+ dispatch NoData = pure ()
145
+ dispatch (NoticeResponse _) = pure ()
146
+ -- TODO handle notifications
147
+ dispatch (NotificationResponse n) = pure ()
148
+ -- Ignore here ?
149
+ dispatch (ParameterDescription _) = pure ()
150
+ dispatch (ParameterStatus _ _) = pure ()
151
+ dispatch (ParseComplete ) = pure ()
152
+ dispatch (PortalSuspended ) = pure ()
153
+ dispatch (ReadForQuery _) = pure ()
154
+ dispatch (RowDescription _) = pure ()
120
155
121
156
data Query = Query
122
157
{ qStatement :: B. ByteString
123
158
, qOids :: V. Vector Oid
124
159
, qValues :: V. Vector B. ByteString
160
+ , qParamsFormat :: Format
161
+ , qResultFormat :: Format
125
162
} deriving (Show )
126
163
127
- query1 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 1" , " 3" ]
128
- query2 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 2" , " 3" ]
129
- query3 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 3" , " 3" ]
130
- query4 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 4" , " 3" ]
131
- query5 = Query " SELECT * FROM a where v > $1 + $2 LIMIT 100" [Oid 23 , Oid 23 ] [" 5" , " 3" ]
132
- -- query1 = QQuery "test1" "select sum(v) from a" [] []
133
- -- query2 = QQuery "test2" "select sum(v) from a" [] []
134
- -- query3 = QQuery "test3" "select sum(v) from a" [] []
135
- -- query4 = QQuery "test4" "select sum(v) from a" [] []
136
- -- query5 = QQuery "test5" "select sum(v) from a" [] []
164
+ query1 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 1" , " 3" ] Text Text
165
+ query2 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 2" , " 3" ] Text Text
166
+ query3 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 3" , " 3" ] Text Text
167
+ query4 = Query " SELECT $1 + $2" [Oid 23 , Oid 23 ] [" 4" , " 3" ] Text Text
168
+ -- query5 = Query "SELECT * FROM a whereee v > $1 + $2 LIMIT 100" [Oid 23, Oid 23] ["5", "3"]
137
169
138
170
sendBatch :: Connection -> [Query ] -> IO ()
139
171
sendBatch conn qs = do
@@ -145,7 +177,8 @@ sendBatch conn qs = do
145
177
let sname = StatementName " "
146
178
pname = PortalName " "
147
179
sendMessage s $ Parse sname (StatementSQL $ qStatement q) (qOids q)
148
- sendMessage s $ Bind pname sname Text (qValues q) Text
180
+ sendMessage s $
181
+ Bind pname sname (qParamsFormat q) (qValues q) (qResultFormat q)
149
182
sendMessage s $ Execute pname noLimitToReceive
150
183
151
184
@@ -170,6 +203,9 @@ test = do
170
203
-- readNextData :: Connection -> IO Data?
171
204
-- readNextData = undefined
172
205
--
206
+ -- readNextServerMessage ?
207
+ --
208
+ --
173
209
-- Simple Queries support or maybe dont support it
174
210
-- because single text query may be send through extended protocol
175
211
-- may be support for all standalone queries
0 commit comments