@@ -32,7 +32,9 @@ public void handleDelivery(String consumerTag, Envelope envelope,
32
32
if (this .latch .getCount () > 0 ) {
33
33
this .getChannel ().basicReject (envelope .getDeliveryTag (), false );
34
34
} else {
35
- this .getChannel ().basicAck (envelope .getDeliveryTag (), false );
35
+ if (this .getChannel ().isOpen ()) {
36
+ this .getChannel ().basicAck (envelope .getDeliveryTag (), false );
37
+ }
36
38
}
37
39
this .headers = properties .getHeaders ();
38
40
latch .countDown ();
@@ -44,6 +46,7 @@ public Map<String, Object> getHeaders() {
44
46
}
45
47
46
48
public class XDeathHeaderGrowth extends BrokerTestCase {
49
+ @ SuppressWarnings ("unchecked" )
47
50
public void testBoundedXDeathHeaderGrowth () throws IOException , InterruptedException {
48
51
final String x1 = "issues.rabbitmq-server-78.fanout1" ;
49
52
declareTransientFanoutExchange (x1 );
@@ -71,7 +74,7 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
71
74
declareTransientQueue (qz , args4 );
72
75
this .channel .queueBind (qz , x3 , "" );
73
76
74
- CountDownLatch latch = new CountDownLatch (5 );
77
+ CountDownLatch latch = new CountDownLatch (10 );
75
78
RejectingConsumer cons = new RejectingConsumer (this .channel , latch );
76
79
this .channel .basicConsume (qz , cons );
77
80
@@ -86,6 +89,12 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
86
89
}
87
90
Collections .sort (qs );
88
91
assertEquals (Arrays .asList (qz , q1 , q2 , q3 ), qs );
92
+ List <Long > cs = new ArrayList <Long >();
93
+ for (Map <String , Object > evt : events ) {
94
+ cs .add ((Long )evt .get ("counter" ));
95
+ }
96
+ Collections .sort (cs );
97
+ assertEquals (Arrays .asList (1L , 1L , 1L , 9L ), cs );
89
98
}
90
99
91
100
private Map <String , Object > argumentsForDeadLetteringTo (String dlx ) {
0 commit comments