8000 Use an executor for shutdown monitors · yezijiang/rabbitmq-java-client@22ddfed · GitHub
[go: up one dir, main page]

Skip to content

Commit 22ddfed

Browse files
Use an executor for shutdown monitors
Fixes rabbitmq#87.
1 parent f6b969e commit 22ddfed

File tree

3 files changed

+59
-11
lines changed

3 files changed

+59
-11
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ public class ConnectionFactory implements Cloneable {
101101
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
102102
private ExecutorService sharedExecutor;
103103
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
104+
// minimises the number of threads rapid closure of many
105+
// connections uses, see rabbitmq/rabbitmq-java-client#86
106+
private ExecutorService shutdownExecutor;
104107
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
105108
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
106109

@@ -469,18 +472,34 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
469472
}
470473

471474
/**
472-
* Set the executor to use by default for newly created connections.
475+
* Set the executor to use for consumer operation dispatch
476+
* by default for newly created connections.
473477
* All connections that use this executor share it.
474478
*
475479
* It's developer's responsibility to shut down the executor
476480
* when it is no longer needed.
477481
*
478-
* @param executor
482+
* @param executor executor service to be used for
483+
* consumer operation
479484
*/
480485
public void setSharedExecutor(ExecutorService executor) {
481486
this.sharedExecutor = executor;
482487
}
483488

489+
/**
490+
* Set the executor to use for connection shutdown.
491+
* All connections that use this executor share it.
492+
*
493+
* It's developer's responsibility to shut down the executor
494+
* when it is no longer needed.
495+
*
496+
* @param executor executor service to be used for
497+
* connection shutdown
498+
*/
499+
public void setShutdownExecutor(ExecutorService executor) {
500+
this.shutdownExecutor = executor;
501+
}
502+
484503
/**
485504
* Retrieve the thread factory used to instantiate new threads.
486505
* @see ThreadFactory
@@ -640,6 +659,8 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
640659
*/
641660
public Connection newConnection(ExecutorService executor, Address[] addrs)
642661
throws IOException, TimeoutException {
662+
// make sure we respect the provided thread factory
663+
maybeInitializeShutdownExecutor();
643664
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
644665
ConnectionParams params = params(executor);
645666

@@ -682,6 +703,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
682703
result.setThreadFactory(threadFactory);
683704
result.setHandshakeTimeout(handshakeTimeout);
684705
result.setRequestedHeartbeat(requestedHeartbeat);
706+
result.setShutdownExecutor(shutdownExecutor);
685707
return result;
686708
}
687709

@@ -718,6 +740,18 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
718740
);
719741
}
720742

743+
/**
744+
* Lazily initializes shutdown executor service. This is necessary
745+
* to make sure the default executor uses the thread factory that
746+
* may be user-provided and crucially important in certain environments,
747+
* e.g. Google App Engine or JEE application servers.
748+
*/
749+
protected void maybeInitializeShutdownExecutor() {
750+
if(shutdownExecutor == null) {
751+
shutdownExecutor = Executors.newFixedThreadPool(4, threadFactory);
752+
}
753+
}
754+
721755
@Override public ConnectionFactory clone(){
722756
try {
723757
return (ConnectionFactory)super.clone();

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ final class Copyright {
6363
* for an example.
6464
*/
6565
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
66-
private final ExecutorService executor;
66+
private final ExecutorService consumerWorkServiceExecutor;
67+
private final ExecutorService shutdownExecutor;
6768
private Thread mainLoopThread;
6869
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6970

@@ -220,7 +221,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
220221
this.handshakeTimeout = params.getHandshakeTimeout();
221222
this.shutdownTimeout = params.getShutdownTimeout();
222223
this.saslConfig = params.getSaslConfig();
223-
this.executor = params.getConsumerWorkServiceExecutor();
224+
this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor();
225+
this.shutdownExecutor = params.getShutdownExecutor();
224226
this.threadFactory = params.getThreadFactory();
225227

226228
this._channelManager = null;
@@ -231,7 +233,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
231233
}
232234

233235
private void initializeConsumerWorkService() {
234-
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
236+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
235237
}
236238

237239
private void initializeHeartbeatSender() {
@@ -478,7 +480,7 @@ public ExceptionHandler getExceptionHandler() {
478480

479481
/** Public API
480482
*
481-
* @return true if this work service instance uses its own executor (as opposed to a shared one)
483+
* @return true if this work service instance uses its own consumerWorkServiceExecutor (as opposed to a shared one)
482484
*/
483485
public boolean willShutDownConsumerExecutor() {
484486
return this._workService.usesPrivateExecutor();
@@ -669,12 +671,12 @@ public void handleConnectionClose(Command closeCommand) {
669671
} catch (IOException ignored) { } // ignore
670672
_brokerInitiatedShutdown = true;
671673
SocketCloseWait scw = new SocketCloseWait(sse);
672-
final String name = "AMQP Connection Closing Monitor " +
673-
getHostAddress() + ":" + getPort();
674-
Thread waiter = Environment.newThread(threadFactory, scw, name);
675-
waiter.start();
674+
shutdownExecutor.execute(scw);
676675
}
677676

677+
// same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT
678+
private static long SOCKET_CLOSE_TIMEOUT = 10000;
679+
678680
private class SocketCloseWait implements Runnable {
679681
private final ShutdownSignalException cause;
680682

@@ -684,9 +686,12 @@ public SocketCloseWait(ShutdownSignalException sse) {
684686

685687
public void run() {
686688
try {
687-
_appContinuation.get();
689+
// TODO: use a sensible timeout here
690+
_appContinuation.get(SOCKET_CLOSE_TIMEOUT);
688691
} catch (InterruptedException e) {
689692
Thread.currentThread().interrupt();
693+
} catch (TimeoutException ignored) {
694+
// this releases the thread
690695
} finally {
691696
_running = false;
692697
_channel0.notifyOutstandingRpc(cause);

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class ConnectionParams {
1111
private String username;
1212
private String password;
1313
private ExecutorService consumerWorkServiceExecutor;
14+
private ExecutorService shutdownExecutor;
1415
private String virtualHost;
1516
private Map<String, Object> clientProperties;
1617
private int requestedFrameMax;
@@ -146,4 +147,12 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
146147
public void setThreadFactory(ThreadFactory threadFactory) {
147148
this.threadFactory = threadFactory;
148149
}
150+
151+
public ExecutorService getShutdownExecutor() {
152+
return shutdownExecutor;
153+
}
154+
155+
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
156+
this.shutdownExecutor = shutdownExecutor;
157+
}
149158
}

0 commit comments

Comments
 (0)
0