8000 [Messenger] Avoid skipping batch handlers on flush · symfony/symfony@76588d0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 76588d0

Browse files
Erwin Houtsmanicolas-grekas
authored andcommitted
[Messenger] Avoid skipping batch handlers on flush
1 parent 93ec0f3 commit 76588d0

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
use Symfony\Component\Messenger\Stamp\SentStamp;
4545
use Symfony\Component\Messenger\Stamp\StampInterface;
4646
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
47+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
4748
use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver;
4849
use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver;
4950
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
@@ -574,6 +575,52 @@ public function testFlushBatchOnStop()
574575
$this->assertSame($expectedMessages, $handler->processedMessages);
575576
}
576577

578+
public function testFlushMultipleBatchOnStop()
579+
{
580+
$expectedMessages = [
581+
new DummyMessage('Hey'),
582+
];
583+
584+
$secondHandlerExpectedMessages = [
585+
new SecondHandlerDummyMessage('Ho'),
586+
];
587+
588+
$receiver = new DummyReceiver([
589+
[new Envelope($expectedMessages[0])],
590+
]);
591+
592+
$secondHandlerReceiver = new DummyReceiver([
593+
[new Envelope($secondHandlerExpectedMessages[0])],
594+
]);
595+
596+
$handler = new DummyBatchHandler();
597+
$secondHandler = new SecondDummyBatchHandler();
598+
599+
$middleware = new HandleMessageMiddleware(new HandlersLocator([
600+
DummyMessage::class => [new HandlerDescriptor($handler)],
601+
SecondHandlerDummyMessage::class => [new HandlerDescriptor($secondHandler)],
602+
]));
603+
604+
$bus = new MessageBus([$middleware]);
605+
606+
$dispatcher = new EventDispatcher();
607+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver, $secondHandlerReceiver) {
608+
static $i = 0;
609+
if (1 < ++$i) {
610+
$event->getWorker()->stop();
611+
}
612+
613+
$this->assertSame(0, $receiver->getAcknowledgeCount());
614+
$this->assertSame(0, $secondHandlerReceiver->getAcknowledgeCount());
615+
});
616+
617+
$worker = new Worker(['first' => $receiver, 'second' => $secondHandlerReceiver], $bus, $dispatcher, clock: new MockClock());
618+
$worker->run();
619+
620+
$this->assertSame($expectedMessages, $handler->processedMessages);
621+
$this->assertSame($secondHandlerExpectedMessages, $secondHandler->processedMessages);
622+
}
623+
577624
public function testFlushRemovesNoAutoAckStampOnException()
578625
{
579626
$envelope = new Envelope(new DummyMessage('Test'));
@@ -651,3 +698,44 @@ private function process(array $jobs): void
651698
}
652699
}
653700
}
701+
702+
class SecondDummyBatchHandler implements BatchHandlerInterface
703+
{
704+
use BatchHandlerTrait;
705+
706+
public array $processedMessages;
707+
708+
public function __invoke(SecondHandlerDummyMessage $message, ?Acknowledger $ack = null)
709+
{
710+
return $this->handle($message, $ack);
711+
}
712+
713+
private function shouldFlush(): bool
714+
{
715+
return 5 <= \count($this->jobs);
716+
}
717+
718+
private function process(array $jobs): void
719+
{
720+
$this->processedMessages = array_column($jobs, 0);
721+
722+
foreach ($jobs as [$job, $ack]) {
723+
$ack->ack($job);
724+
}
725+
}
726+
}
727+
728+
class SecondHandlerDummyMessage implements DummyMessageInterface
729+
{
730+
private string $message;
731+
732+
public function __construct(string $message)
733+
{
734+
$this->message = $message;
735+
}
736+
737+
public function getMessage(): string
738+
{
739+
return $this->message;
740+
}
741+
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ private function flush(bool $force): bool
258258
try {
259259
$e = null;
260260
$this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
261-
unset($unacks[$batchHandler], $batchHandler);
262261
} catch (\Throwable $e) {
263262
$envelope = $envelope->withoutAll(NoAutoAckStamp::class);
264263
$this->acks[] = [$transportName, $envelope, $e];

0 commit comments

Comments
 (0)
0