8000 Make consumer priorities test more reliable · ftt1024/rabbitmq-java-client@0767120 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0767120

Browse files
committed
Make consumer priorities test more reliable
The test didn't check higher priority consumers were actually cancelled before checking the content of lower priority consumers. Higher priority consumers could then "steal" expected messages from the lower priority consumers, making the test waiting forever some expecting messages for the lower priority consumers. Problem appeared on CI, after 50fa0efc03b284c986b15426c404861470d3226d on rabbitmq-common and 9e7cea3ae6dfb89bc14e562812420ee846486220 on rabbitmq-server (stable branch after 3.6.12 release).
1 parent f80e0a6 commit 0767120

File tree

1 file changed

+52
-10
lines changed

1 file changed

+52
-10
lines changed

src/test/java/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616
package com.rabbitmq.client.test.functional;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.junit.Assert.fail;
2021

2122
import java.io.IOException;
2223
import java.util.Arrays;
2324
import java.util.HashMap;
2425
import java.util.Map;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.TimeUnit;
2530

31+
import com.rabbitmq.client.DefaultConsumer;
32+
import com.rabbitmq.client.Envelope;
2633
import org.junit.Test;
2734

2835
import com.rabbitmq.client.AMQP;
@@ -51,24 +58,33 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
5158
}
5259

5360
private static final int COUNT = 10;
61+
private static final long DELIVERY_TIMEOUT_MS = 100;
62+
private static final long CANCEL_OK_TIMEOUT_MS = 10 * 1000;
5463

5564
@Test public void consumerPriorities() throws Exception {
5665
String queue = channel.queueDeclare().getQueue();
57-
QueueingConsumer highConsumer = new QueueingConsumer(channel);
58-
QueueingConsumer medConsumer = new QueueingConsumer(channel);
59-
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
66+
QueueMessageConsumer highConsumer = new QueueMessageConsumer(channel);
67+
QueueMessageConsumer medConsumer = new QueueMessageConsumer(channel);
68+
QueueMessageConsumer lowConsumer = new QueueMessageConsumer(channel);
6069
String high = channel.basicConsume(queue, true, args(1), highConsumer);
6170
String med = channel.basicConsume(queue, true, medConsumer);
6271
channel.basicConsume(queue, true, args(-1), lowConsumer);
6372

6473
publish(queue, COUNT, "high");
74+
assertContents(highConsumer, COUNT, "high");
6575
channel.basicCancel(high);
76+
assertTrue(
77+
"High priority consumer should have been cancelled",
78+
highConsumer.cancelLatch.await(CANCEL_OK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
79+
);
6680
publish(queue, COUNT, "med");
81+
assertContents(medConsumer, COUNT, "med");
6782
channel.basicCancel(med);
83+
assertTrue(
84+
"Medium priority consumer should have been cancelled",
85+
medConsumer.cancelLatch.await(CANCEL_OK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
86+
);
6887
publish(queue, COUNT, "low");
69-
70-
assertContents(highConsumer, COUNT, "high");
71-
assertContents(medConsumer, COUNT, "med");
7288
assertContents(lowConsumer, COUNT, "low");
7389
}
7490

@@ -78,17 +94,43 @@ private Map<String, Object> args(Object o) {
7894
return map;
7995
}
8096

81-
private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
97+
private void assertContents(QueueMessageConsumer qc, int count, String msg) throws InterruptedException {
8298
for (int i = 0; i < count; i++) {
83-
QueueingConsumer.Delivery d = qc.nextDelivery();
84-
assertEquals(msg, new String(d.getBody()));
99+
byte[] body = qc.nextDelivery(DELIVERY_TIMEOUT_MS);
100+
assertEquals(msg, new String(body));
85101
}
86-
assertEquals(null, qc.nextDelivery(0));
102+
assertEquals(null, qc.nextDelivery(DELIVERY_TIMEOUT_MS));
87103
}
88104

89105
private void publish(String queue, int count, String msg) throws IOException {
90106
for (int i = 0; i < count; i++) {
91107
channel.basicPublish("", queue, MessageProperties.MINIMAL_BASIC, msg.getBytes());
92108
}
93109
}
110+
111+
private static class QueueMessageConsumer extends DefaultConsumer {
112+
113+
BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();
114+
115+
CountDownLatch cancelLatch = new CountDownLatch(1);
116+
117+
public QueueMessageConsumer(Channel channel) {
118+
super(channel);
119+
}
120+
121+
@Override
122+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
123+
messages.add(body);
124+
}
125+
126+
@Override
127+
public void handleCancelOk(String consumerTag) {
128+
cancelLatch.countDown();
129+
}
130+
131+
byte[] nextDelivery(long timeoutInMs) throws InterruptedException {
132+
return messages.poll(timeoutInMs, TimeUnit.MILLISECONDS);
133+
}
134+
135+
}
94136
}

0 commit comments

Comments
 (0)
0