8000 Use an executor for shutdown monitors by michaelklishin · Pull Request #88 · rabbitmq/rabbitmq-java-client · GitHub
[go: up one dir, main page]

Skip to content

Use an executor for shutdown monitors #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions src/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class ConnectionFactory implements Cloneable {
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
private ExecutorService sharedExecutor;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
// minimises the number of threads rapid closure of many
// connections uses, see rabbitmq/rabbitmq-java-client#86
private ExecutorService shutdownExecutor;
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();

Expand Down Expand Up @@ -469,18 +472,34 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
}

/**
* Set the executor to use by default for newly created connections.
* Set the executor to use for consumer operation dispatch
* by default for newly created connections.
* All connections that use this executor share it.
*
* It's developer's responsibility to shut down the executor
* when it is no longer needed.
*
* @param executor
* @param executor executor service to be used for
* consumer operation
*/
public void setSharedExecutor(ExecutorService executor) {
this.sharedExecutor = executor;
}

/**
* Set the executor to use for connection shutdown.
* All connections that use this executor share it.
*
* It's developer's responsibility to shut down the executor
* when it is no longer needed.
*
* @param executor executor service to be used for
* connection shutdown
*/
public void setShutdownExecutor(ExecutorService executor) {
this.shutdownExecutor = executor;
}

/**
* Retrieve the thread factory used to instantiate new threads.
* @see ThreadFactory
Expand Down Expand Up @@ -640,6 +659,8 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
*/
public Connection newConnection(ExecutorService executor, Address[] addrs)
throws IOException, TimeoutException {
// make sure we respect the provided thread factory
maybeInitializeShutdownExecutor();
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);

Expand All @@ -664,12 +685,25 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
}
}

public ConnectionParams params(ExecutorService executor) {
// TODO: switch to use setters for all fields
ConnectionParams result = new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
ConnectionParams result = new ConnectionParams();

result.setUsername(username);
result.setPassword(password);
result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
result.setVirtualHost(virtualHost);
result.setClientProperties(getClientProperties());
result.setRequestedFrameMax(requestedFrameMax);
result.setRequestedChannelMax(requestedChannelMax);
result.setShutdownTimeout(shutdownTimeout);
result.setSaslConfig(saslConfig);
result.setNetworkRecoveryInterval(networkRecoveryInterval);
result.setTopologyRecovery(topologyRecovery);
result.setExceptionHandler(exceptionHandler);
result.setThreadFactory(threadFactory);
result.setHandshakeTimeout(handshakeTimeout);
result.setRequestedHeartbeat(requestedHeartbeat);
result.setShutdownExecutor(shutdownExecutor);
return result;
}

Expand Down Expand Up @@ -706,6 +740,18 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
);
}

/**
* Lazily initializes shutdown executor service. This is necessary
* to make sure the default executor uses the thread factory that
* may be user-provided and crucially important in certain environments,
* e.g. Google App Engine or JEE application servers.
*/
protected void maybeInitializeShutdownExecutor() {
if(shutdownExecutor == null) {
shutdownExecutor = Executors.newFixedThreadPool(4, threadFactory);
}
}

@Override public ConnectionFactory clone(){
try {
return (ConnectionFactory)super.clone();
Expand Down
23 changes: 14 additions & 9 deletions src/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ final class Copyright {
* for an example.
*/
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
private final ExecutorService executor;
private final ExecutorService consumerWorkServiceExecutor;
private final ExecutorService shutdownExecutor;
private Thread mainLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();

Expand Down Expand Up @@ -220,7 +221,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
this.handshakeTimeout = params.getHandshakeTimeout();
this.shutdownTimeout = params.getShutdownTimeout();
this.saslConfig = params.getSaslConfig();
this.executor = params.getExecutor();
this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor();
this.shutdownExecutor = params.getShutdownExecutor();
this.threadFactory = params.getThreadFactory();

this._channelManager = null;
Expand All @@ -231,7 +233,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
}

private void initializeConsumerWorkService() {
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
}

private void initializeHeartbeatSender() {
Expand Down Expand Up @@ -478,7 +480,7 @@ public ExceptionHandler getExceptionHandler() {

/** Public API
*
* @return true if this work service instance uses its own executor (as opposed to a shared one)
* @return true if this work service instance uses its own consumerWorkServiceExecutor (as opposed to a shared one)
*/
public boolean willShutDownConsumerExecutor() {
return this._workService.usesPrivateExecutor();
Expand Down Expand Up @@ -669,12 +671,12 @@ public void handleConnectionClose(Command closeCommand) {
} catch (IOException ignored) { } // ignore
_brokerInitiatedShutdown = true;
SocketCloseWait scw = new SocketCloseWait(sse);
final String name = "AMQP Connection Closing Monitor " +
getHostAddress() + ":" + getPort();
Thread waiter = Environment.newThread(threadFactory, scw, name);
waiter.start();
shutdownExecutor.execute(scw);
}

// same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT
private static long SOCKET_CLOSE_TIMEOUT = 10000;

private class SocketCloseWait implements Runnable {
private final ShutdownSignalException cause;

Expand All @@ -684,9 +686,12 @@ public SocketCloseWait(ShutdownSignalException sse) {

public void run() {
try {
_appContinuation.get();
// TODO: use a sensible timeout here
_appContinuation.get(SOCKET_CLOSE_TIMEOUT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ignored) {
// this releases the thread
} finally {
_running = false;
_channel0.notifyOutstandingRpc(cause);
Expand Down
139 changes: 85 additions & 54 deletions src/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,25 @@
import java.util.concurrent.ThreadFactory;

public class ConnectionParams {
private final String username;
private final String password;
private final ExecutorService executor;
private final String virtualHost;
private final Map<String, Object> clientProperties;
private final int requestedFrameMax;
private final int requestedChannelMax;
private final int requestedHeartbeat;
private String username;
private String password;
private ExecutorService consumerWorkServiceExecutor;
private ExecutorService shutdownExecutor;
private String virtualHost;
private Map<String, Object> clientProperties;
private int requestedFrameMax;
private int requestedChannelMax;
private int requestedHeartbeat;
private int handshakeTimeout;
private final int shutdownTimeout;
private final SaslConfig saslConfig;
private final long networkRecoveryInterval;
private final boolean topologyRecovery;

private final ExceptionHandler exceptionHandler;
private final ThreadFactory threadFactory;

/**
* @param username name used to establish connection
* @param password for <code><b>username</b></code>
* @param executor thread pool service for consumer threads for channels on this connection
* @param virtualHost virtual host of this connection
* @param clientProperties client info used in negotiating with the server
* @param requestedFrameMax max size of frame offered
* @param requestedChannelMax max number of channels offered
* @param requestedHeartbeat heart-beat in seconds offered
* @param saslConfig sasl configuration hook
* @param networkRecoveryInterval interval used when recovering from network failure
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
* @param threadFactory factory that instantiates threads used by the client
* @param exceptionHandler handles unhandled consumer exceptions
*/
public ConnectionParams(String username, String password, ExecutorService executor,
String virtualHost, Map<String, Object> clientProperties,
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
int shutdownTimeout, SaslConfig saslConfig, long networkRecoveryInterval,
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
this.username = username;
this.password = password;
this.executor = executor;
this.virtualHost = virtualHost;
this.clientProperties = clientProperties;
this.requestedFrameMax = requestedFrameMax;
this.requestedChannelMax = requestedChannelMax;
this.requestedHeartbeat = requestedHeartbeat;
this.shutdownTimeout = shutdownTimeout;
this.saslConfig = saslConfig;
this.networkRecoveryInterval = networkRecoveryInterval;
this.topologyRecovery = topologyRecovery;
this.exceptionHandler = exceptionHandler;
this.threadFactory = threadFactory;
}
private int shutdownTimeout;
private SaslConfig saslConfig;
private long networkRecoveryInterval;
private boolean topologyRecovery;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;

public ConnectionParams() {}

public String getUsername() {
return username;
Expand All @@ -69,8 +36,8 @@ public String getPassword() {
return password;
}

public ExecutorService getExecutor() {
return executor;
public ExecutorService getConsumerWorkServiceExecutor() {
return consumerWorkServiceExecutor;
}

public String getVirtualHost() {
Expand Down Expand Up @@ -121,7 +88,71 @@ public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
}

public ThreadFactory getThreadFactory() {
public ThreadFactory getThreadFactory() {
return threadFactory;
}

public void setUsername(String username) {
this.username = username;
}

public void setPassword(String password) {
this.password = password;
}

public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) {
this.consumerWorkServiceExecutor = consumerWorkServiceExecutor;
}

public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}

public void setClientProperties(Map<String, Object> clientProperties) {
this.clientProperties = clientProperties;
}

public void setRequestedFrameMax(int requestedFrameMax) {
this.requestedFrameMax = requestedFrameMax;
}

public void setRequestedChannelMax(int requestedChannelMax) {
this.requestedChannelMax = requestedChannelMax;
}

public void setRequestedHeartbeat(int requestedHeartbeat) {
this.requestedHeartbeat = requestedHeartbeat;
}

public void setShutdownTimeout(int shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}

public void setSaslConfig(SaslConfig saslConfig) {
this.saslConfig = saslConfig;
}

public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}

public void setTopologyRecovery(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}

public void setExceptionHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}

public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

public ExecutorService getShutdownExecutor() {
return shutdownExecutor;
}

public void setShutdownExecutor(ExecutorService shutdownExecutor) {
this.shutdownExecutor = shutdownExecutor;
}
}
0