8000 Merge pull request #56 from rabbitmq/rabbitmq-server-152 · panchenko/rabbitmq-java-client@5706325 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5706325

Browse files
Merge pull request rabbitmq#56 from rabbitmq/rabbitmq-server-152
Test for rabbitmq/rabbitmq-server#152
2 parents c2df18f + 94f594e commit 5706325

File tree

1 file changed

+102
-8
lines changed

1 file changed

+102
-8
lines changed

test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.HashMap;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.UUID;
1617
import java.util.concurrent.CountDownLatch;
1718
import java.util.concurrent.TimeUnit;
1819

@@ -56,22 +57,18 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
5657
declareTransientFanoutExchange(x3);
5758

5859
final String q1 = "issues.rabbitmq-server-78.queue1";
59-
Map<String, Object> args1 = argumentsForDeadLetteringTo(x1);
60-
declareTransientQueue(q1, args1);
60+
declareTransientQueue(q1, argumentsForDeadLetteringTo(x1));
6161

6262
final String q2 = "issues.rabbitmq-server-78.queue2";
63-
Map<String, Object> args2 = argumentsForDeadLetteringTo(x2);
64-
declareTransientQueue(q2, args2);
63+
declareTransientQueue(q2, argumentsForDeadLetteringTo(x2));
6564
this.channel.queueBind(q2, x1, "");
6665

6766
final String q3 = "issues.rabbitmq-server-78.queue3";
68-
Map<String, Object> args3 = argumentsForDeadLetteringTo(x3);
69-
declareTransientQueue(q3, args3);
67+
declareTransientQueue(q3, argumentsForDeadLetteringTo(x3));
7068
this.channel.queueBind(q3, x2, "");
7169

7270
final String qz = "issues.rabbitmq-server-78.destination";
73-
Map<String, Object> args4 = argumentsForDeadLetteringTo(x3);
74-
declareTransientQueue(qz, args4);
71+
declareTransientQueue(qz, argumentsForDeadLetteringTo(x3));
7572
this.channel.queueBind(qz, x3, "");
7673

7774
CountDownLatch latch = new CountDownLatch(10);
@@ -95,6 +92,103 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
9592
}
9693
Collections.sort(cs);
9794
assertEquals(Arrays.asList(1L, 1L, 1L, 9L), cs);
95+
96+
cleanUpExchanges(x1, x2, x3);
97+
cleanUpQueues(q1, q2, q3, qz);
98+
}
99+
100+
private void cleanUpExchanges(String... xs) throws IOException {
101+
for(String x : xs) {
102+
this.channel.exchangeDelete(x);
103+
}
104+
}
105+
private void cleanUpQueues(String... qs) throws IOException {
106+
for(String q : qs) {
107+
this.channel.queueDelete(q);
108+
}
109+
}
110+
111+
@SuppressWarnings("unchecked")
112+
public void testHandlingOfXDeathHeadersFromEarlierVersions() throws IOException, InterruptedException {
113+
final String x1 = "issues.rabbitmq-server-152.fanout1";
114+
declareTransientFanoutExchange(x1);
115+
final String x2 = "issues.rabbitmq-server-152.fanout2";
116+
declareTransientFanoutExchange(x2);
117+
118+
final String q1 = "issues.rabbitmq-server-152.queue1";
119+
declareTransientQueue(q1, argumentsForDeadLetteringTo(x1));
120+
121+
final String q2 = "issues.rabbitmq-server-152.queue2";
122+
declareTransientQueue(q2, argumentsForDeadLetteringTo(x2));
123+
this.channel.queueBind(q2, x1, "");
124+
125+
final String qz = "issues.rabbitmq-server-152.destination";
126+
declareTransientQueue(qz, argumentsForDeadLetteringTo(x2));
127+
this.channel.queueBind(qz, x2, "");
128+
129+
CountDownLatch latch = new CountDownLatch(10);
130+
RejectingConsumer cons = new RejectingConsumer(this.channel, latch);
131+
this.channel.basicConsume(qz, cons);
132+
133+
final AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
134+
AMQP.BasicProperties props = bldr.headers(
135+
propsWithLegacyXDeathsInHeaders("issues.rabbitmq-server-152.queue97",
136+
"issues.rabbitmq-server-152.queue97",
137+
"issues.rabbitmq-server-152.queue97",
138+
"issues.rabbitmq-server-152.queue98",
139+
"issues.rabbitmq-server-152.queue99")).build();
140+
this.channel.basicPublish("", q1, props, "msg".getBytes());
141+
142+
assertTrue(latch.await(5, TimeUnit.SECONDS));
143+
List<Map<String, Object>> events = (List<Map<String, Object>>)cons.getHeaders().get("x-death");
144+
assertEquals(6, events.size());
145+
146+
List<String> qs = new ArrayList<String>();
147+
for (Map<String, Object> evt : events) {
148+
qs.add(evt.get("queue").toString());
149+
}
150+
Collections.sort(qs);
151+
assertEquals(Arrays.asList(qz, q1, q2,
152+
"issues.rabbitmq-server-152.queue97",
153+
"issues.rabbitmq-server-152.queue98",
154+
"issues.rabbitmq-server-152.queue99"), qs);
155+
List<Long> cs = new ArrayList<Long>();
156+
for (Map<String, Object> evt : events) {
157+
cs.add((Long)evt.get("count"));
158+
}
159+
Collections.sort(cs);
160+
assertEquals(Arrays.asList(1L, 1L, 4L, 4L, 9L, 12L), cs);
161+
162+
cleanUpExchanges(x1, x2);
163+
cleanUpQueues(q1, q2, qz,
164+
"issues.rabbitmq-server-152.queue97",
165+
"issues.rabbitmq-server-152.queue98",
166+
"issues.rabbitmq-server-152.queue99");
167+
}
168+
169+
private Map<String, Object> propsWithLegacyXDeathsInHeaders(String... qs) {
170+
Map<String, Object> m = new HashMap<String, Object>();
171+
List<Map<String, Object>> xDeaths = new ArrayList<Map<String, Object>>();
172+
for(String q : qs) {
173+
xDeaths.add(newXDeath(q));
174+
xDeaths.add(newXDeath(q));
175+
xDeaths.add(newXDeath(q));
176+
xDeaths.add(newXDeath(q));
177+
}
178+
179+
m.put("x-death", xDeaths);
180+
return m;
181+
}
182+
183+
private Map<String, Object> newXDeath(String q) {
184+
Map<String, Object> m = new HashMap<String, Object>();
185+
m.put("reason", "expired");
186+
m.put("queue", q);
187+
m.put("exchange", "issues.rabbitmq-server-152.fanout0");
188+
m.put("routing-keys", Arrays.asList("routing-key-1", "routing-key-2"));
189+
m.put("random", UUID.randomUUID().toString());
190+
191+
return m;
98192
}
99193

100194
private Map<String, Object> argumentsForDeadLetteringTo(String dlx) {

0 commit comments

Comments
 (0)
0