8000 Named connection tests · liborange/rabbitmq-java-client@f04af95 · GitHub
[go: up one dir, main page]

Skip to content

Commit f04af95

Browse files
author
Daniil Fedotov
committed
Named connection tests
1 parent 5066bcc commit f04af95

File tree

3 files changed

+90
-0
lines changed

3 files changed

+90
-0
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,21 @@ public Connection newConnection() throws IOException, TimeoutException {
841841
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
842842
}
843843

844+
/**
845+
* Create a new broker connection.
846+
*
847+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
848+
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
849+
* attempts will always use the address configured on {@link ConnectionFactory}.
850+
*
851+
* @param connectionName arbitrary sring for connection name client property
852+
* @return an interface to the connection
853+
* @throws IOException if it encounters a problem
854+
*/
855+
public Connection newConnection(String connectionName) throws IOException, TimeoutException {
856+
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
857+
}
858+
844859
/**
845860
* Create a new broker connection.
846861
*
@@ -856,6 +871,22 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
856871
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())));
857872
}
858873

874+
/**
875+
* Create a new broker connection.
876+
*
877+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
878+
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
879+
* attempts will always use the address configured on {@link ConnectionFactory}.
880+
*
881+
* @param executor thread execution service for consumers on the connection
882+
* @param connectionName arbitrary sring for connection name client property
883+
* @return an interface to the connection
884+
* @throws IOException if it encounters a problem
885+
*/
886+
public Connection newConnection(ExecutorService executor, String connectionName) throws IOException, TimeoutException {
887+
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
888+
}
889+
859890
@Override public ConnectionFactory clone(){
860891
try {
861892
return (ConnectionFactory)super.clone();

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import java.util.List;
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.TimeoutException;
28+
import java.util.concurrent.ExecutorService;
29+
import com.rabbitmq.client.Address;
30+
import java.util.Arrays;
2831

2932
import com.rabbitmq.client.impl.ConnectionParams;
3033
import com.rabbitmq.client.TopologyRecoveryException;
@@ -174,6 +177,36 @@ public void testConnectionHangInNegotiation() {
174177
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
175178
}
176179

180+
public void testConnectionName() throws IOException, TimeoutException {
181+
String connectionName = "custom name";
182+
Connection connection = factory.newConnection(connectionName);
183+
assertEquals(connectionName, connection.getConnectionName());
184+
connection.close();
185+
186+
List<Address> addresses_list = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
187+
connection = factory.newConnection(addresses_list, connectionName);
188+
assertEquals(connectionName, connection.getConnectionName());
189+
connection.close();
190+
191+
Address[] addresses_arr = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
192+
connection = factory.newConnection(addresses_arr, connectionName);
193+
assertEquals(connectionName, connection.getConnectionName());
194+
connection.close();
195+
196+
ExecutorService executor = Executors.newSingleThreadExecutor();
197+
connection = factory.newConnection(executor, connectionName);
198+
assertEquals(connectionName, connection.getConnectionName());
199+
connection.close();
200+
201+
connection = factory.newConnection(executor, addresses_list, connectionName);
202+
assertEquals(connectionName, connection.getConnectionName());
203+
connection.close();
204+
205+
connection = factory.newConnection(executor, addresses_arr, connectionName);
206+
assertEquals(connectionName, connection.getConnectionName());
207+
connection.close();
208+
}
209+
177210
/** Mock frame handler to facilitate testing. */
178211
private static class MockFrameHandler implements FrameHandler {
179212
/** How many times has sendHeader() been called? */

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ public void testConnectionRecovery() throws IOException, InterruptedException {
3030
assertTrue(connection.isOpen());
3131
}
3232

33+
public void testNamedConnectionRecovery()
34+
throws IOException, InterruptedException, TimeoutException {
35+
String connectionName = "custom name";
36+
AutorecoveringConnection c = newRecoveringConnection(connectionName);
37+
try {
38+
assertTrue(c.isOpen());
39+
assertEquals(connectionName, c.getConnectionName());
40+
closeAndWaitForRecovery(c);
41+
assertTrue(c.isOpen());
42+
assertEquals(connectionName, c.getConnectionName());
43+
} finally {
44+
c.abort();
45+
}
46+
}
47+
3348
public void testConnectionRecoveryWithServerRestart() throws IOException, InterruptedException {
3449
assertTrue(connection.isOpen());
3550
restartPrimaryAndWaitForRecovery();
@@ -739,6 +754,17 @@ private AutorecoveringConnection newRecoveringConnection(List<Address> addresses
739754
return newRecoveringConnection(false, addresses);
740755
}
741756

757+
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName)
758+
throws IOException, TimeoutException {
759+
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
760+
return (AutorecoveringConnection) cf.newConnection(connectionName);
761+
}
762+
763+
private AutorecoveringConnection newRecoveringConnection(String connectionName)
764+
throws IOException, TimeoutException {
765+
return newRecoveringConnection(false, connectionName);
766+
}
767+
742768
private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
743769
ConnectionFactory cf = new ConnectionFactory();
744770
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);

0 commit comments

Comments
 (0)
0