10000 Close connections cleanly. · juliombb/postgres-async-driver@8dfa5f3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8dfa5f3

Browse files
committed
Close connections cleanly.
1 parent f0c7eae commit 8dfa5f3

File tree

7 files changed

+22
-9
lines changed

7 files changed

+22
-9
lines changed

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@ public Observable<String> listen(String channel) {
9090

9191
@Override
9292
public Observable<Void> close() {
93-
return Observable.concat(stream.send(Terminate.INSTANCE), stream.close())
94-
.first()
95-
.map(__ -> null);
93+
return stream.close();
9694
}
9795

9896
private Observable<Message> sendQuery(String sql, Object[] params) {

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import java.net.InetSocketAddress;
2525
import java.util.LinkedList;
2626
import java.util.Queue;
27+
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.locks.Condition;
2930
import java.util.concurrent.locks.ReentrantLock;
31+
import java.util.logging.Logger;
3032
import javax.annotation.concurrent.GuardedBy;
3133
import static java.util.concurrent.TimeUnit.SECONDS;
3234

@@ -109,6 +111,7 @@ public Observable<String> listen(String channel) {
109111
@Override
110112
public Observable<Void> close() {
111113
return Observable.create(subscriber -> {
114+
112115
lock.lock();
113116
try {
114117
closed = true;
@@ -132,7 +135,11 @@ public Observable<Void> close() {
132135
continue;
133136
}
134137
currentSize--;
135-
connection.close().toBlocking().single();
138+
CountDownLatch wait = new CountDownLatch(1);
139+
connection.close().subscribe(__ -> wait.countDown(), __ -> wait.countDown());
140+
if(!wait.await(5, SECONDS)) {
141+
Logger.getLogger(getClass().getName()).warning("Closing connection timed out");
142+
}
136143
}
137144
} catch (InterruptedException e) { /* ignore */ }
138145

src/main/java/com/github/pgasync/impl/netty/NettyPgConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Observable<Void> close() {
6262

6363
private <T> GenericFutureListener<Future<T>> close(Subscriber<?> subscriber, Throwable exception) {
6464
return f -> {
65-
if (exception == null && !f.isSuccess()) {
65+
if (exception == null && f.isSuccess()) {
6666
subscriber.onNext(null);
6767
subscriber.onCompleted();
6868
return;

src/test/java/com/github/pgasync/impl/DatabaseRule.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.Map.Entry;
1313
import java.util.concurrent.ArrayBlockingQueue;
1414
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.CountDownLatch;
1516
import java.util.concurrent.TimeUnit;
1617

1718
/**
@@ -40,7 +41,14 @@ protected void before() {
4041
@Override
4142
protected void after() {
4243
if(pool != null) {
43-
pool.close();
44+
CountDownLatch latch = new CountDownLatch(1);
45+
pool.close().subscribe(__ -> latch.countDown(), ex -> {
46+
ex.printStackTrace();
47+
latch.countDown();
48+
});
49+
try {
50+
latch.await(30, TimeUnit.SECONDS);
51+
} catch (InterruptedException e) { /* ignore */ }
4452
}
4553
}
4654

src/test/java/com/github/pgasync/impl/ListenNotifyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ public void shouldReceiveNotificationsOnListenedChannel() throws Exception {
4444

4545
@AfterClass
4646
public static void close() {
47-
pool.close();
47+
pool.close().toBlocking().single();
4848
}
4949
}

src/test/java/com/github/pgasync/impl/PerformanceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public PerformanceTest(int poolSize, int numThreads, boolean pipeline) {
9090
@After
9191
public void close() {
9292
threadPool.shutdownNow();
93-
dbPool.close();
93+
dbPool.close().toBlocking().single();
9494
}
9595

9696
@Test(timeout = 1000)

src/test/java/com/github/pgasync/impl/PipelineTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void closeConnection() {
5252
pool.release(c);
5353
}
5454
if (pool != null) {
55-
pool.close();
55+
pool.close().toBlocking().single();
5656
}
5757
}
5858

0 commit comments

Comments
 (0)
0