8000 Make Db.close() blocking. · emrul/postgres-async-driver@45474a7 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit 45474a7

Browse files
committed
Make Db.close() blocking.
1 parent e9fc76f commit 45474a7

File tree

11 files changed

+69
-176
lines changed

11 files changed

+69
-176
lines changed

src/main/java/com/github/pgasync/Connection.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
package com.github.pgasync;
1616

17-
import rx.Observable;
18-
1917
/**
2018
* A single physical connection to PostgreSQL backend.
2119
*
@@ -24,8 +22,8 @@
2422
public interface Connection extends Db {
2523

2624
/**
27-
* Closes the connection.
25+
* Closes the connection, blocks the calling thread until the connection is closed.
2826
*/
29-
Observable<Void> close();
27+
void close() throws Exception;
3028

3129
}
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package com.github.pgasync;
22

3-
import rx.Observable;
4-
53
/**
64
* Main interface to PostgreSQL backend.
75
*
86
* @author Antti Laisi
97
*/
10-
public interface Db extends QueryExecutor, TransactionExecutor, Listenable {
8+
public interface Db extends QueryExecutor, TransactionExecutor, Listenable, AutoCloseable {
119

1210
/**
13-
* Closes the pool.
11+
* Closes the pool, blocks the calling thread until connections are closed.
1412
*/
15-
Observable<Void> close();
13+
@Override
14+
void close() throws Exception;
1615

1716
}

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
import rx.Subscriber;
2525
import rx.observers.Subscribers;
2626

27-
import java.util.*;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.logging.Logger;
2834

2935
import static com.github.pgasync.impl.message.RowDescription.ColumnDescription;
3036

@@ -89,8 +95,13 @@ public Observable<String> listen(String channel) {
8995
}
9096

9197
@Override
92-
public Observable<Void> close() {
93-
return stream.close();
98+
public void close() throws Exception {
99+
CountDownLatch latch = new CountDownLatch(1);
100+
stream.close().subscribe(__ -> latch.countDown(), ex -> {
101+
Logger.getLogger(getClass().getName()).warning("Exception closing connection: " + ex);
102+
latch.countDown();
103+
});
104+
latch.await(1000, TimeUnit.MILLISECONDS);
94105
}
95106

96107
private Observable<Message> sendQuery(String sql, Object[] params) {
@@ -177,13 +188,13 @@ class PgConnectionTransaction implements Transaction {
177188
public Observable<Void> commit() {
178189
return PgConnection.this.querySet("COMMIT")
179190
.map(rs -> (Void) null)
180-
.doOnError(exception -> close());
191+
.doOnError(exception -> stream.close().subscribe());
181192
}
182193
@Override
183194
public Observable<Void> rollback() {
184195
return PgConnection.this.querySet("ROLLBACK")
185196
.map(rs -> (Void) null)
186-
.doOnError(exception -> close());
197+
.doOnError(exception -> stream.close().subscribe());
187198
}
188199
@Override
189200
public Observable<Row> queryRows(String sql, Object... params) {

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
import rx.Subscriber;
2222
import rx.observers.Subscribers;
2323

24+
import javax.annotation.concurrent.GuardedBy;
2425
import java.net.InetSocketAddress;
2526
import java.util.LinkedList;
2627
import java.util.Queue;
27-
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.locks.Condition;
3030
import java.util.concurrent.locks.ReentrantLock;
31-
import java.util.logging.Logger;
32-
import javax.annotation.concurrent.GuardedBy;
31+
3332
import static java.util.concurrent.TimeUnit.SECONDS;
3433

3534
/**
@@ -109,47 +108,35 @@ public Observable<String> listen(String channel) {
109108
}
110109

111110
@Override
112-
public Observable<Void> close() {
113-
return Observable.create(subscriber -> {
114-
115-
lock.lock();
116-
try {
117-
closed = true;
118-
119-
// TODO: this currently blocks the thread
111+
public void close() throws Exception {
112+
lock.lock();
113+
try {
114+
closed = true;
120115

121-
while(!subscribers.isEmpty()) {
122-
Subscriber<? super Connection> queued = subscribers.poll();
123-
if(queued != null) {
124-
queued.onError(new SqlException("Connection pool is closing"));
125-
}
116+
while(!subscribers.isEmpty()) {
117+
Subscriber<? super Connection> queued = subscribers.poll();
118+
if(queued != null) {
119+
queued.onError(new SqlException("Connection pool is closing"));
126120
}
121+
}
127122

128-
try {
129-
while (currentSize > 0) {
130-
Connection connection = connections.poll();
131-
if(connection == null) {
132-
if (closingConnectionReleased.await(10, SECONDS)) {
133-
break;
134-
}
135-
continue;
136-
}
137-
currentSize--;
138-
CountDownLatch wait = new CountDownLatch(1);
139-
connection.close().subscribe(__ -> wait.countDown(), __ -> wait.countDown());
140-
if(!wait.await(5, SECONDS)) {
141-
Logger.getLogger(getClass().getName()).warning("Closing connection timed out");
123+
try {
124+
while (currentSize > 0) {
125+
Connection connection = connections.poll();
126+
if(connection == null) {
127+
if (closingConnectionReleased.await(10, SECONDS)) {
128+
break;
142129
}
130+
continue;
143131
}
144-
} catch (InterruptedException e) { /* ignore */ }
145-
146-
subscriber.onNext(null);
147-
subscriber.onCompleted();
132+
currentSize--;
133+
connection.close();
134+
}
135+
} catch (InterruptedException e) { /* ignore */ }
148136

149-
} finally {
150-
lock.unlock();
151-
}
152-
});
137+
} finally {
138+
lock.unlock();
139+
}
153140
}
154141

155142
@Override

src/main/java/com/github/pgasync/impl/netty/NettyPgConnectionPool.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,9 @@
1919
import com.github.pgasync.impl.PgProtocolStream;
2020
import io.netty.channel.EventLoopGroup;
2121
import io.netty.channel.nio.NioEventLoopGroup;
22-
import io.netty.util.concurrent.Future;
23-
import io.netty.util.concurrent.GenericFutureListener;
24-
import io.netty.util.concurrent.Promise;
25-
import rx.Observable;
26-
import rx.Subscriber;
27-
import rx.functions.Action1;
28-
import rx.observers.Subscribers;
2922

3023
import java.net.InetSocketAddress;
24+
import java.util.concurrent.TimeUnit;
3125

3226
/**
3327
* {@link PgConnectionPool} that uses {@link NettyPgProtocolStream}. Each pool
@@ -53,21 +47,8 @@ protected PgProtocolStream openStream(InetSocketAddress address) {
5347
}
5448

5549
@Override
56-
public Observable<Void> close() {
57-
return super.close()
58-
.lift(subscriber -> Subscribers.create(
59-
__ -> group.shutdownGracefully().addListener(close(subscriber, null)),
60-
ex -> group.shutdownGracefully().addListener(close(subscriber, ex))));
61-
}
62-
63-
private <T> GenericFutureListener<Future<T>> close(Subscriber<?> subscriber, Throwable exception) {
64-
return f -> {
65-
if (exception == null && f.isSuccess()) {
66-
subscriber.onNext(null);
67-
subscriber.onCompleted();
68-
return;
69-
}
70-
subscriber.onError(f.isSuccess() ? exception : f.cause());
71-
};
50+
public void close() throws Exception {
51+
super.close();
52+
group.shutdownGracefully().await(10, TimeUnit.SECONDS);
7253
}
7354
}

src/main/java/com/github/pgasync/impl/reactive/AsyncPublisher.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

src/test/java/com/github/pgasync/impl/AuthenticationTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.github.pgasync.ConnectionPool;
44
import com.github.pgasync.SqlException;
5-
import org.junit.Ignore;
65
import org.junit.Test;
76

87
import static com.github.pgasync.impl.DatabaseRule.createPoolBuilder;
@@ -13,14 +12,11 @@ public class AuthenticationTest {
1312

1413
@Test
1514
public void shouldThrowExceptionOnInvalidCredentials() throws Exception {
16-
ConnectionPool pool = createPoolBuilder(1).password("_invalid_").build();
17-
try {
15+
try (ConnectionPool pool = createPoolBuilder(1).password("_invalid_").build()) {
1816
pool.queryRows("SELECT 1").toBlocking().first();
1917
fail();
2018
} catch (SqlException sqle) {
2119
assertEquals("28P01", sqle.getCode());
22-
} finally {
23-
pool.close().toBlocking().single();
2420
}
2521
}
2622

src/test/java/com/github/pgasync/impl/DatabaseRule.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,11 @@ protected void before() {
4141
@Override
4242
protected void after() {
4343
if(pool != null) {
44-
CountDownLatch latch = new CountDownLatch(1);
45-
pool.close().subscribe(__ -> latch.countDown(), ex -> {
46-
ex.printStackTrace();
47-
latch.countDown();
48-
});
4944
try {
50-
latch.await(30, TimeUnit.SECONDS);
51-
} catch (InterruptedException e) { /* ignore */ }
45+
pool.close();
46+
} catch (Exception e) {
47+
throw new RuntimeException(e);
48+
}
5249
}
5350
}
5451

src/test/java/com/github/pgasync/impl/ListenNotifyTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import java.util.concurrent.LinkedBlockingQueue;
1010
import java.util.concurrent.TimeUnit;
1111

12-
import static org.junit.Assert.assertEquals;
13-
import static org.junit.Assert.assertNull;
14-
import static org.junit.Assert.assertTrue;
12+
import static org.junit.Assert.*;
1513

1614
/**
1715
* @author Antti Laisi
@@ -43,7 +41,7 @@ public void shouldReceiveNotificationsOnListenedChannel() throws Exception {
4341
}
4442

4543
@AfterClass
46-
public static void close() {
47-
pool.close().toBlocking().single();
44+
public static void close() throws Exception {
45+
pool.close();
4846
}
4947
}

src/test/java/com/github/pgasync/impl/PerformanceTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ public PerformanceTest(int poolSize, int numThreads, boolean pipeline) {
8888
}
8989

9090
@After
91-
public void close() {
91+
public void close() throws Exception {
9292
threadPool.shutdownNow();
93-
dbPool.close().toBlocking().single();
93+
dbPool.close();
9494
}
9595

9696
@Test(timeout = 1000)
@@ -102,7 +102,7 @@ public void t1_preAllocatePool() throws InterruptedException {
102102
while (connections.size() < poolSize) {
103103
MILLISECONDS.sleep(5);
104104
}
105-
connections.forEach(c -> dbPool.release(c));
105+
connections.forEach(dbPool::release);
106106
}
107107

108108
@Test

0 commit comments

Comments
 (0)
0