8000 Merge pull request #172 from rabbitmq/rabbitmq-java-client-153-to-stable · CassOnMars/rabbitmq-java-client@cb1d363 · GitHub
[go: up one dir, main page]

Skip to content

Commit cb1d363

Browse files
Merge pull request rabbitmq#172 from rabbitmq/rabbitmq-java-client-153-to-stable
Add strategy interface to resolve list of hosts
2 parents 6660c1b + 49a7fcb commit cb1d363

File tree

5 files changed

+109
-18
lines changed

5 files changed

+109
-18
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Strategy interface to get the potential servers to connect to.
7+
*/
8+
public interface AddressResolver {
9+
10+
List<Address> getAddresses();
11+
12+
}

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,23 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
661661
return newConnection(this.sharedExecutor, Arrays.asList(addrs), null);
662662
}
663663

664+
/**
665+
* Create a new broker connection, picking the first available address from
666+
* the list provided by the {@link AddressResolver}.
667+
*
668+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
669+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
670+
* reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
671+
*
672+
* @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
673+
* @return an interface to the connection
674+
* @throws IOException if it encounters a problem
675+
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
676+
*/
677+
public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException {
678+
return newConnection(this.sharedExecutor, addressResolver, null);
679+
}
680+
664681

665682
/**
666683
* Create a new broker connection with a client-provided name, picking the first available address from
@@ -780,6 +797,24 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs) t
780797
return newConnection(executor, addrs, null);
781798
}
782799

800+
/**
801+
* Create a new broker connection, picking the first available address from
802+
* the list provided by the {@link AddressResolver}.
803+
*
804+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
805+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
806+
* reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
807+
*
808+
* @param executor thread execution service for consumers on the connection
809+
* @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
810+
* @return an interface to the connection
811+
* @throws java.io.IOException if it encounters a problem
812+
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
813+
*/
814+
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver) throws IOException, TimeoutException {
815+
return newConnection(executor, addressResolver, null);
816+
}
817+
783818
/**
784819
* Create a new broker connection with a client-provided name, picking the first available address from
785820
* the list.
@@ -801,6 +836,30 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs) t
801836
*/
802837
public Connection newConnection(ExecutorService executor, List<Address> addrs, String clientProvidedName)
803838
throws IOException, TimeoutException {
839+
return newConnection(executor, new ListAddressResolver(addrs), clientProvidedName);
840+
}
841+
842+
/**
843+
* Create a new broker connection with a client-provided name, picking the first available address from
844+
* the list provided by the {@link AddressResolver}.
845+
*
846+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
847+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
848+
* reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
849+
*
850+
* @param executor thread execution service for consumers on the connection
851+
* @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
852+
* @param clientProvidedName application-specific connection name, will be displayed
853+
* in the management UI if RabbitMQ server supports it.
854+
* This value doesn't have to be unique and cannot be used
855+
* as a connection identifier e.g. in HTTP API requests.
856+
* This value is supposed to be human-readable.
857+
* @return an interface to the connection
858+
* @throws java.io.IOException if it encounters a problem
859+
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
860+
*/
861+
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
862+
throws IOException, TimeoutException {
804863
// make sure we respect the provided thread factory
805864
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
806865
ConnectionParams params = params(executor);
@@ -813,11 +872,12 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs, S
813872

814873
if (isAutomaticRecoveryEnabled()) {
815874
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
816-
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
875+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver);
817876

818877
conn.init();
819878
return conn;
820879
} else {
880+
List<Address> addrs = addressResolver.getAddresses();
821881
IOException lastException = null;
822882
for (Address addr : addrs) {
823883
try {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Simple implementation of {@link AddressResolver} that returns a fixed list.
7+
*/
8+
public class ListAddressResolver implements AddressResolver {
9+
10+
private final List<Address> addresses;
11+
12+
public ListAddressResolver(List<Address> addresses) {
13+
this.addresses = addresses;
14+
}
15+
16+
@Override
17+
public List<Address> getAddresses() {
18+
return addresses;
19+
}
20+
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,9 @@
1515

1616
package com.rabbitmq.client.impl.recovery;
1717

18-
import com.rabbitmq.client.AMQP;
19-
import com.rabbitmq.client.Address;
20-
import com.rabbitmq.client.BlockedListener;
21-
import com.rabbitmq.client.Channel;
22-
import com.rabbitmq.client.Connection;
23-
import com.rabbitmq.client.ConnectionFactory;
24-
import com.rabbitmq.client.MissedHeartbeatException;
25-
import com.rabbitmq.client.Recoverable;
26-
import com.rabbitmq.client.RecoveryListener;
27-
import com.rabbitmq.client.ShutdownListener;
28-
import com.rabbitmq.client.ShutdownSignalException;
29-
import com.rabbitmq.client.TopologyRecoveryException;
18+
import com.rabbitmq.client.*;
19+
3020
import com.rabbitmq.client.impl.ConnectionParams;
31-
import com.rabbitmq.client.ExceptionHandler;
3221
import com.rabbitmq.client.impl.FrameHandlerFactory;
3322
import com.rabbitmq.client.impl.NetworkConnection;
3423

@@ -93,7 +82,11 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
9382
private final Object recoveryLock = new Object();
9483

9584
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
96-
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
85+
this(params, f, new ListAddressResolver(addrs));
86+
}
87+
88+
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver) {
89+
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver);
9790
this.params = params;
9891

9992
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.AddressResolver;
20+
import com.rabbitmq.client.ListAddressResolver;
1921
import com.rabbitmq.client.impl.ConnectionParams;
2022
import com.rabbitmq.client.impl.FrameHandler;
2123
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -29,12 +31,16 @@
2931
public class RecoveryAwareAMQConnectionFactory {
3032
private final ConnectionParams params;
3133
private final FrameHandlerFactory factory;
32-
private final List<Address> addrs;
34+
private final AddressResolver addressResolver;
3335

3436
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List<Address> addrs) {
37+
this(params, factory, new ListAddressResolver(addrs));
38+
}
39+
40+
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, AddressResolver addressResolver) {
3541
this.params = params;
3642
this.factory = factory;
37-
this.addrs = addrs;
43+
this.addressResolver = addressResolver;
3844
}
3945

4046
/**
@@ -43,7 +49,7 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
4349
*/
4450
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
4551
IOException lastException = null;
46-
List<Address> shuffled = shuffle(addrs);
52+
List<Address> shuffled = shuffle(addressResolver.getAddresses());
4753

4854
for (Address addr : shuffled) {
4955
try {

0 commit comments

Comments
 (0)
0