8000 Deduplicate Middleware · symfony/symfony@aa7000a · GitHub
[go: up one dir, main page]

Skip to content

Commit aa7000a

Browse files
Deduplicate Middleware
1 parent ecb9728 commit aa7000a

File tree

8 files changed

+225
-17
lines changed
  • Stamp
  • Tests/Middleware
  • 8 files changed

    +225
    -17
    lines changed

    src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

    +8
    Original file line numberDiff line numberDiff line change
    @@ -120,6 +120,7 @@
    120120
    use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
    121121
    use Symfony\Component\Messenger\MessageBus;
    122122
    use Symfony\Component\Messenger\MessageBusInterface;
    123+
    use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
    123124
    use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
    124125
    use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
    125126
    use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
    @@ -2266,6 +2267,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
    22662267
    ['id' => 'handle_message'],
    22672268
    ],
    22682269
    ];
    2270+
    2271+
    if (class_exists(DeduplicateMiddleware::class) && class_exists(LockFactory::class)) {
    2272+
    $defaultMiddleware['before'][] = ['id' => 'deduplicate_middleware'];
    2273+
    } else {
    2274+
    $container->removeDefinition('messenger.middleware.deduplicate_middleware');
    2275+
    }
    2276+
    22692277
    foreach ($config['buses'] as $busId => $bus) {
    22702278
    $middleware = $bus['middleware'];
    22712279

    src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

    +6
    Original file line numberDiff line numberDiff line change
    @@ -25,6 +25,7 @@
    2525
    use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
    2626
    use Symfony\Component\Messenger\Handler\RedispatchMessageHandler;
    2727
    use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
    28+
    use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
    2829
    use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
    2930
    use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
    3031
    use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
    @@ -86,6 +87,11 @@
    8687
    ->tag('monolog.logger', ['channel' => 'messenger'])
    8788
    ->call('setLogger', [service('logger')->ignoreOnInvalid()])
    8889

    90+
    ->set('messenger.middleware.deduplicate_middleware', DeduplicateMiddleware::class)
    91+
    ->args([
    92+
    service('lock.factory'),
    93+
    ])
    94+
    8995
    ->set('messenger.middleware.add_bus_name_stamp_middleware', AddBusNameStampMiddleware::class)
    9096
    ->abstract()
    9197

    src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

    +47-17
    Original file line numberDiff line numberDiff line change
    @@ -62,6 +62,7 @@
    6262
    use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
    6363
    use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
    6464
    use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
    65+
    use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
    6566
    use Symfony\Component\Messenger\Transport\TransportFactory;
    6667
    use Symfony\Component\Notifier\ChatterInterface;
    6768
    use Symfony\Component\Notifier\TexterInterface;
    @@ -1061,25 +1062,54 @@ public function testMessengerWithMultipleBuses()
    10611062

    10621063
    $this->assertTrue($container->has('messenger.bus.commands'));
    10631064
    $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
    1064-
    $this->assertEquals([
    1065-
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
    1066-
    ['id' => 'reject_redelivered_message_middleware'],
    1067-
    ['id' => 'dispatch_after_current_bus'],
    1068-
    ['id' => 'failed_message_processing_middleware'],
    1069-
    ['id' => 'send_message', 'arguments' => [true]],
    1070-
    ['id' => 'handle_message', 'arguments' => [false]],
    1071-
    ], $container->getParameter('messenger.bus.commands.middleware'));
    1065+
    1066+
    if (class_exists(DeduplicateMiddleware::class)) {
    1067+
    $this->assertEquals([
    1068+
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
    1069+
    ['id' => 'reject_redelivered_message_middleware'],
    1070+
    ['id' => 'dispatch_after_current_bus'],
    1071+
    ['id' => 'failed_message_processing_middleware'],
    1072+
    ['id' => 'deduplicate_middleware'],
    1073+
    ['id' => 'send_message', 'arguments' => [true]],
    1074+
    ['id' => 'handle_message', 'arguments' => [false]],
    1075+
    ], $container->getParameter('messenger.bus.commands.middleware'));
    1076+
    } else {
    1077+
    $this->assertEquals([
    1078+
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
    1079+
    ['id' => 'reject_redelivered_message_middleware'],
    1080+
    ['id' => 'dispatch_after_current_bus'],
    1081+
    ['id' => 'failed_message_processing_middleware'],
    1082+
    ['id' => 'send_message', 'arguments' => [true]],
    1083+
    ['id' => 'handle_message', 'arguments' => [false]],
    1084+
    ], $container->getParameter('messenger.bus.commands.middleware'));
    1085+
    }
    1086+
    10721087
    $this->assertTrue($container->has('messenger.bus.events'));
    10731088
    $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
    1074-
    $this->assertEquals([
    1075-
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
    1076-
    ['id' => 'reject_redelivered_message_middleware'],
    1077-
    ['id' => 'dispatch_after_current_bus'],
    1078-
    ['id' => 'failed_message_processing_middleware'],
    1079-
    ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
    1080-
    ['id' => 'send_message', 'arguments' => [true]],
    1081-
    ['id' => 'handle_message', 'arguments' => [false]],
    1082-
    ], $container->getParameter('messenger.bus.events.middleware'));
    1089+
    1090+
    if (class_exists(DeduplicateMiddleware::class)) {
    1091+
    $this->assertEquals([
    1092+
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
    1093+
    ['id' => 'reject_redelivered_message_middleware'],
    1094+
    ['id' => 'dispatch_after_current_bus'],
    1095+
    ['id' => 'failed_message_processing_middleware'],
    1096+
    ['id' => 'deduplicate_middleware'],
    1097+
    ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
    1098+
    ['id' => 'send_message', 'arguments' => [true]],
    1099+
    ['id' => 'handle_message', 'arguments' => [false]],
    1100+
    ], $container->getParameter('messenger.bus.events.middleware'));
    1101+
    } else {
    1102+
    $this->assertEquals([
    1103+
    ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
    1104+
    ['id' => 'reject_redelivered_message_middleware'],
    1105+
    ['id' => 'dispatch_after_current_bus'],
    1106+
    ['id' => 'failed_message_processing_middleware'],
    1107+
    ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
    1108+
    ['id' => 'send_message', 'arguments' => [true]],
    1109+
    ['id' => 'handle_message', 'arguments' => [false]],
    1110+
    ], $container->getParameter('messenger.bus.events.middleware'));
    1111+
    }
    1112< F987 code class="diff-text syntax-highlighted-line addition">+
    10831113
    $this->assertTrue($container->has('messenger.bus.queries'));
    10841114
    $this->assertSame([], $container->getDefinition('messenger.bus.queries')->getArgument(0));
    10851115
    $this->assertEquals([

    src/Symfony/Component/Messenger/CHANGELOG.md

    +1
    Original file line numberDiff line numberDiff line change
    @@ -4,6 +4,7 @@ CHANGELOG
    44
    7.2
    55
    ---
    66

    7+
    * Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
    78
    * Add `$previous` to the exception output at the `messenger:failed:show` command
    89
    * `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
    910
    * Add `#[AsMessage]` attribute with `$transport` parameter for message routing
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,51 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Middleware;
    13+
    14+
    use Symfony\Component\Lock\LockFactory;
    15+
    use Symfony\Component\Messenger\Envelope;
    16+
    use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
    17+
    use Symfony\Component\Messenger\Stamp\ReceivedStamp;
    18+
    19+
    final class DeduplicateMiddleware implements MiddlewareInterface
    20+
    {
    21+
    public function __construct(private LockFactory $lockFactory)
    22+
    {
    23+
    }
    24+
    25+
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    26+
    {
    27+
    if (!$stamp = $envelope->last(DeduplicateStamp::class)) {
    28+
    return $stack->next()->handle($envelope, $stack);
    29+
    }
    30+
    31+
    if (!$envelope->last(ReceivedStamp::class)) {
    32+
    $lock = $this->lockFactory->createLockFromKey($stamp->getKey(), $stamp->getTtl(), autoRelease: false);
    33+
    34+
    if (!$lock->acquire()) {
    35+
    return $envelope;
    36+
    }
    37+
    } elseif ($stamp->onlyDeduplicateInQueue()) {
    38+
    $this->lockFactory->createLockFromKey($stamp->getKey())->release();
    39+
    }
    40+
    41+
    try {
    42+
    $envelope = $stack->next()->handle($envelope, $stack);
    43+
    } finally {
    44+
    if ($envelope->last(ReceivedStamp::class) && !$stamp->onlyDeduplicateInQueue()) {
    45+
    $this->lockFactory->createLockFromKey($stamp->getKey())->release();
    46+
    }
    47+
    }
    48+
    49+
    return $envelope;
    50+
    }
    51+
    }
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,42 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Stamp;
    13+
    14+
    use Symfony\Component\Lock\Key;
    15+
    16+
    final class DeduplicateStamp implements StampInterface
    17+
    {
    18+
    private Key $key;
    19+
    20+
    public function __construct(
    21+
    string $key,
    22+
    private ?float $ttl = 300.0,
    23+
    private bool $onlyDeduplicateInQueue = false,
    24+
    ) {
    25+
    $this->key = new Key($key);
    26+
    }
    27+
    28+
    public function onlyDeduplicateInQueue(): bool
    29+
    {
    30+
    return $this->onlyDeduplicateInQueue;
    31+
    }
    32+
    33+
    public function getKey(): Key
    34+
    {
    35+
    return $this->key;
    36+
    }
    37+
    38+
    public function getTtl(): ?float
    39+
    {
    40+
    return $this->ttl;
    41+
    }
    42+
    }
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,68 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Tests\Middleware;
    13+
    14+
    use Symfony\Component\Lock\LockFactory;
    15+
    use Symfony\Component\Lock\Store\FlockStore;
    16+
    use Symfony\Component\Lock\Store\SemaphoreStore;
    17+
    use Symfony\Component\Messenger\Envelope;
    18+
    use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
    19+
    use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
    20+
    use Symfony\Component\Messenger\Stamp\ReceivedStamp;
    21+
    use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
    22+
    use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
    23+
    24+
    final class DeduplicateMiddlewareTest extends MiddlewareTestCase
    25+
    {
    26+
    public function testDeduplicateMiddlewareIgnoreIfMessageIsNotLockable()
    27+
    {
    28+
    $message = new DummyMessage('Hello');
    29+
    $envelope = new Envelope($message);
    30+
    31+
    $lockFactory = $this->createMock(LockFactory::class);
    32+
    $lockFactory->expects($this->never())->method('createLock');
    33+
    34+
    $decorator = new DeduplicateMiddleware($lockFactory);
    35+
    36+
    $decorator->handle($envelope, $this->getStackMock(true));
    37+
    }
    38+
    39+
    public function testDeduplicateMiddlewareIfMessageHasKey()
    40+
    {
    41+
    $message = new DummyMessage('Hello');
    42+
    $envelope = new Envelope($message, [new DeduplicateStamp('id')]);
    43+
    44+
    if (SemaphoreStore::isSupported()) {
    45+
    $store = new SemaphoreStore();
    46+
    } else {
    47+
    $store = new FlockStore();
    48+
    }
    49+
    50+
    $decorator = new DeduplicateMiddleware(new LockFactory($store));
    51+
    52+
    $envelope = $decorator->handle($envelope, $this->getStackMock(true));
    53+
    $this->assertNotNull($envelope->last(DeduplicateStamp::class));
    54+
    55+
    $message2 = new DummyMessage('Hello');
    56+
    $envelope2 = new Envelope($message2, [new DeduplicateStamp('id')]);
    57+
    58+
    $decorator->handle($envelope2, $this->getStackMock(false));
    59+
    60+
    // Simulate receiving the first message
    61+
    $envelope = $envelope->with(new ReceivedStamp('transport'));
    62+
    $decorator->handle($envelope, $this->getStackMock(true));
    63+
    64+
    $message3 = new DummyMessage('Hello');
    65+
    $envelope3 = new Envelope($message3, [new DeduplicateStamp('id')]);
    66+
    $decorator->handle($envelope3, $this->getStackMock(true));
    67+
    }
    68+
    }

    src/Symfony/Component/Messenger/composer.json

    +2
    Original file line numberDiff line numberDiff line change
    @@ -29,6 +29,7 @@
    2929
    "symfony/http-kernel": "^6.4|^7.0",
    3030
    "symfony/process": "^6.4|^7.0",
    3131
    "symfony/property-access": "^6.4|^7.0",
    32+
    "symfony/lock": "^6.4|^7.0",
    3233
    "symfony/rate-limiter": "^6.4|^7.0",
    3334
    "symfony/routing": "^6.4|^7.0",
    3435
    "symfony/serializer": "^6.4|^7.0",
    @@ -42,6 +43,7 @@
    4243
    "symfony/event-dispatcher-contracts": "<2.5",
    4344
    "symfony/framework-bundle": "<6.4",
    4445
    "symfony/http-kernel": "<6.4",
    46+
    "symfony/lock": "<6.4",
    4547
    "symfony/serializer": "<6.4"
    4648
    },
    4749
    "autoload": {

    0 commit comments

    Comments
     (0)
    0