E5DD feat(socket): implement `esockd_socket` connection module by keynslug · Pull Request #15451 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0630af6
feat(parser): tell how much more bytes are expected when incomplete
keynslug Jun 30, 2025
7cf8259
chore: update `esockd` to 5.15.0 with `esockd_socket` backend
keynslug Jul 23, 2025
be4b994
feat(socket): implement `esockd_socket` connection module
keynslug Jun 30, 2025
28bc810
test(emqx): make peeking into connection state module-aware
keynslug Jun 30, 2025
3a6b020
test(emqx): adapt PP2 mocks to `esockd_socket` listeners
keynslug Jul 1, 2025
aad4e5a
fix(sessds): make `get_session_state/1` connmod-aware
keynslug Jul 1, 2025
e6edd25
fix(eviction): make aware of new connection module
keynslug Jul 1, 2025
0761af5
test(emqx): cleanup `emqx_client_SUITE` and unbreak 2 testcases
keynslug Jul 2, 2025
1f9dc79
feat(congestion): make module generic and connmod-aware
keynslug Jul 2, 2025
280ac5a
feat(socket): use async send APIs and respect `send_timeout` option
keynslug Jul 2, 2025
a43ee50
chore(conn): make `esockd_transport` connection emit `send_timeout` e…
keynslug Jul 2, 2025
63f64ec
test(conn): verify congestion and send timeout logic
keynslug Jul 2, 2025
e1312b6
feat(emqx): support `esockd_socket` backend through listener config
keynslug Jun 30, 2025
960f9fb
FBC8 test(conn): verify connmods work well with socket close and keepalive
keynslug Jul 10, 2025
f69c877
perf(frame): optimize away and inline few serialization routines
keynslug Jul 10, 2025
13a081b
chore(socket): fix dialyzer complaints
keynslug Jul 10, 2025
3a3ce16
chore: add changelog entry
keynslug Jul 10, 2025
242148f
chore(socket): drop unnecessary directive
keynslug Jul 16, 2025
6dcae1c
fix(socket): avoid closing already closed socket
keynslug Jul 17, 2025
3a81374
fix(socket): anticipate > 1 socket `abort`s on socket close
keynslug Jul 18, 2025
3430943
chore(socket): correct typespec
keynslug Jul 18, 2025
b3d1f5e
fix(socket): preserve select handle + handle decongestion correctly
keynslug Jul 18, 2025
a724a65
test(client): add congested-then-decongested client testcase
keynslug Jul 18, 2025
8bb14d9
chore(socket): annotate log events with connmod for observability
keynslug Jul 18, 2025
7b8dcbc
chore(frame): avoid using 0 to signal "some more" bytes expected
keynslug Jul 22, 2025
e66c98c
chore(socket): drop dead copy-paste artifacts
keynslug Jul 22, 2025
9a26fb3
chore(conn): mention related `emqx_socket_connection` in header
keynslug Jul 22, 2025
8cf1ee0
chore(schema): mention `tcp_backend` change requires listener restart
keynslug Jul 22, 2025
b255ddd
fix(schema): do not announce `socket` TCP backend under Windows
keynslug Jul 22, 2025
367fd4b
chore(socket): address dialyzer concerns
keynslug Jul 22, 2025
80429fa
test(telemetry): fix testcase timeout due to dirty server state
keynslug Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test(emqx): make peeking into connection state module-aware
  • Loading branch information
keynslug committed Jul 23, 2025
commit 28bc810e4355d92b5ea78654ac88490037c21b21
36 changes: 36 additions & 0 deletions apps/emqx/test/emqx_cth_broker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-module(emqx_cth_broker).

-compile(export_all).
-compile(nowarn_export_all).

-spec connection_info(_Info, pid() | emqx_types:clientid()) -> _Value.
connection_info(Info, Client) when is_pid(Client) ->
connection_info(Info, emqtt_info(clientid, Client));
connection_info(Info, ClientId) ->
[ChanPid] = emqx_cm:lookup_channels(ClientId),
ConnMod = emqx_cm:do_get_chann_conn_mod(ClientId, ChanPid),
get_connection_info(Info, ConnMod, sys:get_state(ChanPid)).

-spec connection_state(pid() | emqx_types:clientid()) -> _Value.
connection_state(Client) when is_pid(Client) ->
connection_state(emqtt_info(clientid, Client));
connection_state(ClientId) ->
[ChanPid] = emqx_cm:lookup_channels(ClientId),
ConnMod = emqx_cm:do_get_chann_conn_mod(ClientId, ChanPid),
ConnMod:get_state(ChanPid).

get_connection_info(connmod, ConnMod, _State) ->
ConnMod;
get_connection_info(Info, emqx_connection, State) ->
emqx_connection:info(Info, State);
get_connection_info(Info, emqx_socket_connection, State) ->
emqx_socket_connection:info(Info, State);
get_connection_info(Info, emqx_ws_connection, {_WSState, ConnState, _}) ->
emqx_ws_connection:info(Info, ConnState).

emqtt_info(Key, Client) ->
proplists:get_value(Key, emqtt:info(Client), undefined).
4 changes: 1 addition & 3 deletions apps/emqx/test/emqx_listeners_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ t_tcp_frame_parsing_conn(_Config) ->
with_listener(tcp, ?FUNCTION_NAME, Conf, fun() ->
Client = emqtt_connect_tcp({127, 0, 0, 1}, Port),
pong = emqtt:ping(Client),
ClientId = proplists:get_value(clientid, emqtt:info(Client)),
[CPid] = emqx_cm:lookup_channels(ClientId),
CState = emqx_connection:get_state(CPid),
CState = emqx_cth_broker:connection_state(Client),
?assertMatch(#{listener := {tcp, ?FUNCTION_NAME}}, CState),
emqx_listeners:is_packet_parser_available(mqtt) andalso
?assertMatch(#{parser := {frame, _Options}}, CState)
Expand Down
9 changes: 2 additions & 7 deletions apps/emqx/test/emqx_listeners_update_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ test_change_parse_unit(ConfPath, ClientOpts) ->
?assertMatch({ok, _}, emqx:update_config(ConfPath, {update, ListenerRawConf1})),
Client1 = emqtt_connect(ClientOpts),
pong = emqtt:ping(Client1),
CState1 = get_conn_state(Client1),
CState1 = emqx_cth_broker:connection_state(Client1),
emqx_listeners:is_packet_parser_available(mqtt) andalso
?assertMatch(
#{parser := {frame, _Options}},
Expand All @@ -245,7 +245,7 @@ test_change_parse_unit(ConfPath, ClientOpts) ->
?assertMatch({ok, _}, emqx:update_config(ConfPath, {update, ListenerRawConf0})),
Client2 = emqtt_connect(ClientOpts),
pong = emqtt:ping(Client2),
CState2 = get_conn_state(Client2),
CState2 = emqx_cth_broker:connection_state(Client2),
emqx_listeners:is_packet_parser_available(mqtt) andalso
?assertMatch(
#{parser := Parser} when Parser =/= map_get(parser, CState1),
Expand Down Expand Up @@ -424,8 +424,3 @@ emqtt_connect(Opts) ->
{error, Reason} ->
error(Reason, [Opts])
end.

get_conn_state(Client) ->
ClientId = proplists:get_value(clientid, emqtt:info(Client)),
[CPid | _] = emqx_cm:lookup_channels(ClientId),
emqx_connection:get_state(CPid).
18 changes: 3 additions & 15 deletions apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,6 @@ end_per_testcase(TestCase, Config) ->
client_info(Key, Client) ->
proplists:get_value(Key, emqtt:info(Client), undefined).

connection_info(Info, ClientPid, Config) when is_list(Config) ->
connection_info(Info, ClientPid, ?config(conn_type, Config));
connection_info(Info, ClientPid, tcp) ->
emqx_connection:info(Info, sys:get_state(ClientPid));
connection_info(Info, ClientPid, quic) ->
emqx_connection:info(Info, sys:get_state(ClientPid));
connection_info(Info, ClientPid, ws) ->
{_WSState, ConnState, _} = sys:get_state(ClientPid),
emqx_ws_connection:info(Info, ConnState).

receive_messages(Count) ->
receive_messages(Count, []).

Expand Down Expand Up @@ -283,8 +273,7 @@ t_connect_will_message(Config) ->
| Config
]),
{ok, _} = emqtt:ConnFun(Client1),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
WillMsg = connection_info({channel, will_msg}, ClientPid, Config),
WillMsg = emqx_cth_broker:connection_info({channel, will_msg}, Client1),
%% [MQTT-3.1.2-7]
?assertNotEqual(undefined, WillMsg),

Expand Down Expand Up @@ -468,13 +457,12 @@ t_connect_emit_stats_timeout(Config) ->
{ok, _} = emqtt:ConnFun(Client),
%% Poke the connection to ensure stats timer is armed.
pong = emqtt:ping(Client),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assertMatch(
TRef when is_reference(TRef),
connection_info(stats_timer, ClientPid, Config)
emqx_cth_broker:connection_info(stats_timer, Client)
),
?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
?assertEqual(undefined, connection_info(stats_timer, ClientPid, Config)),
?assertEqual(undefined, emqx_cth_broker:connection_info(stats_timer, Client)),
ok = emqtt:disconnect(Client).

%% [MQTT-3.1.2-22]
Expand Down
3 changes: 1 addition & 2 deletions apps/emqx/test/emqx_persistent_session_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,12 @@ t_choose_impl(Config) ->
| Config
]),
{ok, _} = emqtt:ConnFun(Client),
[ChanPid] = emqx_cm:lookup_channels(ClientId),
?assertEqual(
case ?config(persistence, Config) of
false -> emqx_session_mem;
ds -> emqx_persistent_session_ds
end,
emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
emqx_cth_broker:connection_info({channel, {session, impl}}, ClientId)
),
ok = emqtt:disconnect(Client).

Expand Down
16 changes: 8 additions & 8 deletions apps/emqx/test/emqx_shared_sub_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -792,28 +792,28 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
[Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic),
?assert(is_process_alive(Pid1)),
?assert(is_process_alive(Pid2)),
?retry(100, 10, ?assertEqual(disconnected, get_channel_info(conn_state, Pid1))),
?retry(100, 10, ?assertEqual(disconnected, get_channel_info(conn_state, Pid2))),
?retry(100, 10, ?assertEqual(disconnected, get_channel_info(conn_state, ClientId1))),
?retry(100, 10, ?assertEqual(disconnected, get_channel_info(conn_state, ClientId2))),

{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
?retry(
100,
10,
begin
Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
Msgs1 = emqx_mqueue:to_list(get_mqueue(ClientId1)),
Msgs2 = emqx_mqueue:to_list(get_mqueue(ClientId2)),
%% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2)
end
),
emqtt:stop(ConnPub),
ok.

get_mqueue(ConnPid) ->
get_channel_info({session, mqueue}, ConnPid).
get_mqueue(Client) ->
get_channel_info({session, mqueue}, Client).

get_channel_info(Info, ConnPid) ->
emqx_connection:info({channel, Info}, sys:get_state(ConnPid)).
get_channel_info(Info, Client) ->
emqx_cth_broker:connection_info({channel, Info}, Client).

%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
Expand Down
7 changes: 5 additions & 2 deletions apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1090,10 +1090,13 @@ t_keepalive(Config) ->
{ok, _} = emqtt:connect(C1),
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
%% will reset to max keepalive if keepalive > max keepalive
#{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
?assertMatch(
#{conninfo := #{keepalive := InitKeepalive}},
emqx_cm:get_chan_info(list_to_binary(ClientId))
),
?assertMatch(
#{max_idle_millisecond := 65536500},
emqx_connection:info({channel, keepalive}, sys:get_state(Pid))
emqx_cth_broker:connection_info({channel, keepalive}, list_to_binary(ClientId))
),

?assertMatch(
Expand Down
Add this suggestion to a batch that can be applied as a single commit. This suggestion is invalid because no changes were made to the code. Suggestions cannot be applied while the pull request is closed. Suggestions cannot be applied while viewing a subset of changes. Only one suggestion per line can be applied in a batch. Add this suggestion to a batch that can be applied as a single commit. Applying suggestions on deleted lines is not supported. You must change the existing code in this line in order to create a valid suggestion. Outdated suggestions cannot be applied. This suggestion has been applied or marked resolved. Suggestions cannot be applied from pending reviews. Suggestions cannot be applied on multi-line comments. Suggestions cannot be applied while the pull request is queued to merge. Suggestion cannot be applied right now. Please check back later.
0