8000 Merge pull request #117 from rabbitmq/rabbitmq-server-488 · yezijiang/rabbitmq-java-client@613476c · GitHub
[go: up one dir, main page]

Skip to content

Commit 613476c

Browse files
Merge pull request rabbitmq#117 from rabbitmq/rabbitmq-server-488
Basic priority queue test
2 parents 7891d02 + fbd0d9e commit 613476c

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.DefaultConsumer;
5+
import com.rabbitmq.client.Envelope;
6+
import com.rabbitmq.client.test.BrokerTestCase;
7+
8+
import java.io.IOException;
9+
import java.util.ArrayList;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
16+
17+
public class PriorityQueues extends BrokerTestCase {
18+
public void testPrioritisingBasics() throws IOException, TimeoutException, InterruptedException {
19+
String q = "with-3-priorities";
20+
int n = 3;
21+
channel.queueDeclare(q, true, false, false, argsWithPriorities(n));
22+
publishWithPriorities(q, n);
23+
24+
List<Integer> xs = prioritiesOfEnqueuedMessages(q, n);
25+
assertEquals(Integer.valueOf(3), xs.get(0));
26+
assertEquals(Integer.valueOf(2), xs.get(1));
27+
assertEquals(Integer.valueOf(1), xs.get(2));
28+
29+
channel.queueDelete(q);
30+
}
31+
32+
private List<Integer> prioritiesOfEnqueuedMessages(String q, int n) throws InterruptedException, IOException {
33+
final List<Integer> xs = new ArrayList<Integer>();
34+
final CountDownLatch latch = new CountDownLatch(n);
35+
36+
channel.basicConsume(q, true, new DefaultConsumer(channel) {
37+
@Override
38+
public void handleDelivery(String consumerTag, Envelope envelope,
39+
AMQP.BasicProperties properties, byte[] body) throws IOException {
40+
xs.add(properties.getPriority());
41+
latch.countDown();
42+
}
43+
});
44+
45+
latch.await(5, TimeUnit.SECONDS);
46+
return xs;
47+
}
48+
49+
private void publishWithPriorities(String q, int n) throws IOException, TimeoutException, InterruptedException {
50+
channel.confirmSelect();
51+
for (int i = 1; i <= n; i++) {
52+
channel.basicPublish("", q, propsWithPriority(i), "msg".getBytes("UTF-8"));
53+
}
54+
channel.waitForConfirms(500);
55+
}
56+
57+
private AMQP.BasicProperties propsWithPriority(int n) {
58+
return (new AMQP.BasicProperties.Builder())
59+
.priority(n)
60+
.build();
61+
}
62+
63+
private Map<String, Object> argsWithPriorities(int n) {
64+
Map<String, Object> m = new HashMap<String, Object>();
65+
m.put("x-max-priority", n);
66+
return m;
67+
}
68+
}

0 commit comments

Comments
 (0)
0