8000 CQ: Improve reading from shared message store · rabbitmq/rabbitmq-server@32816c0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 32816c0

Browse files
author
Loïc Hoguin
committed
CQ: Improve reading from shared message store
This commit replaces file combining with single-file compaction where data is moved near the beginning of the file before updating the index entries. The file is then truncated when all existing readers are gone. This allows removing the lock that existed before and enables reading multiple messages at once from the shared files. This also helps us avoid many ets operations and simplify the code greatly. This commit still has some issues: reading a single message is currently slow due to the removal of FHC in the client code. This will be resolved by implementing read buffering in a similar way as FHC but without keeping files open more than necessary. The dirty recovery code also likely has a number of issues because of the compaction changes.
1 parent 6bee349 commit 32816c0

File tree

7 files changed

+681
-733
lines changed

7 files changed

+681
-733
lines changed

deps/rabbit/src/rabbit_msg_file.erl

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(rabbit_msg_file).
99

10-
-export([append/3, read/2, scan/4]).
10+
-export([append/3, read/2, pread/2, pread/3, scan/4]).
1111

1212
%%----------------------------------------------------------------------------
1313

@@ -39,6 +39,9 @@
3939

4040
append(FileHdl, MsgId, MsgBody)
4141
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
42+
%% @todo I think we are actually writing MsgId TWICE: once in
43+
%% the header, once in the body. Might be better to reduce
44+
%% the size of the body...
4245
MsgBodyBin = term_to_binary(MsgBody),
4346
MsgBodyBinSize = size(MsgBodyBin),
4447
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
@@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
6770
KO -> KO
6871
end.
6972

73+
-spec pread(io_device(), position(), msg_size()) ->
74+
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
75+
any()).
76+
77+
pread(FileHdl, Offset, TotalSize) ->
78+
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
79+
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
80+
case file:pread(FileHdl, Offset, TotalSize) of
81+
{ok, <<Size:?INTEGER_SIZE_BITS,
82+
MsgId:?MSG_ID_SIZE_BYTES/binary,
83+
MsgBodyBin:BodyBinSize/binary,
84+
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
85+
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
86+
KO -> KO
87+
end.
88+
89+
-spec pread(io_device(), [{position(), msg_size()}]) ->
90+
{ok, [msg()]} | {error, any()} | eof.
91+
92+
pread(FileHdl, LocNums) ->
93+
case file:pread(FileHdl, LocNums) of
94+
{ok, DataL} -> {ok, pread_parse(DataL)};
95+
KO -> KO
96+
end.
97+
98+
pread_parse([<<Size:?INTEGER_SIZE_BITS,
99+
_MsgId:?MSG_ID_SIZE_BYTES/binary,
100+
Rest0/bits>>|Tail]) ->
101+
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
102+
<<MsgBodyBin:BodyBinSize/binary,
103+
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS,
104+
Rest/bits>> = Rest0,
105+
[binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])];
106+
pread_parse([<<>>]) ->
107+
[];
108+
pread_parse([<<>>|Tail]) ->
109+
pread_parse(Tail).
110+
70111
-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
71112
{'ok', A, position()}.
72113

0 commit comments

Comments
 (0)
0