@@ -109,7 +109,10 @@ all_tests() ->
109
109
consume_redelivery_count ,
110
110
subscribe_redelivery_count ,
111
111
message_bytes_metrics ,
112
- queue_length_limit_drop_head
112
+ queue_length_limit_drop_head ,
113
+ subscribe_redelivery_limit ,
114
+ subscribe_redelivery_policy ,
115
+ subscribe_redelivery_limit_with_dead_letter
113
116
].
114
117
115
118
memory_tests () ->
@@ -1462,6 +1465,154 @@ subscribe_redelivery_count(Config) ->
1462
1465
wait_for_messages_pending_ack (Servers , RaName , 0 )
1463
1466
end .
1464
1467
1468
+ subscribe_redelivery_limit (Config ) ->
1469
+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1470
+
1471
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
1472
+ QQ = ? config (queue_name , Config ),
1473
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1474
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
1475
+ {<<" x-delivery-limit" >>, long , 1 }])),
1476
+
1477
+ RaName = ra_name (QQ ),
1478
+ publish (Ch , QQ ),
1479
+ wait_for_messages_ready (Servers , RaName , 1 ),
1480
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1481
+ subscribe (Ch , QQ , false ),
1482
+
1483
+ DTag = <<" x-delivery-count" >>,
1484
+ receive
1485
+ {# 'basic.deliver' {delivery_tag = DeliveryTag ,
1486
+ redelivered = false },
1487
+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
1488
+ ? assertMatch (undefined , rabbit_basic :header (DTag , H0 )),
1489
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
1490
+ multiple = false ,
1491
+ requeue = true })
1492
+ end ,
1493
+
1494
+ wait_for_messages_ready (Servers , RaName , 0 ),
1495
+ wait_for_messages_pending_ack (Servers , RaName , 1 ),
1496
+ receive
1497
+ {# 'basic.deliver' {delivery_tag = DeliveryTag1 ,
1498
+ redelivered = true },
1499
+ # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
1500
+ ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1501
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
1502
+ multiple = false ,
1503
+ requeue = true })
1504
+ end ,
1505
+
1506
+ wait_for_messages_ready (Servers , RaName , 0 ),
1507
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1508
+ receive
1509
+ {# 'basic.deliver' {redelivered = true }, # amqp_msg {}} ->
1510
+ throw (unexpected_redelivery )
1511
+ after 2000 ->
1512
+ ok
1513
+ end .
1514
+
1515
+ subscribe_redelivery_policy (Config ) ->
1516
+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1517
+
1518
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
1519
+ QQ = ? config (queue_name , Config ),
1520
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1521
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1522
+
1523
+ ok = rabbit_ct_broker_helpers :set_policy (
1524
+ Config , 0 , <<" delivery-limit" >>, <<" .*" >>, <<" queues" >>,
1525
+ [{<<" delivery-limit" >>, 1 }]),
1526
+
1527
+ RaName = ra_name (QQ ),
1528
+ publish (Ch , QQ ),
1529
+ wait_for_messages_ready (Servers , RaName , 1 ),
1530
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1531
+ subscribe (Ch , QQ , false ),
1532
+
1533
+ DTag = <<" x-delivery-count" >>,
1534
+ receive
1535
+ {# 'basic.deliver' {delivery_tag = DeliveryTag ,
1536
+ redelivered = false },
1537
+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
1538
+ ? assertMatch (undefined , rabbit_basic :header (DTag , H0 )),
1539
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
1540
+ multiple = false ,
1541
+ requeue = true })
1542
+ end ,
1543
+
1544
+ wait_for_messages_ready (Servers , RaName , 0 ),
1545
+ wait_for_messages_pending_ack (Servers , RaName , 1 ),
1546
+ receive
1547
+ {# 'basic.deliver' {delivery_tag = DeliveryTag1 ,
1548
+ redelivered = true },
1549
+ # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
1550
+ ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1551
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
1552
+ multiple = false ,
1553
+ requeue = true })
1554
+ end ,
1555
+
1556
+ wait_for_messages_ready (Servers , RaName , 0 ),
1557
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1558
+ receive
1559
+ {# 'basic.deliver' {redelivered = true }, # amqp_msg {}} ->
1560
+ throw (unexpected_redelivery )
1561
+ after 2000 ->
1562
+ ok
1563
+ end ,
1564
+ ok = rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" delivery-limit" >>).
1565
+
1566
+ subscribe_redelivery_limit_with_dead_letter (Config ) ->
1567
+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1568
+
1569
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
1570
+ QQ = ? config (queue_name , Config ),
1571
+ DLX = <<" subcribe_redelivery_limit_with_dead_letter_dlx" >>,
1572
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1573
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
1574
+ {<<" x-delivery-limit" >>, long , 1 },
1575
+ {<<" x-dead-letter-exchange" >>, longstr , <<>>},
1576
+ {<<" x-dead-letter-routing-key" >>, longstr , DLX }
1577
+ ])),
1578
+ ? assertEqual ({'queue.declare_ok' , DLX , 0 , 0 },
1579
+ declare (Ch , DLX , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1580
+
1581
+ RaName = ra_name (QQ ),
1582
+ RaDlxName = ra_name (DLX ),
1583
+ publish (Ch , QQ ),
1584
+ wait_for_messages_ready (Servers , RaName , 1 ),
1585
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1586
+ subscribe (Ch , QQ , false ),
1587
+
1588
+ DTag = <<" x-delivery-count" >>,
1589
+ receive
1590
+ {# 'basic.deliver' {delivery_tag = DeliveryTag ,
1591
+ redelivered = false },
1592
+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
1593
+ ? assertMatch (undefined , rabbit_basic :header (DTag , H0 )),
1594
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
1595
+ multiple = false ,
1596
+ requeue = true })
1597
+ end ,
1598
+
1599
+ wait_for_messages_ready (Servers , RaName , 0 ),
1600
+ wait_for_messages_pending_ack (Servers , RaName , 1 ),
1601
+ receive
1602
+ {# 'basic.deliver' {delivery_tag = DeliveryTag1 ,
1603
+ redelivered =
91FD
true },
1604
+ # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
1605
+ ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1606
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
1607
+ multiple = false ,
1608
+ requeue = true })
1609
+ end ,
1610
+
1611
+ wait_for_messages_ready (Servers , RaName , 0 ),
1612
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1613
+ wait_for_messages_ready (Servers , RaDlxName , 1 ),
1614
+ wait_for_messages_pending_ack (Servers , RaDlxName , 0 ).
1615
+
1465
1616
consume_redelivery_count (Config ) ->
1466
1617
[Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1467
1618
0 commit comments