-
Notifications
You must be signed in to change notification settings - Fork 140
[sender] refactor for a simpler multi-thread behavior #209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I very much like this! I like how simpler it looks once is_closed becomes a terminal state: once the instance gets there, there's no going back.
I also very much like how the mutex usage is kept off the main path of the code.
| class FlushQueue < Queue | ||
| end | ||
| class CloseQueue < Queue | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these aren't used elsewhere, I suggest putting them inside the Sender class itself.
Also (very minor) if you want a single-line definition, you can use:
FlushQueue = Class.new(Queue) # OR
class FlushQueue < Queue; end| blocking_queue = FlushQueue.new | ||
| channel << blocking_queue | ||
| blocking_queue.pop # wait for the bg thread to finish its work | ||
| blocking_queue.close if CLOSEABLE_QUEUES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I'm growing increasingly unconvinced this whole business with the CLOSEABLE_QUEUES
8000
is worth it. It effectively creates two code paths for different Ruby versions, but it's not like we're going to drop support for the old Rubies soon (#close is a Ruby 2.3 feature, and we're still fighting to drop 2.0).
Would it be simpler to just remove this entirely? It doesn't even seem that it would particularly improve performance either.
| @sender_thread = Thread.new(&method(:send_loop)) | ||
| end | ||
|
|
||
| def rendez_vous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The forwarder will probably need to be updated to not use #rendez_vous anymore, right? (To be honest, I don't quite understand the use-case of having both #sync_with_outbound_io and #flush at the top-level).
| # Initialize and get the thread's sync queue | ||
| queue = (Thread.current[:statsd_sync_queue] ||= Queue.new) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lost this caching-the-queue behavior in the refactoring, which doesn't seem like an issue, but just doublechecking by asking if this is ok from a performance pov (I actually am not sure how expensive it is to create queues, probably not a lot)
| # Compatibility with `Sender` | ||
| def start() | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, neither Sender nor SingleThreadedSender use #start, so perhaps it would make sense to just remove them?
| channel << blocking_queue | ||
| blocking_queue.pop # wait for the bg thread to finish its work | ||
| blocking_queue.close if CLOSEABLE_QUEUES | ||
| sender_thread.join(3) # wait for completion, timeout after 3 seconds | ||
| # TODO(remy): should I close `channel` here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I suggested above, I think it'd be simpler to just not use close; if we do, calling close here may be problematic if two stops get called concurrently, but the background thread is taking long to finish. E.g. something like T1: acquire mutex -> tell background thread to stop -> timeout join -> call close -> release mutex; T2: acquire mutex -> does not see previous @is_closed -> tries to write to channel -> channel has been closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should the stop behavior be a bit more flexible? E.g. configurable timeout, or optionally run a block to decide what to do.
DRAFT
Idea behind this PR is to improved the
Senderobjects lifecycles (especially themessage_queueand thesender_thread) in order to have a simpler implementation not having to always check for their existence.On top of that, this PR is synchronizing the
#stop/close mechanism for it to be blocking. The worse scenario with multiple calls to close/add in parallel would now be that metrics submitted after a close call would not be flushed.