16
16
package com .rabbitmq .client .test .functional ;
17
17
18
18
import static org .junit .Assert .assertEquals ;
19
+ import static org .junit .Assert .assertTrue ;
19
20
import static org .junit .Assert .fail ;
20
21
21
22
import java .io .IOException ;
22
23
import java .util .Arrays ;
23
24
import java .util .HashMap ;
24
25
import java .util .Map ;
26
+ import java .util .concurrent .BlockingQueue ;
27
+ import java .util .concurrent .CountDownLatch ;
28
+ import java .util .concurrent .LinkedBlockingQueue ;
29
+ import java .util .concurrent .TimeUnit ;
25
30
31
+ import com .rabbitmq .client .DefaultConsumer ;
32
+ import com .rabbitmq .client .Envelope ;
26
33
import org .junit .Test ;
27
34
28
35
import com .rabbitmq .client .AMQP ;
@@ -51,24 +58,33 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
51
58
}
52
59
53
60
private static final int COUNT = 10 ;
61
+ private static final long DELIVERY_TIMEOUT_MS = 100 ;
62
+ private static final long CANCEL_OK_TIMEOUT_MS = 10 * 1000 ;
54
63
55
64
@ Test public void consumerPriorities () throws Exception {
56
65
String queue = channel .queueDeclare ().getQueue ();
57
- QueueingConsumer highConsumer = new QueueingConsumer (channel );
58
- QueueingConsumer medConsumer = new QueueingConsumer (channel );
59
- QueueingConsumer lowConsumer = new QueueingConsumer (channel );
66
+ QueueMessageConsumer highConsumer = new QueueMessageConsumer (channel );
67
+ QueueMessageConsumer medConsumer = new QueueMessageConsumer (channel );
68
+ QueueMessageConsumer lowConsumer = new QueueMessageConsumer (channel );
60
69
String high = channel .basicConsume (queue , true , args (1 ), highConsumer );
61
70
String med = channel .basicConsume (queue , true , medConsumer );
62
71
channel .basicConsume (queue , true , args (-1 ), lowConsumer );
63
72
64
73
publish (queue , COUNT , "high" );
74
+ assertContents (highConsumer , COUNT , "high" );
65
75
channel .basicCancel (high );
76
+ assertTrue (
77
+ "High priority consumer should have been cancelled" ,
78
+ highConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
79
+ );
66
80
publish (queue , COUNT , "med" );
81
+ assertContents (medConsumer , COUNT , "med" );
67
82
channel .basicCancel (med );
83
+ assertTrue (
84
+ "Medium priority consumer should have been cancelled" ,
85
+ medConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
86
+ );
68
87
publish (queue , COUNT , "low" );
69
-
70
- assertContents (highConsumer , COUNT , "high" );
71
- assertContents (medConsumer , COUNT , "med" );
72
88
assertContents (lowConsumer , COUNT , "low" );
73
89
}
74
90
@@ -78,17 +94,43 @@ private Map<String, Object> args(Object o) {
78
94
return map ;
79
95
}
80
96
81
- private void assertContents (QueueingConsumer qc , int count , String msg ) throws InterruptedException {
97
+ private void assertContents (QueueMessageConsumer qc , int count , String msg ) throws InterruptedException {
82
98
for (int i = 0 ; i < count ; i ++) {
83
- QueueingConsumer . Delivery d = qc .nextDelivery ();
84
- assertEquals (msg , new String (d . getBody () ));
99
+ byte [] body = qc .nextDelivery (DELIVERY_TIMEOUT_MS );
100
+ assertEquals (msg , new String (body ));
85
101
}
86
- assertEquals (null , qc .nextDelivery (0 ));
102
+ assertEquals (null , qc .nextDelivery (DELIVERY_TIMEOUT_MS ));
87
103
}
88
104
89
105
private void publish (String queue , int count , String msg ) throws IOException {
90
106
for (int i = 0 ; i < count ; i ++) {
91
107
channel .basicPublish ("" , queue , MessageProperties .MINIMAL_BASIC , msg .getBytes ());
92
108
}
93
109
}
110
+
111
+ private static class QueueMessageConsumer extends DefaultConsumer {
112
+
113
+ BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte []>();
114
+
115
+ CountDownLatch cancelLatch = new CountDownLatch (1 );
116
+
117
+ public QueueMessageConsumer (Channel channel ) {
118
+ super (channel );
119
+ }
120
+
121
+ @ Override
122
+ public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [] body ) throws IOException {
123
+ messages .add (body );
124
+ }
125
+
126
+ @ Override
127
+ public void handleCancelOk (String consumerTag ) {
128
+ cancelLatch .countDown ();
129
+ }
130
+
131
+ byte [] nextDelivery (long timeoutInMs ) throws InterruptedException {
132
+ return messages .poll (timeoutInMs , TimeUnit .MILLISECONDS );
133
+ }
134
+
135
+ }
94
136
}
0 commit comments