8000 [Messenger] Add `bury_on_reject` option to Beanstalkd bridge · symfony/symfony@9f4b8a4 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9f4b8a4

Browse files
HypeMCfabpot
authored andcommitted
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent d4566b2 commit 9f4b8a4

File tree

10 files changed

+254
-14
lines changed

10 files changed

+254
-14
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add `bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

+33-1
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1920
use Symfony\Component\Messenger\Envelope;
2021
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
22+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2123
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2224
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2325
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -81,12 +83,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8183
$beanstalkdEnvelope = $this->createBeanstalkdEnvelope();
8284
$connection = $this->createMock(Connection::class);
8385
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
84-
$connection->expects($this->once())->method('reject');
86+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
87+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2);
8588

8689
$receiver = new BeanstalkdReceiver($connection, $serializer);
8790
$receiver->get();
8891
}
8992

93+
/**
94+
* @dataProvider provideRejectCases
95+
*/
96+
public function testReject(array $stamps, ?int $priority, bool $forceDelete)
97+
{
98+
$serializer = $this->createSerializer();
99+
100+
$id = 'some id';
101+
102+
$connection = $this->createMock(Connection::class);
103+
$connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete);
104+
105+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
106+
foreach ($stamps as $stamp) {
107+
$envelope = $envelope->with($stamp);
108+
}
109+
110+
$receiver = new BeanstalkdReceiver($connection, $serializer);
111+
$receiver->reject($envelope);
112+
}
113+
114+
public static function provideRejectCases(): iterable
115+
{
116+
yield 'No stamp' => [[], null, false];
117+
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true];
118+
yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true];
119+
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false];
120+
}
121+
90122
public function testKeepalive()
91123
{
92124
$serializer = $this->createSerializer();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

+90-7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default', $configuration['tube_name']);
4848
$this->assertSame(0, $configuration['timeout']);
4949
$this->assertSame(90, $configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default', $configuration['tube_name']);
5960
$this->assertSame(0, $configuration['timeout']);
6061
$this->assertSame(90, $configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default', $connection->getTube());
6264
}
6365

6466
public function testFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]),
70+
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration = $connection->getConfiguration();
73+
$configuration = $connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo', $configuration['tube_name']);
7476
$this->assertSame(10, $configuration['timeout']);
7577
$this->assertSame(5000, $configuration['ttr']);
76-
$this->assertSame('foo', $connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo', $connectionWithOptions->getTube());
80+
81+
$configuration = $connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo', $configuration['tube_name']);
84+
$this->assertSame(10, $configuration['timeout']);
85+
$this->assertSame(5000, $configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo', $connectionWithOptions->getTube());
7788
}
7889

7990
public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' => 'bar',
8394
'timeout' => 20,
8495
'ttr' => 6000,
96+
'bury_on_reject' => false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options)
90102
);
91103

92104
$configuration = $connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'], $configuration['tube_name']);
95107
$this->assertSame($options['timeout'], $configuration['timeout']);
96108
$this->assertSame($options['ttr'], $configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'], $connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string) $id);
200213
}
201214

202-
public function testReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
public function testReject(bool $buryOnReject, bool $forceDelete)
203221
{
204222
$id = 123456;
205223

@@ -209,11 +227,42 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
211229

212-
$connection = new Connection(['tube_name' => $tube], $client);
230+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231+
232+
$connection->reject((string) $id, null, $forceDelete);
233+
}
234+
235+
public function testRejectWithBury()
236+
{
237+
$id = 123456;
238+
239+
$tube = 'baz';
240+
241+
$client = $this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);
244+
245+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
213246

214247
$connection->reject((string) $id);
215248
}
216249

250+
public function testRejectWithBuryAndPriority()
251+
{
252+
$id = 123456;
253+
$priority = 2;
254+
255+
$tube = 'baz';
256+
257+
$client = $this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
260+
261+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
262+
263+
$connection->reject((string) $id, $priority);
264+
}
265+
217266
public function testRejectWhenABeanstalkdExceptionOccurs()
218267
{
219268
$id = 123456;
@@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
263312
$connection->getMessageCount();
264313
}
265314

315+
public function testMessagePriority()
316+
{
317+
$id = 123456;
318+
$priority = 51;
319+
320+
$tube = 'baz';
321+
322+
$response = new ArrayResponse('OK', ['pri' => $priority]);
323+
324+
$client = $this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);
326+
327+
$connection = new Connection(['tube_name' => $tube], $client);
328+
329+
$this->assertSame($priority, $connection->getMessagePriority((string) $id));
330+
}
331+
332+
public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id = 123456;
335+
336+
$tube = 'baz1234';
337+
338+
$exception = new ClientException('foobar error');
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
342+
343+
$connection = new Connection(['tube_name' => $tube], $client);
344+
345+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
346+
$connection->getMessagePriority((string) $id);
347+
}
348+
266349
public function testSend()
267350
{
268351
$tube = 'xyz';

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1819
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -48,7 +49,10 @@ public function get(): iterable
4849
'headers' => $beanstalkdEnvelope['headers'],
4950
]);
5051
} catch (MessageDecodingFailedException $exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
52+
$this->connection->reject(
53+
$beanstalkdEnvelope['id'],
54+
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
55+
);
5256

5357
throw $exception;
5458
}
@@ -68,7 +72,11 @@ public function ack(Envelope $envelope): void
6872

6973
public function reject(Envelope $envelope): void
7074
{
71-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
75+
$this->connection->reject(
76+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
77+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
78+
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
79+
);
7280
}
7381

7482
public function keepalive(Envelope $envelope, ?int $seconds = null): void

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

+30-3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' => 0,
3434
'ttr' => 90,
35+
'bury_on_reject' => false,
3536
];
3637

3738
private string $tube;
3839
private int $timeout;
3940
private int $ttr;
41+
private bool $buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
public function __construct(
5154
private array $configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube = $this->configuration['tube_name'];
5659
$this->timeout = $this->configuration['timeout'];
5760
$this->ttr = $this->configuration['ttr'];
61+
$this->buryOnReject = $this->configuration['bury_on_reject'];
5862
}
5963

6064
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONS as $k => $v) {
82+
$value = $options[$k] ?? $query[$k] ?? $v;
83+
84+
$configuration[$k] = match (\gettype($v)) {
85+
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default => $value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
public function reject(string $id): void
187+
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
190+
if (!$forceDelete && $this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
192+
} else {
193+
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
194+
}
179195
} catch (Exception $exception) {
180196
throw new TransportException($exception->getMessage(), 0, $exception);
181197
}
@@ -201,4 +217,15 @@ public function getMessageCount(): int
201217

202218
return (int) $tubeStats['current-jobs-ready'];
203219
}
220+
221+
public function getMessagePriority(string $id): int
222+
{
223+
try {
224+
$jobStats = $this->client->statsJob(new JobId((int) $id));
225+
} catch (Exception $exception) {
226+
throw new TransportException($exception->getMessage(), 0, $exception);
227+
}
228+
229+
return (int) $jobStats['pri'];
230+
}
204231
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^7.2"
17+
"symfony/messenger": "^7.3"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry
8+
49
7.2
510
---
611

src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2626
use Symfony\Component\Messenger\Stamp\DelayStamp;
2727
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
28+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2829
use Symfony\Component\Messenger\Stamp\StampInterface;
2930
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
3031
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -82,6 +83,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
8283
} else {
8384
$this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
8485
}
86+
87+
$event->addStamps(new SentForRetryStamp($shouldRetry));
8588
}
8689

8790
/**

0 commit comments

Comments
 (0)
0