From 907ddfbdba5f20eaf148879a0721ff5df4d7e92d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 18 Sep 2015 17:03:25 +0300 Subject: [PATCH 1/4] Refactor ConnectionParams to setters Before we introduce another field. --- .../rabbitmq/client/ConnectionFactory.java | 19 ++- .../client/impl/ConnectionParams.java | 126 ++++++++++-------- 2 files changed, 89 insertions(+), 56 deletions(-) diff --git a/src/com/rabbitmq/client/ConnectionFactory.java b/src/com/rabbitmq/client/ConnectionFactory.java index 8573d3e137..f5a6668703 100644 --- a/src/com/rabbitmq/client/ConnectionFactory.java +++ b/src/com/rabbitmq/client/ConnectionFactory.java @@ -665,10 +665,21 @@ public Connection newConnection(ExecutorService executor, Address[] addrs) } public ConnectionParams params(ExecutorService executor) { - // TODO: switch to use setters for all fields - ConnectionParams result = new ConnectionParams(username, password, executor, virtualHost, getClientProperties(), - requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig, - networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory); + ConnectionParams result = new ConnectionParams(); + + result.setUsername(username); + result.setPassword(password); + result.setExecutor(executor); + result.setVirtualHost(virtualHost); + result.setClientProperties(getClientProperties()); + result.setRequestedFrameMax(requestedFrameMax); + result.setRequestedChannelMax(requestedChannelMax); + result.setShutdownTimeout(shutdownTimeout); + result.setSaslConfig(saslConfig); + result.setNetworkRecoveryInterval(networkRecoveryInterval); + result.setTopologyRecovery(topologyRecovery); + result.setExceptionHandler(exceptionHandler); + result.setThreadFactory(threadFactory); result.setHandshakeTimeout(handshakeTimeout); return result; } diff --git a/src/com/rabbitmq/client/impl/ConnectionParams.java b/src/com/rabbitmq/client/impl/ConnectionParams.java index 2dfeba745f..d487f19fcd 100644 --- a/src/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/com/rabbitmq/client/impl/ConnectionParams.java @@ -8,58 +8,24 @@ import java.util.concurrent.ThreadFactory; public class ConnectionParams { - private final String username; - private final String password; - private final ExecutorService executor; - private final String virtualHost; - private final Map clientProperties; - private final int requestedFrameMax; - private final int requestedChannelMax; - private final int requestedHeartbeat; + private String username; + private String password; + private ExecutorService executor; + private String virtualHost; + private Map clientProperties; + private int requestedFrameMax; + private int requestedChannelMax; + private int requestedHeartbeat; private int handshakeTimeout; - private final int shutdownTimeout; - private final SaslConfig saslConfig; - private final long networkRecoveryInterval; - private final boolean topologyRecovery; - - private final ExceptionHandler exceptionHandler; - private final ThreadFactory threadFactory; - - /** - * @param username name used to establish connection - * @param password for username - * @param executor thread pool service for consumer threads for channels on this connection - * @param virtualHost virtual host of this connection - * @param clientProperties client info used in negotiating with the server - * @param requestedFrameMax max size of frame offered - * @param requestedChannelMax max number of channels offered - * @param requestedHeartbeat heart-beat in seconds offered - * @param saslConfig sasl configuration hook - * @param networkRecoveryInterval interval used when recovering from network failure - * @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed? - * @param threadFactory factory that instantiates threads used by the client - * @param exceptionHandler handles unhandled consumer exceptions - */ - public ConnectionParams(String username, String password, ExecutorService executor, - String virtualHost, Map clientProperties, - int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat, - int shutdownTimeout, SaslConfig saslConfig, long networkRecoveryInterval, - boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) { - this.username = username; - this.password = password; - this.executor = executor; - this.virtualHost = virtualHost; - this.clientProperties = clientProperties; - this.requestedFrameMax = requestedFrameMax; - this.requestedChannelMax = requestedChannelMax; - this.requestedHeartbeat = requestedHeartbeat; - this.shutdownTimeout = shutdownTimeout; - this.saslConfig = saslConfig; - this.networkRecoveryInterval = networkRecoveryInterval; - this.topologyRecovery = topologyRecovery; - this.exceptionHandler = exceptionHandler; - this.threadFactory = threadFactory; - } + private int shutdownTimeout; + private SaslConfig saslConfig; + private long networkRecoveryInterval; + private boolean topologyRecovery; + + private ExceptionHandler exceptionHandler; + private ThreadFactory threadFactory; + + public ConnectionParams() {} public String getUsername() { return username; @@ -121,7 +87,63 @@ public boolean isTopologyRecoveryEnabled() { return topologyRecovery; } - public ThreadFactory getThreadFactory() { + public ThreadFactory getThreadFactory() { return threadFactory; } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setExecutor(ExecutorService executor) { + this.executor = executor; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public void setClientProperties(Map clientProperties) { + this.clientProperties = clientProperties; + } + + public void setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + } + + public void setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + } + + public void setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + } + + public void setShutdownTimeout(int shutdownTimeout) { + this.shutdownTimeout = shutdownTimeout; + } + + public void setSaslConfig(SaslConfig saslConfig) { + this.saslConfig = saslConfig; + } + + public void setNetworkRecoveryInterval(long networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + } + + public void setTopologyRecovery(boolean topologyRecovery) { + this.topologyRecovery = topologyRecovery; + } + + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + public void setThreadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + } } From 2f60a8b15dce0aa059acf9866f48af7b96fd1475 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 18 Sep 2015 17:05:22 +0300 Subject: [PATCH 2/4] Set requested heartbeat, too --- src/com/rabbitmq/client/ConnectionFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/com/rabbitmq/client/ConnectionFactory.java b/src/com/rabbitmq/client/ConnectionFactory.java index f5a6668703..392882e892 100644 --- a/src/com/rabbitmq/client/ConnectionFactory.java +++ b/src/com/rabbitmq/client/ConnectionFactory.java @@ -681,6 +681,7 @@ public ConnectionParams params(ExecutorService executor) { result.setExceptionHandler(exceptionHandler); result.setThreadFactory(threadFactory); result.setHandshakeTimeout(handshakeTimeout); + result.setRequestedHeartbeat(requestedHeartbeat); return result; } From f6b969e0b4664d00e62e6aea22703f24468808b3 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 18 Sep 2015 17:11:42 +0300 Subject: [PATCH 3/4] More specific ConnectionParams field name --- src/com/rabbitmq/client/ConnectionFactory.java | 4 ++-- src/com/rabbitmq/client/impl/AMQConnection.java | 2 +- src/com/rabbitmq/client/impl/ConnectionParams.java | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/com/rabbitmq/client/ConnectionFactory.java b/src/com/rabbitmq/client/ConnectionFactory.java index 392882e892..6653982676 100644 --- a/src/com/rabbitmq/client/ConnectionFactory.java +++ b/src/com/rabbitmq/client/ConnectionFactory.java @@ -664,12 +664,12 @@ public Connection newConnection(ExecutorService executor, Address[] addrs) } } - public ConnectionParams params(ExecutorService executor) { + public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { ConnectionParams result = new ConnectionParams(); result.setUsername(username); result.setPassword(password); - result.setExecutor(executor); + result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor); result.setVirtualHost(virtualHost); result.setClientProperties(getClientProperties()); result.setRequestedFrameMax(requestedFrameMax); diff --git a/src/com/rabbitmq/client/impl/AMQConnection.java b/src/com/rabbitmq/client/impl/AMQConnection.java index afe65dad80..365c4506f8 100644 --- a/src/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/com/rabbitmq/client/impl/AMQConnection.java @@ -220,7 +220,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) this.handshakeTimeout = params.getHandshakeTimeout(); this.shutdownTimeout = params.getShutdownTimeout(); this.saslConfig = params.getSaslConfig(); - this.executor = params.getExecutor(); + this.executor = params.getConsumerWorkServiceExecutor(); this.threadFactory = params.getThreadFactory(); this._channelManager = null; diff --git a/src/com/rabbitmq/client/impl/ConnectionParams.java b/src/com/rabbitmq/client/impl/ConnectionParams.java index d487f19fcd..d42679b906 100644 --- a/src/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/com/rabbitmq/client/impl/ConnectionParams.java @@ -10,7 +10,7 @@ public class ConnectionParams { private String username; private String password; - private ExecutorService executor; + private ExecutorService consumerWorkServiceExecutor; private String virtualHost; private Map clientProperties; private int requestedFrameMax; @@ -35,8 +35,8 @@ public String getPassword() { return password; } - public ExecutorService getExecutor() { - return executor; + public ExecutorService getConsumerWorkServiceExecutor() { + return consumerWorkServiceExecutor; } public String getVirtualHost() { @@ -99,8 +99,8 @@ public void setPassword(String password) { this.password = password; } - public void setExecutor(ExecutorService executor) { - this.executor = executor; + public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) { + this.consumerWorkServiceExecutor = consumerWorkServiceExecutor; } public void setVirtualHost(String virtualHost) { From 22ddfedea874ec2c6b99d96e60fcd8046b78d8d0 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 18 Sep 2015 18:57:27 +0300 Subject: [PATCH 4/4] Use an executor for shutdown monitors Fixes #87. --- .../rabbitmq/client/ConnectionFactory.java | 38 ++++++++++++++++++- .../rabbitmq/client/impl/AMQConnection.java | 23 ++++++----- .../client/impl/ConnectionParams.java | 9 +++++ 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/src/com/rabbitmq/client/ConnectionFactory.java b/src/com/rabbitmq/client/ConnectionFactory.java index 6653982676..b74556ca52 100644 --- a/src/com/rabbitmq/client/ConnectionFactory.java +++ b/src/com/rabbitmq/client/ConnectionFactory.java @@ -101,6 +101,9 @@ public class ConnectionFactory implements Cloneable { private SaslConfig saslConfig = DefaultSaslConfig.PLAIN; private ExecutorService sharedExecutor; private ThreadFactory threadFactory = Executors.defaultThreadFactory(); + // minimises the number of threads rapid closure of many + // connections uses, see rabbitmq/rabbitmq-java-client#86 + private ExecutorService shutdownExecutor; private SocketConfigurator socketConf = new DefaultSocketConfigurator(); private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); @@ -469,18 +472,34 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) { } /** - * Set the executor to use by default for newly created connections. + * Set the executor to use for consumer operation dispatch + * by default for newly created connections. * All connections that use this executor share it. * * It's developer's responsibility to shut down the executor * when it is no longer needed. * - * @param executor + * @param executor executor service to be used for + * consumer operation */ public void setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; } + /** + * Set the executor to use for connection shutdown. + * All connections that use this executor share it. + * + * It's developer's responsibility to shut down the executor + * when it is no longer needed. + * + * @param executor executor service to be used for + * connection shutdown + */ + public void setShutdownExecutor(ExecutorService executor) { + this.shutdownExecutor = executor; + } + /** * Retrieve the thread factory used to instantiate new threads. * @see ThreadFactory @@ -640,6 +659,8 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce */ public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException { + // make sure we respect the provided thread factory + maybeInitializeShutdownExecutor(); FrameHandlerFactory fhFactory = createFrameHandlerFactory(); ConnectionParams params = params(executor); @@ -682,6 +703,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setThreadFactory(threadFactory); result.setHandshakeTimeout(handshakeTimeout); result.setRequestedHeartbeat(requestedHeartbeat); + result.setShutdownExecutor(shutdownExecutor); return result; } @@ -718,6 +740,18 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti ); } + /** + * Lazily initializes shutdown executor service. This is necessary + * to make sure the default executor uses the thread factory that + * may be user-provided and crucially important in certain environments, + * e.g. Google App Engine or JEE application servers. + */ + protected void maybeInitializeShutdownExecutor() { + if(shutdownExecutor == null) { + shutdownExecutor = Executors.newFixedThreadPool(4, threadFactory); + } + } + @Override public ConnectionFactory clone(){ try { return (ConnectionFactory)super.clone(); diff --git a/src/com/rabbitmq/client/impl/AMQConnection.java b/src/com/rabbitmq/client/impl/AMQConnection.java index 365c4506f8..0fc0224d98 100644 --- a/src/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/com/rabbitmq/client/impl/AMQConnection.java @@ -63,7 +63,8 @@ final class Copyright { * for an example. */ public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection { - private final ExecutorService executor; + private final ExecutorService consumerWorkServiceExecutor; + private final ExecutorService shutdownExecutor; private Thread mainLoopThread; private ThreadFactory threadFactory = Executors.defaultThreadFactory(); @@ -220,7 +221,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) this.handshakeTimeout = params.getHandshakeTimeout(); this.shutdownTimeout = params.getShutdownTimeout(); this.saslConfig = params.getSaslConfig(); - this.executor = params.getConsumerWorkServiceExecutor(); + this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor(); + this.shutdownExecutor = params.getShutdownExecutor(); this.threadFactory = params.getThreadFactory(); this._channelManager = null; @@ -231,7 +233,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) } private void initializeConsumerWorkService() { - this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout); + this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout); } private void initializeHeartbeatSender() { @@ -478,7 +480,7 @@ public ExceptionHandler getExceptionHandler() { /** Public API * - * @return true if this work service instance uses its own executor (as opposed to a shared one) + * @return true if this work service instance uses its own consumerWorkServiceExecutor (as opposed to a shared one) */ public boolean willShutDownConsumerExecutor() { return this._workService.usesPrivateExecutor(); @@ -669,12 +671,12 @@ public void handleConnectionClose(Command closeCommand) { } catch (IOException ignored) { } // ignore _brokerInitiatedShutdown = true; SocketCloseWait scw = new SocketCloseWait(sse); - final String name = "AMQP Connection Closing Monitor " + - getHostAddress() + ":" + getPort(); - Thread waiter = Environment.newThread(threadFactory, scw, name); - waiter.start(); + shutdownExecutor.execute(scw); } + // same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT + private static long SOCKET_CLOSE_TIMEOUT = 10000; + private class SocketCloseWait implements Runnable { private final ShutdownSignalException cause; @@ -684,9 +686,12 @@ public SocketCloseWait(ShutdownSignalException sse) { public void run() { try { - _appContinuation.get(); + // TODO: use a sensible timeout here + _appContinuation.get(SOCKET_CLOSE_TIMEOUT); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (TimeoutException ignored) { + // this releases the thread } finally { _running = false; _channel0.notifyOutstandingRpc(cause); diff --git a/src/com/rabbitmq/client/impl/ConnectionParams.java b/src/com/rabbitmq/client/impl/ConnectionParams.java index d42679b906..a7eacd0391 100644 --- a/src/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/com/rabbitmq/client/impl/ConnectionParams.java @@ -11,6 +11,7 @@ public class ConnectionParams { private String username; private String password; private ExecutorService consumerWorkServiceExecutor; + private ExecutorService shutdownExecutor; private String virtualHost; private Map clientProperties; private int requestedFrameMax; @@ -146,4 +147,12 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) { public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } + + public ExecutorService getShutdownExecutor() { + return shutdownExecutor; + } + + public void setShutdownExecutor(ExecutorService shutdownExecutor) { + this.shutdownExecutor = shutdownExecutor; + } }