8000 Merge pull request #82 from rabbitmq/rabbitmq-java-client-81 · yezijiang/rabbitmq-java-client@b93263d · GitHub
[go: up one dir, main page]

Skip to content

Commit b93263d

Browse files
Merge pull request rabbitmq#82 from rabbitmq/rabbitmq-java-client-81
Make handshake timeout configurable
2 parents 3530c2b + c7008fa commit b93263d

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,14 @@ public class ConnectionFactory implements Cloneable {
7070
public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;
7171
/** The default ssl port */
7272
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
73-
/** The default connection timeout;
73+
/** The default TCP connection timeout;
7474
* zero means wait indefinitely */
7575
public static final int DEFAULT_CONNECTION_TIMEOUT = 0;
76+
/**
77+
* The default AMQP 0-9-1 connection handshake timeout. See DEFAULT_CONNECTION_TIMEOUT
78+
* for TCP (socket) connection timeout.
79+
*/
80+
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;
7681
/** The default shutdown timeout;
7782
* zero means wait indefinitely */
7883
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
@@ -89,6 +94,7 @@ public class ConnectionFactory implements Cloneable {
8994
private int requestedFrameMax = DEFAULT_FRAME_MAX;
9095
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
9196
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
97+
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
9298
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
9399
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
94100
private SocketFactory factory = SocketFactory.getDefault();
@@ -308,21 +314,41 @@ public int getRequestedHeartbeat() {
308314
}
309315

310316
/**
311-
* Set the connection timeout.
312-
* @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
317+
* Set the TCP connection timeout.
318+
* @param connectionTimeout connection TCP establishment timeout in milliseconds; zero for infinite
313319
*/
314320
public void setConnectionTimeout(int connectionTimeout) {
315321
this.connectionTimeout = connectionTimeout;
316322
}
317323

318324
/**
319-
* Retrieve the connection timeout.
320-
* @return the connection timeout, in milliseconds; zero for infinite
325+
* Retrieve the TCP connection timeout.
326+
* @return the TCP connection timeout, in milliseconds; zero for infinite
321327
*/
322328
public int getConnectionTimeout() {
323329
return this.connectionTimeout;
324330
}
325331

332+
/**
333+
* Retrieve the AMQP 0-9-1 protocol handshake timeout.
334+
* @return the AMQP0-9-1 protocol handshake timeout, in milliseconds
335+
*/
336+
public int getHandshakeTimeout() {
337+
return handshakeTimeout;
338+
}
339+
340+
/**
341+
* Set the AMQP0-9-1 protocol handshake timeout.
342+
* @param handshakeTimeout the AMQP0-9-1 protocol handshake timeout, in milliseconds
343+
*/
344+
public void setHandshakeTimeout(int handshakeTimeout) {
345+
if(handshakeTimeout < connectionTimeout) {
346+
this.handshakeTimeout = handshakeTimeout;
347+
} else {
348+
throw new IllegalArgumentException("handshake timeout cannot be lower than TCP connection timeout");
349+
}
350+
}
351+
326352
/**
327353
* Set the shutdown timeout. This is the amount of time that Consumer implementations have to
328354
* continue working through deliveries (and other Consumer callbacks) <b>after</b> the connection
@@ -629,9 +655,12 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
629655
}
630656

631657
public ConnectionParams params(ExecutorService executor) {
632-
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
633-
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
634-
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
658+
// TODO: switch to use setters for all fields
659+
ConnectionParams result = new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
660+
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
661+
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
662+
result.setHandshakeTimeout(handshakeTimeout);
663+
return result;
635664
}
636665

637666
/**

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 5 additions & 5 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ final class Copyright {
6363
* for an example.
6464
*/
6565
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
66-
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
67-
public static final int HANDSHAKE_TIMEOUT = 10000;
6866
private final ExecutorService executor;
6967
private Thread mainLoopThread;
7068
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
@@ -139,6 +137,7 @@ public static final Map<String, Object> defaultClientProperties() {
139137
private final int requestedHeartbeat;
140138
private final int requestedChannelMax;
141139
private final int requestedFrameMax;
140+
private final int handshakeTimeout;
142141
private final int shutdownTimeout;
143142
private final String username;
144143
private final String password;
@@ -218,6 +217,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
218217
this.requestedFrameMax = params.getRequestedFrameMax();
219218
this.requestedChannelMax = params.getRequestedChannelMax();
220219
this.requestedHeartbeat = params.getRequestedHeartbeat();
220+
this.handshakeTimeout = params.getHandshakeTimeout();
221221
this.shutdownTimeout = params.getShutdownTimeout();
222222
this.saslConfig = params.getSaslConfig();
223223
this.executor = params.getExecutor();
@@ -273,7 +273,7 @@ public void start()
273273
try {
274274
// The following two lines are akin to AMQChannel's
275275
// transmit() method for this pseudo-RPC.
276-
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
276+
_frameHandler.setTimeout(handshakeTimeout);
277277
_frameHandler.sendHeader();
278278
} catch (IOException ioe) {
279279
_frameHandler.close();
@@ -291,7 +291,7 @@ public void start()
291291
AMQP.Connection.Tune connTune = null;
292292
try {
293293
connStart =
294-
(AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();
294+
(AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
295295

296296
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
297297

@@ -324,7 +324,7 @@ public void start()
324324
: new AMQP.Connection.SecureOk.Builder().response(response).build();
325325

326326
try {
327-
Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();
327+
Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
328328
if (serverResponse instanceof AMQP.Connection.Tune) {
329329
connTune = (AMQP.Connection.Tune) serverResponse;
330330
} else {

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class ConnectionParams {
1616
private final int requestedFrameMax;
1717
private final int requestedChannelMax;
1818
private final int requestedHeartbeat;
19+
private int handshakeTimeout;
1920
private final int shutdownTimeout;
2021
private final SaslConfig saslConfig;
2122
private final long networkRecoveryInterval;
@@ -36,8 +37,8 @@ public class ConnectionParams {
3637
* @param saslConfig sasl configuration hook
3738
* @param networkRecoveryInterval interval used when recovering from network failure
3839
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
39-
* @param threadFactory
40-
* @param exceptionHandler
40+
* @param threadFactory factory that instantiates threads used by the client
41+
* @param exceptionHandler handles unhandled consumer exceptions
4142
*/
4243
public ConnectionParams(String username, String password, ExecutorService executor,
4344
String virtualHost, Map<String, Object> clientProperties,
@@ -92,6 +93,14 @@ public int getRequestedHeartbeat() {
9293
return requestedHeartbeat;
9394
}
9495

96+
public int getHandshakeTimeout() {
97+
return handshakeTimeout;
98+
}
99+
100+
public void setHandshakeTimeout(int timeout) {
101+
this.handshakeTimeout = timeout;
102+
}
103+
95104
public int getShutdownTimeout() {
96105
return shutdownTimeout;
97106
}

0 commit comments

Comments
 (0)
0