8000 [Messenger] Blocking consumer by melihovv · Pull Request #47226 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Blocking consumer #47226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

melihovv
Copy link
@melihovv melihovv commented Aug 8, 2022
Q A
Branch? 7.3
Bug fix? no
New feature? yes
Deprecations? no
Tickets Fix #30259
License MIT
Doc PR It will be created later if this PR will be approved and merged

Introduced new mode for messenger:consume command. This mode is useful for RabbitMq, because more efficient method of fetching messages is used (consume instead of get).

https://www.rabbitmq.com/consumers.html#fetching

Fetching messages one by one is highly discouraged as it is very inefficient compared to regular long-lived consumers. As with any polling-based algorithm, it will be extremely wasteful in systems where message publishing is sporadic and queues can stay empty for prolonged periods of time.

I know that current architecture of messenger component is not very suited for blocking connection, --time-limit/--memory-limit options and messenger:stop-workers will not work if there are no messages in queue. But I think that is ok, these options/command will take effect as soon as any message will be received.

My work is based on https://github.com/Cafeine42/amqp-messenger, in which a lot of code is duplicated and very difficult to maintain. I hope this PR will be merged and this functionality will be maintained by community and symfony team.

@carsonbot
Copy link

Hey!

I see that this is your first PR. That is great! Welcome!

Symfony has a contribution guide which I suggest you to read.

In short:

  • Always add tests
  • Keep backward compatibility (see https://symfony.com/bc).
  • Bug fixes must be submitted against the lowest maintained branch where they apply (see https://symfony.com/releases)
  • Features and deprecations must be submitted against the 6.2 branch.

Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change.

When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor!
If this PR is merged in a lower version branch, it will be merged up to all maintained branches within a few days.

I am going to sit back now and wait for the reviews.

Cheers!

Carsonbot

@mrjingles
Copy link
mrjingles commented Aug 10, 2022

Hey @melihovv, thanks for your contribution. That symfony-messenger is only able to use basic.get() and not basic.consume() is very annoying. So this would be a great improvement. But one question - why is it not an option for you to have pullFromQueue() method, to keep up with the --queues parameter?

From the docs (https://www.rabbitmq.com/consumers.html#fetching):

Fetching messages one by one is highly discouraged as it is very inefficient compared to regular long-lived consumers. As with any polling-based algorithm, it will be extremely wasteful in systems where message publishing is sporadic and queues can stay empty for prolonged periods of time.

@melihovv
Copy link
Author

Hi, @mrjingles! I'm glad that not only me is interested in this functionality :)

But one question - why is it not an option for you to have pullFromQueue() method, to keep up with the --queues parameter?

You are right, there is no problem to add pullFromQueue() method, I will do it.

@melihovv
Copy link
Author

Don't know why tests are failed on low-deps, but pass on high-deps. Help needed.

@melihovv melihovv force-pushed the messenger_consume_6_2 branch from cb7d6e9 to 77efbe4 Compare August 30, 2022 05:52
@melihovv melihovv force-pushed the messenger_consume_6_2 branch from 67c1f24 to fab295b Compare September 7, 2022 15:08
@carsonbot carsonbot changed the title blocking messenger consumer [Messenger] blocking messenger consumer Sep 7, 2022
@nicolas-grekas nicolas-grekas modified the milestones: 6.2, 6.3 Nov 5, 2022
@joaojacome
Copy link

Is this being considered for Symfony 6.3?

@melihovv
Copy link
Author

Is this being considered for Symfony 6.3?

yes

@Gyvastis
Copy link

Awesome! Glad to see this is being fixed

@nicolas-grekas nicolas-grekas modified the milestones: 6.3, 6.4 May 23, 2023
@edumarques
Copy link

Hey guys, I saw @nicolas-grekas changed the milestone to 6.4, not including this feature in 6.3. Is it definitive, or any chance to be postponed again? I'm asking because my team is also in need of it. Thanks!

@jdreesen
Copy link
Contributor

Now that Symfony 6.3 has already been released and this is a new feature, it will have to wait for 6.4, I think.

melihovv added 4 commits July 5, 2023 00:36
; Conflicts:
;	src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php
;	src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php
;	src/Symfony/Component/Messenger/Worker.php
@melihovv melihovv force-pushed the messenger_consume_6_2 branch from fab295b to e579ac6 Compare July 4, 2023 21:37
@nicolas-grekas nicolas-grekas modified the milestones: 6.4, 7.1 Nov 15, 2023
@siemendev
Copy link

Hi everyone!
I'm working on a project that has insanely high throughput in its rabbitmq queues and we recently migrated all queue workers to the symfony messenger.
While it's definably our fault not having done our due diligence before this migration, now we find ourselves in the situation where we need to use consumers instead of basic.get and realise that messenger is not capable of that.

This PR looks very promising, but seeing that the code review has not yet been reviewed and the whole topic bounces from release to release does not look like this is getting merged anyway soon. Is there any way for me to help with this issue or are we waiting for the core maintainers team to take a look into this?

I need to decide in the near future if we should implement our own amqp consumer or wait for this to be merged and would love to get a little heartbeat from the contributors in here to get a feel for when (or even if) it gets merged. Please do not get offended by this, I know it sucks when people are just trying to pressure issues, I really don't aim for pressuring this - just a friendly ask 😄

Let me know if there's a way for me to support you with this!

@michaelklishin
Copy link

Note that basic.get been recommended against for more than a decade. RabbitMQ docs these days are very explicit.

@dada-amater
Copy link

Please merge this PR!

@xabbuh xabbuh modified the milestones: 7.1, 7.2 May 15, 2024
@githoober
Copy link

What prevents this PR from merging? Is there anything we can do to make it happen?

@edumarques
Copy link

Honestly, the fact that this merge is being successively postponed for more than 2 years without any given reason shows a real negative point on community orientation...

@chalasr
Copy link
Member
chalasr commented Oct 16, 2024

We are sorry for the lack of feedback on this, we do our best to move contributions forward according to each one’s availability and familiarity/interest in the proposed changes.

Personally this PR got totally out of my radar until now. I’d be happy to get this reviewed and ultimately merged hopefully in 7.3 whose development phase starts in December.

Thanks for reviving the topic.

@siemendev
Copy link

A little update on my side: I've manually patched our project for a PoC with this branch to test everything and I feel like it gets very complicated to keep track on how many messages are actually in "buffer" (i don't know the actual term, but when the amqp sends it to the consumer and the consumer buffers them until processing the message). I could not find where the messages are actually stored when buffered and it is very intransparent what happens when the consumer needs to be stopped. If theres still messages in buffer and we stop the messenger, those messages might get lost (the messenger runs for as long as it needs to until the messsages are processed, but eventually some outside orchestrator (in our case k8s) will kill the process since it's way longer than usual (this is highly dependend on how many messages it's configured to buffer). I think these are not no-go criteria, but it will complicate the correct configuration of amqp consumers (at least a little) to prevent message loss.

I still think this is a good contribution. Maybe some more people can talk about their experiences with this?

@jorissteyn
Copy link
jorissteyn commented Oct 17, 2024

This feature would be a really great option to have. It will make Symfony Messenger better suited for larger deployments where reliability is important. Having a blocking consumer (using the consume primitive instead of get) is the only way to use the RabbitMQ "Single Active Consumer" feature, required for automatic fail-over.

Because consumers won't respond to kill signals while being blocked (i.e. while consuming an empty queue), applications need to implement some kind of "kill command" in order to properly stop consumers (for example during a deployment). That's really easy to implement, but ideally Symfony would provide in such a mechanism too. Even without that mechanism, I hope we can support blocking consumers in future versions of messenger.

@githoober
Copy link

@chalasr Any news on when this PR can be reviewed?

@Wirone
Copy link
Contributor
Wirone commented Dec 13, 2024

In our company (large online service with huge AMQP traffic) we're in the process of migrating workers to Messenger. We do it with full awareness of current implementation, because right now we still consume messages with legacy workers, but they now create new, Messenger-ready messages and proxy them to new handlers, so we're technically ready (well, almost) for using messenger:consume. However, the fact that Messenger is not counted as a consumer makes observability harder, and polling for message all the time may be inefficient in our case. So we're waiting for this to be merged and released. I just wanted to provide this feedback and say THANK YOU to the original author and all the people that invested their time in this feature 🙏 ❤️ 🍻! Looking forward to this.

@githoober
Copy link

@melihovv Are you still interested in maintaining this PR? It looks like it now requires some changes to be merged from the upstream master to resolve conflicts.

@melihovv
Copy link
Author

I don't have much free time currently to work on it, maybe later
If anybody wants to continue my pr, feel free to do it 👍

@jorissteyn
Copy link
jorissteyn commented Dec 19, 2024

I've got a few insights to share. I believe the goals of this PR are very valid, but the approach is wrong. Let me explain.

The functionality we're looking for is not about blocking vs non-blocking but about push (consume) vs pull (get). The php-amqp extension only allowing blocking connections to the server when consuming instead of pulling is an undesirable property of that extension and I don't think Messenger should try to support that as a goal per se.

The problem with the blocking consume of php-amqp is, as others already mentioned, our rate limit functionality which won't work when there are no messages to consume. Even more important is how it interferes with signal handlers. When registering signal handlers (like messenger:consume does), the process will stop responding to signals. This means it's no longer possible to gracefully stop a consumer using SIGINT (or any other signal) and processes can only be stopped forcefully by external processes (like after a systemd stop timeout) using SIGTERM, possibly losing data when the consumer was actually processing a message.

The only solution I see is allowing people with advanced use-cases (performance, redundancy / single-active-consumer, observability) to use php-amqplib. That library supports push on a non-blocking connection. It also has proven to be really solid and actively developed. As a bonus, it will allow early PHP upgrades without having to wait for someone to make php-amqp compatible.

In order to use php-amqplib in Symfony Messenger, we will still need to support transports that allow pushing messages to the application instead of the application pulling messages one by one. And that's what this PR is doing. But I don't think we should allow users to activate a blocking connection as an option. We should allow users to switch to a different transport.

In the coming weeks, I'll experiment with a php-amqplib transport in combination with the changes in this PR, and I’d love to hear your feedback and suggestions to further improve this.

@nesl247
Copy link
nesl247 commented Feb 24, 2025

@jorissteyn have you had any luck with your experiment?

@OskarStark OskarStark changed the title [Messenger] blocking messenger consumer [Messenger] Blocking consumer Feb 25, 2025
@jwage
Copy link
Contributor
jwage commented Apr 12, 2025

For anyone who is interested, I am working on a messenger transport that uses php-amqplib/php-amqplib instead of php-amqp. The main difference is that it uses basic_consume() instead of basic_get() and does not use polling. The queue wait time with this approach is much lower and without the polling, idle consumers don't cause load on the rabbitmq server. It is still very early and I need some help from others to help test it in other projects and give feedback. It is working in my project but need help testing other use cases. Thanks!

https://github.com/jwage/phpamqplib-messenger/

@jorissteyn
Copy link

@jwage Awesome work! Following it now. Happy to do some early testing in real-world scenarios once it's ready.

@jorissteyn Have you had any luck with your experiment?

@nesl247 I was planning to try out the Messenger-Enqueue-Transport with the amqp_lib transport, but I haven’t experimented with it yet.

I believe what @jwage is trying is the best approach. I thought required too much effort to get all the necessary details right. But don’t take that as discouragement. Looking at it now, it actually seems like an interesting and fun challenge, and definitely doable. Go for it!

Whichever way we fix it, I think it should eventually become the preferred transport for RabbitMQ. The reason consume blocks the process is due to a design flaw in the rabbitmq-c library, see this and this issue. It doesn’t look like a fix for that is coming any time soon.

The php-amqplib library, on the other hand, properly implements asynchronous consumers. Since both libraries are actively maintained by VMware, it makes sense to prefer php-amqplib over the extension especially for advanced use cases. It also makes upgrading PHP versions easier.

@jorissteyn
Copy link

Oh and regarding the original goal of this PR: these changes could make Messenger support synchronous/blocking consumers but I suggest not merging this, unless other use-cases justify the effort.

@chalasr
Copy link
Member
chalasr commented Apr 13, 2025

I was planning to try out the Messenger-Enqueue-Transport with the amqp_lib transport, but I haven’t experimented with it yet.

Given that sroze's messenger-enqueue-bridge is not actively maintained anymore and that enqueue packages suffers from important maintenance issues too + Symfony will probably never consider adding it as a dependency for many different reasons, I think it's not worth spending much effort on them.

Whichever way we fix it, I think it should eventually become the preferred transport for RabbitMQ. The reason consume blocks the process is due to a design flaw in the rabbitmq-c library, see alanxz/rabbitmq-c#138 and alanxz/rabbitmq-c#370. It doesn’t look like a fix for that is coming any time soon.

I tend to agree, it's probably too soon to discuss though as there are many important milestones to reach before this can happen.

I'm closing this PR as it is stale for 3 years now, thanks everyone for the great discussion.
Let's focus on improving @jwage's experiment so it eventually makes it into Symfony as the implementation looks better than what's proposed here on many aspects.

@chalasr chalasr closed this Apr 13, 2025
@jwage
Copy link
Contributor
jwage commented Apr 13, 2025

I think it's worth mentioning that I did test this PR in my app but you can’t stop the worker safely with pcntl signals. So it’s a no go just for that alone imo. That is what led me to using phpamqplib.

Maybe one day someone will get the appetite and ability to fix the consume() method in the c extension but until then I don't think you can use consume() in the php-amqp extension for anything real.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Messenger] Consumers not showing in RabbitMQ admin?!
0