From a0fd22be2d732aaf7e75f84df6bc0d7fb163e8a6 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Fri, 29 Mar 2019 10:36:41 -0400 Subject: [PATCH] [WIP] Message failure handling --- .../Failure/DbalFailedMessageStorage.php | 170 ++++++++++++++++++ .../Messenger/Failure/FailedMessage.php | 62 +++++++ .../Failure/FailedMessageStorageInterface.php | 33 ++++ 3 files changed, 265 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Failure/DbalFailedMessageStorage.php create mode 100644 src/Symfony/Component/Messenger/Failure/FailedMessage.php create mode 100644 src/Symfony/Component/Messenger/Failure/FailedMessageStorageInterface.php diff --git a/src/Symfony/Component/Messenger/Failure/DbalFailedMessageStorage.php b/src/Symfony/Component/Messenger/Failure/DbalFailedMessageStorage.php new file mode 100644 index 0000000000000..3b3597fbf2f7d --- /dev/null +++ b/src/Symfony/Component/Messenger/Failure/DbalFailedMessageStorage.php @@ -0,0 +1,170 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Failure; + +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Driver\Statement; +use Doctrine\DBAL\Exception\TableNotFoundException; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; +use Doctrine\DBAL\Types\Type; +use Symfony\Component\Messenger\Envelope; + +class DbalFailedMessageStorage implements FailedMessageStorageInterface +{ + private $driverConnection; + private $options; + + public function __construct(Connection $driverConnection, array $options) + { + $this->driverConnection = $driverConnection; + $this->options = array_merge([ + 'auto_setup' => true, + 'table_name' => 'failed_messages', + ], $options); + } + + public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage + { + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->insert($this->options['table_name']) + ->values([ + 'envelope' => ':envelope', + 'exception' => ':exception', + 'transport_name' => ':transport_name', + 'failed_at' => ':failed_at', + ]) + ; + + $data = [ + 'envelope' => serialize($envelope), + 'exception' => serialize($exception), + 'transport_name' => $transportName, + 'failed_at' => $failedAt->format('Y-m-d H:i:s'), + ]; + + $this->executeQuery($queryBuilder->getSQL(), $data); + + $data['id'] = $this->driverConnection->lastInsertId(); + + return $this->createFailedMessage($data); + } + + public function all(): array + { + $query = $this->driverConnection->createQueryBuilder() + ->select('m.*') + ->from($this->options['table_name'], 'm') + ->orderBy('failed_at', 'ASC') + ; + + $rows = $this->executeQuery($query->getSQL())->fetchAll(); + + return array_map(function ($row) { + return $this->createFailedMessage($row); + }, $rows); + } + + public function get($id): FailedMessage + { + $query = $this->driverConnection->createQueryBuilder() + ->select('m.*') + ->from($this->options['table_name'], 'm') + ->where('m.id = :id') + ; + + $row = $this->executeQuery($query->getSQL(), ['id' => $id])->fetch(); + + return $this->createFailedMessage($row); + } + + public function remove(FailedMessage $failedMessage): void + { + $query = $this->driverConnection->createQueryBuilder() + ->delete($this->options['table_name']) + ->where('m.id = :id') + ; + + $this->executeQuery($query->getSQL(), ['id' => $failedMessage->getId()]); + } + + public function removeAll(): void + { + $query = $this->driverConnection->createQueryBuilder() + ->delete($this->options['table_name']) + ; + + $this->executeQuery($query->getSQL()); + } + + private function executeQuery(string $sql, array $parameters = []): Statement + { + $stmt = null; + + try { + $stmt = $this->driverConnection->prepare($sql); + $stmt->execute($parameters); + } catch (TableNotFoundException $e) { + // create table + if (!$this->driverConnection->isTransactionActive() && $this->options['auto_setup']) { + $this->setup(); + } + + // statement not prepared ? SQLite throw on exception on prepare if the table does not exist + if (null === $stmt) { + $stmt = $this->driverConnection->prepare($sql); + } + + $stmt->execute($parameters); + } + + return $stmt; + } + + public function setup(): void + { + $synchronizer = new SingleDatabaseSynchronizer($this->driverConnection); + $synchronizer->updateSchema($this->getSchema(), true); + } + + private function getSchema(): Schema + { + $schema = new Schema(); + $table = $schema->createTable($this->options['table_name']); + $table->addColumn('id', Type::BIGINT) + ->setAutoincrement(true) + ->setNotnull(true); + $table->addColumn('envelope', Type::TEXT) + ->setNotnull(true); + $table->addColumn('exception', Type::TEXT) + ->setNotnull(true); + $table->addColumn('transport_name', Type::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('failed_at', Type::DATETIME_IMMUTABLE) + ->setNotnull(true); + $table->setPrimaryKey(['id']); + + return $schema; + } + + private function createFailedMessage(array $rowData): FailedMessage + { + return new FailedMessage( + $rowData['id'], + unserialize($rowData['envelope']), + unserialize($rowData['exception']), + $rowData['transport_name'], + \DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $rowData['failed_at']) + ); + } +} diff --git a/src/Symfony/Component/Messenger/Failure/FailedMessage.php b/src/Symfony/Component/Messenger/Failure/FailedMessage.php new file mode 100644 index 0000000000000..2bcda61736836 --- /dev/null +++ b/src/Symfony/Component/Messenger/Failure/FailedMessage.php @@ -0,0 +1,62 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Failure; + +use Symfony\Component\Messenger\Envelope; + +class FailedMessage +{ + private $id; + private $envelope; + private $exception; + private $transportName; + private $failedAt; + + public function __construct($id, Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt) + { + $this->id = $id; + $this->envelope = $envelope; + $this->exception = $exception; + $this->transportName = $transportName; + $this->failedAt = $failedAt; + } + + /** + * Some unique identifier for this failed message within the storage. + * + * @return mixed + */ + public function getId() + { + return $this->id; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } + + public function getException(): \Throwable + { + return $this->exception; + } + + public function getFailedAt(): \DateTimeInterface + { + return $this->failedAt; + } + + public function getTransportName(): string + { + return $this->transportName; + } +} diff --git a/src/Symfony/Component/Messenger/Failure/FailedMessageStorageInterface.php b/src/Symfony/Component/Messenger/Failure/FailedMessageStorageInterface.php new file mode 100644 index 0000000000000..1d59b37c8093c --- /dev/null +++ b/src/Symfony/Component/Messenger/Failure/FailedMessageStorageInterface.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Failure; + +use Symfony\Component\Messenger\Envelope; + +/** + * @author Ryan Weaver + */ +interface FailedMessageStorageInterface +{ + public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage; + + /** + * @return FailedMessage[] + */ + public function all(): array; + + public function get($id): FailedMessage; + + public function remove(FailedMessage $failedMessage): void; + + public function removeAll(): void; +}