8000 Allow to set an optional Executor to be used to send heartbeats · yezijiang/rabbitmq-java-client@1fdb294 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1fdb294

Browse files
committed
Allow to set an optional Executor to be used to send heartbeats
1 parent 8d5dd23 commit 1fdb294

File tree

4 files changed

+38
-15
lines changed

4 files changed

+38
-15
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
import java.security.KeyManagementException;
2121
import java.security.NoSuchAlgorithmException;
2222
import java.util.Map;
23-
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.*;
2424

2525
import java.net.URI;
2626
import java.net.URISyntaxException;
2727
import java.net.URLDecoder;
28-
import java.util.concurrent.Executors;
29-
import java.util.concurrent.ThreadFactory;
30-
import java.util.concurrent.TimeoutException;
3128

3229
import javax.net.SocketFactory;
3330
import javax.net.ssl.SSLSocketFactory;
@@ -104,6 +101,7 @@ public class ConnectionFactory implements Cloneable {
104101
// minimises the number of threads rapid closure of many
105102
// connections uses, see rabbitmq/rabbitmq-java-client#86
106103
private ExecutorService shutdownExecutor;
104+
private ScheduledExecutorService heartbeatExecutor;
107105
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
108106
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
109107

@@ -492,6 +490,19 @@ public void setShutdownExecutor(ExecutorService executor) {
492490
this.shutdownExecutor = executor;
493491
}
494492

493+
/**
494+
* Set the executor to use to send the heartbeat
495+
* All connections that use this executor share it.
496+
*
497+
* It's developer's responsibility to shut down the executor
498+
* when it is no longer needed.
499+
*
500+
* @param executor executor service to be used to send heartbeat
501+
*/
502+
public void setHeartbeatExecutor(ScheduledExecutorService executor) {
503+
this.heartbeatExecutor = executor;
504+
}
505+
495506
/**
496507
* Retrieve the thread factory used to instantiate new threads.
497508
* @see ThreadFactory
@@ -694,6 +705,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
694705
result.setHandshakeTimeout(handshakeTimeout);
695706
result.setRequestedHeartbeat(requestedHeartbeat);
696707
result.setShutdownExecutor(shutdownExecutor);
708+
result.setHeartbeatExecutor(heartbeatExecutor);
697709
return result;
698710
}
699711

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,7 @@
2525
import java.util.Collections;
2626
import java.util.HashMap;
2727
import java.util.Map;
28-
import java.util.concurrent.CopyOnWriteArrayList;
29-
import java.util.concurrent.ExecutorService;
30-
import java.util.concurrent.Executors;
31-
import java.util.concurrent.ThreadFactory;
32-
import java.util.concurrent.TimeoutException;
28+
import java.util.concurrent.*;
3329

3430
import com.rabbitmq.client.AMQP;
3531
import com.rabbitmq.client.AuthenticationFailureException;
@@ -64,6 +60,7 @@ final class Copyright {
6460
*/
6561
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6662
private final ExecutorService consumerWorkServiceExecutor;
63+
private final ScheduledExecutorService heartbeatExecutor;
6764
private final ExecutorService shutdownExecutor;
6865
private Thread mainLoopThread;
6966
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
@@ -222,6 +219,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
222219
this.shutdownTimeout = params.getShutdownTimeout();
223220
this.saslConfig = params.getSaslConfig();
224221
this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor();
222+
this.heartbeatExecutor = params.getHeartbeatExecutor();
225223
this.shutdownExecutor = params.getShutdownExecutor();
226224
this.threadFactory = params.getThreadFactory();
227225

@@ -237,7 +235,7 @@ private void initializeConsumerWorkService() {
237235
}
238236

239237
private void initializeHeartbeatSender() {
240-
this._heartbeatSender = new HeartbeatSender(_frameHandler, threadFactory);
238+
this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
241239
}
242240

243241
/**

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
import java.util.Map;
77
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.ScheduledExecutorService;
89
import java.util.concurrent.ThreadFactory;
910

1011
public class ConnectionParams {
1112
private String username;
1213
private String password;
1314
private ExecutorService consumerWorkServiceExecutor;
15+
private ScheduledExecutorService heartbeatExecutor;
1416
private ExecutorService shutdownExecutor;
1517
private String virtualHost;
1618
private Map<String, Object> clientProperties;
@@ -155,4 +157,12 @@ public ExecutorService getShutdownExecutor() {
155157
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
156158
this.shutdownExecutor = shutdownExecutor;
157159
}
160+
161+
public ScheduledExecutorService getHeartbeatExecutor() {
162+
return heartbeatExecutor;
163+
}
164+
165+
public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) {
166+
this.heartbeatExecutor = heartbeatExecutor;
167+
}
158168
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,18 @@ final class HeartbeatSender {
4343
private final ThreadFactory threadFactory;
4444

4545
private ScheduledExecutorService executor;
46+
private final boolean privateExecutor;
4647

4748
private ScheduledFuture<?> future;
4849

4950
private boolean shutdown = false;
5051

5152
private volatile long lastActivityTime;
5253

53-
HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
54+
HeartbeatSender(FrameHandler frameHandler, ScheduledExecutorService heartbeatExecutor, ThreadFactory threadFactory) {
5455
this.frameHandler = frameHandler;
56+
this.privateExecutor = (heartbeatExecutor == null);
57+
this.executor = heartbeatExecutor;
5558
this.threadFactory = threadFactory;
5659
}
5760

@@ -106,14 +109,14 @@ public void shutdown() {
106109
this.future = null;
107110
}
108111

109-
if (this.executor != null) {
112+
if (this.privateExecutor) {
110113
// to be safe, we shouldn't call shutdown holding the
111114
// monitor.
112115
executorToShutdown = this.executor;
113-
114-
this.shutdown = true;
115-
this.executor = null;
116116
}
117+
118+
this.executor = null;
119+
this.shutdown = true;
117120
}
118121
if(executorToShutdown != null) {
119122
executorToShutdown.shutdown();

0 commit comments

Comments
 (0)
0