-
Notifications
You must be signed in to change notification settings - Fork 2.4k
feat(socket): implement esockd_socket connection module
#15451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(socket): implement esockd_socket connection module
#15451
Conversation
5483eef to
23a9313
Compare
d8f2480 to
c25571c
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## release-60 #15451 +/- ##
==============================================
- Coverage 84.51% 84.47% -0.04%
==============================================
Files 1113 1114 +1
Lines 78238 78682 +444
==============================================
+ Hits 66120 66466 +346
- Misses 12118 12216 +98 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c25571c to
5392456
Compare
rel/i18n/emqx_schema.hocon
Outdated
| - `gen_tcp`: Standard backend, in use since EMQX 5.0 release. | ||
| - `socket`: Experimental backend, looking to improve message latency and compute resource usage. | ||
| Note that some `tcp_options` settings will have no effect when using this backend, e.g.: `high_watermark` and `send_timeout_close`.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not suggested for IPv6 as otp doc said it is not fully tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it says "experimental". Note that the same doc also does not say the same about {inet_backend, socket} in gen_tcp, which suggests that relevant API surface is well-tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌶️
(still reading)
| -endif. | ||
|
|
||
| -elvis([{elvis_style, used_ignored_variable, disable}]). | ||
| -elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale directive? Or perhaps:
| -elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]). | |
| -elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_socket_connection]}}]). |
?
| handle_msg({'$socket', _Socket, abort, {_Handle, Reason}}, State) -> | ||
| handle_info({sock_error, Reason}, State); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: could this be a stale message if _Handle in #congested.handle is different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, IIUC any abort basically means someone closed the socket / there was a socket error. Maybe I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was asking if it is possible that a stale abort arrives here, but is no longer relevant, so it wouldn't be necessary to log it? e.g. the handle changed, and the Handle in this message has been abandoned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sock_error later forces the connection process to close and abandon the socket. But in theory we may get 2 aborts per each outstanding select (one for recv, one for send). So, spot on, I'll double-check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 ![]()
| emqx_congestion:cancel_alarms(?MODULE, State), | ||
| emqx_channel:terminate(Reason, Channel1), | ||
| close_socket_ok(State), | ||
| ?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason}) | |
| ?TRACE("SOCKET", "emqx_socket_connection_terminated", #{reason => Reason}) |
?
| ?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason}) | ||
| catch | ||
| E:C:S -> | ||
| ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe add the connection module as metadata, to differentiate between this module and emqx_connection?
Though, seeing how many tracepoints and logs would need to change, maybe it could be added to the process logger metadata. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, did the latter.
|
|
||
| -compile({inline, [request_more_data/4]}). | ||
| request_more_data(Socket, More, Acc, State) -> | ||
| %% TODO: `{otp, select_read}`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: should this be addressed here? Or maybe in a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about a followup. It's not entirely clear this will prove useful, and at very least need experimentation.
| %%-------------------------------------------------------------------- | ||
| %% Send data | ||
|
|
||
| -spec send(non_neg_integer(), iodata(), state()) -> {ok, state()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: typespec does not cover {ok, {sock_error, _}, state()}.
| {ok, queue_send(Handle, Rest, State)}; | ||
| {select, _Info} -> | ||
| %% Totally congested, keep the deadline. | ||
| NState = State#state{sockstate = SS#congested{sendq = IoData}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: don't we need to update/set #congested.handle to the freshly created handle here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find! That's most likely a bug.
8cefbac to
b072f42
Compare
apps/emqx/src/emqx_frame.erl
Outdated
|
|
||
| -type parse_result() :: | ||
| {more, parse_state()} | ||
| %% Need more bytes out of stream, `0` means it's unclear how much more. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is counter-intuitive, maybe error prone.
More = 0, is nomore to me.
I prefer type unknown | non_neg_integer()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree about explicitness / error-proneness, unknown and 0 have no practical difference for callers, at least for now: i.e. there's no such state as "we don't need to read byte stream anymore". And using 0 avoids an extra mapping step in emqx_socket_connection.
I prefer type
unknown | non_neg_integer()
Perhaps you meant unknown | pos_integer()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 566e73a.
| ok = emqtt:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), | ||
| {ok, _} = emqtt:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), | ||
| {ok, _} = emqtt:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), | ||
| ok = emqtt:publish(C2, <<"TopicA/B">>, <<"qos 0">>, 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, honestly I think old code is more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! To me, it was always an additional cognitive load to understand how nth(3, ?TOPICS) and nth(4, ?TOPICS) relate to nth(6, ?WILD_TOPICS) in one testcase, or why nth(7, ?WILD_TOPICS) and nth(1, ?WILD_TOPICS) were specifically chosen for "overlapping subscriptions" testcase, etc.
| _Different -> | ||
| %% TODO | ||
| %% Again, we're not strictly required to drop live connections in this case. | ||
| {error, not_supported} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add some description to the config schema desc?
e.g. when choose to use socket (instead of gen_tcp), the socket options cannot be updated from Dashboard or API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of TCP options can be updated I think, it's tcp_backend that cannot be changed without running restart_listener/4. I'll clarify the latter, unless I'm missing something.
| %% This module interacts with the transport layer of MQTT | ||
| %% Transport: esockd_socket. | ||
| %% | ||
| %% NOTE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add same note to emqx_connection too ?
| -compile({inline, [request_more_data/4]}). | ||
| request_more_data(Socket, More, Acc, State) -> | ||
| %% TODO: `{otp, select_read}`. | ||
| case sock_async_recv(Socket, More) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this return {complete, ComplteInfo}?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only when running on Windows I believe. What do you suggest to do about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do not allow this backend on Window when checking schema.
| {ok, Data} -> | ||
| NState = start_idle_timer(State), | ||
| handle_recv({recv_more, Data}, Parent, NState); | ||
| {select, _SelectInfo} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so there is no need to kee SelectInfo for the next call ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, socket keeps no special context in recv's select_info()s, it's just {select_info, recv, Handle}. Supposedly, one can just socket:recv(Socket, 0, ...) again once requested number of bytes has arrived, signaled through {$socket, Socket, select, ...}.
Connection loop needs to disambiguate recv and send "selects" though, and for that we only keep send handles (in #congested{} structure) but not recv handles.
This is useful for `socket`-based connections to request subsequent asynchronous socket reads.
Behavior is slightly different from `emqx_connection`: there's no equivalent to `high_watermark`, so connection is never blocked on send.
…rror Effectively mirroring `emqx_socket_connection` behavior.
Co-authored-by: zmstone <zmstone@gmail.com>
b2eca06 to
80429fa
Compare
Part of EMQX-14333.
Release version: 6.0.0
Summary
This PR introduces
tcp_backendlistener option, exclusively to TCP listeners, and employs emqx/esockd#204 with a separate connection module namedemqx_socket_connectionwhentcp_backendissocket. This shows consistently better wide-fanout performance, especially when combined with async shard dispatch introduced in #15421, see report.This new connection module is identical to
emqx_connectionmodule in most of aspects, but made separate primarily because of:emqx_connectionmight have made the latter too bloated, harder to test and reason about.Obvious downside is the need to keep changes to connection modules in sync. This requires additional development processes, or perhaps some CI magic, otherwise code will eventually diverge. Ideas are welcome.
See individual commits for details.
Notes:
iovec()experiments that I decided to include in this PR, because it shows small consistent improvements in average message processing latency (~2% less) and VM reductions (~3.5% less).esockd_socketlisteners, there's ongoing work in Erlang/OTP to support familiar SSL API withsocketsockets.PR Checklist
changes/ee/(feat|perf|fix|breaking)-<PR-id>.en.mdfiles