8000 Add strategy interface to resolve list of hosts · CassOnMars/rabbitmq-java-client@d7980da · GitHub
[go: up one dir, main page]

Skip to content

Commit d7980da

Browse files
committed
Add strategy interface to resolve list of hosts
Issue rabbitmq#153. Could help for rabbitmq#104 and rabbitmq#138.
1 parent 2dcec45 commit d7980da

File tree

5 files changed

+73
-18
lines changed

5 files changed

+73
-18
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Strategy interface to get the potential servers to connect to.
7+
*/
8+
public interface AddressResolver {
9+
10+
List<Address> getAddresses();
11+
12+
}

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,30 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs) t
801801
*/
802802
public Connection newConnection(ExecutorService executor, List<Address> addrs, String clientProvidedName)
803803
throws IOException, TimeoutException {
804+
return newConnection(executor, new ListAddressResolver(addrs), clientProvidedName);
805+
}
806+
807+
/**
808+
* Create a new broker connection with a client-provided name, picking the first available address from
809+
* the list provided by the {@link AddressResolver}.
810+
*
811+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
812+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
813+
* reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
814+
*
815+
* @param executor thread execution service for consumers on the connection
816+
* @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
817+
* @param clientProvidedName application-specific connection name, will be displayed
818+
* in the management UI if RabbitMQ server supports it.
819+
* This value doesn't have to be unique and cannot be used
820+
* as a connection identifier e.g. in HTTP API requests.
821+
* This value is supposed to be human-readable.
822+
* @return
823+
* @throws IOException
824+
* @throws TimeoutException
825+
*/
826+
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
827+
throws IOException, TimeoutException {
804828
// make sure we respect the provided thread factory
805829
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
806830
ConnectionParams params = params(executor);
@@ -813,11 +837,12 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs, S
813837

814838
if (isAutomaticRecoveryEnabled()) {
815839
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
816-
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
840+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver);
817841

818842
conn.init();
819843
return conn;
820844
} else {
845+
List<Address> addrs = addressResolver.getAddresses();
821846
IOException lastException = null;
822847
for (Address addr : addrs) {
823848
try {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Simple implementation of {@link AddressResolver} that returns a fixed list.
7+
*/
8+
public class ListAddressResolver implements AddressResolver {
9+
10+
private final List<Address> addresses;
11+
12+
public ListAddressResolver(List<Address> addresses) {
13+
this.addresses = addresses;
14+
}
15+
16+
@Override
17+
public List<Address> getAddresses() {
18+
return addresses;
19+
}
20+
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,9 @@
1515

1616
package com.rabbitmq.client.impl.recovery;
1717

18-
import com.rabbitmq.client.AMQP;
19-
import com.rabbitmq.client.Address;
20-
import com.rabbitmq.client.BlockedListener;
21-
import com.rabbitmq.client.Channel;
22-
import com.rabbitmq.client.Connection;
23-
import com.rabbitmq.client.ConnectionFactory;
24-
import com.rabbitmq.client.MissedHeartbeatException;
25-
import com.rabbitmq.client.Recoverable;
26-
import com.rabbitmq.client.RecoveryListener;
27-
import com.rabbitmq.client.ShutdownListener;
28-
import com.rabbitmq.client.ShutdownSignalException;
29-
import com.rabbitmq.client.TopologyRecoveryException;
18+
import com.rabbitmq.client.*;
3019
import com.rabbitmq.client.impl.AMQConnection;
3120
import com.rabbitmq.client.impl.ConnectionParams;
32-
import com.rabbitmq.client.ExceptionHandler;
3321
import com.rabbitmq.client.impl.FrameHandlerFactory;
3422
import com.rabbitmq.client.impl.NetworkConnection;
3523

@@ -94,7 +82,11 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
9482
private final Object recoveryLock = new Object();
9583

9684
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
97-
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
85+
this(params, f, new ListAddressResolver(addrs));
86+
}
87+
88+
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver) {
89+
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver);
9890
this.params = params;
9991

10092
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.AddressResolver;
20+
import com.rabbitmq.client.ListAddressResolver;
1921
import com.rabbitmq.client.impl.ConnectionParams;
2022
import com.rabbitmq.client.impl.FrameHandler;
2123
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -29,12 +31,16 @@
2931
public class RecoveryAwareAMQConnectionFactory {
3032
private final ConnectionParams params;
3133
private final FrameHandlerFactory factory;
32-
private final List<Address> addrs;
34+
private final AddressResolver addressResolver;
3335

3436
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List<Address> addrs) {
37+
this(params, factory, new ListAddressResolver(addrs));
38+
}
39+
40+
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, AddressResolver addressResolver) {
3541
this.params = params;
3642
this.factory = factory;
37-
this.addrs = addrs;
43+
this.addressResolver = addressResolver;
3844
}
3945

4046
/**
@@ -43,7 +49,7 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
4349
*/
4450
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
4551
IOException lastException = null;
46-
List<Address> shuffled = shuffle(addrs);
52+
List<Address> shuffled = shuffle(addressResolver.getAddresses());
4753

4854
for (Address addr : shuffled) {
4955
try {

0 commit comments

Comments
 (0)
0