8000 [WIP] Message failure handling · symfony/symfony@9590c86 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9590c86

Browse files
committed
[WIP] Message failure handling
1 parent 162d5a8 commit 9590c86

File tree

3 files changed

+264
-0
lines changed

3 files changed

+264
-0
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Failure;
13+
14+
use Doctrine\DBAL\Connection;
15+
use Doctrine\DBAL\Exception\TableNotFoundException;
16+
use Doctrine\DBAL\Schema\Schema;
17+
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
18+
use Doctrine\DBAL\Types\Type;
19+
use Symfony\Component\Messenger\Envelope;
20+
21+
class DbalFailedMessageStorage implements FailedMessageStorageInterface
22+
{
23+
private $driverConnection;
24+
private $options;
25+
26+
public function __construct(Connection $driverConnection, array $options)
27+
{
28+
$this->driverConnection = $driverConnection;
29+
$this->options = array_merge([
30+
'auto_setup' => true,
31+
'table_name' => 'failed_messages',
32+
], $options);
33+
}
34+
35+
public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage
36+
{
37+
$queryBuilder = $this->driverConnection->createQueryBuilder()
38+
->insert($this->options['table_name'])
39+
->values([
40+
'envelope' => ':envelope',
41+
'exception' => ':exception',
42+
'transport_name' => ':transport_name',
43+
'failed_at' => ':failed_at',
44+
])
45+
;
46+
47+
$data = [
48+
'envelope' => serialize($envelope),
49+
'exception' => serialize($exception),
50+
'transport_name' => $transportName,
51+
'failed_at' => $failedAt->format('Y-m-d H:i:s'),
52+
];
53+
54+
$this->executeQuery($queryBuilder->getSQL(), $data);
55+
56+
$data['id'] = $this->driverConnection->lastInsertId();
57+
58+
return $this->createFailedMessage($data);
59+
}
60+
61+
public function all()
62+
{
63+
$query = $this->driverConnection->createQueryBuilder()
64+
->select('m.*')
65+
->from($this->options['table_name'], 'm')
66+
->orderBy('failed_at', 'ASC')
67+
;
68+
69+
$rows = $this->executeQuery($query->getSQL())->fetchAll();
70+
71+
return array_map(function ($row) {
72+
return $this->createFailedMessage($row);
73+
}, $rows);
74+
}
75+
76+
public function get($id): FailedMessage
77+
{
78+
$query = $this->driverConnection->createQueryBuilder()
79+
->select('m.*')
80+
->from($this->options['table_name'], 'm')
81+
->where('m.id = :id')
82+
;
83+
84+
$row = $this->executeQuery($query->getSQL(), ['id' => $id])->fetch();
85+
86+
return $this->createFailedMessage($row);
87+
}
88+
89+
public function remove(FailedMessage $failedMessage): void
90+
{
91+
$query = $this->driverConnection->createQueryBuilder()
92+
->delete($this->options['table_name'])
93+
->where('m.id = :id')
94+
;
95+
96+
$this->executeQuery($query->getSQL(), ['id' => $failedMessage->getId()]);
97+
}
98+
99+
public function removeAll(): void
100+
{
101+
$query = $this->driverConnection->createQueryBuilder()
102+
->delete($this->options['table_name'])
103+
;
104+
105+
$this->executeQuery($query->getSQL());
106+
}
107+
108+
private function executeQuery(string $sql, array $parameters = [])
109+
{
110+
$stmt = null;
111+
112+
try {
113+
$stmt = $this->driverConnection->prepare($sql);
114+
$stmt->execute($parameters);
115+
} catch (TableNotFoundException $e) {
116+
// create table
117+
if (!$this->driverConnection->isTransactionActive() && $this->options['auto_setup']) {
118+
$this->setup();
119+
}
120+
121+
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
122+
if (null === $stmt) {
123+
$stmt = $this->driverConnection->prepare($sql);
124+
}
125+
126+
$stmt->execute($parameters);
127+
}
128+
129+
return $stmt;
130+
}
131+
132+
public function setup(): void
133+
{
134+
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
135+
$synchronizer->updateSchema($this->getSchema(), true);
136+
}
137+
138+
private function getSchema(): Schema
139+
{
140+
$schema = new Schema();
141+
$table = $schema->createTable($this->options['table_name']);
142+
$table->addColumn('id', Type::BIGINT)
143+
->setAutoincrement(true)
144+
->setNotnull(true);
145+
$table->addColumn('envelope', Type::TEXT)
146+
->setNotnull(true);
147+
$table->addColumn('exception', Type::TEXT)
148+
->setNotnull(true);
149+
$table->addColumn('transport_name', Type::STRING)
150+
->setLength(255)
151+
->setNotnull(true);
152+
$table->addColumn('failed_at', Type::DATETIME_IMMUTABLE)
153+
->setNotnull(true);
154+
$table->setPrimaryKey(['id']);
155+
156+
return $schema;
157+
}
158+
159+
private function createFailedMessage(array $rowData): FailedMessage
160+
{
161+
return new FailedMessage(
162+
$rowData['id'],
163+
unserialize($rowData['envelope']),
164+
unserialize($rowData['exception']),
165+
$rowData['transport_name'],
166+
\DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $rowData['failed_at'])
167+
);
168+
}
169+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Failure;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
class FailedMessage
17+
{
18+
private $id;
19+
private $envelope;
20+
private $exception;
21+
private $transportName;
22+
private $failedAt;
23+
24+
public function __construct($id, Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt)
25+
{
26+
$this->id = $id;
27+
$this->envelope = $envelope;
28+
$this->exception = $exception;
29+
$this->transportName = $transportName;
30+
$this->failedAt = $failedAt;
31+
}
32+
33+
/**
34+
* Some unique identifier for this failed message within the storage.
35+
*
36+
* @return mixed
37+
*/
38+
public function getId()
39+
{
40+
return $this->id;
41+
}
42+
43+
public function getEnvelope(): Envelope
44+
{
45+
return $this->envelope;
46+
}
47+
48+
public function getException(): \Throwable
49+
{
50+
return $this->exception;
51+
}
52+
53+
public function getFailedAt(): \DateTimeInterface
54+
{
55+
return $this->failedAt;
56+
}
57+
58+
public function getTransportName(): string
59+
{
60+
return $this->transportName;
61+
}
62+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Failure;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* @author Ryan Weaver <ryan@symfonycasts.com>
18+
*/
19+
interface FailedMessageStorageInterface
20+
{
21+
public function add(Envelope $envelope, \Throwable $exception, string $transportName, \DateTimeInterface $failedAt): FailedMessage;
22+
23+
/**
24+
* @return FailedMessage[]
25+
*/
26+
public function all();
27+
28+
public function get($id): FailedMessage;
29+
30+
public function remove(FailedMessage $failedMessage): void;
31+
32+
public function removeAll(): void;
33+
}

0 commit comments

Comments
 (0)
0