29
29
import java .util .concurrent .Executor ;
30
30
import java .util .concurrent .locks .Lock ;
31
31
import java .util .concurrent .locks .ReentrantLock ;
32
- import java .util .function .BiConsumer ;
33
- import java .util .function .Consumer ;
34
- import java .util .function .Function ;
35
- import java .util .function .Supplier ;
32
+ import java .util .function .*;
36
33
import java .util .logging .Level ;
37
34
import java .util .logging .Logger ;
35
+ import java .util .stream .Collectors ;
38
36
39
37
/**
40
38
* Resource pool for backend connections.
@@ -141,7 +139,8 @@ CompletableFuture<Void> shutdown() {
141
139
142
140
@ Override
143
141
public CompletableFuture <Void > close () {
144
- return release (this );
142
+ release (this );
143
+ return CompletableFuture .completedFuture (null );
145
144
}
146
145
147
146
@ Override
@@ -244,15 +243,11 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
244
243
@ GuardedBy ("guard" )
245
244
private int size ;
246
245
@ GuardedBy ("guard" )
247
- private boolean closed ;
248
- @ GuardedBy ("guard" )
249
- private final Queue <CompletableFuture <? super Connection >> uponAvailableSubscribers = new LinkedList <>();
246
+ private final Queue <CompletableFuture <? super Connection >> pending = new LinkedList <>();
250
247
@ GuardedBy ("guard" )
251
- private final Queue <PooledPgConnection > availableConnections = new LinkedList <>();
248
+ private final Queue <PooledPgConnection > connections = new LinkedList <>();
252
249
@ GuardedBy ("guard" )
253
- private CompletableFuture <Void > uponFullyAvailable ;
254
- @ GuardedBy ("guard" )
255
- private final List <Runnable > executed = new ArrayList <>(32 );
250
+ private CompletableFuture <Void > closing ;
256
251
257
252
public PgConnectionPool (ConnectibleBuilder .ConnectibleProperties properties , Supplier <CompletableFuture <ProtocolStream >> obtainStream , Executor futuresExecutor ) {
258
253
super (properties , obtainStream , futuresExecutor );
@@ -261,186 +256,144 @@ public PgConnectionPool(ConnectibleBuilder.ConnectibleProperties properties, Sup
261
256
}
262
257
263
258
private <T > T locked (Supplier <T > action ) {
264
- Runnable [] copy = new Runnable []{};
265
259
guard .lock ();
266
260
try {
267
261
return action .get ();
268
262
} finally {
269
- try {
270
- copy = executed .toArray (copy );
271
- executed .clear ();
272
- } finally {
273
- guard .unlock ();
274
- }
275
- execute (copy );
263
+ guard .unlock ();
276
264
}
277
265
}
278
266
279
- private void locked (Runnable action ) {
280
- Runnable [] copy = new Runnable []{};
281
- guard .lock ();
282
- try {
283
- action .run ();
284
- } finally {
285
- try {
286
- copy = executed .toArray (copy );
287
- executed .clear ();
288
- } finally {
289
- guard .unlock ();
290
- }
291
- execute (copy );
267
+ private void release (PooledPgConnection connection ) {
268
+ if (connection == null ) {
269
+ throw new IllegalArgumentException ("'connection' should be not null" );
292
270
}
293
- }
294
-
295
- private void execute (Runnable [] targets ) {
296
- for (Runnable r : targets ) {
297
- try {
298
- futuresExecutor .execute (r );
299
- } catch (Throwable th ) { // Catch here because of simple executors like (r -> { r.run(); })
300
- Logger .getLogger (PgConnectionPool .class .getName ()).log (Level .SEVERE , th .getMessage (), th );
271
+ Runnable lucky = locked (() -> {
272
+ CompletableFuture <? super Connection > nextUser = pending .poll ();
273
+ if (nextUser != null ) {
274
+ return () -> nextUser .complete (connection );
275
+ } else {
276
+ connections .add (connection );
277
+ return checkClosed ();
301
278
}
302
- }
303
- }
304
-
305
- private void toExecute (Runnable r ) {
306
- executed .add (r );
279
+ });
280
+ futuresExecutor .execute (lucky );
307
281
}
308
282
309
- private CompletableFuture <Void > fullyAvailable () {
310
- if (uponFullyAvailable == null ) {
311
- if (size <= availableConnections .size ()) {
312
- return CompletableFuture .completedFuture (null );
283
+ @ Override
284
+ public CompletableFuture <Connection > getConnection () {
285
+ if (locked (() -> closing != null )) {
286
+ return CompletableFuture .failedFuture (new SqlException ("Connection pool is closed" ));
287
+ } else {
288
+ Connection cached = locked (this ::firstAliveConnection );
289
+ if (cached != null ) {
290
+ return CompletableFuture .completedFuture (cached );
313
291
} else {
314
- uponFullyAvailable = new CompletableFuture <>();
315
- return uponFullyAvailable ;
292
+ CompletableFuture <Connection > deferred = new CompletableFuture <>();
293
+ boolean makeNewConnection = locked (() -> {
294
+ pending .add (deferred );
295
+ if (size < maxConnections ) {
296
+ size ++;
297
+ return true ;
298
+ } else {
299
+ return false ;
300
+ }
301
+ });
302
+ if (makeNewConnection ) {
303
+ obtainStream .get ()
304
+ .thenApply (stream -> new PooledPgConnection (new PgConnection (stream , dataConverter ))
305
+ .connect (username , password , database ))
306
+ .thenCompose (Function .identity ())
307
+ .thenApply (pooledConnection -> {
308
+ if (validationQuery != null && !validationQuery .isBlank ()) {
309
+ return pooledConnection .completeScript (validationQuery )
310
+ .handle ((rss , th ) -> {
311
+ if (th != null ) {
312
+ return ((PooledPgConnection ) pooledConnection ).delegate .close ()
313
+ .thenApply (v -> CompletableFuture .<Connection >failedFuture (th ))
314
+ .thenCompose (Function .identity ());
315
+ } else {
316
+ return CompletableFuture .completedFuture (pooledConnection );
317
+ }
318
+ })
319
+ .thenCompose (Function .identity ());
320
+ } else {
321
+ return CompletableFuture .completedFuture (pooledConnection );
322
+ }
323
+ })
324
+ .thenCompose (Function .identity ())
325
+ .whenComplete ((connected , th ) -> {
326
+ if (th == null ) {
327
+ release ((PooledPgConnection ) connected );
328
+ } else {
329
+ Collection <Runnable > actions = locked (() -> {
330
+ size --;
331
+ List <Runnable > unlucky = pending .stream ()
332
+ .<Runnable >map (item -> () ->
333
+ item .completeExceptionally (th ))
334
+ .collect (Collectors .toList ());
335
+ unlucky .add (checkClosed ());
336
+ pending .clear ();
337
+ return unlucky ;
338
+ });
339
+ actions .forEach (futuresExecutor ::execute );
340
+ }
341
+ });
342
+ }
343
+ return deferred ;
316
344
}
317
- } else {
318
- return CompletableFuture .failedFuture (new IllegalStateException ("Only a single 'fullyAvailable' request at a time is supported" ));
319
345
}
320
346
}
321
347
322
- private void discardAvailableSubscribers (String reason ) {
323
- while (!uponAvailableSubscribers .isEmpty ()) {
324
- CompletableFuture <? super Connection > queued = uponAvailableSubscribers .poll ();
325
- toExecute (() -> queued .completeExceptionally (new SqlException (reason )));
348
+ private static class CloseTuple {
349
+ private final CompletableFuture <Void > closing ;
350
+ private final Runnable immediate ;
351
+
352
+ public CloseTuple (CompletableFuture <Void > closing , Runnable immediate ) {
353
+ this .closing = closing ;
354
+ this .immediate = immediate ;
326
355
}
327
356
E377
}
328
357
329
358
@ Override
330
359
public CompletableFuture <Void > close () {
331
- return locked (() -> {
332
- closed = true ;
333
- discardAvailableSubscribers ("Connection pool is closing" );
334
- return fullyAvailable ()
335
- .thenApply (v -> locked (() -> {
336
- uponFullyAvailable = null ;
337
- Collection <CompletableFuture <Void >> shutdownTasks = new ArrayList <>();
338
- while (!availableConnections .isEmpty ()) {
339
- PooledPgConnection connection = availableConnections .poll ();
340
- shutdownTasks .add (connection .shutdown ());
341
- size --;
342
- }
343
- return CompletableFuture .allOf (shutdownTasks .toArray (CompletableFuture <?>[]::new ));
344
- }))
345
- .thenCompose (Function .identity ());
346
- });
347
- }
348
-
349
- @ Override
350
- public CompletableFuture <Connection > getConnection () {
351
- return locked (() -> {
352
- CompletableFuture <Connection > uponAvailable = new CompletableFuture <>();
353
- if (closed ) {
354
- toExecute (() -> uponAvailable .completeExceptionally (new SqlException ("Connection pool is closed" )));
360
+ CloseTuple tuple = locked (() -> {
361
+ if (closing == null ) {
362
+ closing = new CompletableFuture <>()
363
+ .thenApply (v -> locked (() ->
364
+ CompletableFuture .allOf (connections .stream ()
365
+ .map (PooledPgConnection ::shutdown )
366
+ .toArray (CompletableFuture []::new )
367
+ )
368
+ ))
369
+ .thenCompose (Function .identity ());
370
+ return new CloseTuple (closing , checkClosed ());
355
371
} else {
356
- Connection connection = firstAliveConnection ();
357
- if (connection != null ) {
358
- toExecute (() -> uponAvailable .complete (connection ));
359
- } else {
360
- if (tryIncreaseSize ()) {
361
- obtainStream .get ()
362
- .thenApply (stream -> new PooledPgConnection (new PgConnection (stream , dataConverter ))
363
- .connect (username , password , database ))
364
- .thenCompose (Function .identity ())
365
- .thenApply (pooledConnection -> {
366
- if (validationQuery != null && !validationQuery .isBlank ()) {
367
- return pooledConnection .completeScript (validationQuery )
368
- .handle ((rss , th ) -> {
369
- if (th != null ) {
370
- return ((PooledPgConnection ) pooledConnection ).delegate .close ()
371
- .thenApply (v -> CompletableFuture .<Connection >failedFuture (th ))
372
- .thenCompose (Function .identity ());
373
- } else {
374
- return CompletableFuture .completedFuture (pooledConnection );
375
- }
376
- })
377
- .thenCompose (Function .identity ());
378
- } else {
379
- return CompletableFuture .completedFuture (pooledConnection );
380
- }
381
- })
382
- .thenCompose (Function .identity ())
383
- .thenAccept (pooledConnection -> locked (() -> toExecute (() -> uponAvailable .complete (pooledConnection ))))
384
- .exceptionally (th -> locked (() -> {
385
- size --;
386
- toExecute (() -> uponAvailable .completeExceptionally (th ));
387
- discardAvailableSubscribers ("Unable to connect" );
388
- checkFullyAvailable ();
389
- return null ;
390
- }));
391
- } else {
392
- // Pool is full now and all connections are busy
393
- uponAvailableSubscribers .offer (uponAvailable );
394
- }
395
- }
372
+ return new CloseTuple (CompletableFuture .failedFuture (new IllegalStateException ("PG pool is already shutting down" )), NO_OP );
396
373
}
397
- return uponAvailable ;
398
374
});
375
+ futuresExecutor .execute (tuple .immediate );
376
+ return tuple .closing ;
399
377
}
400
378
401
- private void checkFullyAvailable () {
402
- if (uponFullyAvailable != null && size <= availableConnections .size ()) {
403
- toExecute (() -> uponFullyAvailable .complete (null ));
379
+ private static final Runnable NO_OP = () -> {
380
+ };
381
+
382
+ private Runnable checkClosed () {
383
+ if (closing != null && size <= connections .size ()) {
384
+ assert pending .isEmpty ();
385
+ return () -> closing .complete (null );
386
+ } else {
387
+ return NO_OP ;
404
388
}
405
389
}
406
390
407
391
private Connection firstAliveConnection () {
408
- Connection connection = availableConnections .poll ();
392
+ Connection connection = connections .poll ();
409
393
while (connection != null && !connection .isConnected ()) {
410
394
size --;
411
- connection = availableConnections .poll ();
395
+ connection = connections .poll ();
412
396
}
413
397
return connection ;
414
398
}
415
-
416
- private boolean tryIncreaseSize () {
417
- if (size < maxConnections ) {
418
- size ++;
419
- return true ;
420
- } else {
421
- return false ;
422
- }
423
- }
424
-
425
- private CompletableFuture <Void > release (PooledPgConnection connection ) {
426
- if (connection == null ) {
427
- throw new IllegalArgumentException ("'connection' should be not null" );
428
- }
429
- return locked (() -> {
430
- if (connection .isConnected ()) {
431
- if (!uponAvailableSubscribers .isEmpty ()) {
432
- CompletableFuture <? super Connection > subscriber = uponAvailableSubscribers .poll ();
433
- toExecute (() -> subscriber .complete (connection ));
434
-
7572
} else {
435
- availableConnections .offer (connection );
436
- checkFullyAvailable ();
437
- }
438
- } else {
439
- size --;
440
- discardAvailableSubscribers ("Connection lost" );
441
- checkFullyAvailable ();
442
- }
443
- return CompletableFuture .completedFuture (null );
444
- });
445
- }
446
399
}
0 commit comments