-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Blocking consumer #47226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hey! I see that this is your first PR. That is great! Welcome! Symfony has a contribution guide which I suggest you to read. In short:
Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change. When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor! I am going to sit back now and wait for the reviews. Cheers! Carsonbot |
Hey @melihovv, thanks for your contribution. That symfony-messenger is only able to use From the docs (https://www.rabbitmq.com/consumers.html#fetching):
|
Hi, @mrjingles! I'm glad that not only me is interested in this functionality :)
You are right, there is no problem to add pullFromQueue() method, I will do it. |
cb7d6e9
to
77efbe4
Compare
67c1f24
to
fab295b
Compare
Is this being considered for Symfony 6.3? |
yes |
Awesome! Glad to see this is being fixed |
Hey guys, I saw @nicolas-grekas changed the milestone to 6.4, not including this feature in 6.3. Is it definitive, or any chance to be postponed again? I'm asking because my team is also in need of it. Thanks! |
Now that Symfony 6.3 has already been released and this is a new feature, it will have to wait for 6.4, I think. |
; Conflicts: ; src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php ; src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php ; src/Symfony/Component/Messenger/Worker.php
fab295b
to
e579ac6
Compare
Hi everyone! This PR looks very promising, but seeing that the code review has not yet been reviewed and the whole topic bounces from release to release does not look like this is getting merged anyway soon. Is there any way for me to help with this issue or are we waiting for the core maintainers team to take a look into this? I need to decide in the near future if we should implement our own amqp consumer or wait for this to be merged and would love to get a little heartbeat from the contributors in here to get a feel for when (or even if) it gets merged. Please do not get offended by this, I know it sucks when people are just trying to pressure issues, I really don't aim for pressuring this - just a friendly ask 😄 Let me know if there's a way for me to support you with this! |
Note that |
Please merge this PR! |
What prevents this PR from merging? Is there anything we can do to make it happen? |
Honestly, the fact that this merge is being successively postponed for more than 2 years without any given reason shows a real negative point on community orientation... |
We are sorry for the lack of feedback on this, we do our best to move contributions forward according to each one’s availability and familiarity/interest in the proposed changes. Personally this PR got totally out of my radar until now. I’d be happy to get this reviewed and ultimately merged hopefully in 7.3 whose development phase starts in December. Thanks for reviving the topic. |
A little update on my side: I've manually patched our project for a PoC with this branch to test everything and I feel like it gets very complicated to keep track on how many messages are actually in "buffer" (i don't know the actual term, but when the amqp sends it to the consumer and the consumer buffers them until processing the message). I could not find where the messages are actually stored when buffered and it is very intransparent what happens when the consumer needs to be stopped. If theres still messages in buffer and we stop the messenger, those messages might get lost (the messenger runs for as long as it needs to until the messsages are processed, but eventually some outside orchestrator (in our case k8s) will kill the process since it's way longer than usual (this is highly dependend on how many messages it's configured to buffer). I think these are not no-go criteria, but it will complicate the correct configuration of amqp consumers (at least a little) to prevent message loss. I still think this is a good contribution. Maybe some more people can talk about their experiences with this? |
This feature would be a really great option to have. It will make Symfony Messenger better suited for larger deployments where reliability is important. Having a blocking consumer (using the Because consumers won't respond to kill signals while being blocked (i.e. while consuming an empty queue), applications need to implement some kind of "kill command" in order to properly stop consumers (for example during a deployment). That's really easy to implement, but ideally Symfony would provide in such a mechanism too. Even without that mechanism, I hope we can support blocking consumers in future versions of messenger. |
@chalasr Any news on when this PR can be reviewed? |
In our company (large online service with huge AMQP traffic) we're in the process of migrating workers to Messenger. We do it with full awareness of current implementation, because right now we still consume messages with legacy workers, but they now create new, Messenger-ready messages and proxy them to new handlers, so we're technically ready (well, almost) for using |
@melihovv Are you still interested in maintaining this PR? It looks like it now requires some changes to be merged from the upstream master to resolve conflicts. |
I don't have much free time currently to work on it, maybe later |
I've got a few insights to share. I believe the goals of this PR are very valid, but the approach is wrong. Let me explain. The functionality we're looking for is not about blocking vs non-blocking but about push (consume) vs pull (get). The The problem with the blocking The only solution I see is allowing people with advanced use-cases (performance, redundancy / single-active-consumer, observability) to use In order to use In the coming weeks, I'll experiment with a |
@jorissteyn have you had any luck with your experiment? |
For anyone who is interested, I am working on a messenger transport that uses |
@jwage Awesome work! Following it now. Happy to do some early testing in real-world scenarios once it's ready.
@nesl247 I was planning to try out the Messenger-Enqueue-Transport with the amqp_lib transport, but I haven’t experimented with it yet. I believe what @jwage is trying is the best approach. I thought required too much effort to get all the necessary details right. But don’t take that as discouragement. Looking at it now, it actually seems like an interesting and fun challenge, and definitely doable. Go for it! Whichever way we fix it, I think it should eventually become the preferred transport for RabbitMQ. The reason consume blocks the process is due to a design flaw in the rabbitmq-c library, see this and this issue. It doesn’t look like a fix for that is coming any time soon. The php-amqplib library, on the other hand, properly implements asynchronous consumers. Since both libraries are actively maintained by VMware, it makes sense to prefer php-amqplib over the extension especially for advanced use cases. It also makes upgrading PHP versions easier. |
Oh and regarding the original goal of this PR: these changes could make Messenger support synchronous/blocking consumers but I suggest not merging this, unless other use-cases justify the effort. |
Given that sroze's messenger-enqueue-bridge is not actively maintained anymore and that enqueue packages suffers from important maintenance issues too + Symfony will probably never consider adding it as a dependency for many different reasons, I think it's not worth spending much effort on them.
I tend to agree, it's probably too soon to discuss though as there are many important milestones to reach before this can happen. I'm closing this PR as it is stale for 3 years now, thanks everyone for the great discussion. |
I think it's worth mentioning that I did test this PR in my app but you can’t stop the worker safely with pcntl signals. So it’s a no go just for that alone imo. That is what led me to using phpamqplib. Maybe one day someone will get the appetite and ability to fix the consume() method in the c extension but until then I don't think you can use consume() in the php-amqp extension for anything real. |
Introduced new mode for
messenger:consume
command. This mode is useful for RabbitMq, because more efficient method of fetching messages is used (consume
instead ofget
).https://www.rabbitmq.com/consumers.html#fetching
I know that current architecture of messenger component is not very suited for blocking connection,
--time-limit/--memory-limit
options andmessenger:stop-workers
will not work if there are no messages in queue. But I think that is ok, these options/command will take effect as soon as any message will be received.My work is based on https://github.com/Cafeine42/amqp-messenger, in which a lot of code is duplicated and very difficult to maintain. I hope this PR will be merged and this functionality will be maintained by community and symfony team.