8000 [Messenger][RateLimiter] fix additional message handled when using a … · symfony/symfony@0ccdc52 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0ccdc52

Browse files
committed
[Messenger][RateLimiter] fix additional message handled when using a rate limiter
1 parent cf4ef92 commit 0ccdc52

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\EventDispatcher\EventDispatcherInterface;
1616
use Psr\Log\LoggerInterface;
17+
use Symfony\Bridge\PhpUnit\ClockMock;
1718
use Symfony\Component\Clock\MockClock;
1819
use Symfony\Component\EventDispatcher\EventDispatcher;
1920
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
@@ -47,8 +48,8 @@
4748
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4849
use Symfony\Component\Messenger\Worker;
4950
use Symfony\Component\RateLimiter\RateLimiterFactory;
51+
use Symfony\Component\RateLimiter\Reservation;
5052
use Symfony\Component\RateLimiter\Storage\InMemoryStorage;
51-
use Symfony\Contracts\Service\ResetInterface;
5253

5354
/**
5455
* @group time-sensitive
@@ -438,21 +439,21 @@ public function testWorkerRateLimitMessages()
438439
$envelope = [
439440
new Envelope(new DummyMessage('message1')),
440441
new Envelope(new DummyMessage('message2')),
442+
new Envelope(new DummyMessage('message3')),
443+
new Envelope(new DummyMessage('message4')),
441444
];
442445
$receiver = new DummyReceiver([$envelope]);
443446

444447
$bus = $this->createMock(MessageBusInterface::class);
445448
$bus->method('dispatch')->willReturnArgument(0);
446449

447450
$eventDispatcher = new EventDispatcher();
448-
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
451+
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(4));
449452

450453
$rateLimitCount = 0;
451-
$listener = function (WorkerRateLimitedEvent $event) use (&$rateLimitCount) {
454+
$eventDispatcher->addListener(WorkerRateLimitedEvent::class, static function () use (&$rateLimitCount) {
452455
++$rateLimitCount;
453-
$event->getLimiter()->reset(); // Reset limiter to continue test
454-
};
455-
$eventDispatcher->addListener(WorkerRateLimitedEvent::class, $listener);
456+
});
456457

457458
$rateLimitFactory = new RateLimiterFactory([
458459
'id' => 'bus',
@@ -461,11 +462,14 @@ public function testWorkerRateLimitMessages()
461462
'interval' => '1 minute',
462463
], new InMemoryStorage());
463464

465+
ClockMock::register(Reservation::class);
466+
ClockMock::register(InMemoryStorage::class);
467+
464468
$worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory], new MockClock());
465469
$worker->run();
466470

467-
$this->assertCount(2, $receiver->getAcknowledgedEnvelopes());
468-
$this->assertEquals(1, $rateLimitCount);
471+
$this->assertSame(4, $receiver->getAcknowledgeCount());
472+
$this->assertSame(3, $rateLimitCount);
469473
}
470474

471475
public function testWorkerShouldLogOnStop()

src/Symfony/Component/Messenger/Worker.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ private function rateLimit(string $transportName): void
242242

243243
$this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter, $transportName));
244244
$rateLimiter->reserve()->wait();
245+
$rateLimiter->consume();
245246
}
246247

247248
private function flush(bool $force): bool

0 commit comments

Comments
 (0)
0