13
13
import java .util .HashMap ;
14
14
import java .util .List ;
15
15
import java .util .Map ;
16
+ import java .util .UUID ;
16
17
import java .util .concurrent .CountDownLatch ;
17
18
import java .util .concurrent .TimeUnit ;
18
19
@@ -56,22 +57,18 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
56
57
declareTransientFanoutExchange (x3 );
57
58
58
59
final String q1 = "issues.rabbitmq-server-78.queue1" ;
59
- Map <String , Object > args1 = argumentsForDeadLetteringTo (x1 );
60
- declareTransientQueue (q1 , args1 );
60
+ declareTransientQueue (q1 , argumentsForDeadLetteringTo (x1 ));
61
61
62
62
final String q2 = "issues.rabbitmq-server-78.queue2" ;
63
- Map <String , Object > args2 = argumentsForDeadLetteringTo (x2 );
64
- declareTransientQueue (q2 , args2 );
63
+ declareTransientQueue (q2 , argumentsForDeadLetteringTo (x2 ));
65
64
this .channel .queueBind (q2 , x1 , "" );
66
65
67
66
final String q3 = "issues.rabbitmq-server-78.queue3" ;
68
- Map <String , Object > args3 = argumentsForDeadLetteringTo (x3 );
69
- declareTransientQueue (q3 , args3 );
67
+ declareTransientQueue (q3 , argumentsForDeadLetteringTo (x3 ));
70
68
this .channel .queueBind (q3 , x2 , "" );
71
69
72
70
final String qz = "issues.rabbitmq-server-78.destination" ;
73
- Map <String , Object > args4 = argumentsForDeadLetteringTo (x3 );
74
- declareTransientQueue (qz , args4 );
71
+ declareTransientQueue (qz , argumentsForDeadLetteringTo (x3 ));
75
72
this .channel .queueBind (qz , x3 , "" );
76
73
77
74
CountDownLatch latch = new CountDownLatch (10 );
@@ -95,6 +92,103 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
95
92
}
96
93
Collections .sort (cs );
97
94
assertEquals (Arrays .asList (1L , 1L , 1L , 9L ), cs );
95
+
96
+ cleanUpExchanges (x1 , x2 , x3 );
97
+ cleanUpQueues (q1 , q2 , q3 , qz );
98
+ }
99
+
100
+ private void cleanUpExchanges (String ... xs ) throws IOException {
101
+ for (String x : xs ) {
102
+ this .channel .exchangeDelete (x );
103
+ }
104
+ }
105
+ private void cleanUpQueues (String ... qs ) throws IOException {
106
+ for (String q : qs ) {
107
+ this .channel .queueDelete (q );
108
+ }
109
+ }
110
+
111
+ @ SuppressWarnings ("unchecked" )
112
+ public void testHandlingOfXDeathHeadersFromEarlierVersions () throws IOException , InterruptedException {
113
+ final String x1 = "issues.rabbitmq-server-152.fanout1" ;
114
+ declareTransientFanoutExchange (x1 );
115
+ final String x2 = "issues.rabbitmq-server-152.fanout2" ;
116
+ declareTransientFanoutExchange (x2 );
117
+
118
+ final String q1 = "issues.rabbitmq-server-152.queue1" ;
119
+ declareTransientQueue (q1 , argumentsForDeadLetteringTo (x1 ));
120
+
121
+ final String q2 = "issues.rabbitmq-server-152.queue2" ;
122
+ declareTransientQueue (q2 , argumentsForDeadLetteringTo (x2 ));
123
+ this .channel .queueBind (q2 , x1 , "" );
124
+
125
+ final String qz = "issues.rabbitmq-server-152.destination" ;
126
+ declareTransientQueue (qz , argumentsForDeadLetteringTo (x2 ));
127
+ this .channel .queueBind (qz , x2 , "" );
128
+
129
+ CountDownLatch latch = new CountDownLatch (10 );
130
+ RejectingConsumer cons = new RejectingConsumer (this .channel , latch );
131
+ this .channel .basicConsume (qz , cons );
132
+
133
+ final AMQP .BasicProperties .Builder bldr = new AMQP .BasicProperties .Builder ();
134
+ AMQP .BasicProperties props = bldr .headers (
135
+ propsWithLegacyXDeathsInHeaders ("issues.rabbitmq-server-152.queue97" ,
136
+ "issues.rabbitmq-server-152.queue97" ,
137
+ "issues.rabbitmq-server-152.queue97" ,
138
+ "issues.rabbitmq-server-152.queue98" ,
139
+ "issues.rabbitmq-server-152.queue99" )).build ();
140
+ this .channel .basicPublish ("" , q1 , props , "msg" .getBytes ());
141
+
142
+ assertTrue (latch .await (5 , TimeUnit .SECONDS ));
143
+ List <Map <String , Object >> events = (List <Map <String , Object >>)cons .getHeaders ().get ("x-death" );
144
+ assertEquals (6 , events .size ());
145
+
146
+ List <String > qs = new ArrayList <String >();
147
+ for (Map <String , Object > evt : events ) {
148
+ qs .add (evt .get ("queue" ).toString ());
149
+ }
150
+ Collections .sort (qs );
151
+ assertEquals (Arrays .asList (qz , q1 , q2 ,
152
+ "issues.rabbitmq-server-152.queue97" ,
153
+ "issues.rabbitmq-server-152.queue98" ,
154
+ "issues.rabbitmq-server-152.queue99" ), qs );
155
+ List <Long > cs = new ArrayList <Long >();
156
+ for (Map <String , Object > evt : events ) {
157
+ cs .add ((Long )evt .get ("count" ));
158
+ }
159
+ Collections .sort (cs );
160
+ assertEquals (Arrays .asList (1L , 1L , 4L , 4L , 9L , 12L ), cs );
161
+
162
+ cleanUpExchanges (x1 , x2 );
163
+ cleanUpQueues (q1 , q2 , qz ,
164
+ "issues.rabbitmq-server-152.queue97" ,
165
+ "issues.rabbitmq-server-152.queue98" ,
166
+ "issues.rabbitmq-server-152.queue99" );
167
+ }
168
+
169
+ private Map <String , Object > propsWithLegacyXDeathsInHeaders (String ... qs ) {
170
+ Map <String , Object > m = new HashMap <String , Object >();
171
+ List <Map <String , Object >> xDeaths = new ArrayList <Map <String , Object >>();
172
+ for (String q : qs ) {
173
+ xDeaths .add (newXDeath (q ));
174
+ xDeaths .add (newXDeath (q ));
175
+ xDeaths .add (newXDeath (q ));
176
+ xDeaths .add (newXDeath (q ));
177
+ }
178
+
179
+ m .put ("x-death" , xDeaths );
180
+ return m ;
181
+ }
182
+
183
+ private Map <String , Object > newXDeath (String q ) {
184
+ Map <String , Object > m = new HashMap <String , Object >();
185
+ m .put ("reason" , "expired" );
186
+ m .put ("queue" , q );
187
+ m .put ("exchange" , "issues.rabbitmq-server-152.fanout0" );
188
+ m .put ("routing-keys" , Arrays .asList ("routing-key-1" , "routing-key-2" ));
189
+ m .put ("random" , UUID .randomUUID ().toString ());
190
+
191
+ return m ;
98
192
}
99
193
100
194
private Map <String , Object > argumentsForDeadLetteringTo (String dlx ) {
0 commit comments