8000 [WIP] Message failure handling · symfony/symfony@fb2d733 · GitHub
[go: up one dir, main page]

Skip to content

Commit fb2d733

Browse files
committed
[WIP] Message failure handling
1 parent 162d5a8 commit fb2d733

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Failure;
4+
5+
use Doctrine\DBAL\Connection;
6+
use Doctrine\DBAL\Exception\TableNotFoundException;
7+
use Doctrine\DBAL\Schema\Schema;
8+
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
9+
use Doctrine\DBAL\Types\Type;
10+
use Symfony\Component\Messenger\Envelope;
11+
12+
class DbalFailedMessageStorage implements FailedMessageStorageInterface
13+
{
14+
private $driverConnection;
15+
private $options;
16+
17+
public function __construct(Connection $driverConnection, array $options)
18+
{
19+
$this->driverConnection = $driverConnection;
20+
$this->options = array_merge([
21+
'auto_setup' => true,
22+
'table_name' => 'failed_messages',
23+
], $options);
24+
}
25+
26+
public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage
27+
{
28+
$queryBuilder = $this->driverConnection->createQueryBuilder()
29+
->insert($this->options['table_name'])
30+
->values([
31+
'envelope' => ':envelope',
32+
'exception' => ':exception',
33+
'transport_name' => ':transport_name',
34+
'failed_at' => ':failed_at'
35+
])
36+
;
37+
38+
$data = [
39+
'envelope' => serialize($envelope),
40+
'exception' => serialize($exception),
41+
'transport_name' => $transportName,
42+
'failed_at' => $failedAt->format('Y-m-d H:i:s')
43+
];
44+
45+
$this->executeQuery($queryBuilder->getSQL(), $data);
46+
47+
$data['id'] = $this->driverConnection->lastInsertId();
48+
49+
return $this->createFailedMessage($data);
50+
}
51+
52+
public function all()
53+
{
54+
$query = $this->driverConnection->createQueryBuilder()
55+
->select('m.*')
56+
->from($this->options['table_name'], 'm')
57+
->orderBy('failed_at', 'ASC')
58+
;
59+
60+
$rows = $this->executeQuery($query->getSQL())->fetchAll();
61+
62+
return array_map(function($row) {
63+
return $this->createFailedMessage($row);
64+
}, $rows);
65+
}
66+
67+
public function get($id): FailedMessage
68+
{
69+
$query = $this->driverConnection->createQueryBuilder()
70+
->select('m.*')
71+
->from($this->options['table_name'], 'm')
72+
->where('m.id = :id')
73+
;
74+
75+
$row = $this->executeQuery($query->getSQL(), ['id' => $id])->fetch();
76+
77+
return $this->createFailedMessage($row);
78+
}
79+
80+
public function remove(FailedMessage $failedMessage): void
81+
{
82+
$query = $this->driverConnection->createQueryBuilder()
83+
->delete($this->options['table_name'])
84+
->where('m.id = :id')
85+
;
86+
87+
$this->executeQuery($query->getSQL(), ['id' => $failedMessage->getId()]);
88+
}
89+
90+
public function removeAll(): void
91+
{
92+
$query = $this->driverConnection->createQueryBuilder()
93+
->delete($this->options['table_name'])
94+
;
95+
96+
$this->executeQuery($query->getSQL());
97+
}
98+
99+
private function executeQuery(string $sql, array $parameters = [])
100+
{
101+
$stmt = null;
102+
103+
try {
104+
$stmt = $this->driverConnection->prepare($sql);
105+
$stmt->execute($parameters);
106+
} catch (TableNotFoundException $e) {
107+
// create table
108+
if (!$this->driverConnection->isTransactionActive() && $this->options['auto_setup']) {
109+
$this->setup();
110+
}
111+
112+
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
113+
if (null === $stmt) {
114+
$stmt = $this->driverConnection->prepare($sql);
115+
}
116+
117+
$stmt->execute($parameters);
118+
}
119+
120+
return $stmt;
121+
}
122+
123+
public function setup(): void
124+
{
125+
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
126+
$synchronizer->updateSchema($this->getSchema(), true);
127+
}
128+
129+
private function getSchema(): Schema
130+
{
131+
$schema = new Schema();
132+
$table = $schema->createTable($this->options['table_name']);
133+
$table->addColumn('id', Type::BIGINT)
134+
->setAutoincrement(true)
135+
->setNotnull(true);
136+
$table->addColumn('envelope', Type::TEXT)
137+
->setNotnull(true);
138+
$table->addColumn('exception', Type::TEXT)
139+
->setNotnull(true);
140+
$table->addColumn('transport_name', Type::STRING)
141+
->setLength(255)
142+
->setNotnull(true);
143+
$table->addColumn('failed_at', Type::DATETIME_IMMUTABLE)
144+
->setNotnull(true);
145+
$table->setPrimaryKey(['id']);
146+
147+
return $schema;
148+
}
149+
150+
private function createFailedMessage(array $rowData): FailedMessage
151+
{
152+
return new FailedMessage(
153+
$rowData['id'],
154+
unserialize($rowData['envelope']),
155+
unserialize($rowData['exception']),
156+
$rowData['transport_name'],
157+
\DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $rowData['failed_at'])
158+
);
159+
}
160+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Failure;
4+
5+
use Symfony\Component\Messenger\Envelope;
6+
7+
class FailedMessage
8+
{
9+
private $id;
10+
private $envelope;
11+
private $exception;
12+
private $transportName;
13+
private $failedAt;
14+
15+
public function __construct($id, Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt)
16+
{
17+
$this->id = $id;
18+
$this->envelope = $envelope;
19+
$this->exception = $exception;
20+
$this->transportName = $transportName;
21+
$this->failedAt = $failedAt;
22+
}
23+
24+
/**
25+
* Some unique identifier for this failed message within the storage.
26+
*
27+
* @return mixed
28+
*/
29+
public function getId()
30+
{
31+
return $this->id;
32+
}
33+
34+
public function getEnvelope(): Envelope
35+
{
36+
return $this->envelope;
37+
}
38+
39+
public function getException(): \Throwable
40+
{
41+
return $this->exception;
42+
}
43+
44+
public function getFailedAt(): \DateTimeInterface
45+
{
46+
return $this->failedAt;
47+
}
48+
49+
public function getTransportName(): string
50+
{
51+
return $this->transportName;
52+
}
53+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Failure;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* @author Ryan Weaver <ryan@symfonycasts.com>
18+
*/
19+
interface FailedMessageStorageInterface
20+
{
21+
public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage;
22+
23+
/**
24+
* @return FailedMessage[]
25+
*/
26+
public function all();
27+
28+
public function get($id): FailedMessage;
29+
30+
public function remove(FailedMessage $failedMessage): void;
31+
32+
public function removeAll(): void;
33+
}

0 commit comments

Comments
 (0)
0