8000 Merge pull request #1889 from rabbitmq/poison-handling-qq · rabbitmq/rabbitmq-server@b987346 · GitHub
[go: up one dir, main page]

Skip to content

Commit b987346

Browse files
Merge pull request #1889 from rabbitmq/poison-handling-qq
Poison handling in quorum queues
2 parents 20d3d6a + 4fd34a4 commit b987346

9 files changed

+2383
-2213
lines changed

src/rabbit_dead_letter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
%%----------------------------------------------------------------------------
2525

26-
-type reason() :: 'expired' | 'rejected' | 'maxlen'.
26+
-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit.
2727

2828
%%----------------------------------------------------------------------------
2929

src/rabbit_fifo.erl

Lines changed: 314 additions & 1569 deletions
Large diffs are not rendered by default.

src/rabbit_fifo.hrl

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
2+
-type raw_msg() :: term().
3+
%% The raw message. It is opaque to rabbit_fifo.
4+
5+
-type msg_in_id() :: non_neg_integer().
6+
% a queue scoped monotonically incrementing integer used to enforce order
7+
% in the unassigned messages map
8+
9+
-type msg_id() :: non_neg_integer().
10+
%% A consumer-scoped monotonically incrementing integer included with a
11+
%% {@link delivery/0.}. Used to settle deliveries using
12+
%% {@link rabbit_fifo_client:settle/3.}
13+
14+
-type msg_seqno() :: non_neg_integer().
15+
% 8000 % A sender process scoped monotonically incrementing integer included
16+
%% in enqueue messages. Used to ensure ordering of messages send from the
17+
%% same process
18+
19+
-type msg_header() :: #{size := msg_size(),
20+
delivery_count => non_neg_integer()}.
21+
%% The message header map:
22+
%% delivery_count: the number of unsuccessful delivery attempts.
23+
%% A non-zero value indicates a previous attempt.
24+
25+
-type msg() :: {msg_header(), raw_msg()}.
26+
%% message with a header map.
27+
28+
-type msg_size() :: non_neg_integer().
29+
%% the size in bytes of the msg payload
30+
31+
-type indexed_msg() :: {ra_index(), msg()}.
32+
33+
-type prefix_msg() :: {'$prefix_msg', msg_header()}.
34+
35+
-type delivery_msg() :: {msg_id(), msg()}.
36+
%% A tuple consisting of the message id and the headered message.
37+
38+
-type consumer_tag() :: binary().
39+
%% An arbitrary binary tag used to distinguish between different consumers
40+
%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
41+
42+
-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
43+
%% Represents the delivery of one or more rabbit_fifo messages.
44+
45+
-type consumer_id() :: {consumer_tag(), pid()}.
46+
%% The entity that receives messages. Uniquely identifies a consumer.
47+
48+
-type credit_mode() :: simple_prefetch | credited.
49+
%% determines how credit is replenished
50+
51+
-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
52+
credit_mode()} |
53+
{dequeue, settled | unsettled} |
54+
cancel.
55+
56+
-type consumer_meta() :: #{ack => boolean(),
57+
username => binary(),
58+
prefetch => non_neg_integer(),
59+
args => list()}.
60+
%% static meta data associated with a consumer
61+
62+
63+
-type applied_mfa() :: {module(), atom(), list()}.
64+
% represents a partially applied module call
65+
66+
-define(RELEASE_CURSOR_EVERY, 64000).
67+
-define(USE_AVG_HALF_LIFE, 10000.0).
68+
69+
-record(consumer,
70+
{meta = #{} :: consumer_meta(),
71+
checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
72+
next_msg_id = 0 :: msg_id(), % part of snapshot data
73+
%% max number of messages that can be sent
74+
%% decremented for each delivery
75+
credit = 0 : non_neg_integer(),
76+
%% total number of checked out messages - ever
77+
%% incremented for each delivery
78+
delivery_count = 0 :: non_neg_integer(),
79+
%% the mode of how credit is incremented
80+
%% simple_prefetch: credit is re-filled as deliveries are settled
81+
%% or returned.
82+
%% credited: credit can only be changed by receiving a consumer_credit
83+
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
84+
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
85+
lifetime = once :: once | auto,
86+
status = up :: up | suspected_down | cancelled
87+
}).
88+
89+
-type consumer() :: #consumer{}.
90+
91+
-record(enqueuer,
92+
{next_seqno = 1 :: msg_seqno(),
93+
% out of order enqueues - sorted list
94+
pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
95+
status = up :: up | suspected_down
96+
}).
97+
98+
-record(cfg,
99+
{name :: atom(),
100+
resource :: rabbit_types:r('queue'),
101+
release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
102+
dead_letter_handler :: maybe(applied_mfa()),
103+
become_leader_handler :: maybe(applied_mfa()),
104+
max_length :: maybe(non_neg_integer()),
105+
max_bytes :: maybe(non_neg_integer()),
106+
%% whether single active consumer is on or not for this queue
107+
consumer_strategy = default :: default | single_active,
108+
delivery_limit :: maybe(non_neg_integer())
109+
}).
110+
111+
-record(rabbit_fifo,
112+
{cfg :: #cfg{},
113+
% unassigned messages
114+
messages = #{} :: #{msg_in_id() => indexed_msg()},
115+
% defines the lowest message in id available in the messages map
116+
% that isn't a return
117+
low_msg_num :: msg_in_id() | undefined,
118+
% defines the next message in id to be added to the messages map
119+
next_msg_num = 1 :: msg_in_id(),
120+
% list of returned msg_in_ids - when checking out it picks from
121+
% this list first before taking low_msg_num
122+
returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
123+
{msg_in_id(), indexed_msg()}),
124+
% a counter of enqueues - used to trigger shadow copy points
125+
enqueue_count = 0 :: non_neg_integer(),
126+
% a map containing all the live processes that have ever enqueued
127+
% a message to this queue as well as a cached value of the smallest
128+
% ra_index of all pending enqueues
129+
enqueuers = #{} :: #{pid() => #enqueuer{}},
130+
% master index of all enqueue raft indexes including pending
131+
% enqueues
132+
% rabbit_fifo_index can be slow when calculating the smallest
133+
% index when there are large gaps but should be faster than gb_trees
134+
% for normal appending operations as it's backed by a map
135+
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
136+
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
137+
ra_index(), #rabbit_fifo{}}),
138+
% consumers need to reflect consumer state at time of snapshot
139+
% needs to be part of snapshot
140+
consumers = #{} :: #{consumer_id() => #consumer{}},
141+
% consumers that require further service are queued here
142+
% needs to be part of snapshot
143+
service_queue = queue:new() :: queue:queue(consumer_id()),
144+
%% This is a special field that is only used for snapshots
145+
%% It represents the queued messages at the time the
146+
%% dehydrated snapshot state was cached.
147+
%% As release_cursors are only emitted for raft indexes where all
148+
%% prior messages no longer contribute to the current state we can
149+
%% replace all message payloads with their sizes (to be used for
150+
%% overflow calculations).
151+
%% This is done so that consumers are still served in a deterministic
152+
%% order on recovery.
153+
prefix_msgs = {[], []} :: {Return :: [msg_header()],
154+
PrefixMsgs :: [msg_header()]},
155+
msg_bytes_enqueue = 0 :: non_neg_integer(),
156+
msg_bytes_checkout = 0 :: non_neg_integer(),
157+
%% waiting consumers, one is picked active consumer is cancelled or dies
158+
%% used only when single active consumer is on
159+
waiting_consumers = [] :: [{consumer_id(), consumer()}]
160+
}).
161+
162+
-type config() :: #{name := atom(),
163+
queue_resource := rabbit_types:r('queue'),
164+
dead_letter_handler => applied_mfa(),
165+
become_leader_handler => applied_mfa(),
166+
release_cursor_interval => non_neg_integer(),
167+
max_length => non_neg_integer(),
168+
max_bytes => non_neg_integer(),
169+
single_active_consumer_on => boolean(),
170+
delivery_limit => non_neg_integer()}.

src/rabbit_policies.erl

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ register() ->
4343
{policy_validator, <<"max-length-bytes">>},
4444
{policy_validator, <<"queue-mode">>},
4545
{policy_validator, <<"overflow">>},
46+
{policy_validator, <<"delivery-limit">>},
4647
{operator_policy_validator, <<"expires">>},
4748
{operator_policy_validator, <<"message-ttl">>},
4849
{operator_policy_validator, <<"max-length">>},
4950
{operator_policy_validator, <<"max-length-bytes">>},
51+
{operator_policy_validator, <<"delivery-limit">>},
5052
{policy_merge_strategy, <<"expires">>},
5153
{policy_merge_strategy, <<"message-ttl">>},
5254
{policy_merge_strategy, <<"max-length">>},
53-
{policy_merge_strategy, <<"max-length-bytes">>}]],
55+
{policy_merge_strategy, <<"max-length-bytes">>},
56+
{policy_merge_strategy, <<"delivery-limit">>}]],
5457
ok.
5558

5659
validate_policy(Terms) ->
@@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
111114
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
112115
ok;
113116
validate_policy0(<<"overflow">>, Value) ->
114-
{error, "~p is not a valid overflow value", [Value]}.
117+
{error, "~p is not a valid overflow value", [Value]};
118+
119+
validate_policy0(<<"delivery-limit">>, Value)
120+
when is_integer(Value), Value >= 0 ->
121+
ok;
122+
validate_policy0(<<"delivery-limit">>, Value) ->
123+
{error, "~p is not a valid delivery limit", [Value]}.
115124

116125
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
117126
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
118127
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
119-
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
128+
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
129+
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).

src/rabbit_quorum_queue.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
163163
%% take the minimum value of the policy and the queue arg if present
164164
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
165165
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
166+
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
166167
#{name => Name,
167168
queue_resource => QName,
168169
dead_letter_handler => dlx_mfa(Q),
169170
become_leader_handler => {?MODULE, become_leader, [QName]},
170171
max_length => MaxLength,
171172
max_bytes => MaxBytes,
172-
single_active_consumer_on => single_active_consumer_on(Q)}.
173+
single_active_consumer_on => single_active_consumer_on(Q),
174+
delivery_limit => DeliveryLimit
175+
}.
173176

174177
single_active_consumer_on(Q) ->
175178
QArguments = amqqueue:get_arguments(Q),
@@ -680,14 +683,12 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
680683
rabbit_misc:execute_mnesia_transaction(
681684
fun() -> rabbit_amqqueue:update(QName, Fun) end),
682685
ok;
683-
timeout ->
686+
{timeout, _} ->
684687
{error, timeout};
685688
E ->
686689
%% TODO should we stop the ra process here?
687690
E
688691
end;
689-
timeout ->
690-
{error, timeout};
691692
E ->
692693
E
693694
end.

0 commit comments

Comments
 (0)
0