8000 feat(socket): implement `esockd_socket` connection module by keynslug · Pull Request #15451 · emqx/emqx · GitHub
[go: up one dir, main page]

Skip to content

Conversation

@keynslug
Copy link
Contributor
@keynslug keynslug commented Jun 30, 2025

Part of EMQX-14333.

Release version: 6.0.0

Summary

This PR introduces tcp_backend listener option, exclusively to TCP listeners, and employs emqx/esockd#204 with a separate connection module named emqx_socket_connection when tcp_backend is socket. This shows consistently better wide-fanout performance, especially when combined with async shard dispatch introduced in #15421, see report.

This new connection module is identical to emqx_connection module in most of aspects, but made separate primarily because of:

  • Socket-based connections use substantially different connection loop, and integrating it into emqx_connection might have made the latter too bloated, harder to test and reason about.
  • Combined module would have larger footprint, which would lead to a bunch of "micro-deoptimizations" affecting every listener, admittedly pretty minor ones.

Obvious downside is the need to keep changes to connection modules in sync. This requires additional development processes, or perhaps some CI magic, otherwise code will eventually diverge. Ideas are welcome.

See individual commits for details.

Notes:

  • Commit ed96d99 is part of iovec() experiments that I decided to include in this PR, because it shows small consistent improvements in average message processing latency (~2% less) and VM reductions (~3.5% less).
  • While TLS connections are currently not supported under esockd_socket listeners, there's ongoing work in Erlang/OTP to support familiar SSL API with socket sockets.

PR Checklist

  • For internal contributor: there is a jira ticket to track this change
  • The changes are covered with new or existing tests
  • Change log for changes visible by users has been added to changes/ee/(feat|perf|fix|breaking)-<PR-id>.en.md files
  • Schema changes are backward compatible

@keynslug keynslug force-pushed the perf/EMQX-14333/fanout-part-iv branch 5 times, most recently from 5483eef to 23a9313 Compare July 2, 2025 18:47
@keynslug keynslug force-pushed the perf/EMQX-14333/fanout-part-iv branch 3 times, most recently from d8f2480 to c25571c Compare July 9, 2025 16:20
@codecov-commenter
Copy link
codecov-commenter commented Jul 9, 2025

Codecov Report

Attention: Patch coverage is 76.65370% with 120 lines in your changes missing coverage. Please review.

Project coverage is 84.47%. Comparing base (22d202c) to head (80429fa).
Report is 2 commits behind head on release-60.

Files with missing lines Patch % Lines
apps/emqx/src/emqx_socket_connection.erl 72.40% 117 Missing ⚠️
apps/emqx/src/emqx_congestion.erl 96.87% 1 Missing ⚠️
apps/emqx/src/emqx_listeners.erl 90.90% 1 Missing ⚠️
apps/emqx/src/emqx_schema.erl 50.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff               @@
##           release-60   #15451      +/-   ##
==============================================
- Coverage       84.51%   84.47%   -0.04%     
==============================================
  Files            1113     1114       +1     
  Lines           78238    78682     +444     
==============================================
+ Hits            66120    66466     +346     
- Misses          12118    12216      +98     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@keynslug keynslug force-pushed the perf/EMQX-14333/fanout-part-iv branch from c25571c to 5392456 Compare July 10, 2025 17:38
@keynslug keynslug marked this pull request as ready for review July 10, 2025 18:42
@keynslug keynslug requested a review from a team as a code owner July 10, 2025 18:42
- `gen_tcp`: Standard backend, in use since EMQX 5.0 release.
- `socket`: Experimental backend, looking to improve message latency and compute resource usage.
Note that some `tcp_options` settings will have no effect when using this backend, e.g.: `high_watermark` and `send_timeout_close`."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not suggested for IPv6 as otp doc said it is not fully tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it says "experimental". Note that the same doc also does not say the same about {inet_backend, socket} in gen_tcp, which suggests that relevant API surface is well-tested.

Copy link
Contributor
@thalesmg thalesmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌶️
(still reading)

-endif.

-elvis([{elvis_style, used_ignored_variable, disable}]).
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale directive? Or perhaps:

Suggested change
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]).
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_socket_connection]}}]).

?

Comment on lines 550 to 551
handle_msg({'$socket', _Socket, abort, {_Handle, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: could this be a stale message if _Handle in #congested.handle is different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, IIUC any abort basically means someone closed the socket / there was a socket error. Maybe I'm missing something?

Copy link
Contributor
@thalesmg thalesmg Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking if it is possible that a stale abort arrives here, but is no longer relevant, so it wouldn't be necessary to log it? e.g. the handle changed, and the Handle in this message has been abandoned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sock_error later forces the connection process to close and abandon the socket. But in theory we may get 2 aborts per each outstanding select (one for recv, one for send). So, spot on, I'll double-check.

thalesmg
thalesmg previously approved these changes Jul 15, 2025
Copy link
Contributor
@thalesmg thalesmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 :shipit:

emqx_congestion:cancel_alarms(?MODULE, State),
emqx_channel:terminate(Reason, Channel1),
close_socket_ok(State),
?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason})
?TRACE("SOCKET", "emqx_socket_connection_terminated", #{reason => Reason})

?

?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason})
catch
E:C:S ->
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe add the connection module as metadata, to differentiate between this module and emqx_connection?

Though, seeing how many tracepoints and logs would need to change, maybe it could be added to the process logger metadata. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, did the latter.


-compile({inline, [request_more_data/4]}).
request_more_data(Socket, More, Acc, State) ->
%% TODO: `{otp, select_read}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: should this be addressed here? Or maybe in a follow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about a followup. It's not entirely clear this will prove useful, and at very least need experimentation.

%%--------------------------------------------------------------------
%% Send data

-spec send(non_neg_integer(), iodata(), state()) -> {ok, state()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typespec does not cover {ok, {sock_error, _}, state()}.

{ok, queue_send(Handle, Rest, State)};
{select, _Info} ->
%% Totally congested, keep the deadline.
NState = State#state{sockstate = SS#congested{sendq = IoData}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: don't we need to update/set #congested.handle to the freshly created handle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find! That's most likely a bug.

@keynslug keynslug force-pushed the perf/EMQX-14333/fanout-part-iv branch 3 times, most recently from 8cefbac to b072f42 Compare July 18, 2025 16:14
qzhuyan
qzhuyan previously approved these changes Jul 22, 2025

-type parse_result() ::
{more, parse_state()}
%% Need more bytes out of stream, `0` means it's unclear how much more.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is counter-intuitive, maybe error prone.

More = 0, is nomore to me.

I prefer type unknown | non_neg_integer()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree about explicitness / error-proneness, unknown and 0 have no practical difference for callers, at least for now: i.e. there's no such state as "we don't need to read byte stream anymore". And using 0 avoids an extra mapping step in emqx_socket_connection.

I prefer type unknown | non_neg_integer()

Perhaps you meant unknown | pos_integer()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 566e73a.

ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqtt:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
ok = emqtt:publish(C2, <<"TopicA/B">>, <<"qos 0">>, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, honestly I think old code is more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! To me, it was always an additional cognitive load to understand how nth(3, ?TOPICS) and nth(4, ?TOPICS) relate to nth(6, ?WILD_TOPICS) in one testcase, or why nth(7, ?WILD_TOPICS) and nth(1, ?WILD_TOPICS) were specifically chosen for "overlapping subscriptions" testcase, etc.

_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
{error, not_supported}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some description to the config schema desc?
e.g. when choose to use socket (instead of gen_tcp), the socket options cannot be updated from Dashboard or API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of TCP options can be updated I think, it's tcp_backend that cannot be changed without running restart_listener/4. I'll clarify the latter, unless I'm missing something.

%% This module interacts with the transport layer of MQTT
%% Transport: esockd_socket.
%%
%% NOTE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add same note to emqx_connection too ?

-compile({inline, [request_more_data/4]}).
request_more_data(Socket, More, Acc, State) ->
%% TODO: `{otp, select_read}`.
case sock_async_recv(Socket, More) of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this return {complete, ComplteInfo}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only when running on Windows I believe. What do you suggest to do about it?

Copy link
Member
@zmstone zmstone Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do not allow this backend on Window when checking schema.

{ok, Data} ->
NState = start_idle_timer(State),
handle_recv({recv_more, Data}, Parent, NState);
{select, _SelectInfo} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is no need to kee SelectInfo for the next call ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, socket keeps no special context in recv's select_info()s, it's just {select_info, recv, Handle}. Supposedly, one can just socket:recv(Socket, 0, ...) again once requested number of bytes has arrived, signaled through {$socket, Socket, select, ...}.

Connection loop needs to disambiguate recv and send "selects" though, and for that we only keep send handles (in #congested{} structure) but not recv handles.

@keynslug keynslug requested a review from zmstone July 23, 2025 10:53
zmstone
zmstone previously approved these changes Jul 23, 2025
keynslug and others added 24 commits July 23, 2025 16:19
Behavior is slightly different from `emqx_connection`: there's no
equivalent to `high_watermark`, so connection is never blocked on
send.
…rror

Effectively mirroring `emqx_socket_connection` behavior.
Co-authored-by: zmstone <zmstone@gmail.com>
@keynslug keynslug force-pushed the perf/EMQX-14333/fanout-part-iv branch from b2eca06 to 80429fa Compare July 23, 2025 14:20
@keynslug keynslug merged commit c9cd26d into emqx:release-60 Jul 24, 2025
436 of 438 checks passed
@keynslug keynslug deleted the perf/EMQX-14333/fanout-part-iv branch July 24, 2025 08:06
@emqxqa
Copy link
emqxqa commented Jul 28, 2025

TestExecution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants

0