8000 Support connection validation. · emrul/postgres-async-driver@5e6176e · GitHub
[go: up one dir, main page]

Skip to content

Commit 5e6176e

Browse files
committed
Support connection validation.
1 parent f6f0afb commit 5e6176e

File tree

4 files changed

+30
-45
lines changed

4 files changed

+30
-45
lines changed

src/main/java/com/github/pgasync/ConnectionPoolBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.github.pgasync.impl.ConnectionValidator;
1818
import com.github.pgasync.impl.conversion.DataConverter;
1919
import com.github.pgasync.impl.netty.NettyPgConnectionPool;
20+
import rx.Observable;
21+
import rx.functions.Func1;
2022

2123
import java.util.ArrayList;
2224
import java.util.Collections;
@@ -108,7 +110,7 @@ public static class PoolProperties {
108110
List<Converter<?>> converters = new ArrayList<>();
109111
boolean useSsl;
110112
boolean usePipelining;
111-
String validationQuery = "SELECT 1";
113+
String validationQuery;
112114

113115
public String getHostname() {
114116
return hostname;
@@ -137,8 +139,10 @@ public boolean getUsePipelining() {
137139
public DataConverter getDataConverter() {
138140
return dataConverter != null ? dataConverter : new DataConverter(converters);
139141
}
140-
public ConnectionValidator getValidator() {
141-
return new ConnectionValidator(validationQuery);
142+
public Func1<Connection,Observable<Connection>> getValidator() {
143+
return validationQuery == null || validationQuery.trim().isEmpty()
144+
? Observable::just
145+
: new ConnectionValidator(validationQuery)::validate;
142146
}
143147
}
144148
}

src/main/java/com/github/pgasync/impl/ConnectionValidator.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717

1818
import com.github.pgasync.Connection;
19+
import com.github.pgasync.Row;
1920
import rx.Observable;
21+
import rx.Subscriber;
2022

2123
/**
2224
* @author Antti Laisi
@@ -29,15 +31,21 @@ public ConnectionValidator(String validationQuery) {
2931
this.validationQuery = validationQuery;
3032
}
3133

32-
Observable<Connection> validate(Connection connection) {
33-
if(validationQuery == null) {
34-
return Observable.just(connection);
35-
}
36-
return Observable.create(subscriber -> connection.queryRows(validationQuery)
37-
.subscribe(row -> { }, subscriber::onError,
38-
() -> {
39-
subscriber.onNext(connection);
40-
subscriber.onCompleted();
41-
}));
34+
public Observable<Connection> validate(Connection connection) {
35+
return connection.queryRows(validationQuery)
36+
.lift(subscriber -> new Subscriber<Row>() {
37+
@Override
38+
public void onError(Throwable e) {
39+
subscriber.onError(e);
40+
}
41+
@Override
42+
public void onCompleted() {
43+
subscriber.onNext(connection);
44+
subscriber.onCompleted();
45+
}
46+
@Override
47+
public void onNext(Row row) { }
48+
});
4249
}
50+
4351
}

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.pgasync.impl.conversion.DataConverter;
2020
import rx.Observable;
2121
import rx.Subscriber;
22+
import rx.functions.Func1;
2223
import rx.observers.Subscribers;
2324

2425
import javax.annotation.concurrent.GuardedBy;
@@ -57,7 +58,7 @@ public abstract class PgConnectionPool implements ConnectionPool {
5758
final String password;
5859
final String database;
5960
final DataConverter dataConverter;
60-
final ConnectionValidator validator;
61+
final Func1<Connection, Observable<Connection>> validator;
6162
final boolean pipeline;
6263

6364
public PgConnectionPool(PoolProperties properties) {
@@ -141,7 +142,7 @@ public void close() throws Exception {
141142

142143
@Override
143144
public Observable<Connection> getConnection() {
144-
return Observable.create(subscriber -> {
145+
return Observable.<Connection>create(subscriber -> {
145146
boolean locked = true;
146147
lock.lock();
147148
try {
@@ -177,7 +178,8 @@ public Observable<Connection> getConnection() {
177178
lock.unlock();
178179
}
179180
}
180-
});
181+
}).flatMap(conn -> validator.call(conn).doOnError(err -> release(conn)))
182+
.retry(poolSize + 1);
181183
}
182184

183185
private boolean tryIncreaseSize() {
@@ -236,34 +238,6 @@ public void release(Connection connection) {
236238
*/
237239
protected abstract PgProtocolStream openStream(InetSocketAddress address);
238240

239-
/*
240-
void validateAndApply(Connection connection, Consumer<Connection> onConnection, Consumer<Throwable> onError, int attempt) {
241-
242-
Runnable onValid = () -> {
243-
try {
244-
onConnection.accept(connection);
245-
} catch (Throwable t) {
246-
release(connection);
247-
onError.accept(t);
248-
}
249-
};
250-
251-
Consumer<Throwable> onValidationFailed = err -> {
252-
if(attempt > poolSize) {
253-
onError.accept(err);
254-
return;
255-
}
256-
try {
257-
connection.close();
258-
} catch (Throwable t) { /* ignored / }
259-
release(connection);
260-
getConnection(onConnection, onError, attempt + 1);
261-
};
262-
263-
validator.validate(connection, onValid, onValidationFailed);
264-
}
265-
*/
266-
267241
/**
268242
* Transaction that chains releasing the connection after COMMIT/ROLLBACK.
269243
*/

src/test/java/com/github/pgasync/impl/DatabaseRule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import java.util.Map.Entry;
1313
import java.util.concurrent.ArrayBlockingQueue;
1414
import java.util.concurrent.BlockingQueue;
15-
import java.util.concurrent.CountDownLatch;
1615
import java.util.concurrent.TimeUnit;
1716

1817
/**

0 commit comments

Comments
 (0)
0