8000 Reconnection guidelines - graceful dropped connection handling on consumer · Issue #444 · php-amqplib/php-amqplib · GitHub
[go: up one dir, main page]

Skip to content

Reconnection guidelines - graceful dropped connection handling on consumer #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
aaronpeterson opened this issue Nov 1, 2016 · 3 comments

Comments

@aaronpeterson
Copy link

Hoping to clarify best practices in reconnecting a consumer after a dropped connection, service outage, etc. Following this basic pattern:

class MyConsumer {

    protected $amqpChannel;
    protected $amqpConnection;

    public function connect()
    {
        // ...set $this->amqpConnection, etc
    }

    public function consume() {
        $this->amqpChannel->basic_consume('my-queue', 'my-tag', false, false, false, false, function ($msg) {
               $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        });
        
        while (count($this->amqpChannel->callbacks)) {
            try {
                $this->amqpChannel->wait();
            } catch (\Exception $e) {
                // Suppose $e is \ErrorException with "errno=32" or "Broken pipe" in message
                $this->amqpConnection->reconnect();

                 // through all of this, $this->amqpConnection->isConnected() remains true
                 // Errors with PhpAmqpLib\Exception\AMQPProtocolConnectionException: CHANNEL_ERROR - expected 'channel.open'
                $this->consume();
            }
        }
    }
}

I've tried a number of things and can't seem to get my AMQPChannel::wait() to resume. I'd be happy to add it to the examples if anyone can clarify.

@RobbieLePommie
Copy link
RobbieLePommie commented Nov 17, 2016

It's not a good example to keep calling a function from within itself. You'll end up with a stack overflow error.

If you write a full example then there are actually many things to take into consideration:

  • handling specific exceptions (e.g. a timeout may be a valid, and handleable exception)
  • graceful retries: depending on the error you may want to try to reconnect immediately, after 5 seconds, after 30 seconds or just give up completely.
  • different responses depending if you can't connect in the first place, or if the queue dies while you're looping etc.

The following is a heavily edited (simplified) version of what I'm using, but should give you an idea.

Essentially there's an outer loop that continually tried to get a connection until something in the code says "Connection Not Longer Required". If the connection dies then trap the SPECIFIC exception and handle it.

try {

    $connectionRequired = true;

    $connectionAttempts = 0;

    // This outer loop enabled the script to try to reconnect in case of failure
    while ($connectionRequired && $connectionAttempts < 30) {
        $connectionAttempts++;
        $foundQueue = false;
        try {
            // Attempt Connection . "\n";
            $connection = new AMQPStreamConnection(RABBITMQ_SERVER, RABBITMQ_PORT, RABBITMQ_USER, RABBITMQ_PASS);
            $foundQueue = false;

            try {
                $channel = $connection->channel();
                $channel->basic_qos(null, MAX_MESSAGES_TO_HANDLE, null);
                $channel->queue_declare('task_queue', false, false, false, false); 

                $callback = function($msg){
                    // Do whatever is needed
                };

                $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

                // We're good!
                $foundQueue = true;

                // Reset the connection attempt counter
                $connectionAttempts = 0;

            } catch(exception $e) {
                // Failed to get channel
                // Best practice is to catch the specific exceptions and handle accordingly.
                // Either handle the message (and exit) or retry

                if (YouWantToRetry) {
                    sleep(5);  // Time should greacefully decrade based on "connectionAttempts"
                } elseif (YouCanHandleTheErrorAndWantToExitGraceully, e.g. $connectionAttempts > threshold OR youKnowTheException) {
                    $connectionRequired = false;
                } elseif (YouCannotHandleTheErrorAndWantToGetOutOfHere) {
                    throw ($e);
                }
            }
        } catch(exception $e) {
            // Failed to get connection. 
            // Best practice is to catch the specific exceptions and handle accordingly.
            // Either handle the message (and exit) or retry

            if (YouWantToRetry) {
                sleep(5);  // Time should greacefully decrade based on "connectionAttempts"
            } elseif (YouCanHandleTheErrorAndWantToExitGraceully, e.g. $connectionAttempts > threshold OR youKnowTheException) {
                $connectionRequired = false;
            } elseif (YouCannotHandleTheErrorAndWantToGetOutOfHere) {
                throw ($e);
            }

        }

        if ($foundQueue) {
            try {
                while(count($channel->callbacks)) {
                    $channel->wait(null, true, null);
                }

            } catch(exception $e) {

                // Consider this carefully! 
                // Best practice is to catch the specific exceptions and handle accordingly.

                if (YouWantToRetry) {
                    sleep(5);
                    $foundQueue = false;
                } elseif (YouCanHandleTheErrorAndWantToExitGraceully) {
                    $foundQueue = false;
                    $connectionRequired = false;
                } elseif (YouCannotHandleTheErrorAndWantToGetOutOfHere) {
                    // Error, so throw out of here
                    throw $e;
                }
            }
        }
    }

    // You'll end here on a graceful exit

} catch (Exception $e) {
    // You'll end up here if something's gone wrong

}

@lukebakken
Copy link
Collaborator

As of 2.7.0 heartbeat checks and reconnections are supported by the Stream and Socket interfaces.

@Whoweez
Copy link
Whoweez commented Aug 24, 2018

@RobbieLePommie Hey man, thanks so much for this :) This really helped a lot!

Maxim-Mazurok added a commit to Maxim-Mazurok/php-amqplib that referenced this issue Jul 6, 2019
lukebakken added a commit that referenced this issue Jul 8, 2019
kratkyzobak pushed a commit to kratkyzobak/php-amqplib that referenced this issue Feb 9, 2024
kratkyzobak pushed a commit to kratkyzobak/php-amqplib that referenced this issue Feb 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants
0