|
| 1 | +<?php |
| 2 | + |
| 3 | +/* |
| 4 | + * This file is part of the Symfony package. |
| 5 | + * |
| 6 | + * (c) Fabien Potencier <fabien@symfony.com> |
| 7 | + * |
| 8 | + * For the full copyright and license information, please view the LICENSE |
| 9 | + * file that was distributed with this source code. |
| 10 | + */ |
| 11 | + |
| 12 | +namespace Symfony\Component\Messenger\Failure; |
| 13 | + |
| 14 | +use Doctrine\DBAL\Connection; |
| 15 | +use Doctrine\DBAL\Exception\TableNotFoundException; |
| 16 | +use Doctrine\DBAL\Schema\Schema; |
| 17 | +use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; |
| 18 | +use Doctrine\DBAL\Types\Type; |
| 19 | +use Symfony\Component\Messenger\Envelope; |
| 20 | + |
| 21 | +class DbalFailedMessageStorage implements FailedMessageStorageInterface |
| 22 | +{ |
| 23 | + private $driverConnection; |
| 24 | + private $options; |
| 25 | + |
| 26 | + public function __construct(Connection $driverConnection, array $options) |
| 27 | + { |
| 28 | + $this->driverConnection = $driverConnection; |
| 29 | + $this->options = array_merge([ |
| 30 | + 'auto_setup' => true, |
| 31 | + 'table_name' => 'failed_messages', |
| 32 | + ], $options); |
| 33 | + } |
| 34 | + |
| 35 | + public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage |
| 36 | + { |
| 37 | + $queryBuilder = $this->driverConnection->createQueryBuilder() |
| 38 | + ->insert($this->options['table_name']) |
| 39 | + ->values([ |
| 40 | + 'envelope' => ':envelope', |
| 41 | + 'exception' => ':exception', |
| 42 | + 'transport_name' => ':transport_name', |
| 43 | + 'failed_at' => ':failed_at', |
| 44 | + ]) |
| 45 | + ; |
| 46 | + |
| 47 | + $data = [ |
| 48 | + 'envelope' => serialize($envelope), |
| 49 | + 'exception' => serialize($exception), |
| 50 | + 'transport_name' => $transportName, |
| 51 | + 'failed_at' => $failedAt->format('Y-m-d H:i:s'), |
| 52 | + ]; |
| 53 | + |
| 54 | + $this->executeQuery($queryBuilder->getSQL(), $data); |
| 55 | + |
| 56 | + $data['id'] = $this->driverConnection->lastInsertId(); |
| 57 | + |
| 58 | + return $this->createFailedMessage($data); |
| 59 | + } |
| 60 | + |
| 61 | + public function all() |
| 62 | + { |
| 63 | + $query = $this->driverConnection->createQueryBuilder() |
| 64 | + ->select('m.*') |
| 65 | + ->from($this->options['table_name'], 'm') |
| 66 | + ->orderBy('failed_at', 'ASC') |
| 67 | + ; |
| 68 | + |
| 69 | + $rows = $this->executeQuery($query->getSQL())->fetchAll(); |
| 70 | + |
| 71 | + return array_map(function ($row) { |
| 72 | + return $this->createFailedMessage($row); |
| 73 | + }, $rows); |
| 74 | + } |
| 75 | + |
| 76 | + public function get($id): FailedMessage |
| 77 | + { |
| 78 | + $query = $this->driverConnection->createQueryBuilder() |
| 79 | + ->select('m.*') |
| 80 | + ->from($this->options['table_name'], 'm') |
| 81 | + ->where('m.id = :id') |
| 82 | + ; |
| 83 | + |
| 84 | + $row = $this->executeQuery($query->getSQL(), ['id' => $id])->fetch(); |
| 85 | + |
| 86 | + return $this->createFailedMessage($row); |
| 87 | + } |
| 88 | + |
| 89 | + public function remove(FailedMessage $failedMessage): void |
| 90 | + { |
| 91 | + $query = $this->driverConnection->createQueryBuilder() |
| 92 | + ->delete($this->options['table_name']) |
| 93 | + ->where('m.id = :id') |
| 94 | + ; |
| 95 | + |
| 96 | + $this->executeQuery($query->getSQL(), ['id' => $failedMessage->getId()]); |
| 97 | + } |
| 98 | + |
| 99 | + public function removeAll(): void |
| 100 | + { |
| 101 | + $query = $this->driverConnection->createQueryBuilder() |
| 102 | + ->delete($this->options['table_name']) |
| 103 | + ; |
| 104 | + |
| 105 | + $this->executeQuery($query->getSQL()); |
| 106 | + } |
| 107 | + |
| 108 | + private function executeQuery(string $sql, array $parameters = []) |
| 109 | + { |
| 110 | + $stmt = null; |
| 111 | + |
| 112 | + try { |
| 113 | + $stmt = $this->driverConnection->prepare($sql); |
| 114 | + $stmt->execute($parameters); |
| 115 | + } catch (TableNotFoundException $e) { |
| 116 | + // create table |
| 117 | + if (!$this->driverConnection->isTransactionActive() && $this->options['auto_setup']) { |
| 118 | + $this->setup(); |
| 119 | + } |
| 120 | + |
| 121 | + // statement not prepared ? SQLite throw on exception on prepare if the table does not exist |
| 122 | + if (null === $stmt) { |
| 123 | + $stmt = $this->driverConnection->prepare($sql); |
| 124 | + } |
| 125 | + |
| 126 | + $stmt->execute($parameters); |
| 127 | + } |
| 128 | + |
| 129 | + return $stmt; |
| 130 | + } |
| 131 | + |
| 132 | + public function setup(): void |
| 133 | + { |
| 134 | + $synchronizer = new SingleDatabaseSynchronizer($this->driverConnection); |
| 135 | + $synchronizer->updateSchema($this->getSchema(), true); |
| 136 | + } |
| 137 | + |
| 138 | + private function getSchema(): Schema |
| 139 | + { |
| 140 | + $schema = new Schema(); |
| 141 | + $table = $schema->createTable($this->options['table_name']); |
| 142 | + $table->addColumn('id', Type::BIGINT) |
| 143 | + ->setAutoincrement(true) |
| 144 | + ->setNotnull(true); |
| 145 | + $table->addColumn('envelope', Type::TEXT) |
| 146 | + ->setNotnull(true); |
| 147 | + $table->addColumn('exception', Type::TEXT) |
| 148 | + ->setNotnull(true); |
| 149 | + $table->addColumn('transport_name', Type::STRING) |
| 150 | + ->setLength(255) |
| 151 | + ->setNotnull(true); |
| 152 | + $table->addColumn('failed_at', Type::DATETIME_IMMUTABLE) |
| 153 | + ->setNotnull(true); |
| 154 | + $table->setPrimaryKey(['id']); |
| 155 | + |
| 156 | + return $schema; |
| 157 | + } |
| 158 | + |
| 159 | + private function createFailedMessage(array $rowData): FailedMessage |
| 160 | + { |
| 161 | + return new FailedMessage( |
| 162 | + $rowData['id'], |
| 163 | + unserialize($rowData['envelope']), |
| 164 | + unserialize($rowData['exception']), |
| 165 | + $rowData['transport_name'], |
| 166 | + \DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $rowData['failed_at']) |
| 167 | + ); |
| 168 | + } |
| 169 | +} |
0 commit comments