10000 Drop callback-based interfaces for now · rasayana/postgres-async-driver@557c9c3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 557c9c3

Browse files
committed
Drop callback-based interfaces for now
1 parent 8f5d7ce commit 557c9c3

File tree

10 files changed

+170
-203
lines changed

10 files changed

+170
-203
lines changed

src/main/java/com/github/pgasync/ConnectionPool.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package com.github.pgasync;
1616

17-
import java.util.function.Consumer;
17+
import rx.Observable;
1818

1919
/**
2020
* Pool of backend {@link Connection}s. Pools implement {@link Db} so
@@ -29,11 +29,8 @@ public interface ConnectionPool extends Db {
2929
* Executes a {@link java.util.function.Consumer} callback when a connection is
3030
* available. Connection passed to callback must be freed with
3131
* {@link com.github.pgasync.ConnectionPool#release(Connection)}
32-
*
33-
* @param handler Called when a connection is acquired
34-
* @param onError Called on exception thrown
3532
*/
36-
void getConnection(Consumer<Connection> handler, Consumer<Throwable> onError);
33+
Observable<Connection> getConnection();
3734

3835
/**
3936
* Releases a connection back to the pool.
Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.github.pgasync;
22

3-
import java.util.List;
4-
import java.util.function.Consumer;
3+
import rx.Observable;
54

65
/**
76
* QueryExecutor submits SQL for execution.
@@ -14,21 +13,17 @@ public interface QueryExecutor {
1413
* Executes a simple query.
1514
*
1615
* @param sql SQL to execute.
17-
* @param onResult Called when query is completed
18-
* @param onError Called on exception thrown
1916
*/
20-
void query(String sql, Consumer<ResultSet> onResult, Consumer<Throwable> onError);
17+
Observable<Row> query(String sql);
2118

2219
/**
2320
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
2421
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
2522
* and byte[].
2623
*
2724
* @param sql SQL to execute
28-
* @param params List of parameters
29-
* @param onResult Called when query is completed
30-
* @param onError Called on exception thrown
25+
* @param params Parameter values
3126
*/
32-
void query(String sql, List/*<Object>*/ params, Consumer<ResultSet> onResult, Consumer<Throwable> onError);
27+
Observable<Row> query(String sql, Object... params);
3328

3429
}

src/main/java/com/github/pgasync/Transaction.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package com.github.pgasync;
1616

17-
import java.util.function.Consumer;
17+
import rx.Observable;
1818

1919
/**
2020
* A unit of work. Transactions must be committed or rolled back, otherwise a
@@ -26,19 +26,13 @@
2626
public interface Transaction extends QueryExecutor {
2727

2828
/**
29-
* Commits a transaction.
30-
*
31-
* @param onCompleted Called when commit completes
32-
* @param onError Called on exception thrown
29+
* Commits a transaction
3330
*/
34-
void commit(Runnable onCompleted, Consumer<Throwable> onError);
31+
Observable<Void> commit();
3532

3633
/**
3734
* Rollbacks a transaction.
38-
*
39-
* @param onCompleted Called when rollback completes
40-
* @param onError Called on exception thrown
4135
*/
42-
void rollback(Runnable onCompleted, Consumer<Throwable> onError);
36+
Observable<Void> rollback();
4337

4438
}
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.github.pgasync;
22

3-
import java.util.function.Consumer;
3+
import rx.Observable;
44

55
/**
66
* TransactionExecutor begins backend transactions.
@@ -11,10 +11,7 @@ public interface TransactionExecutor {
1111

1212
/**
1313
* Begins a transactio 92B6 n.
14-
*
15-
* @param onTransaction Called when transaction is successfully started.
16-
* @param onError Called on exception thrown
1714
*/
18-
void begin(Consumer<Transaction> onTransaction, Consumer<Throwable> onError);
15+
Observable<Transaction> begin();
1916

2017
}

src/main/java/com/github/pgasync/impl/ConnectionValidator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717

1818
import com.github.pgasync.Connection;
19-
20-
import java.util.function.Consumer;
19+
import rx.Observable;
2120

2221
/**
2322
* @author Antti Laisi
@@ -30,11 +29,15 @@ public ConnectionValidator(String validationQuery) {
3029
this.validationQuery = validationQuery;
3130
}
3231

33-
void validate(Connection connection, Runnable onValid, Consumer<Throwable> onError) {
32+
Observable<Connection> validate(Connection connection) {
3433
if(validationQuery == null) {
35-
onValid.run();
36-
return;
34+
return Observable.just(connection);
3735
}
38-
connection.query(validationQuery, rs -> onValid.run(), onError);
36+
return Observable.create(subscriber -> connection.query(validationQuery)
37+
.subscribe(row -> { }, subscriber::onError,
38+
() -> {
39+
subscriber.onNext(connection);
40+
subscriber.onCompleted();
41+
}));
3942
}
4043
}

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 38 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@
1 F438 515
package com.github.pgasync.impl;
1616

1717
import com.github.pgasync.Connection;
18-
import com.github.pgasync.ResultSet;
1918
import com.github.pgasync.Row;
2019
import com.github.pgasync.Transaction;
2120
import com.github.pgasync.impl.conversion.DataConverter;
2221
import com.github.pgasync.impl.message.*;
2322
import rx.Observable;
2423
import rx.functions.Func1;
2524

26-
import java.util.*;
2725
import java.util.AbstractMap.SimpleImmutableEntry;
26+
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Map.Entry;
2931
import java.util.function.Consumer;
3032

31-
import static com.github.pgasync.impl.Functions.applyConsumer;
32-
import static com.github.pgasync.impl.Functions.applyRunnable;
3333
import static com.github.pgasync.impl.message.RowDescription.ColumnDescription;
3434

3535
/**
@@ -57,46 +57,42 @@ Observable<Connection> connect(String username, String password, String database
5757

5858
Observable<? extends Message> authenticate(String username, String password, Message message) {
5959
return message instanceof Authentication
60-
? stream.send(new PasswordMessage(username, password, ((Authentication) message).getMd5Salt()))
61-
: Observable.just(message);
60+
? stream.send(new PasswordMessage(username, password, ((Authentication) message).getMd5Salt()))
61+
: Observable.just(message);
6262
}
6363

6464
boolean isConnected() {
6565
return stream.isConnected();
6666
}
6767

6868
@Override
69-
public void query(String sql, Consumer<ResultSet> onQuery, Consumer<Throwable> onError) {
70-
stream.send(new Query(sql))
71-
.reduce(new QueryResponse(), this::toResponse)
72-
.map(this::toResultSet)
73-
.subscribe(onQuery::accept, onError::accept);
69+
public Observable<Row> query(String sql) {
70+
return stream.send(new Query(sql))
71+
.flatMap(toDataRowFn())
72+
.map(row -> new PgRow(row.getKey(), row.getValue(), dataConverter));
7473
}
7574

7675
@Override
77-
@SuppressWarnings({ "unchecked", "rawtypes" })
78-
public void query(String sql, List params, Consumer<ResultSet> onQuery, Consumer<Throwable> onError) {
79-
if (params == null || params.isEmpty()) {
80-
query(sql, onQuery, onError);
81-
return;
82-
}
83-
stream.send(new Parse(sql), new Bind(dataConverter.fromParameters(params)),
84-
ExtendedQuery.DESCRIBE, ExtendedQuery.EXECUTE, ExtendedQuery.CLOSE, ExtendedQuery.SYNC)
85-
.reduce(new QueryResponse(), this::toResponse)
86-
.map(this::toResultSet)
87-
.subscribe(onQuery::accept, onError::accept);
76+
public Observable<Row> query(String sql, Object... params) {
77+
return stream.send( new Parse(sql),
78+
new Bind(dataConverter.fromParameters(params)),
79+
ExtendedQuery.DESCRIBE,
80+
ExtendedQuery.EXECUTE,
81+
ExtendedQuery.CLOSE,
82+
ExtendedQuery.SYNC)
83+
.flatMap(toDataRowFn())
84+
.map(row -> new PgRow(row.getKey(), row.getValue(), dataConverter));
8885
}
8986

9087
@Override
91-
public void begin(Consumer<Transaction> handler, Consumer<Throwable> onError) {
92-
query("BEGIN", beginRs -> applyConsumer(handler, new ConnectionTx(), onError), onError);
88+
public Observable<Transaction> begin() {
89+
return query("BEGIN").map(row -> new ConnectionTx());
9390
}
9491

9592
@Override
9693
public void listen(String channel, Consumer<String> onNotification, Consumer<String> onListenStarted, Consumer<Throwable> onError) {
9794
stream.send(new Query("LISTEN " + channel))
98-
.subscribe(msg -> {
99-
}, onError::accept,
95+
.subscribe(msg -> {}, onError::accept,
10096
() -> onListenStarted.accept(stream.registerNotificationHandler(channel, onNotification)));
10197
}
10298

@@ -115,23 +111,16 @@ public void close() {
115111
stream.close();
116112
}
117113

118-
public Observable<Row> query(String sql) {
119-
return stream.send(new Query(sql))
120-
.map(mapRowFn())
121-
.filter(msg -> msg != null)
122-
.map(row -> new PgRow(row.getKey(), row.getValue(), dataConverter));
123-
}
124-
125-
Func1<? super Message, Entry<DataRow,Map<String,PgColumn>>> mapRowFn() {
114+
Func1<? super Message, Observable<Entry<DataRow,Map<String,PgColumn>>>> toDataRowFn() {
126115
Map<String, PgColumn> columns = new HashMap<>();
127-
return msg -> {
128-
if (msg instanceof RowDescription) {
129-
columns.putAll(getColumns(((RowDescription) msg).getColumns()));
130-
return null;
116+
return message -> {
117+
if(message instanceof DataRow) {
118+
return Observable.just(new SimpleImmutableEntry<>((DataRow) message, columns));
119+
}
120+
if (message instanceof RowDescription) {
121+
columns.putAll(getColumns(((RowDescription) message).getColumns()));
131122
}
132-
return msg instanceof DataRow
133-
? new SimpleImmutableEntry<>((DataRow) msg, columns)
134-
: null;
123+
return Observable.empty();
135124
};
136125
}
137126

@@ -160,20 +149,20 @@ static Map<String,PgColumn> getColumns(ColumnDescription[] descriptions) {
160149

161150
class ConnectionTx implements Transaction {
162151
@Override
163-
public void commit(Runnable onCompleted, Consumer<Throwable> onCommitError) {
164-
PgConnection.this.query("COMMIT", (rs) -> applyRunnable(onCompleted, onCommitError), onCommitError);
152+
public Observable<Void> commit() {
153+
return PgConnection.this.query("COMMIT").map(row -> null);
165154
}
166155
@Override
167-
public void rollback(Runnable onCompleted, Consumer<Throwable> onRollbackError) {
168-
PgConnection.this.query("ROLLBACK", (rs) -> applyRunnable(onCompleted, onRollbackError), onRollbackError);
156+
public Observable<Void> rollback() {
157+
return PgConnection.this.query("ROLLBACK").map(row -> null);
169158
}
170159
@Override
171-
public void query(String sql, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
172-
PgConnection.this.query(sql, onResult, onError);
160+
public Observable<Row> query(String sql) {
161+
return PgConnection.this.query(sql);
173162
}
174163
@Override
175-
public void query(String sql, List params, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
176-
PgConnection.this.query(sql, params, onResult, onError);
164+
public Observable<Row> query(String sql, Object... params) {
165+
return PgConnection.this.query(sql, params);
177166
}
178167
}
179168

0 commit comments

Comments
 (0)
0