|
| 1 | + |
| 2 | +-type raw_msg() :: term(). |
| 3 | +%% The raw message. It is opaque to rabbit_fifo. |
| 4 | + |
| 5 | +-type msg_in_id() :: non_neg_integer(). |
| 6 | +% a queue scoped monotonically incrementing integer used to enforce order |
| 7 | +% in the unassigned messages map |
| 8 | + |
| 9 | +-type msg_id() :: non_neg_integer(). |
| 10 | +%% A consumer-scoped monotonically incrementing integer included with a |
| 11 | +%% {@link delivery/0.}. Used to settle deliveries using |
| 12 | +%% {@link rabbit_fifo_client:settle/3.} |
| 13 | + |
| 14 | +-type msg_seqno() :: non_neg_integer(). |
| 15 | +%
8000
% A sender process scoped monotonically incrementing integer included |
| 16 | +%% in enqueue messages. Used to ensure ordering of messages send from the |
| 17 | +%% same process |
| 18 | + |
| 19 | +-type msg_header() :: #{size := msg_size(), |
| 20 | + delivery_count => non_neg_integer()}. |
| 21 | +%% The message header map: |
| 22 | +%% delivery_count: the number of unsuccessful delivery attempts. |
| 23 | +%% A non-zero value indicates a previous attempt. |
| 24 | + |
| 25 | +-type msg() :: {msg_header(), raw_msg()}. |
| 26 | +%% message with a header map. |
| 27 | + |
| 28 | +-type msg_size() :: non_neg_integer(). |
| 29 | +%% the size in bytes of the msg payload |
| 30 | + |
| 31 | +-type indexed_msg() :: {ra_index(), msg()}. |
| 32 | + |
| 33 | +-type prefix_msg() :: {'$prefix_msg', msg_header()}. |
| 34 | + |
| 35 | +-type delivery_msg() :: {msg_id(), msg()}. |
| 36 | +%% A tuple consisting of the message id and the headered message. |
| 37 | + |
| 38 | +-type consumer_tag() :: binary(). |
| 39 | +%% An arbitrary binary tag used to distinguish between different consumers |
| 40 | +%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.} |
| 41 | + |
| 42 | +-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}. |
| 43 | +%% Represents the delivery of one or more rabbit_fifo messages. |
| 44 | + |
| 45 | +-type consumer_id() :: {consumer_tag(), pid()}. |
| 46 | +%% The entity that receives messages. Uniquely identifies a consumer. |
| 47 | + |
| 48 | +-type credit_mode() :: simple_prefetch | credited. |
| 49 | +%% determines how credit is replenished |
| 50 | + |
| 51 | +-type checkout_spec() :: {once | auto, Num :: non_neg_integer(), |
| 52 | + credit_mode()} | |
| 53 | + {dequeue, settled | unsettled} | |
| 54 | + cancel. |
| 55 | + |
| 56 | +-type consumer_meta() :: #{ack => boolean(), |
| 57 | + username => binary(), |
| 58 | + prefetch => non_neg_integer(), |
| 59 | + args => list()}. |
| 60 | +%% static meta data associated with a consumer |
| 61 | + |
| 62 | + |
| 63 | +-type applied_mfa() :: {module(), atom(), list()}. |
| 64 | +% represents a partially applied module call |
| 65 | + |
| 66 | +-define(RELEASE_CURSOR_EVERY, 64000). |
| 67 | +-define(USE_AVG_HALF_LIFE, 10000.0). |
| 68 | + |
| 69 | +-record(consumer, |
| 70 | + {meta = #{} :: consumer_meta(), |
| 71 | + checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, |
| 72 | + next_msg_id = 0 :: msg_id(), % part of snapshot data |
| 73 | + %% max number of messages that can be sent |
| 74 | + %% decremented for each delivery |
| 75 | + credit = 0 : non_neg_integer(), |
| 76 | + %% total number of checked out messages - ever |
| 77 | + %% incremented for each delivery |
| 78 | + delivery_count = 0 :: non_neg_integer(), |
| 79 | + %% the mode of how credit is incremented |
| 80 | + %% simple_prefetch: credit is re-filled as deliveries are settled |
| 81 | + %% or returned. |
| 82 | + %% credited: credit can only be changed by receiving a consumer_credit |
| 83 | + %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' |
| 84 | + credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data |
| 85 | + lifetime = once :: once | auto, |
| 86 | + status = up :: up | suspected_down | cancelled |
| 87 | + }). |
| 88 | + |
| 89 | +-type consumer() :: #consumer{}. |
| 90 | + |
| 91 | +-record(enqueuer, |
| 92 | + {next_seqno = 1 :: msg_seqno(), |
| 93 | + % out of order enqueues - sorted list |
| 94 | + pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], |
| 95 | + status = up :: up | suspected_down |
| 96 | + }). |
| 97 | + |
| 98 | +-record(cfg, |
| 99 | + {name :: atom(), |
| 100 | + resource :: rabbit_types:r('queue'), |
| 101 | + release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), |
| 102 | + dead_letter_handler :: maybe(applied_mfa()), |
| 103 | + become_leader_handler :: maybe(applied_mfa()), |
| 104 | + max_length :: maybe(non_neg_integer()), |
| 105 | + max_bytes :: maybe(non_neg_integer()), |
| 106 | + %% whether single active consumer is on or not for this queue |
| 107 | + consumer_strategy = default :: default | single_active, |
| 108 | + delivery_limit :: maybe(non_neg_integer()) |
| 109 | + }). |
| 110 | + |
| 111 | +-record(rabbit_fifo, |
| 112 | + {cfg :: #cfg{}, |
| 113 | + % unassigned messages |
| 114 | + messages = #{} :: #{msg_in_id() => indexed_msg()}, |
| 115 | + % defines the lowest message in id available in the messages map |
| 116 | + % that isn't a return |
| 117 | + low_msg_num :: msg_in_id() | undefined, |
| 118 | + % defines the next message in id to be added to the messages map |
| 119 | + next_msg_num = 1 :: msg_in_id(), |
| 120 | + % list of returned msg_in_ids - when checking out it picks from |
| 121 | + % this list first before taking low_msg_num |
| 122 | + returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | |
| 123 | + {msg_in_id(), indexed_msg()}), |
| 124 | + % a counter of enqueues - used to trigger shadow copy points |
| 125 | + enqueue_count = 0 :: non_neg_integer(), |
| 126 | + % a map containing all the live processes that have ever enqueued |
| 127 | + % a message to this queue as well as a cached value of the smallest |
| 128 | + % ra_index of all pending enqueues |
| 129 | + enqueuers = #{} :: #{pid() => #enqueuer{}}, |
| 130 | + % master index of all enqueue raft indexes including pending |
| 131 | + % enqueues |
| 132 | + % rabbit_fifo_index can be slow when calculating the smallest |
| 133 | + % index when there are large gaps but should be faster than gb_trees |
| 134 | + % for normal appending operations as it's backed by a map |
| 135 | + ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), |
| 136 | + release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, |
| 137 | + ra_index(), #rabbit_fifo{}}), |
| 138 | + % consumers need to reflect consumer state at time of snapshot |
| 139 | + % needs to be part of snapshot |
| 140 | + consumers = #{} :: #{consumer_id() => #consumer{}}, |
| 141 | + % consumers that require further service are queued here |
| 142 | + % needs to be part of snapshot |
| 143 | + service_queue = queue:new() :: queue:queue(consumer_id()), |
| 144 | + %% This is a special field that is only used for snapshots |
| 145 | + %% It represents the queued messages at the time the |
| 146 | + %% dehydrated snapshot state was cached. |
| 147 | + %% As release_cursors are only emitted for raft indexes where all |
| 148 | + %% prior messages no longer contribute to the current state we can |
| 149 | + %% replace all message payloads with their sizes (to be used for |
| 150 | + %% overflow calculations). |
| 151 | + %% This is done so that consumers are still served in a deterministic |
| 152 | + %% order on recovery. |
| 153 | + prefix_msgs = {[], []} :: {Return :: [msg_header()], |
| 154 | + PrefixMsgs :: [msg_header()]}, |
| 155 | + msg_bytes_enqueue = 0 :: non_neg_integer(), |
| 156 | + msg_bytes_checkout = 0 :: non_neg_integer(), |
| 157 | + %% waiting consumers, one is picked active consumer is cancelled or dies |
| 158 | + %% used only when single active consumer is on |
| 159 | + waiting_consumers = [] :: [{consumer_id(), consumer()}] |
| 160 | + }). |
| 161 | + |
| 162 | +-type config() :: #{name := atom(), |
| 163 | + queue_resource := rabbit_types:r('queue'), |
| 164 | + dead_letter_handler => applied_mfa(), |
| 165 | + become_leader_handler => applied_mfa(), |
| 166 | + release_cursor_interval => non_neg_integer(), |
| 167 | + max_length => non_neg_integer(), |
| 168 | + max_bytes => non_neg_integer(), |
| 169 | + single_active_consumer_on => boolean(), |
| 170 | + delivery_limit => non_neg_integer()}. |
0 commit comments