8000 Merge pull request #88 from rabbitmq/rabbitmq-java-client-87 · rabbitmq/rabbitmq-java-client@e762711 · GitHub
[go: up one dir, main page]

Skip to content

Commit e762711

Browse files
Merge pull request #88 from rabbitmq/rabbitmq-java-client-87
Use an executor for shutdown monitors
2 parents b145f02 + 22ddfed commit e762711

File tree

3 files changed

+152
-70
lines changed

3 files changed

+152
-70
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ public class ConnectionFactory implements Cloneable {
101101
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
102102
private ExecutorService sharedExecutor;
103103
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
104+
// minimises the number of threads rapid closure of many
105+
// connections uses, see rabbitmq/rabbitmq-java-client#86
106+
private ExecutorService shutdownExecutor;
104107
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
105108
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
106109

@@ -469,18 +472,34 @@ public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
469472
}
470473

471474
/**
472-
* Set the executor to use by default for newly created connections.
475+
* Set the executor to use for consumer operation dispatch
476+
* by default for newly created connections.
473477
* All connections that use this executor share it.
474478
*
475479
* It's developer's responsibility to shut down the executor
476480
* when it is no longer needed.
477481
*
478-
* @param executor
482+
* @param executor executor service to be used for
483+
* consumer operation
479484
*/
480485
public void setSharedExecutor(ExecutorService executor) {
481486
this.sharedExecutor = executor;
482487
}
483488

489+
/**
490+
* Set the executor to use for connection shutdown.
491+
* All connections that use this executor share it.
492+
*
493+
* It's developer's responsibility to shut down the executor
494+
* when it is no longer needed.
495+
*
496+
* @param executor executor service to be used for
497+
* connection shutdown
498+
*/
499+
public void setShutdownExecutor(ExecutorService executor) {
500+
this.shutdownExecutor = executor;
501+
}
502+
484503
/**
485504
* Retrieve the thread factory used to instantiate new threads.
486505
* @see ThreadFactory
@@ -640,6 +659,8 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
640659
*/
641660
public Connection newConnection(ExecutorService executor, Address[] addrs)
642661
throws IOException, TimeoutException {
662+
// make sure we respect the provided thread factory
663+
maybeInitializeShutdownExecutor();
643664
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
644665
ConnectionParams params = params(executor);
645666

@@ -664,12 +685,25 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
664685
}
665686
}
666687

667-
public ConnectionParams params(ExecutorService executor) {
668-
// TODO: switch to use setters for all fields
669-
ConnectionParams result = new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
670-
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
671-
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
688+
public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
689+
ConnectionParams result = new ConnectionParams();
690+
691+
result.setUsername(username);
692+
result.setPassword(password);
693+
result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
694+
result.setVirtualHost(virtualHost);
695+
result.setClientProperties(getClientProperties());
696+
result.setRequestedFrameMax(requestedFrameMax);
697+
result.setRequestedChannelMax(requestedChannelMax);
698+
result.setShutdownTimeout(shutdownTimeout);
699+
result.setSaslConfig(saslConfig);
700+
result.setNetworkRecoveryInterval(networkRecoveryInterval);
701+
result.setTopologyRecovery(topologyRecovery);
702+
result.setExceptionHandler(exceptionHandler);
703+
result.setThreadFactory(threadFactory);
672704
result.setHandshakeTimeout(handshakeTimeout);
705+
result.setRequestedHeartbeat(requestedHeartbeat);
706+
result.setShutdownExecutor(shutdownExecutor);
673707
return result;
674708
}
675709

@@ -706,6 +740,18 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
706740
);
707741
}
708742

743+
/**
744+
* Lazily initializes shutdown executor service. This is necessary
745+
* to make sure the default executor uses the thread factory that
746+
* may be user-provided and crucially important in certain environments,
747+
* e.g. Google App Engine or JEE application servers.
748+
*/
749+
protected void maybeInitializeShutdownExecutor() {
750+
if(shutdownExecutor == null) {
751+
shutdownExecutor = Executors.newFixedThreadPool(4, threadFactory);
752+
}
753+
}
754+
709755
@Override public ConnectionFactory clone(){
710756
try {
711757
return (ConnectionFactory)super.clone();

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ final class Copyright {
6363
* for an example.
6464
*/
6565
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
66-
private final ExecutorService executor;
66+
private final ExecutorService consumerWorkServiceExecutor;
67+
private final ExecutorService shutdownExecutor;
6768
private Thread mainLoopThread;
6869
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6970

@@ -220,7 +221,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
220221
this.handshakeTimeout = params.getHandshakeTimeout();
221222
this.shutdownTimeout = params.getShutdownTimeout();
222223
this.saslConfig = params.getSaslConfig();
223-
this.executor = params.getExecutor();
224+
this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor();
225+
this.shutdownExecutor = params.getShutdownExecutor();
224226
this.threadFactory = params.getThreadFactory();
225227

226228
this._channelManager = null;
@@ -231,7 +233,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
231233
}
232234

233235
private void initializeConsumerWorkService() {
234-
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
236+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
235237
}
236238

237239
private void initializeHeartbeatSender() {
@@ -478,7 +480,7 @@ public ExceptionHandler getExceptionHandler() {
478480

479481
/** Public API
480482
*
481-
* @return true if this work service instance uses its own executor (as opposed to a shared one)
483+
* @return true if this work service instance uses its own consumerWorkServiceExecutor (as opposed to a shared one)
482484
*/
483485
public boolean willShutDownConsumerExecutor() {
484486
return this._workService.usesPrivateExecutor();
@@ -669,12 +671,12 @@ public void handleConnectionClose(Command closeCommand) {
669671
} catch (IOException ignored) { } // ignore
670672
_brokerInitiatedShutdown = true;
671673
SocketCloseWait scw = new SocketCloseWait(sse);
672-
final String name = "AMQP Connection Closing Monitor " +
673-
getHostAddress() + ":" + getPort();
674-
Thread waiter = Environment.newThread(threadFactory, scw, name);
675-
waiter.start();
674+
shutdownExecutor.execute(scw);
676675
}
677676

677+
// same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT
678+
private static long SOCKET_CLOSE_TIMEOUT = 10000;
679+
678680
private class SocketCloseWait implements Runnable {
679681
private final ShutdownSignalException cause;
680682

@@ -684,9 +686,12 @@ public SocketCloseWait(ShutdownSignalException sse) {
684686

685687
public void run() {
686688
try {
687-
_appContinuation.get();
689+
// TODO: use a sensible timeout here
690+
_appContinuation.get(SOCKET_CLOSE_TIMEOUT);
688691
} catch (InterruptedException e) {
689692
Thread.currentThread().interrupt();
693+
} catch (TimeoutException ignored) {
694+
// this releases the thread
690695
} finally {
691696
_running = false;
692697
_channel0.notifyOutstandingRpc(cause);

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 85 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,25 @@
88
import java.util.concurrent.ThreadFactory;
99

1010
public class ConnectionParams {
11-
private final String username;
12-
private final String password;
13-
private final ExecutorService executor;
14-
private final String virtualHost;
15-
private final Map<String, Object> clientProperties;
16-
private final int requestedFrameMax;
17-
private final int requestedChannelMax;
18-
private final int requestedHeartbeat;
11+
private String username;
12+
private String password;
13+
private ExecutorService consumerWorkServiceExecutor;
14+
private ExecutorService shutdownExecutor;
15+
private String virtualHost;
16+
private Map<String, Object> clientProperties;
17+
private int requestedFrameMax;
18+
private int requestedChannelMax;
19+
private int requestedHeartbeat;
1920
private int handshakeTimeout;
20-
private final int shutdownTimeout;
21-
private final SaslConfig saslConfig;
22-
private final long networkRecoveryInterval;
23-
private final boolean topologyRecovery;
24-
25-
private final ExceptionHandler exceptionHandler;
26-
private final ThreadFactory threadFactory;
27-
28-
/**
29-
* @param username name used to establish connection
30-
* @param password for <code><b>username</b></code>
31-
* @param executor thread pool service for consumer threads for channels on this connection
32-
* @param virtualHost virtual host of this connection
33-
* @param clientProperties client info used in negotiating with the server
34-
* @param requestedFrameMax max size of frame offered
35-
* @param requestedChannelMax max number of channels offered
36-
* @param requestedHeartbeat heart-beat in seconds offered
37-
* @param saslConfig sasl configuration hook
38-
* @param networkRecoveryInterval interval used when recovering from network failure
39-
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
40-
* @param threadFactory factory that instantiates threads used by the client
41-
* @param exceptionHandler handles unhandled consumer exceptions
42-
*/
43-
public ConnectionParams(String username, String password, ExecutorService executor,
44-
String virtualHost, Map<String, Object> clientProperties,
45-
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
46-
int shutdownTimeout, SaslConfig saslConfig, long networkRecoveryInterval,
47-
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
48-
this.username = username;
49-
this.password = password;
50-
this.executor = executor;
51-
this.virtualHost = virtualHost;
52-
this.clientProperties = clientProperties;
53-
this.requestedFrameMax = requestedFrameMax;
54-
this.requestedChannelMax = requestedChannelMax;
55-
this.requestedHeartbeat = requestedHeartbeat;
56-
this.shutdownTimeout = shutdownTimeout;
57-
this.saslConfig = saslConfig;
58-
this.networkRecoveryInterval = networkRecoveryInterval;
59-
this.topologyRecovery = topologyRecovery;
60-
this.exceptionHandler = exceptionHandler;
61-
this.threadFactory = threadFactory;
62-
}
21+
private int shutdownTimeout;
22+
private SaslConfig saslConfig;
23+
private long networkRecoveryInterval;
24+
private boolean topologyRecovery;
25+
26+
private ExceptionHandler exceptionHandler;
27+
private ThreadFactory threadFactory;
28+
29+
public ConnectionParams() {}
6330

6431
public String getUsername() {
6532
return username;
@@ -69,8 +36,8 @@ public String getPassword() {
6936
return password;
7037
}
7138

72-
public ExecutorService getExecutor() {
73-
return executor;
39+
public ExecutorService getConsumerWorkServiceExecutor() {
40+
return consumerWorkServiceExecutor;
7441
}
7542

7643
public String getVirtualHost() {
@@ -121,7 +88,71 @@ public boolean isTopologyRecoveryEnabled() {
12188
return topologyRecovery;
12289
}
12390

124-
public ThreadFactory getThreadFactory() {
91+
public ThreadFactory getThreadFactory() {
12592
return threadFactory;
12693
}
94+
95+
public void setUsername(String username) {
96+
this.username = username;
97+
}
98+
99+
public void setPassword(String password) {
100+
this.password = password;
101+
}
102+
103+
public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) {
104+
this.consumerWorkServiceExecutor = consumerWorkServiceExecutor;
105+
}
106+
107+
public void setVirtualHost(String virtualHost) {
108+
this.virtualHost = virtualHost;
109+
}
110+
111+
public void setClientProperties(Map<String, Object> clientProperties) {
112+
this.clientProperties = clientProperties;
113+
}
114+
115+
public void setRequestedFrameMax(int requestedFrameMax) {
116+
this.requestedFrameMax = requestedFrameMax;
117+
}
118+
119+
public void setRequestedChannelMax(int requestedChannelMax) {
120+
this.requestedChannelMax = requestedChannelMax;
121+
}
122+
123+
public void setRequestedHeartbeat(int requestedHeartbeat) {
124+
this.requestedHeartbeat = requestedHeartbeat;
125+
}
126+
127+
public void setShutdownTimeout(int shutdownTimeout) {
128+
this.shutdownTimeout = shutdownTimeout;
129+
}
130+
131+
public void setSaslConfig(SaslConfig saslConfig) {
132+
this.saslConfig = saslConfig;
133+
}
134+
135+
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
136+
this.networkRecoveryInterval = networkRecoveryInterval;
137+
}
138+
139+
public void setTopologyRecovery(boolean topologyRecovery) {
140+
this.topologyRecovery = topologyRecovery;
141+
}
142+
143+
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
144+
this.exceptionHandler = exceptionHandler;
145+
}
146+
147+
public void setThreadFactory(ThreadFactory threadFactory) {
148+
this.threadFactory = threadFactory;
149+
}
150+
151+
public ExecutorService getShutdownExecutor() {
152+
return shutdownExecutor;
153+
}
154+
155+
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
156+
this.shutdownExecutor = shutdownExecutor;
157+
}
127158
}

0 commit comments

Comments
 (0)
0