10BC0 feat(socket): implement `esockd_socket` connection module by keynslug · Pull Request #15451 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0630af6
feat(parser): tell how much more bytes are expected when incomplete
keynslug Jun 30, 2025
7cf8259
chore: update `esockd` to 5.15.0 with `esockd_socket` backend
keynslug Jul 23, 2025
be4b994
feat(socket): implement `esockd_socket` connection module
keynslug Jun 30, 2025
28bc810
test(emqx): make peeking into connection state module-aware
keynslug Jun 30, 2025
3a6b020
test(emqx): adapt PP2 mocks to `esockd_socket` listeners
keynslug Jul 1, 2025
aad4e5a
fix(sessds): make `get_session_state/1` connmod-aware
keynslug Jul 1, 2025
e6edd25
fix(eviction): make aware of new connection module
keynslug Jul 1, 2025
0761af5
test(emqx): cleanup `emqx_client_SUITE` and unbreak 2 testcases
keynslug Jul 2, 2025
1f9dc79
feat(congestion): make module generic and connmod-aware
keynslug Jul 2, 2025
280ac5a
feat(socket): use async send APIs and respect `send_timeout` option
keynslug Jul 2, 2025
a43ee50
chore(conn): make `esockd_transport` connection emit `send_timeout` e…
keynslug Jul 2, 2025
63f64ec
test(conn): verify congestion and send timeout logic
keynslug Jul 2, 2025
e1312b6
feat(emqx): support `esockd_socket` backend through listener config
keynslug Jun 30, 2025
960f9fb
test(conn): verify connmods work well with socket close and keepalive
keynslug Jul 10, 2025
f69c877
perf(frame): optimize away and inline few serialization routines
keynslug Jul 10, 2025
13a081b
chore(socket): fix dialyzer complaints
keynslug Jul 10, 2025
3a3ce16
chore: add changelog entry
keynslug Jul 10, 2025
242148f
chore(socket): drop unnecessary directive
keynslug Jul 16, 2025
6dcae1c
fix(socket): avoid closing already closed socket
keynslug Jul 17, 2025
3a81374
fix(socket): anticipate > 1 socket `abort`s on socket close
keynslug Jul 18, 2025
3430943
chore(socket): correct typespec
keynslug Jul 18, 2025
b3d1f5e
fix(socket): preserve select handle + handle decongestion correctly
keynslug Jul 18, 2025
a724a65
test(client): add congested-then-decongested client testcase
keynslug Jul 18, 2025
8bb14d9
chore(socket): annotate log events with connmod for observability
keynslug Jul 18, 2025
7b8dcbc
chore(frame): avoid using 0 to signal "some more" bytes expected
keynslug Jul 22, 2025
e66c98c
chore(socket): drop dead copy-paste artifacts
keynslug Jul 22, 2025
9a26fb3
chore(conn): mention related `emqx_socket_connection` in header
keynslug Jul 22, 2025
8cf1ee0
chore(schema): mention `tcp_backend` change requires listener restart
keynslug Jul 22, 2025
b255ddd
fix(schema): do not announce `socket` TCP backend under Windows
keynslug Jul 22, 2025
367fd4b
chore(socket): address dialyzer concerns
keynslug Jul 22, 2025
80429fa
test(telemetry): fix testcase timeout due to dirty server state
keynslug Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(emqx): support esockd_socket backend through listener config
  • Loading branch information
keynslug committed Jul 23, 2025
commit e1312b6d324a58eab6de370b6ece1c43f46c2ffb
44 changes: 36 additions & 8 deletions apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,17 @@ console_print(_Fmt, _Args) -> ok.
-spec do_start_listener(listener_type(), atom(), listener_id(), map()) ->
{ok, pid() | {skipped, atom()}} | {error, term()}.
%% Start MQTT/TCP listener
do_start_listener(Type = tcp, Name, Id, #{bind := ListenOn, tcp_backend := socket} = Conf) ->
esockd:open_tcpsocket(
Id,
ListenOn,
esockd_opts(Id, Type, Name, emqx_socket_connection, Conf)
);
do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
esockd:open(
Id,
ListenOn,
esockd_opts(Id, Type, Name, Opts)
esockd_opts(Id, Type, Name, emqx_connection, Opts)
);
%% Start MQTT/WS listener
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
Expand All @@ -454,13 +460,35 @@ do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
{ok, {skipped, quic_app_missing}}
end.

do_update_listener(
Type = tcp, Name, OldConf, NewConf = #{bind := ListenOn, tcp_backend := Backend}
) ->
Id = listener_id(tcp, Name),
case OldConf of
#{bind := ListenOn, tcp_backend := Backend} ->
case Backend of
gen_tcp -> ConnMod = emqx_connection;
socket -> ConnMod = emqx_socket_connection
end,
esockd:reset_options(
{Id, ListenOn},
esockd_opts(Id, Type, Name, ConnMod, NewConf)
);
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
{error, not_supported}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some description to the config schema desc?
e.g. when choose to use socket (instead of gen_tcp), the socket options cannot be updated from Dashboard or API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of TCP options can be updated I think, it's tcp_backend that cannot be changed without running restart_listener/4. I'll clarify the latter, unless I'm missing something.

end;
do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
?ESOCKD_LISTENER(Type)
->
Id = listener_id(Type, Name),
case maps:get(bind, OldConf) of
ListenOn ->
esockd:reset_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
case OldConf of
#{bind := ListenOn} ->
esockd:reset_options(
{Id, ListenOn},
esockd_opts(Id, Type, Name, emqx_connection, NewConf)
);
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
Expand Down Expand Up @@ -578,17 +606,17 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
perform_listener_change(stop, {Type, Name, Conf}) ->
stop_listener(Type, Name, Conf).

esockd_opts(ListenerId, Type, Name, Opts0) ->
esockd_opts(ListenerId, Type, Name, ConnMod, Opts0) ->
Zone = zone(Opts0),
PacketTcpOpts = choose_packet_opts(Opts0),
Limiter = emqx_limiter:create_esockd_limiter_client(Zone, ListenerId),
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
ESockdLimiter = emqx_limiter:create_esockd_limiter_client(Zone, ListenerId),
Opts2 = Opts1#{
limiter => ESockdLimiter,
limiter => Limiter,
access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])),
tune_fun => {emqx_olp, backoff_new_conn, [Zone]},
connection_mfargs =>
{emqx_connection, start_link, [
{ConnMod, start_link, [
#{
listener => {Type, Name},
zone => Zone,
Expand Down
8 changes: 8 additions & 0 deletions apps/emqx/src/emqx_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,14 @@ fields("crl_cache") ->
];
fields("mqtt_tcp_listener") ->
mqtt_listener(1883) ++
[
{"tcp_backend",
sc(hoconsc:enum([gen_tcp, socket]), #{
default => <<"gen_tcp">>,
desc => ?DESC(fields_mqtt_opts_tcp_backend),
importance => ?IMPORTANCE_LOW
})}
] ++
mqtt_parse_options() ++
[
{"tcp_options",
Expand Down
190 changes: 111 additions & 79 deletions apps/emqx/test/emqx_client_SUITE.erl
6D4E
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,52 @@

all() ->
[
{group, mqttv3},
{group, mqttv4},
{group, mqttv5},
{group, others},
{group, misbehaving}
{group, gen_tcp_listener},
{group, socket_listener}
].

groups() ->
[
{mqttv3, [], [t_basic_v3]},
{gen_tcp_listener, [], [
{group, mqttv3},
{group, mqttv4},
{group, mqttv5},
{group, others},
{group, misbehaving}
]},
{socket_listener, [], [
{group, sock_closed},
{group, misbehaving}
]},
{mqttv3, [], [
t_basic,
t_sock_closed_reason_normal,
t_sock_closed_force_closed_by_client
]},
{mqttv4, [], [
t_basic_v4,
t_basic,
t_cm,
t_cm_registry,
%% t_will_message,
t_offline_message_queueing,
t_overlapping_subscriptions,
%% t_keepalive,
t_redelivery_on_reconnect,
t_dollar_topics
t_dollar_topics,
t_sock_closed_reason_normal,
t_sock_closed_force_closed_by_client
]},
{mqttv5, [], [
t_basic_with_props_v5,
t_v5_receive_maximim_in_connack,
t_sock_closed_reason_normal,
t_sock_closed_force_closed_by_client
]},
{mqttv5, [], [t_basic_with_props_v5, t_v5_receive_maximim_in_connack]},
{others, [], [
t_username_as_clientid,
t_certcn_as_alias,
t_certdn_as_alias,
t_client_attr_from_user_property,
t_sock_closed_reason_normal,
t_sock_closed_force_closed_by_client,
t_certcn_as_clientid_default_config_tls,
t_certcn_as_clientid_tlsv1_3,
t_certcn_as_clientid_tlsv1_2,
Expand All @@ -58,14 +75,25 @@ groups() ->
{misbehaving, [], [
t_sub_non_utf8_topic,
t_congestion_send_timeout
< 9E72 /td> ]},
{sock_closed, [], [
t_sock_closed_reason_normal,
t_sock_closed_force_closed_by_client
]}
].

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_group(gen_tcp_listener, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
%% t_congestion_send_timeout
"listeners.tcp.default.tcp_backend = gen_tcp\n"
"listeners.tcp.default.tcp_options.send_timeout = 2500\n"
"listeners.tcp.default.tcp_options.sndbuf = 4KB\n"
"listeners.tcp.default.tcp_options.recbuf = 4KB\n"
Expand All @@ -75,10 +103,37 @@ init_per_suite(Config) ->
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
[{group_apps, Apps} | Config];
init_per_group(socket_listener, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
%% t_congestion_send_timeout
"listeners.tcp.default.tcp_backend = socket\n"
"listeners.tcp.default.tcp_options.send_timeout = 2500\n"
"listeners.tcp.default.tcp_options.sndbuf = 4KB\n"
"listeners.tcp.default.tcp_options.recbuf = 4KB\n"
%% others
"listeners.ssl.default.ssl_options.verify = verify_peer\n"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{group_apps, Apps} | Config];
init_per_group(mqttv3, Config) ->
[{proto_ver, v3} | Config];
init_per_group(mqttv4, Config) ->
[{proto_ver, v4} | Config];
init_per_group(mqttv5, Config) ->
[{proto_ver, v5} | Config];
init_per_group(_GroupName, Config) ->
Config.

end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
end_per_group(gen_tcp_listener, Config) ->
emqx_cth_suite:stop(?config(group_apps, Config));
end_per_group(socket_listener, Config) ->
emqx_cth_suite:stop(?config(group_apps, Config));
end_per_group(_GroupName, _Config) ->
ok.

init_per_testcase(_Case, Config) ->
Config.
Expand All @@ -92,20 +147,10 @@ end_per_testcase(_Case, _Config) ->
emqx_config:put_zone_conf(default, [mqtt, clientid_override], disabled),
ok.

%%--------------------------------------------------------------------
%% Test cases for MQTT v3
%%--------------------------------------------------------------------

t_basic_v3(_) ->
run_basic([{proto_ver, v3}]).

%%--------------------------------------------------------------------
%% Test cases for MQTT v4
%%--------------------------------------------------------------------

t_basic_v4(_Config) ->
run_basic([{proto_ver, v4}]).

t_cm(_) ->
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000),
ClientId = atom_to_binary(?FUNCTION_NAME),
Expand Down Expand Up @@ -271,18 +316,15 @@ t_dollar_topics(_) ->
%% Test cases for MQTT v5
%%--------------------------------------------------------------------

v5_conn_props(ReceiveMaximum) ->
[
{proto_ver, v5},
{properties, #{'Receive-Maximum' => ReceiveMaximum}}
].
v5_conn_props(ReceiveMaximum, Config) ->
[{properties, #{'Receive-Maximum' => ReceiveMaximum}} | Config].

t_basic_with_props_v5(_) ->
run_basic(v5_conn_props(4)).
t_basic_with_props_v5(Config) ->
t_basic(v5_conn_props(4, Config)).

t_v5_receive_maximim_in_connack(_) ->
t_v5_receive_maximim_in_connack(Config) ->
ReceiveMaximum = 7,
{ok, C} = emqtt:start_link(v5_conn_props(ReceiveMaximum)),
{ok, C} = emqtt:start_link(v5_conn_props(ReceiveMaximum, Config)),
{ok, Props} = emqtt:connect(C),
?assertMatch(#{'Receive-Maximum' := ReceiveMaximum}, Props),
ok = emqtt:disconnect(C),
Expand All @@ -292,7 +334,7 @@ t_v5_receive_maximim_in_connack(_) ->
%% General test cases.
%%--------------------------------------------------------------------

run_basic(Opts) ->
t_basic(Opts) ->
Topic = <<"TopicA">>,
{ok, C} = emqtt:start_link(Opts),
{ok, _} = emqtt:connect(C),
Expand Down Expand Up @@ -378,56 +420,46 @@ t_client_attr_from_user_property(_Config) ->
),
emqtt:disconnect(Client).

t_sock_closed_reason_normal(_) ->
ProtoVers = [v3, v4, v5],
t_sock_closed_reason_normal(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
[
?check_trace(
begin
{ok, C} = emqtt:start_link([{proto_ver, Ver}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
?wait_async_action(
emqtt:disconnect(C),
#{?snk_kind := sock_closed_normal},
5_000
)
end,
fun(Trace0) ->
?assertMatch([#{clientid := ClientId}], ?of_kind(sock_closed_normal, Trace0)),
ok
end
)
|| Ver <- ProtoVers
].
?check_trace(
begin
{ok, C} = emqtt:start_link([{clientid, ClientId} | Config]),
{ok, _} = emqtt:connect(C),
?wait_async_action(
emqtt:disconnect(C),
#{?snk_kind := sock_closed_normal},
5_000
)
end,
fun(Trace0) ->
?assertMatch([#{clientid := ClientId}], ?of_kind(sock_closed_normal, Trace0)),
ok
end
).

t_sock_closed_force_closed_by_client(_) ->
ProtoVers = [v3, v4, v5],
t_sock_closed_force_closed_by_client(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
process_flag(trap_exit, true),
[
?check_trace(
begin
{ok, C} = emqtt:start_link([{proto_ver, Ver}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
?wait_async_action(
exit(C, kill),
#{?snk_kind := sock_closed_with_other_reason},
5_000
)
end,
fun(Trace0) ->
?assertMatch(
[#{clientid := ClientId}], ?of_kind(sock_closed_with_other_reason, Trace0)
),
ok
end
)
|| Ver <- ProtoVers
],
process_flag(trap_exit, false).
?check_trace(
begin
{ok, C} = emqtt:start_link([{clientid, ClientId} | Config]),
{ok, _} = emqtt:connect(C),
true = erlang:unlink(C),
?wait_async_action(
exit(C, kill),
#{?snk_kind := sock_closed_with_other_reason},
5_000
)
end,
fun(Trace0) ->
?assertMatch(
[#{clientid := ClientId}], ?of_kind(sock_closed_with_other_reason, Trace0)
),
ok
end
).

t_clientid_override(_) ->
emqx_logger:set_log_level(debug),
ClientId = <<"original-clientid-0">>,
Username = <<"username1">>,
Override = <<"username">>,
Expand Down
Loading
0