10BC0 fix: refresh resources and rules asynchronously by terry-xiaoyu · Pull Request #9199 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion apps/emqx_rule_engine/include/rule_engine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@
end
end()).

-define(RPC_TIMEOUT, 30000).

-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).

-define(CLUSTER_CALL(Func, Args, ResParttern),
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of
{ResL, []} ->
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
[] -> ResL;
Expand All @@ -192,6 +194,37 @@
throw({Func, {failed_on_nodes, BadNodes}})
end end()).

%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined
-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs),
fun() ->
RNodes = ekka_mnesia:running_nodes(),
ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT),
Res = lists:zip(RNodes, ResL),
BadRes = lists:filtermap(fun
({_Node, {ok, ResParttern}}) ->
false;
({Node, {error, {exception, undef, _}}}) ->
try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of
ResParttern ->
false;
OtherRes ->
{true, #{rpc_type => call, func => FallbackFunc,
result => OtherRes, node => Node}}
catch
Err:Reason ->
{true, #{rpc_type => call, func => FallbackFunc,
exception => {Err, Reason}, node => Node}}
end;
({Node, OtherRes}) ->
{true, #{rpc_type => multicall, func => FallbackFunc,
result => OtherRes, node => Node}}
end, Res),
case BadRes of
[] -> Res;
_ -> throw(BadRes)
end
end()).

%% Tables
-define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action).
Expand Down
196 changes: 196 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_engine.appup.src

Large diffs are not rendered by default.

47 changes: 34 additions & 13 deletions apps/emqx_rule_engine/src/emqx_rule_engine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
, refresh_resources/0
, refresh_resource/1
, refresh_rule/1
, refresh_rules/0
, refresh_rules_when_boot/0
, refresh_actions/1
, refresh_actions/2
, refresh_resource_status/0
Expand All @@ -47,6 +47,7 @@
]).

-export([ init_resource/4
, init_resource_with_retrier/4
, init_action/4
, clear_resource/4
, clear_rule/1
Expand Down Expand Up @@ -255,15 +256,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
created_at = erlang:system_time(millisecond)
},
ok = emqx_rule_registry:add_resource(Resource),
InitArgs = [M, F, ResId, Config],
case Retry of
with_retry ->
%% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later.
_ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))),
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
init_resource, InitArgs)
catch throw : Reason ->
?LOG(error, "create_resource failed: ~0p", [Reason])
end,
{ok, Resource};
no_retry ->
try
_ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
_ = ?CLUSTER_CALL(init_resource, InitArgs),
{ok, Resource}
catch throw : Reason ->
{error, Reason}
Expand Down Expand Up @@ -327,7 +333,7 @@ start_resource(ResId) ->
{ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType),
try
init_resource(Mod, Create, ResId, Config),
init_resource_with_retrier(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId)
catch
throw:Reason -> {error, Reason}
Expand Down Expand Up @@ -476,20 +482,22 @@ refresh_resource(Type) when is_atom(Type) ->
emqx_rule_registry:get_resources_by_type(Type));

refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
try
{ok, #resource_type{on_create = {M, F}}} =
emqx_rule_registry:find_resource_type(Type),
ok = emqx_rule_engine:init_resource(M, F, ResId, Config)
catch _:_ ->
emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY)
end.
{ok, #resource_type{on_create = {M, F}}} =
emqx_rule_registry:find_resource_type(Type),
ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config).

-spec(refresh_rules() -> ok).
refresh_rules() ->
-spec(refresh_rules_when_boot() -> ok).
refresh_rules_when_boot() ->
lists:foreach(fun
(#rule{enabled = true} = Rule) ->
try refresh_rule(Rule)
catch _:_ ->
%% We set the enable = false when rule init failed to avoid bad rules running
%% without actions created properly.
%% The init failure might be caused by a disconnected resource, in this case the
%% actions can not be created, so the rules won't work.
%% After the user fixed the problem he can enable it manually,
%% doing so will also recreate the actions.
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
end;
(_) -> ok
Expand Down Expand Up @@ -655,6 +663,19 @@ init_resource(Module, OnCreate, ResId, Config) ->
status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams).

init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
try
Params = Module:OnCreate(ResId, Config),
ResParams = #resource_params{id = ResId,
params = Params,
status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams)
catch Class:Reason:ST ->
Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY),
emqx_rule_monitor:ensure_resource_retrier(ResId, Interval),
erlang:raise(Class, {init_resource, Reason}, ST)
end.

init_action(Module, OnCreate, ActionInstId, Params) ->
ok = emqx_rule_metrics:create_metrics(ActionInstId),
case ?RAISE(Module:OnCreate(ActionInstId, Params),
Expand Down
3 changes: 1 addition & 2 deletions apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_rule_engine_sup:start_link(),
_ = emqx_rule_engine_sup:start_locker(),
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_engine:refresh_resources(),
ok = emqx_rule_engine:refresh_rules(),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
ok = emqx_rule_engine_cli:load(),
{ok, Sup}.

Expand Down
23 changes: 23 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

-export([ start_link/0
, stop/0
, async_refresh_resources_rules/0
, ensure_resource_retrier/2
, retry_loop/3
]).
Expand All @@ -45,12 +46,22 @@ init([]) ->
_ = erlang:process_flag(trap_exit, true),
{ok, #{retryers => #{}}}.

async_refresh_resources_rules() ->
gen_server:cast(?MODULE, async_refresh).

ensure_resource_retrier(ResId, Interval) ->
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).

handle_call(_Msg, _From, State) ->
{reply, ok, State}.

handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) ->
%% the refresh task is already in progress, we discard the duplication
{noreply, State};
handle_cast(async_refresh, State) ->
Pid = spawn_link(fun do_async_refresh/0),
{noreply, State#{boot_refresh_pid => Pid}};

handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
Objects = maps:get(Tag, State, #{}),
NewState = case maps:find(Obj, Objects) of
Expand All @@ -65,7 +76,13 @@ handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.


handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) ->
{noreply, State#{boot_refresh_pid => undefined}};
handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
%% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'.
%% Instead we rely on the user to trigger a manual retry for the resources, and then enable
%% the rules after resources are connected.
case maps:take(Pid, Retryers) of
{{Tag, Obj}, Retryers2} ->
Objects = maps:get(Tag, State, #{}),
Expand Down Expand Up @@ -117,6 +134,12 @@ retry_loop(resource, ResId, Interval) ->
ok
end.

do_async_refresh() ->
%% NOTE: the order matters.
%% We should always refresh the resources first and then the rules.
ok = emqx_rule_engine:refresh_resources(),
ok = emqx_rule_engine:refresh_rules_when_boot().

refresh_and_enable_rules_of_resource(ResId) ->
lists:foreach(
fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) ->
Expand Down
55 changes: 49 additions & 6 deletions apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl
C026
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ suite() ->
groups() ->
[{resource, [sequence],
[ t_restart_resource
, t_refresh_resources_rules
]}
].

Expand All @@ -47,24 +48,53 @@ end_per_suite(_Config) ->
ok.

init_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100),
Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ets:new(t_restart_resource, [named_table, public]),
ets:insert(t_restart_resource, {failed_count, 0}),
ets:insert(t_restart_resource, {succ_count, 0}),
common_init_per_testcase(),
Config;
init_per_testcase(t_refresh_resources_rules, Config) ->
meck:unload(),
ets:new(t_refresh_resources_rules, [named_table, public]),
ok = meck:new(emqx_rule_engine, [no_link, passthrough]),
meck:expect(emqx_rule_engine, refresh_resources, fun() ->
timer:sleep(500),
ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}),
ok
end),
meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() ->
timer:sleep(500),
ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}),
ok
end),
common_init_per_testcase(),
Config;

init_per_testcase(_, Config) ->
common_init_per_testcase(),
Config.

end_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000),
ets:delete(t_restart_resource),
common_end_per_testcases(),
Config;
end_per_testcase(t_refresh_resources_rules, Config) ->
meck:unload(),
common_end_per_testcases(),
Config;
end_per_testcase(_, Config) ->
common_end_per_testcases(),
Config.

common_init_per_testcase() ->
{ok, _} = emqx_rule_monitor:start_link().
common_end_per_testcases() ->
emqx_rule_monitor:stop().

t_restart_resource(_) ->
{ok, _} = emqx_rule_monitor:start_link(),
ok = emqx_rule_registry:register_resource_types(
[#resource_type{
name = test_res_1,
Expand All @@ -79,21 +109,34 @@ t_restart_resource(_) ->
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => test_res_1,
config => #{},
restart_interval => 100,
description => <<"debug resource">>}),
[{_, 1}] = ets:lookup(t_restart_resource, failed_count),
[{_, 0}] = ets:lookup(t_restart_resource, succ_count),
?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)),
?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3,
ets:lookup(t_restart_resource, failed_count)),
ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
timer:sleep(1000),
[{_, 5}] = ets:lookup(t_restart_resource, failed_count),
[{_, 1}] = ets:lookup(t_restart_resource, succ_count),
#{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)),
?assertEqual(0, map_size(Pids)),
ok = emqx_rule_engine:unload_providers(),
emqx_rule_registry:remove_resource(ResId),
emqx_rule_monitor:stop(),
ok.

t_refresh_resources_rules(_) ->
ok = emqx_rule_monitor:async_refresh_resources_rules(),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
%% there should be only one refresh handler at the same time
?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))),
timer:sleep(1200),
?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
timer:sleep(1200),
?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)).

on_resource_create(Id, _) ->
case ets:lookup(t_restart_resource, failed_count) of
[{_, 5}] ->
Expand Down
3 changes: 3 additions & 0 deletions changes/v4.3.22-en.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
### Enhancements

- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
This is to avoid slowing down the boot if some resources spend long time establishing the connection.

- Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124).
This is to make the ACL deny logging for subscription behave the same as for publish.

Expand Down
3 changes: 3 additions & 0 deletions changes/v4.3.22-zh.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
### 增强

- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。

- 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。
该行为的改变主要是为了跟发布失败时的行为保持一致。

Expand Down
0