-
Notifications
You must be signed in to change notification settings - Fork 10.3k
Deduplicate group messages across connections #61810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Deduplicate group messages across connections #61810
Conversation
Thanks for your PR, @@datwelk. Someone from the team will get assigned to your PR shortly and we'll get it reviewed. |
@dotnet-policy-service agree company="LIVEOP B.V." |
List<Task>? tasks = new List<Task>(); | ||
|
||
var connections = new ConcurrentDictionary<string, HubConnectionContext>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know too little about the code in order to tell if there's an alternative approch for the fix.
But I'm bothered about these allocation. So I guess they should be cached an re-used. In a quite simple manner that could be done as something like:
public class Foo
{
private static List<Task>? s_tasks;
private static ConcurrentDictionary<string, HubConnectionContext>? s_connections;
public Task SendAsync()
{
List<Task> tasks = Interlocked.Exchange(ref s_tasks, null) ?? [];
ConcurrentDictionary<string, HubConnectionContext> connections = Interlocked.Exchange(ref s_connections, null) ?? [];
try
{
// Use task and connections
return Task.WhenAll(tasks);
}
finally
{
tasks.Clear();
connections.Clear();
Interlocked.CompareExchange(ref s_tasks, tasks, null);
Interlocked.CompareExchange(ref s_connections, connections, null);
}
}
}
29e1aa1
to
c393463
Compare
if (connections == null) | ||
{ | ||
connections = new HashSet<string>(); | ||
} | ||
connections.Add(connection.Key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this could be written also as 1-liner.
if (connections == null) | |
{ | |
connections = new HashSet<string>(); | |
} | |
connections.Add(connection.Key); | |
(connections ?? []).Add(connection.Key); |
For me it's readability is the same.
In L239 can you also check if the group
has items?
If so, the check for connections == null
can be removed altogehter, and initialize the HashSet just before the foreach-loop in L241, because if the group has items, then Add is called unconditionally.
The HashSet could then be also initialized with the proper capacity, to avoid potential resizing and copying of the internal structures of HashSet.
@@ -226,7 +226,7 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me | |||
{ | |||
// Each task represents the list of tasks for each of the writes within a group | |||
List<Task>? tasks = null; | |||
SerializedHubMessage? message = null; | |||
HashSet<string>? connections = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on how much we want to put into perf / allocations a few thoughts:
- cache the HashSet? See the previous comment above for an simple approach -- but this is also no silver bullet, as it has (in the long term) an Gen2 -> Gen0 reference
- HashSet is O(1), but still a bit costly due to hash computation, bucket lookup, etc. So for a small amount of connections a simple array with a linear scan (O(n)) could be faster -- the logic could be encapsulated in a new type "FastHashSet" which starts with the array-approach and switches to the HashSet when more items are present
Well, if that's a goal this can be done in a separate PR for sure.
if (tasks == null) | ||
{ | ||
tasks = new List<Task>(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashSet has a Count
property, so initialize the tasks
list to proper size before the loop, thus avoiding this if
.
{ | ||
foreach (var connection in connectionStore) | ||
{ | ||
if (connections == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can similar optimizations as above be applied here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And to the ToList() call below. I wonder if we shouldn't be using a pooled Span instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain more details about the design of your app where you're broadcasting a message to multiple groups that a single client might be subscribed to? Might it be better to have these clients subscribe to a separate group meant solely for the previously multi-group broadcasts? Even after this change, that would be more efficient.
This does seem like a nicer behavior than before. I can't think of a reason you'd want a client to receive duplicate messages just because they're in multiple groups the original message was sent to, but I'm not sure it's worth the extra complexity and performance hit if no one has complained since yesterday. However, I'm fine taking it if the perf impact is small.
@BrennanConroy @sebastienros How hard would it be to measure the perf impact of this change?
{ | ||
tasks = new List<Task>(); | ||
} | ||
tasks.Add(SendConnectionAsync(connectionId, methodName, args, cancellationToken)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to SendConnectionAsync reserializes the method invocation payload for each connection.
aspnetcore/src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs
Lines 194 to 196 in aa2e0dc
// We're sending to a single connection | |
// Write message directly to connection without caching it in memory | |
var message = CreateInvocationMessage(methodName, args); |
Prior to this change, SendToGroupConnections would call CreateSerializedInvocationMessage once, send the same serialized invocation payload to each client. I suspect that will have a far bigger performance impact than the HashSet, although I'd also like to pool that if we move forward with this change.
@@ -238,7 +238,26 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me | |||
var group = _groups[groupName]; | |||
if (group != null) | |||
{ | |||
DefaultHubLifetimeManager<THub>.SendToGroupConnections(methodName, args, group, null, null, ref tasks, ref message, cancellationToken); | |||
foreach (var connection in group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
foreach (var connection in group) | |
foreach (var (connectionId, _) in group) |
{ | ||
foreach (var connection in connectionStore) | ||
{ | ||
if (connections == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And to the ToList() call below. I wonder if we shouldn't be using a pooled Span instead.
Looks like this PR hasn't been active for some time and the codebase could have been changed in the meantime. |
@halter73 Thank you for your time and feedback! I came across an earlier reference to a similar issue from 2019: #14782. One suggestion there was to add a distinct parameter to the method - e.g., Let's say we have an entity called fruit, and users can subscribe to updates for all fruits, individual fruit types, or both. To keep the client simple, we don’t prevent overlapping subscriptions - e.g., subscribing to both “all fruits” and “oranges”. This means a single orange update may be received multiple times as it's broadcast to the "all fruits" and "oranges" groups. While we could try to handle this on the client side, doing so introduces significant complexity. For instance, if a user subscribes to “all fruits” but disables one type (like kiwi), we’d have to unsubscribe from “all fruits” and then individually subscribe to every other fruit type. Managing this becomes more cumbersome as more fruit types - or more entities in general - are added to the app. If introducing a distinct parameter and modifying the public API is the preferred approach, I’d be happy to proceed with that. Alternatively, I can implement the previously suggested workaround—please let me know your preference. |
Deduplicate SignalR group messages across connections
Description
This change modifies
DefaultHubLifetimeManager
andRedisHubLifetimeManager
to deduplicate SignalR group messages across connections. Previously, when a connection A is part of N-groups, and a SignalR message is sent to those N-groups, connection A will receive those messages N times. This changes instead checks which connections need to be addressed for a list of groups, and uniques them.Fixes #61809