@@ -48,6 +48,8 @@ public class RpcClient {
48
48
private final String _exchange ;
49
49
/** Routing key to use for requests */
50
50
private final String _routingKey ;
51
+ /** Queue where the server should put the reply */
52
+ private final String _replyTo ;
51
53
/** timeout to use on call responses */
52
54
private final int _timeout ;
53
55
/** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
@@ -65,17 +67,20 @@ public class RpcClient {
65
67
* Construct a new RpcClient that will communicate on the given channel, sending
66
68
* requests to the given exchange with the given routing key.
67
69
* <p/>
68
- * Causes the creation of a temporary private autodelete queue.
70
+ * Causes the creation of a temporary private autodelete queue. The name of this queue can be specified.
69
71
* @param channel the channel to use for communication
70
72
* @param exchange the exchange to connect to
71
73
* @param routingKey the routing key
74
+ * @param replyTo the queue where the server should put the reply
72
75
* @param timeout milliseconds before timing out on wait for response
73
76
* @throws IOException if an error is encountered
74
77
*/
75
- public RpcClient (Channel channel , String exchange , String routingKey , int timeout ) throws IOException {
78
+ public RpcClient (Channel channel , String exchange , String routingKey , String replyTo , int timeout ) throws
79
+ IOException {
76
80
_channel = channel ;
77
81
_exchange = exchange ;
78
82
_routingKey = routingKey ;
83
+ _replyTo = replyTo ;
79
84
if (timeout < NO_TIMEOUT ) throw new IllegalArgumentException ("Timeout arguument must be NO_TIMEOUT(-1) or non-negative." );
80
85
_timeout = timeout ;
81
86
_correlationId = 0 ;
@@ -87,7 +92,24 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
87
92
* Construct a new RpcClient that will communicate on the given channel, sending
88
93
* requests to the given exchange with the given routing key.
89
94
* <p/>
90
- * Causes the creation of a temporary private autodelete queue.
95
+ * Causes the creation of a temporary private autodelete queue. The name of the queue can be provided
96
+ * <p/>
97
+ * Waits forever for responses (that is, no timeout).
98
+ * @param channel the channel to use for communication
99
+ * @param exchange the exchange to connect to
100
+ * @param routingKey the routing key
101
+ * @param replyTo the queue where the server should put the reply
102
+ * @throws IOException if an error is encountered
103
+ */
104
+ public RpcClient (Channel channel , String exchange , String routingKey , String replyTo ) throws IOException {
105
+ this (channel , exchange , routingKey , replyTo , NO_TIMEOUT );
106
+ }
107
+
108
+ /**
109
+ * Construct a new RpcClient that will communicate on the given channel, sending
110
+ * requests to the given exchange with the given routing key.
111
+ * <p/>
112
+ * Causes the creation of a temporary private autodelete queue. This queue will be named "amq.rabbitmq.reply-to".
91
113
* <p/>
92
114
* Waits forever for responses (that is, no timeout).
93
115
* @param channel the channel to use for communication
@@ -96,9 +118,27 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
96
118
* @throws IOException if an error is encountered
97
119
*/
98
120
public RpcClient (Channel channel , String exchange , String routingKey ) throws IOException {
99
- this (channel , exchange , routingKey , NO_TIMEOUT );
121
+ this (channel , exchange , routingKey , "amq.rabbitmq.reply-to" , NO_TIMEOUT );
100
122
}
101
123
124
+
125
+ /**
126
+ * Construct a new RpcClient that will communicate on the given channel, sending
127
+ * requests to the given exchange with the given routing key.
128
+ * <p/>
129
+ * Causes the creation of a temporary private autodelete queue. The name of this queue will be
130
+ * "amq.rabbitmq.reply-to".
131
+ * @param channel the channel to use for communication
132
+ * @param exchange the exchange to connect to
133
+ * @param routingKey the routing key
134
+ * @param timeout milliseconds before timing out on wait for response
135
+ * @throws IOException if an error is encountered
136
+ */
137
+ public RpcClient (Channel channel , String exchange , String routingKey , int timeout ) throws IOException {
138
+ this (channel , exchange , routingKey , "amq.rabbitmq.reply-to" , timeout )
139
+ }
140
+
141
+
102
142
/**
103
143
* Private API - ensures the RpcClient is correctly open.
104
144
* @throws IOException if an error is encountered
@@ -152,7 +192,7 @@ public void handleDelivery(String consumerTag,
152
192
}
153
193
}
154
194
};
155
- _channel .basicConsume ("amq.rabbitmq.reply-to" , true , consumer );
195
+ _channel .basicConsume (_replyTo , true , consumer );
156
196
return consumer ;
157
197
}
158
198
@@ -171,7 +211,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
171
211
_correlationId ++;
172
212
String replyId = "" + _correlationId ;
173
213
props = ((props ==null ) ? new AMQP .BasicProperties .Builder () : props .builder ())
174
- .correlationId (replyId ).replyTo ("amq.rabbitmq.reply-to" ).build ();
214
+ .correlationId (replyId ).replyTo (_replyTo ).build ();
175
215
_continuationMap .put (replyId , k );
176
216
}
177
217
publish (props , message );
0 commit comments