8000 RpcClient: Make reply-to queue name configurable · Khazrak/rabbitmq-java-client@66f692b · GitHub
[go: up one dir, main page]

Skip to content

Commit 66f692b

Browse files
Dominik Bruhnmichaelklishin
authored andcommitted
RpcClient: Make reply-to queue name configurable
1 parent b74ab53 commit 66f692b

File tree

1 file changed

+46
-6
lines changed

1 file changed

+46
-6
lines changed

src/com/rabbitmq/client/RpcClient.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public class RpcClient {
4848
private final String _exchange;
4949
/** Routing key to use for requests */
5050
private final String _routingKey;
51+
/** Queue where the server should put the reply */
52+
private final String _replyTo;
5153
/** timeout to use on call responses */
5254
private final int _timeout;
5355
/** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
@@ -65,17 +67,20 @@ public class RpcClient {
6567
* Construct a new RpcClient that will communicate on the given channel, sending
6668
* requests to the given exchange with the given routing key.
6769
* <p/>
68-
* Causes the creation of a temporary private autodelete queue.
70+
* Causes the creation of a temporary private autodelete queue. The name of this queue can be specified.
6971
* @param channel the channel to use for communication
7072
* @param exchange the exchange to connect to
7173
* @param routingKey the routing key
74+
* @param replyTo the queue where the server should put the reply
7275
* @param timeout milliseconds before timing out on wait for response
7376
* @throws IOException if an error is encountered
7477
*/
75-
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
78+
public RpcClient(Channel channel, String exchange, String routingKey, String replyTo, int timeout) throws
79+
IOException {
7680
_channel = channel;
7781
_exchange = exchange;
7882
_routingKey = routingKey;
83+
_replyTo = replyTo;
7984
if (timeout < NO_TIMEOUT) throw new IllegalArgumentException("Timeout arguument must be NO_TIMEOUT(-1) or non-negative.");
8085
_timeout = timeout;
8186
_correlationId = 0;
@@ -87,7 +92,24 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
8792
* Construct a new RpcClient that will communicate on the given channel, sending
8893
* requests to the given exchange with the given routing key.
8994
* <p/>
90-
* Causes the creation of a temporary private autodelete queue.
95+
* Causes the creation of a temporary private autodelete queue. The name of the queue can be provided
96+
* <p/>
97+
* Waits forever for responses (that is, no timeout).
98+
* @param channel the channel to use for communication
99+
* @param exchange the exchange to connect to
100+
* @param routingKey the routing key
101+
* @param replyTo the queue where the server should put the reply
102+
* @throws IOException if an error is encountered
103+
*/
104+
public RpcClient(Channel channel, String exchange, String routingKey, String replyTo) throws IOException {
105+
this(channel, exchange, routingKey, replyTo, NO_TIMEOUT);
106+
}
107+
108+
/**
109+
* Construct a new RpcClient that will communicate on the given channel, sending
110+
* requests to the given exchange with the given routing key.
111+
* <p/>
112+
* Causes the creation of a temporary private autodelete queue. This queue will be named "amq.rabbitmq.reply-to".
91113
* <p/>
92114
* Waits forever for responses (that is, no timeout).
93115
* @param channel the channel to use for communication
@@ -96,9 +118,27 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
96118
* @throws IOException if an error is encountered
97119
*/
98120
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
99-
this(channel, exchange, routingKey, NO_TIMEOUT);
121+
this(channel, exchange, routingKey, "amq.rabbitmq.reply-to", NO_TIMEOUT);
100122
}
101123

124+
125+
/**
126+
* Construct a new RpcClient that will communicate on the given channel, sending
127+
* requests to the given exchange with the given routing key.
128+
* <p/>
129+
* Causes the creation of a temporary private autodelete queue. The name of this queue will be
130+
* "amq.rabbitmq.reply-to".
131+
* @param channel the channel to use for communication
132+
* @param exchange the exchange to connect to
133+
* @param routingKey the routing key
134+
* @param timeout milliseconds before timing out on wait for response
135+
* @throws IOException if an error is encountered
136+
*/
137+
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
138+
this(channel, exchange, routingKey, "amq.rabbitmq.reply-to", timeout)
139+
}
140+
141+
102142
/**
103143
* Private API - ensures the RpcClient is correctly open.
104144
* @throws IOException if an error is encountered
@@ -152,7 +192,7 @@ public void handleDelivery(String consumerTag,
152192
}
153193
}
154194
};
155-
_channel.basicConsume("amq.rabbitmq.reply-to", true, consumer);
195+
_channel.basicConsume(_replyTo, true, consumer);
156196
return consumer;
157197
}
158198

@@ -171,7 +211,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
171211
_correlationId++;
172212
String replyId = "" + _correlationId;
173213
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
174-
.correlationId(replyId).replyTo("amq.rabbitmq.reply-to").build();
214+
.correlationId(replyId).replyTo(_replyTo).build();
175215
_continuationMap.put(replyId, k);
176216
}
177217
publish(props, message);

0 commit comments

Comments
 (0)
0