8000 fix(ds): Add configuration for RocksDB options by ieQu1 · Pull Request #15463 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions apps/emqx/src/emqx_ds_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ translate_builtin_raft(
n_shards := NShards,
n_sites := NSites,
replication_factor := ReplFactor,
transaction := Transaction
transaction := Transaction,
rocksdb := RocksDBOptions
}
) ->
%% NOTE: Undefined if `basic` schema is in use.
Expand All @@ -67,14 +68,16 @@ translate_builtin_raft(
replication_factor => ReplFactor,
replication_options => maps:get(replication_options, Backend, #{}),
storage => emqx_maybe:apply(fun translate_layout/1, Layout),
transaction => Transaction
transaction => Transaction,
rocksdb => RocksDBOptions
}.

translate_builtin_local(
Backend = #{
backend := builtin_local,
n_shards := NShards,
transaction := Transaction
transaction := Transaction,
rocksdb := RocksDBOptions
}
) ->
%% NOTE: Undefined if `basic` schema is in use.
Expand All @@ -87,7 +90,8 @@ translate_builtin_local(
storage => emqx_maybe:apply(fun translate_layout/1, Layout),
poll_workers_per_shard => NPollers,
poll_batch_size => BatchSize,
transaction => Transaction
transaction => Transaction,
rocksdb => RocksDBOptions
}.

%%================================================================================
Expand Down Expand Up @@ -194,6 +198,33 @@ fields(builtin_local_basic) ->
backend_fields(builtin_local, basic);
fields(builtin_raft_basic) ->
backend_fields(builtin_raft, basic);
fields(rocksdb_options) ->
[
{cache_size,
sc(
emqx_schema:bytesize(),
#{
default => <<"8MB">>,
desc => ?DESC(rocksdb_cache_size)
}
)},
{write_buffer_size,
sc(
emqx_schema:bytesize(),
#{
default => <<"10MB">>,
desc => ?DESC(rocksdb_write_buffer_size)
}
)},
{max_open_files,
sc(
pos_integer(),
#{
default => 100,
desc => ?DESC(rocksdb_max_open_files)
}
)}
];
fields(builtin_write_buffer) ->
[
{max_items,
Expand Down Expand Up @@ -295,9 +326,19 @@ fields(layout_builtin_wildcard_optimized_v2) ->
sc(
emqx_ds_msg_serializer:schema(),
#{
default => v1,
default => asn1,
importance => ?IMPORTANCE_HIDDEN
}
)},
{wildcard_thresholds,
sc(
hoconsc:array(hoconsc:union([non_neg_integer(), infinity])),
#{
default => [100, 10],
validator => fun validate_wildcard_thresholds/1,
importance => ?IMPORTANCE_LOW,
desc => ?DESC(lts_wildcard_thresholds)
}
)}
];
fields(layout_builtin_reference) ->
Expand Down Expand Up @@ -380,6 +421,14 @@ common_builtin_fields(basic) ->
importance => ?IMPORTANCE_LOW,
desc => ?DESC(builtin_optimistic_transaction)
}
)},
{rocksdb,
sc(
ref(rocksdb_options),
#{
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_rocksdb_options)
}
)}
];
common_builtin_fields(layout) ->
Expand Down Expand Up @@ -436,6 +485,8 @@ desc(layout_builtin_reference) ->
?DESC(layout_builtin_reference);
desc(optimistic_transaction) ->
?DESC(optimistic_transaction);
desc(rocksdb_options) ->
?DESC(rocksdb_options);
desc(_) ->
undefined.

Expand All @@ -448,13 +499,15 @@ translate_layout(
type := wildcard_optimized_v2,
bytes_per_topic_level := BytesPerTopicLevel,
topic_index_bytes := TopicIndexBytes,
serialization_schema := SSchema
serialization_schema := SSchema,
wildcard_thresholds := WildcardThresholds
}
) ->
{emqx_ds_storage_skipstream_lts, #{
wildcard_hash_bytes => BytesPerTopicLevel,
topic_index_bytes => TopicIndexBytes,
serialization_schema => SSchema
serialization_schema => SSchema,
lts_threshold_spec => translate_lts_wildcard_thresholds(WildcardThresholds)
}};
translate_layout(
#{
Expand Down Expand Up @@ -486,3 +539,11 @@ builtin_layouts() ->
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

ref(StructName) -> hoconsc:ref(?MODULE, StructName).

validate_wildcard_thresholds([]) ->
{error, "List should not be empty."};
validate_wildcard_thresholds([_ | _]) ->
ok.

translate_lts_wildcard_thresholds(L = [_ | _]) ->
{simple, list_to_tuple(L)}.
5CC0
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ open_db(DB, CreateOpts0) ->
flush_interval => 1_000,
idle_flush_interval => 1,
conflict_window => 5_000
}
},
replication_options => #{}
},
CreateOpts1
),
Expand Down
44 changes: 43 additions & 1 deletion apps/emqx_durable_storage/src/emqx_ds_lts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
info/2,
info/1,

insert_wildcard/2,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused for now.


updated_topics/2,

threshold_fun/1,
Expand Down Expand Up @@ -379,6 +381,32 @@ updated_topics(#trie{rlookups = true}, Dump) ->
Dump
).

-spec insert_wildcard(trie(), [binary() | '+']) -> ok.
insert_wildcard(Trie, TF) ->
%% Craft a special threshold function that returns 0 for topic
%% levels corresponding to '+'s in the filter or `infinity'
%% otherwise:
LevelThresholds = [
case I of
'+' -> 0;
_ when is_binary(I) -> infinity
end
|| I <- TF
],
ThresholdFun = fun(Depth, _Parent) ->
lists:nth(Depth + 1, LevelThresholds)
end,
%% Insert a dummy topic that corresponds to the given topic filter:
DummyTopic = [
case I of
'+' -> <<>>;
_ -> I
end
|| I <- TF
],
_ = topic_key(Trie, ThresholdFun, DummyTopic),
ok.

%%================================================================================
%% Internal exports
%%================================================================================
Expand Down Expand Up @@ -862,6 +890,19 @@ topic_key_test() ->
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
end.

insert_wildcard_test() ->
T = trie_create(),
?assertMatch(ok, insert_wildcard(T, ['+', <<"pub">>, '+'])),
ThresholdFun = fun(_, _) -> infinity end,
?assertMatch(
{_, [<<"foo">>, <<"bar">>]},
test_key(T, ThresholdFun, [<<"foo">>, <<"pub">>, <<"bar">>, <<"quux">>])
),
?assertMatch(
{_, [<<"foo">>]},
test_key(T, ThresholdFun, [<<"foo">>, <<"bar">>])
).

%% erlfmt-ignore
topic_match_test() ->
T = trie_create(),
Expand Down Expand Up @@ -1064,7 +1105,8 @@ assert_match_topics(Trie, Filter0, Expected) ->
%% erlfmt-ignore
test_key(Trie, Threshold, Topic0) ->
Topic = lists:map(fun('') -> '';
(I) -> integer_to_binary(I)
(I) when is_integer(I) -> integer_to_binary(I);
(Bin) when is_binary(Bin) -> Bin
end,
Topic0),
Ret = topic_key(Trie, Threshold, Topic),
Expand Down
Loading
0