8000 perf(ruleeng): employ `emqx_topic_index` to speed up topic matching by keynslug · Pull Request #11396 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test(topicidx): add property test
Co-Authored-By: JianBo He <heeejianbo@gmail.com>
  • Loading branch information
keynslug and HJianBo committed Aug 14, 2023
commit 0c7bdbdab45ec2f9ecb5dc3f4f390afc5eae60f4
178 changes: 161 additions & 17 deletions apps/emqx/test/emqx_topic_index_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").

-import(emqx_proper_types, [scaled/2]).

all() ->
emqx_common_test_helpers:all(?MODULE).

Expand Down Expand Up @@ -144,23 +147,122 @@ t_match_unique(_) ->
[id(M) || M <- emqx_topic_index:matches(<<"a/b/c">>, Tab, [unique])]
).

t_match_wildcards(_) ->
Tab = emqx_topic_index:new(),
emqx_topic_index:insert(<<"a/b">>, id1, <<>>, Tab),
emqx_topic_index:insert(<<"a/b/#">>, id2, <<>>, Tab),
emqx_topic_index:insert(<<"a/b/#">>, id3, <<>>, Tab),
emqx_topic_index:insert(<<"a/b/c">>, id4, <<>>, Tab),
emqx_topic_index:insert(<<"a/b/+">>, id5, <<>>, Tab),
emqx_topic_index:insert(<<"a/b/d">>, id6, <<>>, Tab),
emqx_topic_index:insert(<<"a/+/+">>, id7, <<>>, Tab),
emqx_topic_index:insert(<<"a/+/#">>, id8, <<>>, Tab),

Records = [id(M) || M <- matches(<<"a/b/c">>, Tab)],
?assertEqual([id2, id3, id4, id5, id7, id8], lists:sort(Records)),

Records1 = [id(M) || M <- matches(<<"a/b">>, Tab)],
?assertEqual([id1, id2, id3, id8], lists:sort(Records1)),
ok.
t_match_wildcard_edge_cases(_) ->
CommonTopics = [
<<"a/b">>,
<<"a/b/#">>,
<<"a/b/#">>,
<<"a/b/c">>,
<<"a/b/+">>,
<<"a/b/d">>,
<<"a/+/+">>,
<<"a/+/#">>
],
Datasets =
[
%% Topics, TopicName, Results
{CommonTopics, <<"a/b/c">>, [2, 3, 4, 5, 7, 8]},
{CommonTopics, <<"a/b">>, [1, 2, 3, 8]},
{[<<"+/b/c">>, <<"/">>], <<"a/b/c">>, [1]},
{[<<"#">>, <<"/">>], <<"a">>, [1]},
{[<<"/">>, <<"+">>], <<"a">>, [2]}
],
F = fun({Topics, TopicName, Expected}) ->
Tab = emqx_topic_index:new(),
_ = [emqx_topic_index:insert(T, N, <<>>, Tab) || {N, T} <- lists:enumerate(Topics)],
Results = [id(M) || M <- emqx_topic_index:matches(TopicName, Tab, [unique])],
?assertEqual(
Expected,
Results,
#{
"Base topics" => Topics,
"Topic name" => TopicName
}
)
end,
lists:foreach(F, Datasets).

t_prop_matches(_) ->
?assert(
proper:quickcheck(
topic_matches_prop(),
[{max_size, 100}, {numtests, 100}]
)
),
Statistics = [{C, account(C)} || C <- [filters, topics, matches, maxhits]],
ct:pal("Statistics: ~p", [maps:from_list(Statistics)]).

topic_matches_prop() ->
?FORALL(
% Generate a longer list of topics and a shorter list of topic filter patterns.
#{
topics := TTopics,
patterns := Pats
},
emqx_proper_types:fixedmap(#{
% NOTE
% Beware adding non-empty contraint, proper will have a hard time with `topic_t/1`
% for some reason.
topics => scaled(4, list(topic_t([1, 2, 3, 4]))),
patterns => list(topic_filter_pattern_t())
}),
begin
Tab = emqx_topic_index:new(),
Topics = [emqx_topic:join(T) || T <- TTopics],
% Produce topic filters from generated topics and patterns.
% Number of filters is equal to the number of patterns, most of the time.
Filters = lists:enumerate(mk_filters(Pats, TTopics)),
_ = [emqx_topic_index:insert(F, N, <<>>, Tab) || {N, F} <- Filters],
% Gather some basic statistics
_ = account(filters, length(Filters)),
_ = account(topics, NTopics = length(Topics)),
_ = account(maxhits, NTopics * NTopics),
% Verify that matching each topic against index returns the same results as
% matching it against the list of filters one by one.
lists:all(
fun(Topic) ->
Ids1 = [id(M) || M <- emqx_topic_index:matches(Topic, Tab, [unique])],
Ids2 = lists:filtermap(
fun({N, F}) ->
case emqx_topic:match(Topic, F) of
true -> {true, N};
false -> false
end
end,
Filters
),
% Account a number of matches to compute hitrate later
_ = account(matches, length(Ids1)),
case (Ids2 -- Ids1) ++ (Ids2 -- Ids1) of
[] ->
true;
[_ | _] = _Differences ->
ct:pal(
"Topic name: ~p~n"
"Index results: ~p~n"
"Topic match results:: ~p~n",
[Topic, Ids1, Ids2]
),
false
end
end,
Topics
)
end
).

mk_filters([Pat | PRest], [Topic | TRest]) ->
[emqx_topic:join(mk_topic_filter(Pat, Topic)) | mk_filters(PRest, TRest)];
mk_filters(_, _) ->
[].

account(Counter, N) ->
put({?MODULE, Counter}, account(Counter) + N).

account(Counter) ->
emqx_maybe:define(get({?MODULE, Counter}), 0).

%%

match(T, Tab) ->
emqx_topic_index:match(T, Tab).
Expand All @@ -173,3 +275,45 @@ id(Match) ->

topic(Match) ->
emqx_topic_index:get_topic(Match).

%%

topic_t(EntropyWeights) ->
EWLast = lists:last(EntropyWeights),
?LET(L, scaled(1 / 4, list(EWLast)), begin
EWs = lists:sublist(EntropyWeights ++ L, length(L)),
?SIZED(S, [oneof([topic_level_t(S * EW), topic_level_fixed_t()]) || EW <- EWs])
end).

topic_level_t(Entropy) ->
S = floor(1 + math:log2(Entropy) / 4),
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).

topic_level_fixed_t() ->
oneof([
<<"foo">>,
<<"bar">>,
<<"baz">>,
<<"xyzzy">>
]).

topic_filter_pattern_t() ->
list(topic_level_pattern_t()).

topic_level_pattern_t() ->
frequency([
{5, level},
{2, '+'},
{1, '#'}
]).

mk_topic_filter([], _) ->
[];
mk_topic_filter(_, []) ->
[];
mk_topic_filter(['#' | _], _) ->
['#'];
mk_topic_filter(['+' | Rest], [_ | Levels]) ->
['+' | mk_topic_filter(Rest, Levels)];
mk_topic_filter([level | Rest], [L | Levels]) ->
[L | mk_topic_filter(Rest, Levels)].
0