@@ -101,6 +101,9 @@ public class ConnectionFactory implements Cloneable {
101
101
private SaslConfig saslConfig = DefaultSaslConfig .PLAIN ;
102
102
private ExecutorService sharedExecutor ;
103
103
private ThreadFactory threadFactory = Executors .defaultThreadFactory ();
104
+ // minimises the number of threads rapid closure of many
105
+ // connections uses, see rabbitmq/rabbitmq-java-client#86
106
+ private ExecutorService shutdownExecutor ;
104
107
private SocketConfigurator socketConf = new DefaultSocketConfigurator ();
105
108
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler ();
106
109
@@ -469,18 +472,34 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
469
472
}
470
473
471
474
/**
472
- * Set the executor to use by default for newly created connections.
475
+ * Set the executor to use for consumer operation dispatch
476
+ * by default for newly created connections.
473
477
* All connections that use this executor share it.
474
478
*
475
479
* It's developer's responsibility to shut down the executor
476
480
* when it is no longer needed.
477
481
*
478
- * @param executor
482
+ * @param executor executor service to be used for
483
+ * consumer operation
479
484
*/
480
485
public void setSharedExecutor (ExecutorService executor ) {
481
486
this .sharedExecutor = executor ;
482
487
}
483
488
489
+ /**
490
+ * Set the executor to use for connection shutdown.
491
+ * All connections that use this executor share it.
492
+ *
493
+ * It's developer's responsibility to shut down the executor
494
+ * when it is no longer needed.
495
+ *
496
+ * @param executor executor service to be used for
497
+ * connection shutdown
498
+ */
499
+ public void setShutdownExecutor (ExecutorService executor ) {
500
+ this .shutdownExecutor = executor ;
501
+ }
502
+
484
503
/**
485
504
* Retrieve the thread factory used to instantiate new threads.
486
505
* @see ThreadFactory
@@ -640,6 +659,8 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
640
659
*/
641
660
public Connection newConnection (ExecutorService executor , Address [] addrs )
642
661
throws IOException , TimeoutException {
662
+ // make sure we respect the provided thread factory
663
+ maybeInitializeShutdownExecutor ();
643
664
FrameHandlerFactory fhFactory = createFrameHandlerFactory ();
644
665
ConnectionParams params = params (executor );
645
666
@@ -682,6 +703,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
682
703
result .setThreadFactory (threadFactory );
683
704
result .setHandshakeTimeout (handshakeTimeout );
684
705
result .setRequestedHeartbeat (requestedHeartbeat );
706
+ result .setShutdownExecutor (shutdownExecutor );
685
707
return result ;
686
708
}
687
709
@@ -718,6 +740,18 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
718
740
);
719
741
}
720
742
743
+ /**
744
+ * Lazily initializes shutdown executor service. This is necessary
745
+ * to make sure the default executor uses the thread factory that
746
+ * may be user-provided and crucially important in certain environments,
747
+ * e.g. Google App Engine or JEE application servers.
748
+ */
749
+ protected void maybeInitializeShutdownExecutor () {
750
+ if (shutdownExecutor == null ) {
751
+ shutdownExecutor = Executors .newFixedThreadPool (4 , threadFactory );
752
+ }
753
+ }
754
+
721
755
@ Override public ConnectionFactory clone (){
722
756
try {
723
757
return (ConnectionFactory )super .clone ();
0 commit comments