8000 [Messenger] Consumers not showing in RabbitMQ admin?! · Issue #30259 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Consumers not showing in RabbitMQ admin?! #30259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tinogo opened this issue Feb 15, 2019 · 24 comments
Closed

[Messenger] Consumers not showing in RabbitMQ admin?! #30259

tinogo opened this issue Feb 15, 2019 · 24 comments

Comments

@tinogo
Copy link
tinogo commented Feb 15, 2019

Symfony version(s) affected: 4.2.0 - 4.2.3

Description
When running the messenger:consume-messages command, the registered consumer(s) will not show in admin area of RabbitMQ, although the messages get consumed.

That's the current state:
Actual

And this is how it should look like:
Expectation

Is this an actual bug, is it by design or am I doing something wrong?

@Guikingone
Copy link
Contributor

It could happen if you use Docker, sometimes, the channel is created and never receive any message.

This is due to a configuration issue:

  • Sometimes, Messenger can accept the service name or ask for the local IP (which can change depending on Docker-Machine usage, etc).

Plus, when you use messenger:consume-messages, you're actually requesting Messenger to consume the messages aka cleaning the queue, so that's normal that RabbitMQ doesn't display the messages (maybe considering persisting every message?).

Problem is, what if you're looking at the logs? As far as I know from now, it could come from a configuration issue when you start RabbitMQ.

@tinogo
Copy link
Author
tinogo commented Feb 19, 2019

Hi @Guikingone,

thank you for your reply.
We've debugged this a little bit further now and the culprit lies in Symfony\Component\Messenger\Transport\AmqpExt\Connection::ack() resp. Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::receive() in Line 62.

So - the messenger library doesn't actually consume messages the "RabbitMQ-way". Hence the Connection should provide a consume-Method, where the \AMQPQueue::consume()-method gets called. In this way we would be able to actually see the subscribed message consumers (as mocked up in my second screenshot).

@Guikingone
Copy link
Contributor

Ok, I understand the main problem.

Maybe we can improve this feature by calling the correct method, @sroze don't know if it's something that we can improve?

I can take a look if you want.

@tinogo
Copy link
Author
tinogo commented Feb 21, 2019

Hi @makasim,

thank you for the - this got my a good insight, why the current approach has been chosen.
And after reading this comment #26632 (comment), I think now is time to reevaluate on this topic and hopefully add the approach using the consume-method. 😄

@meshenka
Copy link
meshenka commented Apr 6, 2019

Thanks for the insight, i was wondering why my consumer does not appears in RabbitMQ

@Napche
Copy link
Napche commented Apr 10, 2019

My solution was adding my own RabbitMQTransportFactory and RabbitMQTransport.
It would have been cleaner if properties in AmqpTransport would have been protected instead of private ;-)

<?php

namespace App\Queue;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;

class RabbitMQTransportFactory extends AmqpTransportFactory
{
    /**
     * @var MonitorMessageHandler
     */
    private $handler;

    /**
     * @var Serializer
     */
    private $serializer;

    /**
     * RabbitMQTransportFactory constructor.
     *
     * @param MonitorMessageHandler $handler
     */
    public function __construct(MonitorMessageHandler $handler, Serializer $serializer)
    {
        $this->handler = $handler;
        $this->serializer = $serializer;
    }

    public function createTransport(string $dsn, array $options): TransportInterface
    {
        return new RabbitMQTransport(Connection::fromDsn($dsn, $options, true), $this->handler, $this->serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === \strpos($dsn, 'rabbitmq://');
    }
}
<?php

namespace App\Queue;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Envelope;

class RabbitMQTransport extends AmqpTransport
{
    /**
     * @var Connection
     */
    private $connection;

    /**
     * @var MonitorMessageHandler
     */
    private $handler;

    /**
     * @var Serializer
     */
    private $serializer;

    /**
     * @var AmqpSender;
     */
    private $sender;

    /**
     * RabbitMQTransport constructor.
     *
     * @param Connection            $connection
     * @param MonitorMessageHandler $handler
     * @param Serializer            $serializer
     */
    public function __construct(Connection $connection, MonitorMessageHandler $handler, Serializer $serializer)
    {
        $this->connection = $connection;
        $this->serializer = $serializer;
        $this->handler = $handler;
    }

    public function receive(callable $handler): void
    {
        $queue = $this->connection->queue();
        $queue->consume([$this, 'consume']);
    }

    public function consume(\AMQPEnvelope $envelope)
    {
        try {
            $decoded = $this->serializer->decode([
                'body' => $envelope->getBody(),
                'headers' => $envelope->getHeaders(),
            ]);
            \call_user_func($this->handler, $decoded->getMessage());

            $this->connection->ack($envelope);
        } catch (RejectMessageExceptionInterface $e) {
            $this->connection->reject($envelope);

            throw $e;
        } catch (\Throwable $e) {
            $this->connection->nack($envelope, AMQP_REQUEUE);

            throw $e;
        }
    }

    /**
     * {@inheritdoc}
     */
    public function send(Envelope $envelope): Envelope
    {
        return ($this->sender ?? $this->getSender())->send($envelope);
    }

    private function getSender()
    {
        return $this->sender = new AmqpSender($this->connection, $this->serializer);
    }

@chalasr
Copy link
Member
chalasr commented May 26, 2019

The transport does not rely on \AmqpQueue::consume() because it is blocking.
Having a blocking receiver makes the --time-limit/--memory-limit options of the messenger:consume command as well as the messenger:stop-workers command inefficient, as they all rely on the fact that the receiver returns immediately no matter if it finds a message or not. The consume worker is responsible for iterating until it receives a message to handle and/or until one of the stop conditions is reached. Thus, the worker's stop logic cannot be reached if it is stuck in a blocking call.

So yes, that's by design.

@chalasr chalasr closed this as completed May 26, 2019
OskarStark added a commit to symfony/symfony-docs that referenced this issue Aug 14, 2019
…up in the (rabbitmq) admin panel (smoench)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Information about why consumers do not show up in the (rabbitmq) admin panel

I wondered why the consumers were not shown up in the rabbitmq admin panel. I found following explanation symfony/symfony#30259 (comment) from @chalasr which should be documented.
<!--

If your pull request fixes a BUG, use the oldest maintained branch that contains
the bug (see https://symfony.com/roadmap for the list of maintained branches).

If your pull request documents a NEW FEATURE, use the same Symfony branch where
the feature was introduced (and `master` for features of unreleased versions).

-->

Commits
-------

27339f2 [Messenger] Information about why consumers do not show up in the (rabbimq) admin panel
@garex
Copy link
garex commented Oct 30, 2020

@chalasr Then it's not true consumer, that not visible in rabbit admin panel.

You should design another command, that do what it should do -- report that it's consumer, be visible in admin panel but without those useless --time-limit/--memory-limit options.

@chalasr
Copy link
Member
chalasr commented Oct 30, 2020

You should design another command, that do what it should do

@garex Those "useless" options is the best we could provide with all concerns taken into account around the design of the component (which has been experimental for 3 minor releases before being considered stable and proven useful).

Please keep in mind that nothing is owed here. Anyone can contribute and propose changes, you included.
Our process for feature requests starts by opening a ticket describing the suggested feature and wait for feedback from the community. if you want something to change, I suggest you to follow that process.

@garex
Copy link
garex commented Oct 30, 2020

@chalasr do you have some links, where to research why you choosen only read-once model instead of read-always?

@garex
Copy link
garex commented Oct 30, 2020

https://www.cloudamqp.com/blog/2020-07-14-rabbitmq-basic-consume-vs-rabbitmq-basic-get.html

https://www.rabbitmq.com/consumers.html#fetching

Fetching messages one by one is highly discouraged as it is very inefficient compared to regular long-lived consumers. As with any polling-based algorithm, it will be extremely wasteful in systems where message publishing is sporadic and queues can stay empty for prolonged periods of time.

@garex
Copy link
garex commented Oct 30, 2020

Chapter 5. Don’t get messages; consume them

https://livebook.manning.com/book/rabbitmq-in-depth/chapter-5/11

@Cafeine42
Copy link

Hello @chalasr

The problem of consuming messages with the get method. If the consumer loses the connection, the message will remain permanently unacked.
With the consume method, the message will return to the queue if the connection is lost after timeout.

There is a possibility of data loss and i don't think --time-limit/--memory-limit features are a enough good reason to allow that.
The documentation should include a disclaimer about this.

@jderusse
Copy link
Member

The problem of consuming messages with the get method. If the consumer loses the connection, the message will remain permanently unacked.

Could you elaborate?

From the documentation (https://github.com/php-amqp/php-amqp/blob/df1241852b359cf12c346beaa68de202257efdf1/stubs/AMQPQueue.php#L155-L177) the message won't be acked, unless we call the get method with AMQP_NOPARAM flag (which is not the case in the current implementation)

@Cafeine42
Copy link

With get method, it isn't, it stays unacked indefinitely, that's the problem.

With pull, it return to ready state after the connection is lost.

https://www.rabbitmq.com/confirms.html#automatic-requeueing

@Cafeine42
Copy link

The redeliver_timeout option that has been added for doctrine and redis covers this issue.
If the consumer no longer gives any sign of life, the message will be reprocessed after the timeout.

This is the behavior that is missing with get & rabbitmq. The pull method guarantees just this behavior.

@Cafeine42
Copy link

I did a quick/dirty example here https://github.com/Cafeine42/symfony/commit/e80ed72816b34712e026340cece3fb33477b709c

  • A new command to execute in pull mode.
  • A BlockingWorker that will extend the Worker (some change to use private method).
  • AmqpReceiver implements a BlockingReceiverInterface instead of ReceiverInterface to handle the pull.
    (The AMQPQueue::consume doc say: If the callback is omitted or null is passed, then the messages delivered to this client will be made available to the first real callback registered. That allows one to have a single callback consuming from multiple queues.)

In the idea, all timeout and memory checks would only be performed at best upon receipt of a message.

But the resilience of RabbitMQ would be preserved.

@gquemener
Copy link
Contributor

Thanks @Cafeine42,

Your implementation makes sense to me 🤔
Any chance to have you submit a PR on the topic?

@garex
Copy link
garex commented Mar 31, 2021 via email

@gquemener
Copy link
Contributor

AFAIU, the problem is that consume() would block current thread until a message is consumed, thus preventing to perform checks like "memory" (memory usage should stay stable as long as no message is consumed, shouldn't it?) or "time" limit in the context of a low throughput queue.

In regards to heartbeat, from amqp-ext documentation:

heartbeats are limited to blocking calls only, so if there are no any operations on a connection or no active consumer set, connection may be closed by the broker as dead.

This means that, when using this php extension, and when heartbeat has been configured in the amqp connection, then heartbeat frame will only be sent (if necessary) when calling methods like get and consume. This is hardly a good strategy as hearbeat frame would be tightly coupled to the consumer process duration or the throughput on the queue (which would most likely result in undesired connection closing). A more reasonable strategy would be not use heartbeat with amqp-ext or use a non blocking io strategy to periodically send the frames.

Is it correct?

If so, I'm wondering if one should not prefer using a non-blocking IO solution (like this) to consume message through the basic.consume strategy (and thus limit the symfony/amqp-messenger to basic.get, as it is today). I'm not sure I have seen such limitation notice in the documentation. If not, I can help to underline this fact.

Disclaimer: I am no expert in amqp protocol, nor in non blocking IO solution, so feel free to adjust my statements and take part in the conversation if I'm talking non-sense here! Plus, the issue is closed, so I am not sure it will ever reach any audience.

@Cafeine42
Copy link

Hi @gquemener

I have created a bundle to implement this solution and i use it in production. I was planning on pushing a PR but got new priorities. It is still on my TODO list because I am very interested that this functionality be maintained over the long term but ... no ETA

@garex The hearthbeat configuration should be accessible, in any case I saw a heartbeat parameter somewhere.

The bundle https://github.com/Cafeine42/amqp-messenger

8000

@B-Galati
Copy link
Contributor
B-Galati commented Sep 1, 2021

I just encounter the issue where my messages are blocked with status "unacked".

@B-Galati
Copy link
Contributor
B-Galati commented Sep 1, 2021

Would it possible to add an option in AMQP transport to use consume mod instead?

--time-limit and --memory-limit could then behave differently for this transport?

@B-Galati
Copy link
Contributor
B-Galati commented Sep 1, 2021

Or perhaps a non-blocking driver could be used instead?
Like https://github.com/php-amqplib/php-amqplib?
Or https://github.com/jakubkulhan/bunny?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

0