8000 Merge branch 'dhakimTRX-stable' into stable · yezijiang/rabbitmq-java-client@3530c2b · GitHub
[go: up one dir, main page]

Skip to content

Commit 3530c2b

Browse files
Merge branch 'dhakimTRX-stable' into stable
2 parents 6cd0fda + f18c1ad commit 3530c2b

File tree

1 file changed

+64
-16
lines changed

1 file changed

+64
-16
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 64 additions & 16 deletions
< 8000 td data-grid-cell-id="diff-8779ac5c3a838b1dd6b6741530b715a3cf46a9f85dd8c1a814c378cff2a10c5d-214-236-2" data-line-anchor="diff-8779ac5c3a838b1dd6b6741530b715a3cf46a9f85dd8c1a814c378cff2a10c5dR236" data-selected="false" role="gridcell" style="background-color:var(--bgColor-default);padding-right:24px" tabindex="-1" valign="top" class="focusable-grid-cell diff-text-cell right-side-diff-cell left-side">

Original file line numberDiff line numberDiff line change
@@ -67,7 +67,14 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
6767
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
6868
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
6969
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
70-
70+
71+
// Used to block connection recovery attempts after close() is invoked.
72+
private volatile boolean manuallyClosed = false;
73+
74+
// This lock guards the manuallyClosed flag and the delegate connection. Guarding these two ensures that a new connection can never
75+
// be created after application code has initiated shutdown.
76+
private Object recoveryLock = new Object();
77+
7178
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
7279
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
7380
this.params = params;
@@ -181,48 +188,69 @@ public boolean isOpen() {
181188
* @see com.rabbitmq.client.Connection#close()
182189
*/
183190
public void close() throws IOException {
191+
synchronized(recoveryLock) {
192+
this.manuallyClosed = true;
193+
}
184194
delegate.close();
185195
}
186196

187197
/**
188198
* @see Connection#close(int)
189199
*/
190200
public void close(int timeout) throws IOException {
201+
synchronized(recoveryLock) {
202+
this.manuallyClosed = true;
203+
}
191204
delegate.close(timeout);
192205
}
193206

194207
/**
195208
* @see Connection#close(int, String, int)
196209
*/
197210
public void close(int closeCode, String closeMessage, int timeout) throws IOException {
211+
synchronized(recoveryLock) {
212+
this.manuallyClosed = true;
213+
}
198214
delegate.close(closeCode, closeMessage, timeout);
199215
}
200216

201217
/**
202218
* @see com.rabbitmq.client.Connection#abort()
203219
*/
204220
public void abort() {
221+
synchronized(recoveryLock) {
222+
this.manuallyClosed = true;
223+
}
205224
delegate.abort();
206225
}
207226

208227
/**
209228
* @see Connection#abort(int, String, int)
210229
*/
211230
public void abort(int closeCode, String closeMessage, int timeout) {
231+
synchronized(recoveryLock) {
232+
this.manuallyClosed = true;
233+
}
212234
delegate.abort(closeCode, closeMessage, timeout);
213235
}
214236
215237
/**
216238
* @see Connection#abort(int, String)
217239
*/
218240
public void abort(int closeCode, String closeMessage) {
241+
synchronized(recoveryLock) {
242+
this.manuallyClosed = true;
243+
}
219244
delegate.abort(closeCode, closeMessage);
220245
}
221246

222247
/**
223248
* @see Connection#abort(int)
224249
*/
225250
public void abort(int timeout) {
251+
synchronized(recoveryLock) {
252+
this.manuallyClosed = true;
253+
}
226254
delegate.abort(timeout);
227255
}
228256

@@ -261,7 +289,10 @@ public void clearBlockedListeners() {
261289
* @see com.rabbitmq.client.Connection#close(int, String)
262290
*/
263291
public void close(int closeCode, String closeMessage) throws IOException {
264-
delegate.close(closeCode, closeMessage);
292+
synchronized(recoveryLock) {
293+
this.manuallyClosed = true;
294+
}
295+
delegate.close(closeCode, closeMessage);
265296
}
266297

267298
/**
@@ -404,16 +435,18 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
404435

405436
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
406437
Thread.sleep(this.params.getNetworkRecoveryInterval());
407-
this.recoverConnection();
408-
this.recoverShutdownListeners();
409-
this.recoverBlockedListeners();
410-
this.recoverChannels();
411-
if(this.params.isTopologyRecoveryEnabled()) {
412-
this.recoverEntities();
413-
this.recoverConsumers();
414-
}
438+
if (!this.recoverConnection())
439+
return;
440+
441+
this.recoverShutdownListeners();
442+
this.recoverBlockedListeners();
443+
this.recoverChannels();
444+
if(this.params.isTopologyRecoveryEnabled()) {
445+
this.recoverEntities();
446+
this.recoverConsumers();
447+
}
415448

416-
this.notifyRecoveryListeners();
449+
this.notifyRecoveryListeners();
417450
}
418451

419452
private void recoverShutdownListeners() {
@@ -428,18 +461,33 @@ private void recoverBlockedListeners() {
428461
}
429462
}
430463

431-
private void recoverConnection() throws IOException, InterruptedException {
432-
boolean recovering = true;
433-
while (recovering) {
464+
// Returns true if the connection was recovered,
465+
// false if application initiated shutdown while attempting recovery.
466+
private boolean recoverConnection() throws IOException, InterruptedException {
467+
while (!manuallyClosed)
468+
{
434469
try {
435-
this.delegate = this.cf.newConnection();
436-
recovering = false;
470+
RecoveryAwareAMQConnection newConn = this.cf.newConnection();
471+
synchronized(recoveryLock) {
472+
if (!manuallyClosed) {
473+
// This is the standard case.
474+
this.delegate = newConn;
475+
return true;
476+
}
477+
}
478+
// This is the once in a blue moon case.
479+
// Application code just called close as the connection
480+
// was being re-established. So we attempt to close the newly created connection.
481+
newConn.abort();
482+
return false;
437483
} catch (Exception e) {
438484
// TODO: exponential back-off
439485
Thread.sleep(this.params.getNetworkRecoveryInterval());
440486
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
441487
}
442488
}
489+
490+
return false;
443491
}
444492

445493
private void recoverChannels() {

0 commit comments

Comments
 (0)
0