|
44 | 44 | use Symfony\Component\Messenger\Stamp\SentStamp; |
45 | 45 | use Symfony\Component\Messenger\Stamp\StampInterface; |
46 | 46 | use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; |
| 47 | +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface; |
47 | 48 | use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver; |
48 | 49 | use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; |
49 | 50 | use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; |
@@ -574,6 +575,52 @@ public function testFlushBatchOnStop() |
574 | 575 | $this->assertSame($expectedMessages, $handler->processedMessages); |
575 | 576 | } |
576 | 577 |
|
| 578 | + public function testFlushMultipleBatchOnStop() |
| 579 | + { |
| 580 | + $expectedMessages = [ |
| 581 | + new DummyMessage('Hey'), |
| 582 | + ]; |
| 583 | + |
| 584 | + $secondHandlerExpectedMessages = [ |
| 585 | + new SecondHandlerDummyMessage('Ho'), |
| 586 | + ]; |
| 587 | + |
| 588 | + $receiver = new DummyReceiver([ |
| 589 | + [new Envelope($expectedMessages[0])], |
| 590 | + ]); |
| 591 | + |
| 592 | + $secondHandlerReceiver = new DummyReceiver([ |
| 593 | + [new Envelope($secondHandlerExpectedMessages[0])], |
| 594 | + ]); |
| 595 | + |
| 596 | + $handler = new DummyBatchHandler(); |
| 597 | + $secondHandler = new SecondDummyBatchHandler(); |
| 598 | + |
| 599 | + $middleware = new HandleMessageMiddleware(new HandlersLocator([ |
| 600 | + DummyMessage::class => [new HandlerDescriptor($handler)], |
| 601 | + SecondHandlerDummyMessage::class => [new HandlerDescriptor($secondHandler)], |
| 602 | + ])); |
| 603 | + |
| 604 | + $bus = new MessageBus([$middleware]); |
| 605 | + |
| 606 | + $dispatcher = new EventDispatcher(); |
| 607 | + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver, $secondHandlerReceiver) { |
| 608 | + static $i = 0; |
| 609 | + if (1 < ++$i) { |
| 610 | + $event->getWorker()->stop(); |
| 611 | + } |
| 612 | + |
| 613 | + $this->assertSame(0, $receiver->getAcknowledgeCount()); |
| 614 | + $this->assertSame(0, $secondHandlerReceiver->getAcknowledgeCount()); |
| 615 | + }); |
| 616 | + |
| 617 | + $worker = new Worker(['first' => $receiver, 'second' => $secondHandlerReceiver], $bus, $dispatcher, clock: new MockClock()); |
| 618 | + $worker->run(); |
| 619 | + |
| 620 | + $this->assertSame($expectedMessages, $handler->processedMessages); |
| 621 | + $this->assertSame($secondHandlerExpectedMessages, $secondHandler->processedMessages); |
| 622 | + } |
| 623 | + |
577 | 624 | public function testFlushRemovesNoAutoAckStampOnException() |
578 | 625 | { |
579 | 626 | $envelope = new Envelope(new DummyMessage('Test')); |
@@ -651,3 +698,44 @@ private function process(array $jobs): void |
651 | 698 | } |
652 | 699 | } |
653 | 700 | } |
| 701 | + |
| 702 | +class SecondDummyBatchHandler implements BatchHandlerInterface |
| 703 | +{ |
| 704 | + use BatchHandlerTrait; |
| 705 | + |
| 706 | + public array $processedMessages; |
| 707 | + |
| 708 | + public function __invoke(SecondHandlerDummyMessage $message, ?Acknowledger $ack = null) |
| 709 | + { |
| 710 | + return $this->handle($message, $ack); |
| 711 | + } |
| 712 | + |
| 713 | + private function shouldFlush(): bool |
| 714 | + { |
| 715 | + return 5 <= \count($this->jobs); |
| 716 | + } |
| 717 | + |
| 718 | + private function process(array $jobs): void |
| 719 | + { |
| 720 | + $this->processedMessages = array_column($jobs, 0); |
| 721 | + |
| 722 | + foreach ($jobs as [$job, $ack]) { |
| 723 | + $ack->ack($job); |
| 724 | + } |
| 725 | + } |
| 726 | +} |
| 727 | + |
| 728 | +class SecondHandlerDummyMessage implements DummyMessageInterface |
| 729 | +{ |
| 730 | + private string $message; |
| 731 | + |
| 732 | + public function __construct(string $message) |
| 733 | + { |
| 734 | + $this->message = $message; |
| 735 | + } |
| 736 | + |
| 737 | + public function getMessage(): string |
| 738 | + { |
| 739 | + return $this->message; |
| 740 | + } |
| 741 | +} |
0 commit comments