8000 [Messenger] Add BeanstalkdPriorityStamp to Beanstalkd bridge · symfony/symfony@c758b07 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit c758b07

Browse files
committed
[Messenger] Add BeanstalkdPriorityStamp to Beanstalkd bridge
1 parent 0a9eb28 commit c758b07

File tree

6 files changed

+86
-10
lines changed

6 files changed

+86
-10
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
89

910
5.2.0
1011
-----

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php

+18-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdSender;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -27,7 +28,7 @@ public function testSend()
2728
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
2829

2930
$connection = $this->createMock(Connection::class);
30-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0);
31+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, null);
3132

3233
$serializer = $this->createMock(SerializerInterface::class);
3334
$serializer->method('encode')->with($envelope)->willReturn($encoded);
@@ -42,7 +43,22 @@ public function testSendWithDelay()
4243
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4344

4445
$connection = $this->createMock(Connection::class);
45-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500);
46+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500, null);
47+
48+
$serializer = $this->createMock(SerializerInterface::class);
49+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
50+
51+
$sender = new BeanstalkdSender($connection, $serializer);
52+
$sender->send($envelope);
53+
}
54+
55+
public function testSendWithPriority()
56+
{
57+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdPriorityStamp(2));
58+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
59+
60+
$connection = $this->createMock(Connection::class);
61+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, 2);
4662

4763
$serializer = $this->createMock(SerializerInterface::class);
4864
$serializer->method('encode')->with($envelope)->willReturn($encoded);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

+35
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,41 @@ public function testSend()
297297
$this->assertSame($id, (int) $returnedId);
298298
}
299299

300+
public function testSendWithPriority()
301+
{
302+
$tube = 'xyz';
303+
304+
$body = 'foo';
305+
$headers = ['test' => 'bar'];
306+
$delay = 1000;
307+
$priority = 2;
308+
$expectedDelay = $delay / 1000;
309+
310+
$id = 110;
311+
312+
$client = $this->createMock(PheanstalkInterface::class);
313+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
314+
$client->expects($this->once())->method('put')->with(
315+
$this->callback(function (string $data) use ($body, $headers): bool {
316+
$expectedMessage = json_encode([
317+
'body' => $body,
318+
'headers' => $headers,
319+
]);
320+
321+
return $expectedMessage === $data;
322+
}),
323+
$priority,
324+
$expectedDelay,
325+
90
326+
)->willReturn(new Job($id, 'foobar'));
327+
328+
$connection = new Connection(['tube_name' => $tube], $client);
329+
330+
$returnedId = $connection->send($body, $headers, $delay, $priority);
331+
332+
$this->assertSame($id, (int) $returnedId);
333+
}
334+
300335
public function testSendWhenABeanstalkdExceptionOccurs()
301336
{
302337
$tube = 'xyz';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
final readonly class BeanstalkdPriorityStamp implements StampInterface
17+
{
18+
public function __construct(
19+
public int $priority,
20+
) {
21+
}
22+
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php

+6-5
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public function send(Envelope $envelope): Envelope
3535
{
3636
$encodedMessage = $this->serializer->encode($envelope);
3737

38-
/** @var DelayStamp|null $delayStamp */
39-
$delayStamp = $envelope->last(DelayStamp::class);
40-
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
41-
42-
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
38+
$this->connection->send(
39+
$encodedMessage['body'],
40+
$encodedMessage['headers'] ?? [],
41+
$envelope->last(DelayStamp::class)?->getDelay() ?? 0,
42+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
43+
);
4344

4445
return $envelope;
4546
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,12 @@ public function getTube(): string
105105
}
106106

107107
/**
108-
* @param int $delay The delay in milliseconds
108+
* @param int $delay The delay in milliseconds
109+
* @param ?int $priority The priority at which the message will be reserved
109110
*
110111
* @return string The inserted id
111112
*/
112-
public function send(string $body, array $headers, int $delay = 0): string
113+
public function send(string $body, array $headers, int $delay = 0, ?int $priority = null): string
113114
{
114115
$message = json_encode([
115116
'body' => $body,
@@ -123,7 +124,7 @@ public function send(string $body, array $headers, int $delay = 0): string
123124
try {
124125
$job = $this->client->useTube($this->tube)->put(
125126
$message,
126-
PheanstalkInterface::DEFAULT_PRIORITY,
127+
$priority ?? PheanstalkInterface::DEFAULT_PRIORITY,
127128
(int) ($delay / 1000),
128129
$this->ttr
129130
);

0 commit comments

Comments
 (0)
0