@@ -96,6 +96,18 @@ public function ack(Envelope $envelope): void
9696 $ stamp ->getAmqpEnvelope (),
9797 $ stamp ->getQueueName ()
9898 );
99+ } catch (\AMQPConnectionException ) {
100+ try {
101+ $ stamp = $ this ->findAmqpStamp ($ envelope );
102+
103+ $ this ->connection ->queue ($ stamp ->getQueueName ())->getConnection ()->reconnect ();
104+ $ this ->connection ->ack (
105+ $ stamp ->getAmqpEnvelope (),
106+ $ stamp ->getQueueName ()
107+ );
108+ } catch (\AMQPException $ exception ) {
109+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
110+ }
99111 } catch (\AMQPException $ exception ) {
100112 throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
101113 }
@@ -124,6 +136,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa
124136 {
125137 try {
126138 $ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
139+ } catch (\AMQPConnectionException ) {
140+ try {
141+ $ this ->connection ->queue ($ queueName )->getConnection ()->reconnect ();
142+ $ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
143+ } catch (\AMQPException $ exception ) {
144+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
145+ }
127146 } catch (\AMQPException $ exception ) {
128147 throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
129148 }
0 commit comments