8000 Poison handling in quorum queues · rabbitmq/rabbitmq-server@affefe1 · GitHub
[go: up one dir, main page]

Skip to content

Commit affefe1

Browse files
committed
Poison handling in quorum queues
Drop messages that exceed the configured number of delivery attemps. If a DLX is configured, dead letter them. [#163513253]
1 parent f6d9368 commit affefe1

File tree

4 files changed

+208
-33
lines changed

4 files changed

+208
-33
lines changed

src/rabbit_fifo.erl

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@
252252
consumer_strategy = default :: default | single_active,
253253
%% waiting consumers, one is picked active consumer is cancelled or dies
254254
%% used only when single active consumer is on
255-
waiting_consumers = [] :: [{consumer_id(), consumer()}]
255+
waiting_consumers = [] :: [{consumer_id(), consumer()}],
256+
delivery_limit :: maybe(non_neg_integer())
256257
}).
257258

258259
-opaque state() :: #state{}.
@@ -264,7 +265,8 @@
264265
release_cursor_interval => non_neg_integer(),
265266
max_length => non_neg_integer(),
266267
max_bytes => non_neg_integer(),
267-
single_active_consumer_on => boolean()}.
268+
single_active_consumer_on => boolean(),
269+
delivery_limit => non_neg_integer()}.
268270

269271
-export_type([protocol/0,
270272
delivery/0,
@@ -293,6 +295,7 @@ update_config(Conf, State) ->
293295
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
294296
MaxLength = maps:get(max_length, Conf, undefined),
295297
MaxBytes = maps:get(max_bytes, Conf, undefined),
298+
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
296299
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
297300
true ->
298301
single_active;
@@ -304,7 +307,8 @@ update_config(Conf, State) ->
304307
release_cursor_interval = SHI,
305308
max_length = MaxLength,
306309
max_bytes = MaxBytes,
307-
consumer_strategy = ConsumerStrategy}.
310+
consumer_strategy = ConsumerStrategy,
311+
delivery_limit = DeliveryLimit}.
308312

309313
zero(_) ->
310314
0.
@@ -468,10 +472,10 @@ apply(_, {down, ConsumerPid, noconnection},
468472
#consumer{checked_out = Checked0} = C,
469473
{Co, St0, Eff}) when (node(P) =:= Node) and
470474
(C#consumer.status =/= cancelled)->
471-
St = return_all(St0, Checked0),
475+
{St, Eff0} = return_all(St0, Checked0, Eff),
472476
Credit = increase_credit(C, maps:size(Checked0)),
473477
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
474-
suspected_down, Eff),
478+
suspected_down, Eff0),
475479
{maps:put(K,
476480
C#consumer{status = suspected_down,
477481
credit = Credit,
@@ -952,8 +956,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
952956
C0, SQ0, Effects0),
953957
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
954958
down ->
955-
S1 = return_all(S0, Checked0),
956-
{S1#state{consumers = Cons1}, Effects0}
959+
{S1, Effects1} = return_all(S0, Checked0, Effects0),
960+
{S1#state{consumers = Cons1}, Effects1}
957961
end.
958962

959963
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -1073,16 +1077,16 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
10731077
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
10741078
Con = Con0#consumer{checked_out = Checked,
10751079
credit = increase_credit(Con0, length(MsgNumMsgs))},
1076-
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
1080+
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
10771081
SQ0, Effects0),
1078-
State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
1079-
return_one(0, Msg, S0);
1080-
({MsgNum, Msg}, S0) ->
1081-
return_one(MsgNum, Msg, S0)
1082-
end, State0, MsgNumMsgs),
1082+
{State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
1083+
return_one(0, Msg, S0, E0);
1084+
({MsgNum, Msg}, {S0, E0}) ->
1085+
return_one(MsgNum, Msg, S0, E0)
1086+
end, {State0, Effects1}, MsgNumMsgs),
10831087
checkout(Meta, State1#state{consumers = Cons,
10841088
service_queue = SQ},
1085-
Effects).
1089+
Effects2).
10861090

10871091
% used to processes messages that are finished
10881092
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
@@ -1188,28 +1192,35 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
11881192
end.
11891193

11901194
return_one(0, {'$prefix_msg', _} = Msg,
1191-
#state{returns = Returns} = State0) ->
1192-
add_bytes_return(Msg,
1193-
State0#state{returns = lqueue:in(Msg, Returns)});
1195+
#state{returns = Returns} = State0, Effects) ->
1196+
{add_bytes_return(Msg,
1197+
State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
11941198
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
1195-
#state{returns = Returns} = State0) ->
1199+
#state{returns = Returns,
1200+
delivery_limit = DeliveryLimit} = State0, Effects0) ->
11961201
Header = maps:update_with(delivery_count,
1197 10000 1202
fun (C) -> C+1 end,
11981203
1, Header0),
1199-
Msg = {RaftId, {Header, RawMsg}},
1200-
% this should not affect the release cursor in any way
1201-
add_bytes_return(RawMsg,
1202-
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
1204+
case maps:get(delivery_count, Header) of
1205+
DeliveryCount when DeliveryCount > DeliveryLimit ->
1206+
Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
1207+
{add_bytes_settle(RawMsg, State0), Effects};
1208+
_ ->
1209+
Msg = {RaftId, {Header, RawMsg}},
1210+
%% this should not affect the release cursor in any way
1211+
{add_bytes_return(RawMsg,
1212+
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
1213+
end.
12031214

1204-
return_all(State0, Checked0) ->
1215+
return_all(State0, Checked0, Effects0) ->
12051216
%% need to sort the list so that we return messages in the order
12061217
%% they were checked out
12071218
Checked = lists:sort(maps:to_list(Checked0)),
1208-
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
1209-
return_one(0, Msg, S);
1210-
({_, {MsgNum, Msg}}, S) ->
1211-
return_one(MsgNum, Msg, S)
1212-
end, State0, Checked).
1219+
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
1220+
return_one(0, Msg, S, E);
1221+
({_, {MsgNum, Msg}}, {S, E}) ->
1222+
return_one(MsgNum, Msg, S, E)
1223+
end, {State0, Effects0}, Checked).
12131224

12141225
%% checkout new messages to consumers
12151226
%% reverses the effects list

src/rabbit_policies.erl

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ register() ->
4343
{policy_validator, <<"max-length-bytes">>},
4444
{policy_validator, <<"queue-mode">>},
4545
{policy_validator, <<"overflow">>},
46+
{policy_validator, <<"delivery-limit">>},
4647
{operator_policy_validator, <<"expires">>},
4748
{operator_policy_validator, <<"message-ttl">>},
4849
{operator_policy_validator, <<"max-length">>},
4950
{operator_policy_validator, <<"max-length-bytes">>},
51+
{operator_policy_validator, <<"delivery-limit">>},
5052
{policy_merge_strategy, <<"expires">>},
5153
{policy_merge_strategy, <<"message-ttl">>},
5254
{policy_merge_strategy, <<"max-length">>},
53-
{policy_merge_strategy, <<"max-length-bytes">>}]],
55+
{policy_merge_strategy, <<"max-length-bytes">>},
56+
{policy_merge_strategy, <<"delivery-limit">>}]],
5457
ok.
5558

5659
validate_policy(Terms) ->
@@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
111114
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
112115
ok;
113116
validate_policy0(<<"overflow">>, Value) ->
114-
{error, "~p is not a valid overflow value", [Value]}.
117+
{error, "~p is not a valid overflow value", [Value]};
118+
119+
validate_policy0(<<"delivery-limit">>, Value)
120+
when is_integer(Value), Value >= 0 ->
121+
ok;
122+
validate_policy0(<<"delivery-limit">>, Value) ->
123+
{error, "~p is not a valid delivery limit", [Value]}.
115124

116125
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
117126
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
118127
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
119-
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
128+
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
129+
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).

src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
159159
%% take the minimum value of the policy and the queue arg if present
160160
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
161161
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
162+
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
162163
#{name => Name,
163164
queue_resource => QName,
164165
dead_letter_handler => dlx_mfa(Q),
165166
become_leader_handler => {?MODULE, become_leader, [QName]},
166167
max_length => MaxLength,
167168
max_bytes => MaxBytes,
168-
single_active_consumer_on => single_active_consumer_on(Q)}.
169+
single_active_consumer_on => single_active_consumer_on(Q),
170+
delivery_limit => DeliveryLimit
171+
}.
169172

170173
single_active_consumer_on(Q) ->
171174
QArguments = amqqueue:get_arguments(Q),

test/quorum_queue_SUITE.erl

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ all_tests() ->
109109
consume_redelivery_count,
110110
subscribe_redelivery_count,
111111
message_bytes_metrics,
112-
queue_length_limit_drop_head
112+
queue_length_limit_drop_head,
113+
subscribe_redelivery_limit,
114+
subscribe_redelivery_policy,
115+
subscribe_redelivery_limit_with_dead_letter
113116
].
114117

115118
memory_tests() ->
@@ -1462,6 +1465,154 @@ subscribe_redelivery_count(Config) ->
14621465
wait_for_messages_pending_ack(Servers, RaName, 0)
14631466
end.
14641467

1468+
subscribe_redelivery_limit(Config) ->
1469+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1470+
1471+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1472+
QQ = ?config(queue_name, Config),
1473+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1474+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1475+
{<<"x-delivery-limit">>, long, 1}])),
1476+
1477+
RaName = ra_name(QQ),
1478+
publish(Ch, QQ),
1479+
wait_for_messages_ready(Servers, RaName, 1),
1480+
wait_for_messages_pending_ack(Servers, RaName, 0),
1481+
subscribe(Ch, QQ, false),
1482+
1483+
DTag = <<"x-delivery-count">>,
1484+
receive
1485+
{#'basic.deliver'{delivery_tag = DeliveryTag,
1486+
redelivered = false},
1487+
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
1488+
?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
1489+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
1490+
multiple = false,
1491+
requeue = true})
1492+
end,
1493+
1494+
wait_for_messages_ready(Servers, RaName, 0),
1495+
wait_for_messages_pending_ack(Servers, RaName, 1),
1496+
receive
1497+
{#'basic.deliver'{delivery_tag = DeliveryTag1,
1498+
redelivered = true},
1499+
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
1500+
?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
1501+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
1502+
multiple = false,
1503+
requeue = true})
1504+
end,
1505+
1506+
wait_for_messages_ready(Servers, RaName, 0),
1507+
wait_for_messages_pending_ack(Servers, RaName, 0),
1508+
receive
1509+
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
1510+
throw(unexpected_redelivery)
1511+
after 2000 ->
1512+
ok
1513+
end.
1514+
1515+
subscribe_redelivery_policy(Config) ->
1516+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1517+
1518+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1519+
QQ = ?config(queue_name, Config),
1520+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1521+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1522+
1523+
ok = rabbit_ct_broker_helpers:set_policy(
1524+
Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
1525+
[{<<"delivery-limit">>, 1}]),
1526+
1527+
RaName = ra_name(QQ),
1528+
publish(Ch, QQ),
1529+
wait_for_messages_ready(Servers, RaName, 1),
1530+
wait_for_messages_pending_ack(Servers, RaName, 0),
1531+
subscribe(Ch, QQ, false),
1532+
1533+
DTag = <<"x-delivery-count">>,
1534+
receive
1535+
{#'basic.deliver'{delivery_tag = DeliveryTag,
1536+
redelivered = false},
1537+
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
1538+
?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
1539+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
1540+
multiple = false,
1541+
requeue = true})
1542+
end,
1543+
1544+
wait_for_messages_ready(Servers, RaName, 0),
1545+
wait_for_messages_pending_ack(Servers, RaName, 1),
1546+
receive
1547+
{#'basic.deliver'{delivery_tag = DeliveryTag1,
1548+
redelivered = true},
1549+
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
1550+
?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
1551+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
1552+
multiple = false,
1553+
requeue = true})
1554+
end,
1555+
1556+
wait_for_messages_ready(Servers, RaName, 0),
1557+
wait_for_messages_pending_ack(Servers, RaName, 0),
1558+
receive
1559+
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
1560+
throw(unexpected_redelivery)
1561+
after 2000 ->
1562+
ok
1563+
end,
1564+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
1565+
1566+
subscribe_redelivery_limit_with_dead_letter(Config) ->
1567+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1568+
1569+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1570+
QQ = ?config(queue_name, Config),
1571+
DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>,
1572+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1573+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
1574+
{<<"x-delivery-limit">>, long, 1},
1575+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
1576+
{<<"x-dead-letter-routing-key">>, longstr, DLX}
1577+
])),
1578+
?assertEqual({'queue.declare_ok', DLX, 0, 0},
1579+
declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1580+
1581+
RaName = ra_name(QQ),
1582+
RaDlxName = ra_name(DLX),
1583+
publish(Ch, QQ),
1584+
wait_for_messages_ready(Servers, RaName, 1),
1585+
wait_for_messages_pending_ack(Servers, RaName, 0),
1586+
subscribe(Ch, QQ, false),
1587+
1588+
DTag = <<"x-delivery-count">>,
1589+
receive
1590+
{#'basic.deliver'{delivery_tag = DeliveryTag,
1591+
redelivered = false},
1592+
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
1593+
?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
1594+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
1595+
multiple = false,
1596+
requeue = true})
1597+
end,
1598+
1599+
wait_for_messages_ready(Servers, RaName, 0),
1600+
wait_for_messages_pending_ack(Servers, RaName, 1),
1601+
receive
1602+
{#'basic.deliver'{delivery_tag = DeliveryTag1,
1603+
redelivered = 91FD true},
1604+
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
1605+
?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
1606+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
1607+
multiple = false,
1608+
requeue = true})
1609+
end,
1610+
1611+
wait_for_messages_ready(Servers, RaName, 0),
1612+
wait_for_messages_pending_ack(Servers, RaName, 0),
1613+
wait_for_messages_ready(Servers, RaDlxName, 1),
1614+
wait_for_messages_pending_ack(Servers, RaDlxName, 0).
1615+
14651616
consume_redelivery_count(Config) ->
14661617
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
14671618

0 commit comments

Comments
 (0)
0