8000 Introduce LockMiddleware · symfony/symfony@3f68223 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3f68223

Browse files
Introduce LockMiddleware
First feedbacks Feedbacks Fix Feedbacks Fix tests Fix tests Fix Feedback Fix typo Poc with stamp only CPP Rework Simplify check Review Add mode Add param Another implementation Simplify and rename
1 parent ecb9728 commit 3f68223

File tree

8 files changed

+225
-17
lines changed

8 files changed

+225
-17
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 8 additions & 0 deletions
use Symfony\Component\Messenger\MessageBusInterface;
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
121121
use Symfony\Component\Messenger\MessageBus;
122122
123+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
123124
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
124125
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
125126
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
@@ -2266,6 +2267,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22662267
['id' => 'handle_message'],
22672268
],
22682269
];
2270+
2271+
if (class_exists(DeduplicateMiddleware::class) && class_exists(LockFactory::class)) {
2272+
$defaultMiddleware['before'][] = ['id' => 'deduplicate_middleware'];
2273+
} else {
2274+
$container->removeDefinition('messenger.middleware.deduplicate_middleware');
2275+
}
2276+
22692277
foreach ($config['buses'] as $busId => $bus) {
22702278
$middleware = $bus['middleware'];
22712279

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
2626
use Symfony\Component\Messenger\Handler\RedispatchMessageHandler;
2727
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
28+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
2829
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
2930
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
3031
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
@@ -86,6 +87,11 @@
8687
->tag('monolog.logger', ['channel' => 'messenger'])
8788
->call('setLogger', [service('logger')->ignoreOnInvalid()])
8889

90+
->set('messenger.middleware.deduplicate_middleware', DeduplicateMiddleware::class)
91+
->args([
92+
service('lock.factory'),
93+
])
94+
8995
->set('messenger.middleware.add_bus_name_stamp_middleware', AddBusNameStampMiddleware::class)
9096
->abstract()
9197

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
6363
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
6464
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
65+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
6566
use Symfony\Component\Messenger\Transport\TransportFactory;
6667
use Symfony\Component\Notifier\ChatterInterface;
6768
use Symfony\Component\Notifier\TexterInterface;
@@ -1061,25 +1062,54 @@ public function testMessengerWithMultipleBuses()
10611062

10621063
$this->assertTrue($container->has('messenger.bus.commands'));
10631064
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
1064-
$this->assertEquals([
1065-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1066-
['id' => 'reject_redelivered_message_middleware'],
1067-
['id' => 'dispatch_after_current_bus'],
1068-
['id' => 'failed_message_processing_middleware'],
1069-
['id' => 'send_message', 'arguments' => [true]],
1070-
['id' => 'handle_message', 'arguments' => [false]],
1071-
], $container->getParameter('messenger.bus.commands.middleware'));
1065+
1066+
if (class_exists(DeduplicateMiddleware::class)) {
1067+
$this->assertEquals([
1068+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1069+
['id' => 'reject_redelivered_message_middleware'],
1070+
['id' => 'dispatch_after_current_bus'],
1071+
['id' => 'failed_message_processing_middleware'],
1072+
['id' => 'deduplicate_middleware'],
1073+
['id' => 'send_message', 'arguments' => [true]],
1074+
['id' => 'handle_message', 'arguments' => [false]],
1075+
], $container->getParameter('messenger.bus.commands.middleware'));
1076+
} else {
1077+
$this->assertEquals([
1078+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1079+
['id' => 'reject_redelivered_message_middleware'],
1080+
['id' => 'dispatch_after_current_bus'],
1081+
['id' => 'failed_message_processing_middleware'],
1082+
['id' => 'send_message', 'arguments' => [true]],
1083+
['id' => 'handle_message', 'arguments' => [false]],
1084+
], $container->getParameter('messenger.bus.commands.middleware'));
1085+
}
1086+
10721087
$this->assertTrue($container->has('messenger.bus.events'));
10731088
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
1074-
$this->assertEquals([
1075-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1076-
['id' => 'reject_redelivered_message_middleware'],
1077-
['id' => 'dispatch_after_current_bus'],
1078-
['id' => 'failed_message_processing_middleware'],
1079-
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1080-
['id' => 'send_message', 'arguments' => [true]],
1081-
['id' => 'handle_message', 'arguments' => [false]],
1082-
], $container->getParameter('messenger.bus.events.middleware'));
1089+
1090+
if (class_exists(DeduplicateMiddleware::class)) {
1091+
$this->assertEquals([
1092+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1093+
['id' => 'reject_redelivered_message_middleware'],
1094+
['id' => 'dispatch_after_current_bus'],
1095+
['id' => 'failed_message_processing_middleware'],
1096+
['id' => 'deduplicate_middleware'],
1097+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1098+
['id' => 'send_message', 'arguments' => [true]],
1099+
['id' => 'handle_message', 'arguments' => [false]],
1100+
], $container->getParameter('messenger.bus.events.middleware'));
1101+
} else {
1102+
$this->assertEquals([
1103+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1104+
['id' => 'reject_redelivered_message_middleware'],
1105+
['id' => 'dispatch_after_current_bus'],
1106+
['id' => 'failed_message_processing_middleware'],
1107+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1108+
['id' => 'send_message', 'arguments' => [true]],
1109+
['id' => 'handle_message', 'arguments' => [false]],
1110+
], $container->getParameter('messenger.bus.events.middleware'));
1111+
}
1112+
10831113
$this->assertTrue($container->has('messenger.bus.queries'));
10841114
$this->assertSame([], $container->getDefinition('messenger.bus.queries')->getArgument(0));
10851115
$this->assertEquals([

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.2
55
---
66

7+
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
78
* Add `$previous` to the exception output at the `messenger:failed:show` command
89
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
910
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
19+
final class DeduplicateMiddleware implements MiddlewareInterface
20+
{
21+
public function __construct(private LockFactory $lockFactory)
22+
{
23+
}
24+
25+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
26+
{
27+
if (!$stamp = $envelope->last(DeduplicateStamp::class)) {
28+
return $stack->next()->handle($envelope, $stack);
29+
}
30+
31+
if (!$envelope->last(ReceivedStamp::class)) {
32+
$lock = $this->lockFactory->createLockFromKey($stamp->getKey(), $stamp->getTtl(), autoRelease: false);
33+
34+
if (!$lock->acquire()) {
35+
return $envelope;
36+
}
37+
} elseif ($stamp->onlyDeduplicateInQueue()) {
38+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
39+
}
40+
41+
try {
42+
$envelope = $stack->next()->handle($envelope, $stack);
43+
} finally {
44+
if ($envelope->last(ReceivedStamp::class) && !$stamp->onlyDeduplicateInQueue()) {
45+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
46+
}
47+
}
48+
49+
return $envelope;
50+
}
51+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Lock\Key;
15+
16+
final class DeduplicateStamp implements StampInterface
17+
{
18+
private Key $key;
19+
20+
public function __construct(
21+
string $key,
22+
private ?float $ttl = 300.0,
23+
private bool $onlyDeduplicateInQueue = false,
24+
) {
25+
$this->key = new Key($key);
26+
}
27+
28+
public function onlyDeduplicateInQueue(): bool
29+
{
30+
return $this->onlyDeduplicateInQueue;
31+
}
32+
33+
public function getKey(): Key
34+
{
35+
return $this->key;
36+
}
37+
38+
public function getTtl(): ?float
39+
{
40+
return $this->ttl;
41+
}
42+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Lock\Store\FlockStore;
16+
use Symfony\Component\Lock\Store\SemaphoreStore;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
19+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
20+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
21+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
22+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
23+
24+
final class DeduplicateMiddlewareTest extends MiddlewareTestCase
25+
{
26+
public function testDeduplicateMiddlewareIgnoreIfMessageIsNotLockable()
27+
{
28+
$message = new DummyMessage('Hello');
29+
$envelope = new Envelope($message);
30+
31+
$lockFactory = $this->createMock(LockFactory::class);
32+
$lockFactory->expects($this->never())->method('createLock');
33+
34+
$decorator = new DeduplicateMiddleware($lockFactory);
35+
36+
$decorator->handle($envelope, $this->getStackMock(true));
37+
}
38+
39+
public function testDeduplicateMiddlewareIfMessageHasKey()
40+
{
41+
$message = new DummyMessage('Hello');
42+
$envelope = new Envelope($message, [new DeduplicateStamp('id')]);
43+
44+
if (SemaphoreStore::isSupported()) {
45+
$store = new SemaphoreStore();
46+
} else {
47+
$store = new FlockStore();
48+
}
49+
50+
$decorator = new DeduplicateMiddleware(new LockFactory($store));
51+
52+
$envelope = $decorator->handle($envelope, $this->getStackMock(true));
53+
$this->assertNotNull($envelope->last(DeduplicateStamp::class));
54+
55+
$message2 = new DummyMessage('Hello');
56+
$envelope2 = new Envelope($message2, [new DeduplicateStamp('id')]);
57+
58+
$decorator->handle($envelope2, $this->getStackMock(false));
59+
60+
// Simulate receiving the first message
61+
$envelope = $envelope->with(new ReceivedStamp('transport'));
62+
$decorator->handle($envelope, $this->getStackMock(true));
63+
64+
$message3 = new DummyMessage('Hello');
65+
$envelope3 = new Envelope($message3, [new DeduplicateStamp('id')]);
66+
$decorator->handle($envelope3, $this->getStackMock(true));
67+
}
68+
}

src/Symfony/Component/Messenger/composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"symfony/http-kernel": "^6.4|^7.0",
3030
"symfony/process": "^6.4|^7.0",
3131
"symfony/property-access": "^6.4|^7.0",
32+
"symfony/lock": "^6.4|^7.0",
3233
"symfony/rate-limiter": "^6.4|^7.0",
3334
"symfony/routing": "^6.4|^7.0",
3435
"symfony/serializer": "^6.4|^7.0",
@@ -42,6 +43,7 @@
4243
"symfony/event-dispatcher-contracts": "<2.5",
4344
"symfony/framework-bundle": "<6.4",
4445
"symfony/http-kernel": "<6.4",
46+
"symfony/lock": "<6.4",
4547
"symfony/serializer": "<6.4"
4648
},
4749
"autoload": {

0 commit comments

Comments
 (0)
0