8000 Stamp for FIFO configuration · symfony/symfony@dc34525 · GitHub
[go: up one dir, main page]

Skip to content

Commit dc34525

Browse files
committed
Stamp for FIFO configuration
1 parent 5626dc0 commit dc34525

File tree

4 files changed

+69
-5
lines changed

4 files changed

+69
-5
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
1617
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
1718
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2021

2122
class AmazonSqsSenderTest extends TestCase
2223
{
23-
public function testSend()
24+
public function testSend(): void
2425
{
2526
$envelope = new Envelope(new DummyMessage('Oy'));
2627
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
@@ -36,4 +37,24 @@ public function testSend()
3637
$sender = new AmazonSqsSender($connection, $serializer);
3738
$sender->send($envelope);
3839
}
40+
41+
public function testSendWithAmazonSqsFifoStamp(): void
42+
{
43+
$envelope = (new Envelope(new DummyMessage('Oy')))
44+
->with($stamp = new AmazonSqsFifoStamp('testGroup', 'testDeduplicationId'));
45+
46+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
47+
48+
$connection = $this->getMockBuilder(Connection::class)
49+
->disableOriginalConstructor()
50+
->getMock();
51+
$connection->expects($this->once())->method('send')
52+
->with($encoded['body'], $encoded['headers'], $stamp->getAttributes());
53+
54+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
55+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
56+
57+
$sender = new AmazonSqsSender($connection, $serializer);
58+
$sender->send($envelope);
59+
}
3960
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
13+
14+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
15+
16+
final class AmazonSqsFifoStamp implements NonSendableStampInterface
17+
{
18+
private $messageGroupId;
19+
20+
private $messageDeduplicationId;
21+
22+
public function __construct(string $messageGroupId, string $messageDeduplicationId)
23+
{
24+
$this->messageGroupId = $messageGroupId;
25+
$this->messageDeduplicationId = $messageDeduplicationId;
26+
}
27+
28+
public function getAttributes(): array
29+
{
30+
return [
31+
'messageGroupId' => $this->messageGroupId,
32+
'messageDeduplicationId' => $this->messageDeduplicationId,
33+
];
34+
}
35+
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ public function send(Envelope $envelope): Envelope
4343
$delayStamp = $envelope->last(DelayStamp::class);
4444
$delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;
4545

46+
$attributes = [];
47+
48+
/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
49+
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
50+
if (null !== $amazonSqsFifoStamp) {
51+
$attributes = $amazonSqsFifoStamp->getAttributes();
52+
}
53+
4654
try {
47-
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
55+
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $attributes, $delay);
4856
} catch (HttpExceptionInterface $e) {
4957
throw new TransportException($e->getMessage(), 0, $e);
5058
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public function getMessageCount(): int
240240
return 0;
241241
}
242242

243-
public function send(string $body, array $headers, int $delay = 0): void
243+
public function send(string $body, array $headers, array $attributes, int $delay = 0): void
244244
{
245245
if ($this->configuration['auto_setup']) {
246246
$this->setup();
@@ -255,8 +255,8 @@ public function send(string $body, array $headers, int $delay = 0): void
255255
];
256256

257257
if ($this->isFifoQueue($this->configuration['queue_name'])) {
258-
$parameters['MessageGroupId'] = __METHOD__;
259-
$parameters['MessageDeduplicationId'] = md5($messageBody);
258+
$parameters['MessageGroupId'] = $attributes['messageGroupId'] ?? __METHOD__;
259+
$parameters['MessageDeduplicationId'] = $attributes['messageDeduplicationId'] ?? sha1($messageBody);
260260
}
261261

262262
$this->call($this->getQueueUrl(), $parameters);

0 commit comments

Comments
 (0)
0