@@ -96,6 +96,18 @@ public function ack(Envelope $envelope): void
96
96
$ stamp ->getAmqpEnvelope (),
97
97
$ stamp ->getQueueName ()
98
98
);
99
+ } catch (\AMQPConnectionException ) {
100
+ try {
101
+ $ stamp = $ this ->findAmqpStamp ($ envelope );
102
+
103
+ $ this ->connection ->queue ($ stamp ->getQueueName ())->getConnection ()->reconnect ();
104
+ $ this ->connection ->ack (
105
+ $ stamp ->getAmqpEnvelope (),
106
+ $ stamp ->getQueueName ()
107
+ );
108
+ } catch (\AMQPException $ exception ) {
109
+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
110
+ }
99
111
} catch (\AMQPException $ exception ) {
100
112
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
101
113
}
@@ -124,6 +136,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa
124
136
{
125
137
try {
126
138
$ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
139
+ } catch (\AMQPConnectionException ) {
140
+ try {
141
+ $ this ->connection ->queue ($ queueName )->getConnection ()->reconnect ();
142
+ $ this ->connection ->nack ($ amqpEnvelope , $ queueName , \AMQP_NOPARAM );
143
+ } catch (\AMQPException $ exception ) {
144
+ throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
145
+ }
127
146
} catch (\AMQPException $ exception ) {
128
147
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
129
148
}
0 commit comments