1
F438
5
15
package com .github .pgasync .impl ;
16
16
17
17
import com .github .pgasync .Connection ;
18
- import com .github .pgasync .ResultSet ;
19
18
import com .github .pgasync .Row ;
20
19
import com .github .pgasync .Transaction ;
21
20
import com .github .pgasync .impl .conversion .DataConverter ;
22
21
import com .github .pgasync .impl .message .*;
23
22
import rx .Observable ;
24
23
import rx .functions .Func1 ;
25
24
26
- import java .util .*;
27
25
import java .util .AbstractMap .SimpleImmutableEntry ;
26
+ import java .util .ArrayList ;
27
+ import java .util .HashMap ;
28
+ import java .util .List ;
29
+ import java .util .Map ;
28
30
import java .util .Map .Entry ;
29
31
import java .util .function .Consumer ;
30
32
31
- import static com .github .pgasync .impl .Functions .applyConsumer ;
32
- import static com .github .pgasync .impl .Functions .applyRunnable ;
33
33
import static com .github .pgasync .impl .message .RowDescription .ColumnDescription ;
34
34
35
35
/**
@@ -57,46 +57,42 @@ Observable<Connection> connect(String username, String password, String database
57
57
58
58
Observable <? extends Message > authenticate (String username , String password , Message message ) {
59
59
return message instanceof Authentication
60
- ? stream .send (new PasswordMessage (username , password , ((Authentication ) message ).getMd5Salt ()))
61
- : Observable .just (message );
60
+ ? stream .send (new PasswordMessage (username , password , ((Authentication ) message ).getMd5Salt ()))
61
+ : Observable .just (message );
62
62
}
63
63
64
64
boolean isConnected () {
65
65
return stream .isConnected ();
66
66
}
67
67
68
68
@ Override
69
- public void query (String sql , Consumer <ResultSet > onQuery , Consumer <Throwable > onError ) {
70
- stream .send (new Query (sql ))
71
- .reduce (new QueryResponse (), this ::toResponse )
72
- .map (this ::toResultSet )
73
- .subscribe (onQuery ::accept , onError ::accept );
69
+ public Observable <Row > query (String sql ) {
70
+ return stream .send (new Query (sql ))
71
+ .flatMap (toDataRowFn ())
72
+ .map (row -> new PgRow (row .getKey (), row .getValue (), dataConverter ));
74
73
}
75
74
76
75
@ Override
77
- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
78
- public void query (String sql , List params , Consumer <ResultSet > onQuery , Consumer <Throwable > onError ) {
79
- if (params == null || params .isEmpty ()) {
80
- query (sql , onQuery , onError );
81
- return ;
82
- }
83
- stream .send (new Parse (sql ), new Bind (dataConverter .fromParameters (params )),
84
- ExtendedQuery .DESCRIBE , ExtendedQuery .EXECUTE , ExtendedQuery .CLOSE , ExtendedQuery .SYNC )
85
- .reduce (new QueryResponse (), this ::toResponse )
86
- .map (this ::toResultSet )
87
- .subscribe (onQuery ::accept , onError ::accept );
76
+ public Observable <Row > query (String sql , Object ... params ) {
77
+ return stream .send ( new Parse (sql ),
78
+ new Bind (dataConverter .fromParameters (params )),
79
+ ExtendedQuery .DESCRIBE ,
80
+ ExtendedQuery .EXECUTE ,
81
+ ExtendedQuery .CLOSE ,
82
+ ExtendedQuery .SYNC )
83
+ .flatMap (toDataRowFn ())
84
+ .map (row -> new PgRow (row .getKey (), row .getValue (), dataConverter ));
88
85
}
89
86
90
87
@ Override
91
- public void begin ( Consumer <Transaction > handler , Consumer < Throwable > onError ) {
92
- query ("BEGIN" , beginRs -> applyConsumer ( handler , new ConnectionTx (), onError ), onError );
88
+ public Observable <Transaction > begin ( ) {
89
+ return query ("BEGIN" ). map ( row -> new ConnectionTx ());
93
90
}
94
91
95
92
@ Override
96
93
public void listen (String channel , Consumer <String > onNotification , Consumer <String > onListenStarted , Consumer <Throwable > onError ) {
97
94
stream .send (new Query ("LISTEN " + channel ))
98
- .subscribe (msg -> {
99
- }, onError ::accept ,
95
+ .subscribe (msg -> {}, onError ::accept ,
100
96
() -> onListenStarted .accept (stream .registerNotificationHandler (channel , onNotification )));
101
97
}
102
98
@@ -115,23 +111,16 @@ public void close() {
115
111
stream .close ();
116
112
}
117
113
118
- public Observable <Row > query (String sql ) {
119
- return stream .send (new Query (sql ))
120
- .map (mapRowFn ())
121
- .filter (msg -> msg != null )
122
- .map (row -> new PgRow (row .getKey (), row .getValue (), dataConverter ));
123
- }
124
-
125
- Func1 <? super Message , Entry <DataRow ,Map <String ,PgColumn >>> mapRowFn () {
114
+ Func1 <? super Message , Observable <Entry <DataRow ,Map <String ,PgColumn >>>> toDataRowFn () {
126
115
Map <String , PgColumn > columns = new HashMap <>();
127
- return msg -> {
128
- if (msg instanceof RowDescription ) {
129
- columns .putAll (getColumns (((RowDescription ) msg ).getColumns ()));
130
- return null ;
116
+ return message -> {
117
+ if (message instanceof DataRow ) {
118
+ return Observable .just (new SimpleImmutableEntry <>((DataRow ) message , columns ));
119
+ }
120
+ if (message instanceof RowDescription ) {
121
+ columns .putAll (getColumns (((RowDescription ) message ).getColumns ()));
131
122
}
132
- return msg instanceof DataRow
133
- ? new SimpleImmutableEntry <>((DataRow ) msg , columns )
134
- : null ;
123
+ return Observable .empty ();
135
124
};
136
125
}
137
126
@@ -160,20 +149,20 @@ static Map<String,PgColumn> getColumns(ColumnDescription[] descriptions) {
160
149
161
150
class ConnectionTx implements Transaction {
162
151
@ Override
163
- public void commit ( Runnable onCompleted , Consumer < Throwable > onCommitError ) {
164
- PgConnection .this .query ("COMMIT" , ( rs ) -> applyRunnable ( onCompleted , onCommitError ), onCommitError );
152
+ public Observable < Void > commit ( ) {
153
+ return PgConnection .this .query ("COMMIT" ). map ( row -> null );
165
154
}
166
155
@ Override
167
- public void rollback ( Runnable onCompleted , Consumer < Throwable > onRollbackError ) {
168
- PgConnection .this .query ("ROLLBACK" , ( rs ) -> applyRunnable ( onCompleted , onRollbackError ), onRollbackError );
156
+ public Observable < Void > rollback ( ) {
157
+ return PgConnection .this .query ("ROLLBACK" ). map ( row -> null );
169
158
}
170
159
@ Override
171
- public void query (String sql , Consumer < ResultSet > onResult , Consumer < Throwable > onError ) {
172
- PgConnection .this .query (sql , onResult , onError );
160
+ public Observable < Row > query (String sql ) {
161
+ return PgConnection .this .query (sql );
173
162
}
174
163
@ Override
175
- public void query (String sql , List params , Consumer < ResultSet > onResult , Consumer < Throwable > onError ) {
176
- PgConnection .this .query (sql , params , onResult , onError );
164
+ public Observable < Row > query (String sql , Object ... params ) {
165
+ return PgConnection .this .query (sql , params );
177
166
}
178
167
}
179
168
0 commit comments