8000 improve amqp connection issues · symfony/symfony@c4a8995 · GitHub
[go: up one dir, main page]

Skip to content

Commit c4a8995

Browse files
improve amqp connection issues
1 parent a8a9e0b commit c4a8995

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Copy file name to clipboard
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ public function ack(Envelope $envelope): void
9696
$stamp->getAmqpEnvelope(),
9797
$stamp->getQueueName()
9898
);
99+
} catch (\AMQPConnectionException) {
100+
try {
101+
$stamp = $this->findAmqpStamp($envelope);
102+
103+
$this->connection->queue($stamp->getQueueName())->getConnection()->reconnect();
104+
$this->connection->ack(
105+
$stamp->getAmqpEnvelope(),
106+
$stamp->getQueueName()
107+
);
108+
} catch (\AMQPException $exception) {
109+
throw new TransportException($exception->getMessage(), 0, $exception);
110+
}
99111
} catch (\AMQPException $exception) {
100112
throw new TransportException($exception->getMessage(), 0, $exception);
101113
}
@@ -124,6 +136,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa
124136
{
125137
try {
126138
$this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
139+
} catch (\AMQPConnectionException) {
140+
try {
141+
$this->connection->queue($queueName)->getConnection()->reconnect();
142+
$this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM);
143+
} catch (\AMQPException $exception) {
144+
throw new TransportException($exception->getMessage(), 0, $exception);
145+
}
127146
} catch (\AMQPException $exception) {
128147
throw new TransportException($exception->getMessage(), 0, $exception);
129148
}

0 commit comments

Comments
 (0)
0