8000 Merge pull request #11 from alaisi/stream · rasayana/postgres-async-driver@92978e8 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 92978e8

Browse files
committed
Merge pull request alaisi#11 from alaisi/stream
Merge reactive stream API to master
2 parents 5e68b38 + 35b7846 commit 92978e8

28 files changed

+618
-589
lines changed

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@
5959
<artifactId>netty-handler</artifactId>
6060
<version>4.0.24.Final</version>
6161
</dependency>
62-
62+
<dependency>
63+
<groupId>org.reactivestreams</groupId>
64+
<artifactId>reactive-streams</artifactId>
65+
<version>1.0.0</version>
66+
</dependency>
67+
<dependency>
68+
<groupId>io.reactivex</groupId>
69+
<artifactId>rxjava</artifactId>
70+
<version>1.0.14</version>
71+
</dependency>
6372
<dependency>
6473
<groupId>junit</groupId>
6574
<artifactId>junit</artifactId>

src/main/java/com/github/pgasync/ConnectionPool.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package com.github.pgasync;
1616

17-
import java.util.function.Consumer;
17+
import rx.Observable;
1818

1919
/**
2020
* Pool of backend {@link Connection}s. Pools implement {@link Db} so
@@ -29,11 +29,8 @@ public interface ConnectionPool extends Db {
2929
* Executes a {@link java.util.function.Consumer} callback when a connection is
3030
* available. Connection passed to callback must be freed with
3131
* {@link com.github.pgasync.ConnectionPool#release(Connection)}
32-
*
33-
* @param handler Called when a connection is acquired
34-
* @param onError Called on exception thrown
3532
*/
36-
void getConnection(Consumer<Connection> handler, Consumer<Throwable> onError);
33+
Observable<Connection> getConnection();
3734

3835
/**
3936
* Releases a connection back to the pool.
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package com.github.pgasync;
22

3-
import java.util.function.Consumer;
3+
import rx.Observable;
44

55
/**
66
* @author Antti Laisi
77
*/
88
public interface Listenable {
99

10-
void listen(String channel, Consumer<String> onNotification, Consumer<String> onListenStarted, Consumer<Throwable> onError);
11-
12-
void unlisten(String channel, String unlistenToken, Runnable onListenStopped, Consumer<Throwable> onError);
10+
Observable<String> listen(String channel);
1311

1412
}

src/main/java/com/github/pgasync/QueryExecutor.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.github.pgasync;
22

3+
import rx.Observable;
4+
35
import java.util.List;
46
import java.util.function.Consumer;
57

@@ -10,14 +12,58 @@
1012
*/
1113
public interface QueryExecutor {
1214

15+
/**
16+
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
17+
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
18+
* and byte[].
19+
*
20+
* @param sql SQL to execute
21+
* @param params Parameter values
22+
* @return Cold observable that emits 0-n rows.
23+
*/
24+
Observable<Row> queryRows(String sql, Object... params);
25+
26+
/**
27+
* Executes a simple query.
28+
*
29+
* @param sql SQL to execute.
30+
* @return Cold observable that emits 0-n rows.
31+
*/
32+
default Observable<Row> queryRows(String sql) {
33+
return queryRows(sql, (Object[]) null);
34+
}
35+
36+
/**
37+
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
38+
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
39+
* and byte[].
40+
*
41+
* @param sql SQL to execute
42+
* @param params Parameter values
43+
* @return Cold observable that emits a single result set.
44+
*/
45+
Observable<ResultSet> querySet(String sql, Object... params);
46+
47+
/**
48+
* Executes a simple query.
49+
*
50+
* @param sql SQL to execute.
51+
* @return Cold observable that emits a single result set.
52+
*/
53+
default Observable<ResultSet> querySet(String sql) {
54+
return querySet(sql, (Object[]) null);
55+
}
56+
1357
/**
1458
* Executes a simple query.
1559
*
1660
* @param sql SQL to execute.
1761
* @param onResult Called when query is completed
1862
* @param onError Called on exception thrown
1963
*/
20-
void query(String sql, Consumer<ResultSet> onResult, Consumer<Throwable> onError);
64+
default void query(String sql, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
65+
query(sql, null, onResult, onError);
66+
}
2167

2268
/**
2369
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
@@ -29,6 +75,7 @@ public interface QueryExecutor {
2975
* @param onResult Called when query is completed
3076
* @param onError Called on exception thrown
3177
*/
32-
void query(String sql, List/*<Object>*/ params, Consumer<ResultSet> onResult, Consumer<Throwable> onError);
33-
78+
default void query(String sql, List/*<Object>*/ params, Consumer<ResultSet> onResult, Consumer<Throwable> onError) {
79+
querySet(sql, params != null ? params.toArray() : null).subscribe(onResult::accept, onError::accept);
80+
}
3481
}

src/main/java/com/github/pgasync/Row.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.sql.Timestamp;
2222

2323
/**
24-
* Row in a query result set. A row consist of 0-n columns of a single type.
24+
* Row in a queryRows result set. A row consist of 0-n columns of a single type.
2525
* Column values can be accessed with a 0-based index or column label.
2626
*
2727
* @author Antti Laisi

src/main/java/com/github/pgasync/Transaction.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.github.pgasync;
1616

17+
import rx.Observable;
18+
1719
import java.util.function.Consumer;
1820

1921
/**
@@ -25,20 +27,34 @@
2527
*/
2628
public interface Transaction extends QueryExecutor {
2729

30+
/**
31+
* Commits a transaction
32+
*/
33+
Observable<Void> commit();
34+
35+
/**
36+
* Rollbacks a transaction.
37+
*/
38+
Observable<Void> rollback();
39+
2840
/**
2941
* Commits a transaction.
30-
*
42+
*
3143
* @param onCompleted Called when commit completes
3244
* @param onError Called on exception thrown
3345
*/
34-
void commit(Runnable onCompleted, Consumer<Throwable> onError);
46+
default void commit(Runnable onCompleted, Consumer<Throwable> onError) {
47+
commit().subscribe(__ -> onCompleted.run(), onError::accept);
48+
}
3549

3650
/**
3751
* Rollbacks a transaction.
38-
*
52+
*
3953
* @param onCompleted Called when rollback completes
4054
* @param onError Called on exception thrown
4155
*/
42-
void rollback(Runnable onCompleted, Consumer<Throwable> onError);
56+
default void rollback(Runnable onCompleted, Consumer<Throwable> onError) {
57+
rollback().subscribe(__ -> onCompleted.run(), onError::accept);
58+
}
4359

4460
}
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.github.pgasync;
22

3-
import java.util.function.Consumer;
3+
import rx.Observable;
44

55
/**
66
* TransactionExecutor begins backend transactions.
@@ -11,10 +11,7 @@ public interface TransactionExecutor {
1111

1212
/**
1313
* Begins a transaction.
14-
*
15-
* @param onTransaction Called when transaction is successfully started.
16-
* @param onError Called on exception thrown
1714
*/
18-
void begin(Consumer<Transaction> onTransaction, Consumer<Throwable> onError);
15+
Observable<Transaction> begin();
1916

2017
}

src/main/java/com/github/pgasync/impl/ConnectionValidator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717

1818
import com.github.pgasync.Connection;
19-
20-
import java.util.function.Consumer;
19+
import rx.Observable;
2120

2221
/**
2322
* @author Antti Laisi
@@ -30,11 +29,15 @@ public ConnectionValidator(String validationQuery) {
3029
this.validationQuery = validationQuery;
3130
}
3231

33-
void validate(Connection connection, Runnable onValid, Consumer<Throwable> onError) {
32+
Observable<Connection> validate(Connection connection) {
3433
if(validationQuery == null) {
35-
onValid.run();
36-
return;
34+
return Observable.just(connection);
3735
}
38-
connection.query(validationQuery, rs 789C -> onValid.run(), onError);
36+
return Observable.create(subscriber -> connection.queryRows(validationQuery)
37+
.subscribe(row -> { }, subscriber::onError,
38+
() -> {
39+
subscriber.onNext(connection);
40+
subscriber.onCompleted();
41+
}));
3942
}
4043
}

src/main/java/com/github/pgasync/impl/Functions.java

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)
0