10000 More relazed locks · LarsG/postgres-async-driver@cba56a0 · GitHub
[go: up one dir, main page]

Skip to content

Commit cba56a0

Browse files
Marat GainullinMarat Gainullin
authored andcommitted
More relazed locks
1 parent 282d553 commit cba56a0

File tree

1 file changed

+112
-159
lines changed

1 file changed

+112
-159
lines changed

src/main/java/com/github/pgasync/PgConnectionPool.java

Lines changed: 112 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,10 @@
2929
import java.util.concurrent.Executor;
3030
import java.util.concurrent.locks.Lock;
3131
import java.util.concurrent.locks.ReentrantLock;
32-
import java.util.function.BiConsumer;
33-
import java.util.function.Consumer;
34-
import java.util.function.Function;
35-
import java.util.function.Supplier;
32+
import java.util.function.*;
3633
import java.util.logging.Level;
3734
import java.util.logging.Logger;
35+
import java.util.stream.Collectors;
3836

3937
/**
4038
* Resource pool for backend connections.
@@ -141,7 +139,8 @@ CompletableFuture<Void> shutdown() {
141139

142140
@Override
143141
public CompletableFuture<Void> close() {
144-
return release(this);
142+
release(this);
143+
return CompletableFuture.completedFuture(null);
145144
}
146145

147146
@Override
@@ -244,15 +243,11 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
244243
@GuardedBy("guard")
245244
private int size;
246245
@GuardedBy("guard")
247-
private boolean closed;
248-
@GuardedBy("guard")
249-
private final Queue<CompletableFuture<? super Connection>> uponAvailableSubscribers = new LinkedList<>();
246+
private final Queue<CompletableFuture<? super Connection>> pending = new LinkedList<>();
250247
@GuardedBy("guard")
251-
private final Queue<PooledPgConnection> availableConnections = new LinkedList<>();
248+
private final Queue<PooledPgConnection> connections = new LinkedList<>();
252249
@GuardedBy("guard")
253-
private CompletableFuture<Void> uponFullyAvailable;
254-
@GuardedBy("guard")
255-
private final List<Runnable> executed = new ArrayList<>(32);
250+
private CompletableFuture<Void> closing;
256251

257252
public PgConnectionPool(ConnectibleBuilder.ConnectibleProperties properties, Supplier<CompletableFuture<ProtocolStream>> obtainStream, Executor futuresExecutor) {
258253
super(properties, obtainStream, futuresExecutor);
@@ -261,186 +256,144 @@ public PgConnectionPool(ConnectibleBuilder.ConnectibleProperties properties, Sup
261256
}
262257

263258
private <T> T locked(Supplier<T> action) {
264-
Runnable[] copy = new Runnable[]{};
265259
guard.lock();
266260
try {
267261
return action.get();
268262
} finally {
269-
try {
270-
copy = executed.toArray(copy);
271-
executed.clear();
272-
} finally {
273-
guard.unlock();
274-
}
275-
execute(copy);
263+
guard.unlock();
276264
}
277265
}
278266

279-
private void locked(Runnable action) {
280-
Runnable[] copy = new Runnable[]{};
281-
guard.lock();
282-
try {
283-
action.run();
284-
} finally {
285-
try {
286-
copy = executed.toArray(copy);
287-
executed.clear();
288-
} finally {
289-
guard.unlock();
290-
}
291-
execute(copy);
267+
private void release(PooledPgConnection connection) {
268+
if (connection == null) {
269+
throw new IllegalArgumentException("'connection' should be not null");
292270
}
293-
}
294-
295-
private void execute(Runnable[] targets) {
296-
for (Runnable r : targets) {
297-
try {
298-
futuresExecutor.execute(r);
299-
} catch (Throwable th) { // Catch here because of simple executors like (r -> { r.run(); })
300-
Logger.getLogger(PgConnectionPool.class.getName()).log(Level.SEVERE, th.getMessage(), th);
271+
Runnable lucky = locked(() -> {
272+
CompletableFuture<? super Connection> nextUser = pending.poll();
273+
if (nextUser != null) {
274+
return () -> nextUser.complete(connection);
275+
} else {
276+
connections.add(connection);
277+
return checkClosed();
301278
}
302-
}
303-
}
304-
305-
private void toExecute(Runnable r) {
306-
executed.add(r);
279+
});
280+
futuresExecutor.execute(lucky);
307281
}
308282

309-
private CompletableFuture<Void> fullyAvailable() {
310-
if (uponFullyAvailable == null) {
311-
if (size <= availableConnections.size()) {
312-
return CompletableFuture.completedFuture(null);
283+
@Override
284+
public CompletableFuture<Connection> getConnection() {
285+
if (locked(() -> closing != null)) {
286+
return CompletableFuture.failedFuture(new SqlException("Connection pool is closed"));
287+
} else {
288+
Connection cached = locked(this::firstAliveConnection);
289+
if (cached != null) {
290+
return CompletableFuture.completedFuture(cached);
313291
} else {
314-
uponFullyAvailable = new CompletableFuture<>();
315-
return uponFullyAvailable;
292+
CompletableFuture<Connection> deferred = new CompletableFuture<>();
293+
boolean makeNewConnection = locked(() -> {
294+
pending.add(deferred);
295+
if (size < maxConnections) {
296+
size++;
297+
return true;
298+
} else {
299+
return false;
300+
}
301+
});
302+
if (makeNewConnection) {
303+
obtainStream.get()
304+
.thenApply(stream -> new PooledPgConnection(new PgConnection(stream, dataConverter))
305+
.connect(username, password, database))
306+
.thenCompose(Function.identity())
307+
.thenApply(pooledConnection -> {
308+
if (validationQuery != null && !validationQuery.isBlank()) {
309+
return pooledConnection.completeScript(validationQuery)
310+
.handle((rss, th) -> {
311+
if (th != null) {
312+
return ((PooledPgConnection) pooledConnection).delegate.close()
313+
.thenApply(v -> CompletableFuture.<Connection>failedFuture(th))
314+
.thenCompose(Function.identity());
315+
} else {
316+
return CompletableFuture.completedFuture(pooledConnection);
317+
}
318+
})
319+
.thenCompose(Function.identity());
320+
} else {
321+
return CompletableFuture.completedFuture(pooledConnection);
322+
}
323+
})
324+
.thenCompose(Function.identity())
325+
.whenComplete((connected, th) -> {
326+
if (th == null) {
327+
release((PooledPgConnection) connected);
328+
} else {
329+
Collection<Runnable> actions = locked(() -> {
330+
size--;
331+
List<Runnable> unlucky = pending.stream()
332+
.<Runnable>map(item -> () ->
333+
item.completeExceptionally(th))
334+
.collect(Collectors.toList());
335+
unlucky.add(checkClosed());
336+
pending.clear();
337+
return unlucky;
338+
});
339+
actions.forEach(futuresExecutor::execute);
340+
}
341+
});
342+
}
343+
return deferred;
316344
}
317-
} else {
318-
return CompletableFuture.failedFuture(new IllegalStateException("Only a single 'fullyAvailable' request at a time is supported"));
319345
}
320346
}
321347

322-
private void discardAvailableSubscribers(String reason) {
323-
while (!uponAvailableSubscribers.isEmpty()) {
324-
CompletableFuture<? super Connection> queued = uponAvailableSubscribers.poll();
325-
toExecute(() -> queued.completeExceptionally(new SqlException(reason)));
348+
private static class CloseTuple {
349+
private final CompletableFuture<Void> closing;
350+
private final Runnable immediate;
351+
352+
public CloseTuple(CompletableFuture<Void> closing, Runnable immediate) {
353+
this.closing = closing;
354+
this.immediate = immediate;
326355
}
327356 E377
}
328357

329358
@Override
330359
public CompletableFuture<Void> close() {
331-
return locked(() -> {
332-
closed = true;
333-
discardAvailableSubscribers("Connection pool is closing");
334-
return fullyAvailable()
335-
.thenApply(v -> locked(() -> {
336-
uponFullyAvailable = null;
337-
Collection<CompletableFuture<Void>> shutdownTasks = new ArrayList<>();
338-
while (!availableConnections.isEmpty()) {
339-
PooledPgConnection connection = availableConnections.poll();
340-
shutdownTasks.add(connection.shutdown());
341-
size--;
342-
}
343-
return CompletableFuture.allOf(shutdownTasks.toArray(CompletableFuture<?>[]::new));
344-
}))
345-
.thenCompose(Function.identity());
346-
});
347-
}
348-
349-
@Override
350-
public CompletableFuture<Connection> getConnection() {
351-
return locked(() -> {
352-
CompletableFuture<Connection> uponAvailable = new CompletableFuture<>();
353-
if (closed) {
354-
toExecute(() -> uponAvailable.completeExceptionally(new SqlException("Connection pool is closed")));
360+
CloseTuple tuple = locked(() -> {
361+
if (closing == null) {
362+
closing = new CompletableFuture<>()
363+
.thenApply(v -> locked(() ->
364+
CompletableFuture.allOf(connections.stream()
365+
.map(PooledPgConnection::shutdown)
366+
.toArray(CompletableFuture[]::new)
367+
)
368+
))
369+
.thenCompose(Function.identity());
370+
return new CloseTuple(closing, checkClosed());
355371
} else {
356-
Connection connection = firstAliveConnection();
357-
if (connection != null) {
358-
toExecute(() -> uponAvailable.complete(connection));
359-
} else {
360-
if (tryIncreaseSize()) {
361-
obtainStream.get()
362-
.thenApply(stream -> new PooledPgConnection(new PgConnection(stream, dataConverter))
363-
.connect(username, password, database))
364-
.thenCompose(Function.identity())
365-
.thenApply(pooledConnection -> {
366-
if (validationQuery != null && !validationQuery.isBlank()) {
367-
return pooledConnection.completeScript(validationQuery)
368-
.handle((rss, th) -> {
369-
if (th != null) {
370-
return ((PooledPgConnection) pooledConnection).delegate.close()
371-
.thenApply(v -> CompletableFuture.<Connection>failedFuture(th))
372-
.thenCompose(Function.identity());
373-
} else {
374-
return CompletableFuture.completedFuture(pooledConnection);
375-
}
376-
})
377-
.thenCompose(Function.identity());
378-
} else {
379-
return CompletableFuture.completedFuture(pooledConnection);
380-
}
381-
})
382-
.thenCompose(Function.identity())
383-
.thenAccept(pooledConnection -> locked(() -> toExecute(() -> uponAvailable.complete(pooledConnection))))
384-
.exceptionally(th -> locked(() -> {
385-
size--;
386-
toExecute(() -> uponAvailable.completeExceptionally(th));
387-
discardAvailableSubscribers("Unable to connect");
388-
checkFullyAvailable();
389-
return null;
390-
}));
391-
} else {
392-
// Pool is full now and all connections are busy
393-
uponAvailableSubscribers.offer(uponAvailable);
394-
}
395-
}
372+
return new CloseTuple(CompletableFuture.failedFuture(new IllegalStateException("PG pool is already shutting down")), NO_OP);
396373
}
397-
return uponAvailable;
398374
});
375+
futuresExecutor.execute(tuple.immediate);
376+
return tuple.closing;
399377
}
400378

401-
private void checkFullyAvailable() {
402-
if (uponFullyAvailable != null && size <= availableConnections.size()) {
403-
toExecute(() -> uponFullyAvailable.complete(null));
379+
private static final Runnable NO_OP = () -> {
380+
};
381+
382+
private Runnable checkClosed() {
383+
if (closing != null && size <= connections.size()) {
384+
assert pending.isEmpty();
385+
return () -> closing.complete(null);
386+
} else {
387+
return NO_OP;
404388
}
405389
}
406390

407391
private Connection firstAliveConnection() {
408-
Connection connection = availableConnections.poll();
392+
Connection connection = connections.poll();
409393
while (connection != null && !connection.isConnected()) {
410394
size--;
411-
connection = availableConnections.poll();
395+
connection = connections.poll();
412396
}
413397
return connection;
414398
}
415-
416-
private boolean tryIncreaseSize() {
417-
if (size < maxConnections) {
418-
size++;
419-
return true;
420-
} else {
421-
return false;
422-
}
423-
}
424-
425-
private CompletableFuture<Void> release(PooledPgConnection connection) {
426-
if (connection == null) {
427-
throw new IllegalArgumentException("'connection' should be not null");
428-
}
429-
return locked(() -> {
430-
if (connection.isConnected()) {
431-
if (!uponAvailableSubscribers.isEmpty()) {
432-
CompletableFuture<? super Connection> subscriber = uponAvailableSubscribers.poll();
433-
toExecute(() -> subscriber.complete(connection));
434-
7572 } else {
435-
availableConnections.offer(connection);
436-
checkFullyAvailable();
437-
}
438-
} else {
439-
size--;
440-
discardAvailableSubscribers("Connection lost");
441-
checkFullyAvailable();
442-
}
443-
return CompletableFuture.completedFuture(null);
444-
});
445-
}
446399
}

0 commit comments

Comments
 (0)
0