8000 feature #54141 [Messenger] Introduce `DeduplicateMiddleware` (Vincent… · symfony/symfony@0b9c488 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0b9c488

Browse files
committed
feature #54141 [Messenger] Introduce DeduplicateMiddleware (VincentLanglet)
This PR was merged into the 7.3 branch. Discussion ---------- [Messenger] Introduce `DeduplicateMiddleware` | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Issues | Fix #54126 <!-- prefix each issue number with "Fix #", no need to create an issue if none exists, explain below instead --> | License | MIT Commits ------- aa7000a Deduplicate Middleware
2 parents 201747b + aa7000a commit 0b9c488

File tree

8 files changed

+225
-17
lines changed

8 files changed

+225
-17
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
121121
use Symfony\Component\Messenger\MessageBus;
122122
use Symfony\Component\Messenger\MessageBusInterface;
123+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
123124
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
124125
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
125126
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
@@ -2272,6 +2273,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22722273
['id' => 'handle_message'],
22732274
],
22742275
];
2276+
2277+
if (class_exists(DeduplicateMiddleware::class) && class_exists(LockFactory::class)) {
2278+
$defaultMiddleware['before'][] = ['id' => 'deduplicate_middleware'];
2279+
} else {
2280+
$container->removeDefinition('messenger.middleware.deduplicate_middleware');
2281+
}
2282+
22752283< 67E6 /code>
foreach ($config['buses'] as $busId => $bus) {
22762284
$middleware = $bus['middleware'];
22772285

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
2626
use Symfony\Component\Messenger\Handler\RedispatchMessageHandler;
2727
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
28+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
2829
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
2930
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
3031
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
@@ -86,6 +87,11 @@
8687
->tag('monolog.logger', ['channel' => 'messenger'])
8788
->call('setLogger', [service('logger')->ignoreOnInvalid()])
8889

90+
->set('messenger.middleware.deduplicate_middleware', DeduplicateMiddleware::class)
91+
->args([
92+
service('lock.factory'),
93+
])
94+
8995
->set('messenger.middleware.add_bus_name_stamp_middleware', AddBusNameStampMiddleware::class)
9096
->abstract()
9197

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

Lines changed: 47 additions & 17 deletions
-
$this->assertEquals([
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
6363
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
6464
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
65+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
6566
use Symfony\Component\Messenger\Transport\TransportFactory;
6667
use Symfony\Component\Notifier\ChatterInterface;
6768
use Symfony\Component\Notifier\TexterInterface;
@@ -1061,25 +1062,54 @@ public function testMessengerWithMultipleBuses()
10611062

10621063
$this->assertTrue($container->has('messenger.bus.commands'));
10631064
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
1064
1065-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1066-
['id' => 'reject_redelivered_message_middleware'],
1067-
['id' => 'dispatch_after_current_bus'],
1068-
['id' => 'failed_message_processing_middleware'],
1069-
['id' => 'send_message', 'arguments' => [true]],
1070-
['id' => 'handle_message', 'arguments' => [false]],
1071-
], $container->getParameter('messenger.bus.commands.middleware'));
1065+
1066+
if (class_exists(DeduplicateMiddleware::class)) {
1067+
$this->assertEquals([
1068+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1069+
['id' => 'reject_redelivered_message_middleware'],
1070+
['id' => 'dispatch_after_current_bus'],
1071+
['id' => 'failed_message_processing_middleware'],
1072+
['id' => 'deduplicate_middleware'],
1073+
['id' => 'send_message', 'arguments' => [true]],
1074+
['id' => 'handle_message', 'arguments' => [false]],
1075+
], $container->getParameter('messenger.bus.commands.middleware'));
1076+
} else {
1077+
$this->assertEquals([
1078+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1079+
['id' => 'reject_redelivered_message_middleware'],
1080+
['id' => 'dispatch_after_current_bus'],
1081+
['id' => 'failed_message_processing_middleware'],
1082+
['id' => 'send_message', 'arguments' => [true]],
1083+
['id' => 'handle_message', 'arguments' => [false]],
1084+
], $container->getParameter('messenger.bus.commands.middleware'));
1085+
}
1086+
10721087
$this->assertTrue($container->has('messenger.bus.events'));
10731088
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
1074-
$this->assertEquals([
1075-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1076-
['id' => 'reject_redelivered_message_middleware'],
1077-
['id' => 'dispatch_after_current_bus'],
1078-
['id' => 'failed_message_processing_middleware'],
1079-
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1080-
['id' => 'send_message', 'arguments' => [true]],
1081-
['id' => 'handle_message', 'arguments' => [false]],
1082-
], $container->getParameter('messenger.bus.events.middleware'));
1089+
1090+
if (class_exists(DeduplicateMiddleware::class)) {
1091+
$this->assertEquals([
1092+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1093+
['id' => 'reject_redelivered_message_middleware'],
1094+
['id' => 'dispatch_after_current_bus'],
1095+
['id' => 'failed_message_processing_middleware'],
1096+
['id' => 'deduplicate_middleware'],
1097+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1098+
['id' => 'send_message', 'arguments' => [true]],
1099+
['id' => 'handle_message', 'arguments' => [false]],
1100+
], $container->getParameter('messenger.bus.events.middleware'));
1101+
} else {
1102+
$this->assertEquals([
1103+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1104+
['id' => 'reject_redelivered_message_middleware'],
1105+
['id' => 'dispatch_after_current_bus'],
1106+
['id' => 'failed_message_processing_middleware'],
1107+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1108+
['id' => 'send_message', 'arguments' => [true]],
1109+
['id' => 'handle_message', 'arguments' => [false]],
1110+
], $container->getParameter('messenger.bus.events.middleware'));
1111+
}
1112+
10831113
$this->assertTrue($container->has('messenger.bus.queries'));
10841114
$this->assertSame([], $container->getDefinition('messenger.bus.queries')->getArgument(0));
10851115
$this->assertEquals([

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.2
55
---
66

7+
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
78
* Add `$previous` to the exception output at the `messenger:failed:show` command
89
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
910
* Add ` 10BC0 #[AsMessage]` attribute with `$transport` parameter for message routing
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
19+
final class DeduplicateMiddleware implements MiddlewareInterface
20+
{
21+
public function __construct(private LockFactory $lockFactory)
22+
{
23+
}
24+
25+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
26+
{
27+
if (!$stamp = $envelope->last(DeduplicateStamp::class)) {
28+
return $stack->next()->handle($envelope, $stack);
29+
}
30+
31+
if (!$envelope->last(ReceivedStamp::class)) {
32+
$lock = $this->lockFactory->createLockFromKey($stamp->getKey(), $stamp->getTtl(), autoRelease: false);
33+
34+
if (!$lock->acquire()) {
35+
return $envelope;
36+
}
37+
} elseif ($stamp->onlyDeduplicateInQueue()) {
38+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
39+
}
40+
41+
try {
42+
$envelope = $stack->next()->handle($envelope, $stack);
43+
} finally {
44+
if ($envelope->last(ReceivedStamp::class) && !$stamp->onlyDeduplicateInQueue()) {
45+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
46+
}
47+
}
48+
49+
return $envelope;
50+
}
51+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Lock\Key;
15+
16+
final class DeduplicateStamp implements StampInterface
17+
{
18+
private Key $key;
19+
20+
public function __construct(
21+
string $key,
22+
private ?float $ttl = 300.0,
23+
private bool $onlyDeduplicateInQueue = false,
24+
) {
25+
$this->key = new Key($key);
26+
}
27+
28+
public function onlyDeduplicateInQueue(): bool
29+
{
30+
return $this->onlyDeduplicateInQueue;
31+
}
32+
33+
public function getKey(): Key
34+
{
35+
return $this->key;
36+
}
37+
38+
public function getTtl(): ?float
39+
{
40+
return $this->ttl;
41+
}
42+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Lock\Store\FlockStore;
16+
use Symfony\Component\Lock\Store\SemaphoreStore;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
19+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
20+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
21+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
22+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
23+
24+
final class DeduplicateMiddlewareTest extends MiddlewareTestCase
25+
{
26+
public function testDeduplicateMiddlewareIgnoreIfMessageIsNotLockable()
27+
{
28+
$message = new DummyMessage('Hello');
29+
$envelope = new Envelope($message);
30+
31+
$lockFactory = $this->createMock(LockFactory::class);
32+
$lockFactory->expects($this->never())->method('createLock');
33+
34+
$decorator = new DeduplicateMiddleware($lockFactory);
35+
36+
$decorator->handle($envelope, $this->getStackMock(true));
37+
}
38+
39+
public function testDeduplicateMiddlewareIfMessageHasKey()
40+
{
41+
$message = new DummyMessage('Hello');
42+
$envelope = new Envelope($message, [new DeduplicateStamp('id')]);
43+
44+
if (SemaphoreStore::isSupported()) {
45+
$store = new SemaphoreStore();
46+
} else {
47+
$store = new FlockStore();
48+
}
49+
50+
$decorator = new DeduplicateMiddleware(new LockFactory($store));
51+
52+
$envelope = $decorator->handle($envelope, $this->getStackMock(true));
53+
$this->assertNotNull($envelope->last(DeduplicateStamp::class));
54+
55+
$message2 = new DummyMessage('Hello');
56+
$envelope2 = new Envelope($message2, [new DeduplicateStamp('id')]);
57+
58+
$decorator->handle($envelope2, $this->getStackMock(false));
59+
60+
// Simulate receiving the first message
61+
$envelope = $envelope->with(new ReceivedStamp('transport'));
62+
$decorator->handle($envelope, $this->getStackMock(true));
63+
64+
$message3 = new DummyMessage('Hello');
65+
$envelope3 = new Envelope($message3, [new DeduplicateStamp('id')]);
66+
$decorator->handle($envelope3, $this->getStackMock(true));
67+
}
68+
}

src/Symfony/Component/Messenger/composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"symfony/http-kernel": "^6.4|^7.0",
3030
"symfony/process": "^6.4|^7.0",
3131
"symfony/property-access": "^6.4|^7.0",
32+
"symfony/lock": "^6.4|^7.0",
3233
"symfony/rate-limiter": "^6.4|^7.0",
3334
"symfony/routing": "^6.4|^7.0",
3435
"symfony/serializer": "^6.4|^7.0",
@@ -42,6 +43,7 @@
4243
"symfony/event-dispatcher-contracts": "<2.5",
4344
"symfony/framework-bundle": "<6.4",
4445
"symfony/http-kernel": "<6.4",
46+
"symfony/lock": "<6.4",
4547
"symfony/serializer": "<6.4"
4648
},
4749
"autoload": {

0 commit comments

Comments
 (0)
0