8000 Test for rabbitmq/rabbitmq-server#78 · wljcom/rabbitmq-java-client@4782751 · GitHub
[go: up one dir, main page]

Skip to content

Commit 4782751

Browse files
1 parent 6aed7c8 commit 4782751

File tree

5 files changed

+141
-0
lines changed

5 files changed

+141
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,17 @@ void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolea
563563
*/
564564
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
565565

566+
/**
567+
* Bind a queue to an exchange, with a blank routing key and no extra arguments.
568+
* @see com.rabbitmq.client.AMQP.Queue.Bind
569+
* @see com.rabbitmq.client.AMQP.Queue.BindOk
570+
* @param queue the name of the queue
571+
* @param exchange the name of the exchange
572+
* @return a binding-confirm method if the binding was successfully created
573+
* @throws java.io.IOException if an error is encountered
574+
*/
575+
Queue.BindOk queueBind(String queue, String exchange) throws IOException;
576+
566577
/**
567578
* Bind a queue to an exchange, with no extra arguments.
568579
* @see com.rabbitmq.client.AMQP.Queue.Bind

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,13 @@ public Queue.BindOk queueBind(String queue, String exchange,
925925
.getMethod();
926926
}
927927

928+
/** Public API - {@inheritDoc} */
929+
public Queue.BindOk queueBind(String queue, String exchange)
930+
throws IOException
931+
{
932+
return queueBind(queue, exchange, "", null);
933+
}
934+
928935
/** Public API - {@inheritDoc} */
929936
public Queue.BindOk queueBind(String queue, String exchange, String routingKey)
930937
throws IOException

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.rabbitmq.client.ReturnListener;
1515
import com.rabbitmq.client.ShutdownListener;
1616
import com.rabbitmq.client.ShutdownSignalException;
17+
import com.rabbitmq.client.impl.AMQImpl;
1718

1819
import java.io.IOException;
1920
import java.util.ArrayList;
@@ -301,6 +302,12 @@ public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) t
301302
delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty);
302303
}
303304

305+
public AMQP.Queue.BindOk queueBind(String queue, String exchange)
306+
throws IOException
307+
{
308+
return queueBind(queue, exchange, "", null);
309+
}
310+
304311
public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException {
305312
return queueBind(queue, exchange, routingKey, null);
306313
}

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.client.test;
1919

2020
import java.io.IOException;
21+
import java.util.Map;
2122
import java.util.UUID;
2223

2324
import junit.framework.TestCase;
@@ -226,6 +227,14 @@ protected void declareDurableQueue(String q) throws IOException {
226227
channel.queueDeclare(q, true, false, false, null);
227228
}
228229

230+
protected void declareTransientQueue(String q) throws IOException {
231+
channel.queueDeclare(q, false, false, false, null);
232+
}
233+
234+
protected void declareTransientQueue(String q, Map<String, Object> args) throws IOException {
235+
channel.queueDeclare(q, false, false, false, args);
236+
}
237+
229238
protected void declareDurableTopicExchange(String x) throws IOException {
230239
channel.exchangeDeclare(x, "topic", true);
231240
}
@@ -234,6 +243,10 @@ protected void declareTransientTopicExchange(String x) throws IOException {
234243
channel.exchangeDeclare(x, "topic", false);
235244
}
236245

246+
protected void declareTransientFanoutExchange(String x) throws IOException {
247+
channel.exchangeDeclare(x, "fanout", false);
248+
}
249+
237250
protected void deleteExchange(String x) throws IOException {
238251
channel.exchangeDelete(x);
239252
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Consumer;
6+
import com.rabbitmq.client.DefaultConsumer;
7+
import com.rabbitmq.client.Envelope;
8+
import com.rabbitmq.client.test.BrokerTestCase;
9+
10+
import java.io.IOException;
11+
import java.util.ArrayList;
12+
import java.util.Arrays;
13+
import java.util.Collections;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
19+
20+
class RejectingConsumer extends DefaultConsumer {
21+
private CountDownLatch latch;
22+
private Map<String, Object> headers;
23+
24+
public RejectingConsumer(Channel channel, CountDownLatch latch) {
25+
super(channel);
26+
this.latch = latch;
27+
}
28+
29+
@Override
30+
public void handleDelivery(String consumerTag, Envelope envelope,
31+
AMQP.BasicProperties properties, byte[] body)
32+
throws IOException {
33+
if(this.latch.getCount() > 0) {
34+
this.getChannel().basicReject(envelope.getDeliveryTag(), false);
35+
} else {
36+
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
37+
}
38+
this.headers = properties.getHeaders();
39+
latch.countDown();
40+
}
41+
42+
public Map<String, Object> getHea F438 ders() {
43+
return headers;
44+
}
45+
}
46+
47+
public class XDeathHeaderGrowth extends BrokerTestCase {
48+
public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedException {
49+
final String x1 = "issues.rabbitmq-server-78.fanout1";
50+
declareTransientFanoutExchange(x1);
51+
final String x2 = "issues.rabbitmq-server-78.fanout2";
52+
declareTransientFanoutExchange(x2);
53+
final String x3 = "issues.rabbitmq-server-78.fanout3";
54+
declareTransientFanoutExchange(x3);
55+
56+
final String q1 = "issues.rabbitmq-server-78.queue1";
57+
Map<String, Object> args1 = argumentsForDeadLetteringTo(x1);
58+
declareTransientQueue(q1, args1);
59+
60+
final String q2 = "issues.rabbitmq-server-78.queue2";
61+
Map<String, Object> args2 = argumentsForDeadLetteringTo(x2);
62+
declareTransientQueue(q2, args2);
63+
this.channel.queueBind(q2, x1);
64+
65+
final String q3 = "issues.rabbitmq-server-78.queue3";
66+
Map<String, Object> args3 = argumentsForDeadLetteringTo(x3);
67+
declareTransientQueue(q3, args3);
68+
this.channel.queueBind(q3, x2);
69+
70+
final String qz = "issues.rabbitmq-server-78.destination";
71+
Map<String, Object> args4 = argumentsForDeadLetteringTo(x3);
72+
declareTransientQueue(qz, args4);
73+
this.channel.queueBind(qz, x3);
74+
75+
CountDownLatch latch = new CountDownLatch(5);
76+
RejectingConsumer cons = new RejectingConsumer(this.channel, latch);
77+
this.channel.basicConsume(qz, cons);
78+
79+
this.channel.basicPublish("", q1, null, "msg".getBytes());
80+
assertTrue(latch.await(5, TimeUnit.SECONDS));
81+
List<Map<String, Object>> events = (List<Map<String, Object>>)cons.getHeaders().get("x-death");
82+
assertEquals(4, events.size());
83+
84+
List<String> qs = new ArrayList<String>();
85+
for (Map<String, Object> evt : events) {
86+
qs.add(evt.get("queue").toString());
87+
}
88+
Collections.sort(qs);
89+
assertEquals(Arrays.asList(qz, q1, q2, q3), qs);
90+
}
91+
92+
private Map<String, Object> argumentsForDeadLetteringTo(String dlx) {
93+
return argumentsForDeadLetteringTo(dlx, 1);
94+
}
95+
96+
private Map<String, Object> argumentsForDeadLetteringTo(String dlx, int ttl) {
97+
Map<String, Object> m = new HashMap<String, Object>();
98+
m.put("x-dead-letter-exchange", dlx);
99+
m.put("x-dead-letter-routing-key", "some-routing-key");
100+
m.put("x-message-ttl", ttl);
101+
return m;
102+
}
103+
}

0 commit comments

Comments
 (0)
0