8000 Merge branch 'stable' · wljcom/rabbitmq-java-client@fc49d55 · GitHub
[go: up one dir, main page]

Skip to content

Commit fc49d55

Browse files
committed
Merge branch 'stable'
2 parents 5328f9f + e60adca commit fc49d55

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.client.test;
1919

2020
import java.io.IOException;
21+
import java.util.Map;
2122
import java.util.UUID;
2223

2324
import junit.framework.TestCase;
@@ -226,6 +227,14 @@ protected void declareDurableQueue(String q) throws IOException {
226227
channel.queueDeclare(q, true, false, false, null);
227228
}
228229

230+
protected void declareTransientQueue(String q) throws IOException {
231+
channel.queueDeclare(q, false, false, false, null);
232+
}
233+
234+
protected void declareTransientQueue(String q, Map<String, Object> args) throws IOException {
235+
channel.queueDeclare(q, false, false, false, args);
236+
}
237+
229238
protected void declareDurableTopicExchange(String x) throws IOException {
230239
channel.exchangeDeclare(x, "topic", true);
231240
}
@@ -234,6 +243,10 @@ protected void declareTransientTopicExchange(String x) throws IOException {
234243
channel.exchangeDeclare(x, "topic", false);
235244
}
236245

246+
protected void declareTransientFanoutExchange(String x) throws IOException {
247+
channel.exchangeDeclare(x, "fanout", false);
248+
}
249+
237250
protected void deleteExchange(String x) throws IOException {
238251
channel.exchangeDelete(x);
239252
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.DefaultConsumer;
6+
import com.rabbitmq.client.Envelope;
7+
import com.rabbitmq.client.test.BrokerTestCase;
8+
9+
import java.io.IOException;
10+
import java.util.ArrayList;
11+
import java.util.Arrays;
12+
import java.util.Collections;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.TimeUnit;
18+
19+
class RejectingConsumer extends DefaultConsumer {
20+
private CountDownLatch latch;
21+
private Map<String, Object> headers;
22+
23+
public RejectingConsumer(Channel channel, CountDownLatch latch) {
24+
super(channel);
25+
this.latch = latch;
26+
}
27+
28+
@Override
29+
public void handleDelivery(String consumerTag, Envelope envelope,
30+
AMQP.BasicProperties properties, byte[] body)
31+
throws IOException {
32+
if(this.latch.getCount() > 0) {
33+
this.getChannel().basicReject(envelope.getDeliveryTag(), false);
34+
} else {
35+
if(this.getChannel().isOpen()) {
36+
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
37+
}
38+
}
39+
this.headers = properties.getHeaders();
40+
latch.countDown();
41+
}
42+
43+
public Map<String, Object> getHeaders() {
44+
return headers;
45+
}
46+
}
47+
48+
public class XDeathHeaderGrowth extends BrokerTestCase {
49+
@SuppressWarnings("unchecked")
50+
public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedException {
51+
final String x1 = "issues.rabbitmq-server-78.fanout1";
52+
declareTransientFanoutExchange(x1);
53+
final String x2 = "issues.rabbitmq-server-78.fanout2";
54+
declareTransientFanoutExchange(x2);
55+
final String x3 = "issues.rabbitmq-server-78.fanout3";
56+
declareTransientFanoutExchange(x3);
57+
58+
final String q1 = "issues.rabbitmq-server-78.queue1";
59+
Map<String, Object> args1 = argumentsForDeadLetteringTo(x1);
60+
declareTransientQueue(q1, args1);
61+
62+
final String q2 = "issues.rabbitmq-server-78.queue2";
63+
Map<String, Object> args2 = argumentsForDeadLetteringTo(x2);
64+
declareTransientQueue(q2, args2);
65+
this.channel.queueBind(q2, x1, "");
66+
67+
final String q3 = "issues.rabbitmq-server-78.queue3";
68+
Map<String, Object> args3 = argumentsForDeadLetteringTo(x3);
69+
declareTransientQueue(q3, args3);
70+
this.channel.queueBind(q3, x2, "");
71+
72+
final String qz = "issues.rabbitmq-server-78.destination";
73+
Map<String, Object> args4 = argumentsForDeadLetteringTo(x3);
74+
declareTransientQueue(qz, args4);
75+
this.channel.queueBind(qz, x3, "");
76+
77+
CountDownLatch latch = new CountDownLatch(10);
78+
RejectingConsumer cons = new RejectingConsumer(this.channel, latch);
79+
this.channel.basicConsume(qz, cons);
80+
81+
this.channel.basicPublish("", q1, null, "msg".getBytes());
82+
assertTrue(latch.await(5, TimeUnit.SECONDS));
83+
List<Map<String, Object>> events = (List<Map<String, Object>>)cons.getHeaders().get("x-death");
84+
assertEquals(4, events.size());
85+
86+
List<String> qs = new ArrayList<String>();
87+
for (Map<String, Object> evt : events) {
88+
qs.add(evt.get("queue").toString());
89+
}
90+
Collections.sort(qs);
91+
assertEquals(Arrays.asList(qz, q1, q2, q3), qs);
92+
List<Long> cs = new ArrayList<Long>();
93+
for (Map<String, Object> evt : events) {
94+
cs.add((Long)evt.get("count"));
95+
}
96+
Collections.sort(cs);
97+
assertEquals(Arrays.asList(1L, 1L, 1L, 9L), cs);
98+
}
99+
100+
private Map<String, Object> argumentsForDeadLetteringTo(String dlx) {
101+
return argumentsForDeadLetteringTo(dlx, 1);
102+
}
103+
104+
private Map<String, Object> argumentsForDeadLetteringTo(String dlx, int ttl) {
105+
Map<String, Object> m = new HashMap<String, Object>();
106+
m.put("x-dead-letter-exchange", dlx);
107+
m.put("x-dead-letter-routing-key", "some-routing-key");
108+
m.put("x-message-ttl", ttl);
109+
return m;
110+
}
111+
}

0 commit comments

Comments
 (0)
0