|
7 | 7 |
|
8 | 8 | -module(rabbit_msg_file).
|
9 | 9 |
|
10 |
| --export([append/3, read/2, scan/4]). |
| 10 | +-export([append/3, read/2, pread/2, pread/3, scan/4]). |
11 | 11 |
|
12 | 12 | %%----------------------------------------------------------------------------
|
13 | 13 |
|
|
39 | 39 |
|
40 | 40 | append(FileHdl, MsgId, MsgBody)
|
41 | 41 | when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
|
| 42 | + %% @todo I think we are actually writing MsgId TWICE: once in |
| 43 | + %% the header, once in the body. Might be better to reduce |
| 44 | + %% the size of the body... |
42 | 45 | MsgBodyBin = term_to_binary(MsgBody),
|
43 | 46 | MsgBodyBinSize = size(MsgBodyBin),
|
44 | 47 | Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
|
@@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
|
67 | 70 | KO -> KO
|
68 | 71 | end.
|
69 | 72 |
|
| 73 | +-spec pread(io_device(), position(), msg_size()) -> |
| 74 | + rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()}, |
| 75 | + any()). |
| 76 | + |
| 77 | +pread(FileHdl, Offset, TotalSize) -> |
| 78 | + Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, |
| 79 | + BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, |
| 80 | + case file:pread(FileHdl, Offset, TotalSize) of |
| 81 | + {ok, <<Size:?INTEGER_SIZE_BITS, |
| 82 | + MsgId:?MSG_ID_SIZE_BYTES/binary, |
| 83 | + MsgBodyBin:BodyBinSize/binary, |
| 84 | + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> |
| 85 | + {ok, {MsgId, binary_to_term(MsgBodyBin)}}; |
| 86 | + KO -> KO |
| 87 | + end. |
| 88 | + |
| 89 | +-spec pread(io_device(), [{position(), msg_size()}]) -> |
| 90 | + {ok, [msg()]} | {error, any()} | eof. |
| 91 | + |
| 92 | +pread(FileHdl, LocNums) -> |
| 93 | + case file:pread(FileHdl, LocNums) of |
| 94 | + {ok, DataL} -> {ok, pread_parse(DataL)}; |
| 95 | + KO -> KO |
| 96 | + end. |
| 97 | + |
| 98 | +pread_parse([<<Size:?INTEGER_SIZE_BITS, |
| 99 | + _MsgId:?MSG_ID_SIZE_BYTES/binary, |
| 100 | + Rest0/bits>>|Tail]) -> |
| 101 | + BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, |
| 102 | + <<MsgBodyBin:BodyBinSize/binary, |
| 103 | + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS, |
| 104 | + Rest/bits>> = Rest0, |
| 105 | + [binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])]; |
| 106 | +pread_parse([<<>>]) -> |
| 107 | + []; |
| 108 | +pread_parse([<<>>|Tail]) -> |
| 109 | + pread_parse(Tail). |
| 110 | + |
70 | 111 | -spec scan(io_device(), file_size(), message_accumulator(A), A) ->
|
71 | 112 | {'ok', A, position()}.
|
72 | 113 |
|
|
0 commit comments