8000 [Messenger] Add a way to no ack message automatically · symfony/symfony@934dde6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 934dde6

Browse files
committed
[Messenger] Add a way to no ack message automatically
1 parent 64cbfd2 commit 934dde6

File tree

7 files changed

+133
-2
lines changed

7 files changed

+133
-2
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
88
* Add support for resetting container services after each messenger message.
9+
* Add `ConfigurableAutoAckInterface` and `DelayedAckStamp` to not automatically ACK message.
910

1011
5.3
1112
---
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Marker interface for message handlers to configure if auto ACK is disabled.
18+
*
19+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
20+
*/
21+
interface ConfigurableAutoAckInterface
22+
{
23+
public function isAutoAckDisabled(Envelope $envelope): bool;
24+
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

+6-1
Original file line numberDiff line number 8000 Diff line change
@@ -16,8 +16,10 @@
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
1818
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
19+
use Symfony\Component\Messenger\Handler\ConfigurableAutoAckInterface;
1920
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2021
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
22+
use Symfony\Component\Messenger\Stamp\DelayedAckStamp;
2123
use Symfony\Component\Messenger\Stamp\HandledStamp;
2224

2325
/**
@@ -60,7 +62,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6062

6163
try {
6264
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
65+
if ($handler instanceof ConfigurableAutoAckInterface && $handler->isAutoAckDisabled($envelope)) {
66+
$envelope = $envelope->with(new DelayedAckStamp());
67+
}
68+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message, $envelope));
6469
$envelope = $envelope->with($handledStamp);
6570
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
6671
} catch (\Throwable $e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Apply this stamp to delay ACK of your message on a transport.
16+
*/
17+
final class DelayedAckStamp implements StampInterface
18+
{
19+
}

src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php

+60
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\HandlerFailedException;
1616
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
17+
use Symfony\Component\Messenger\Handler\ConfigurableAutoAckInterface;
1718
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
1819
use Symfony\Component\Messenger\Handler\HandlersLocator;
1920
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
2021
use Symfony\Component\Messenger\Middleware\StackMiddleware;
22+
use Symfony\Component\Messenger\Stamp\DelayedAckStamp;
2123
use Symfony\Component\Messenger\Stamp\HandledStamp;
2224
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2325
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@@ -114,6 +116,45 @@ public function itAddsHandledStampsProvider(): iterable
114116
];
115117
}
116118

119+
/**
120+
* @dataProvider itAddsDelayedAckStampProvider
121+
*/
122+
public function testItAddsDelayedAckStamp($handler, bool $stampIsExpected)
123+
{
124+
$message = new DummyMessage('Hey');
125+
$envelope = new Envelope($message);
126+
127+
$middleware = new HandleMessageMiddleware(new HandlersLocator([
128+
DummyMessage::class => [$handler],
129+
]));
130+
131+
try {
132+
$envelope = $middleware->handle($envelope, $this->getStackMock(true));
133+
} catch (HandlerFailedException $e) {
134+
$envelope = $e->getEnvelope();
135+
}
136+
137+
$this->assertSame($stampIsExpected, null !== $envelope->last(DelayedAckStamp::class));
138+
}
139+
140+
public function itAddsDelayedAckStampProvider(): iterable
141+
{
142+
yield 'It does not add stamp by default' => [
143+
new HandleMessageMiddlewareTestCallable(),
144+
false,
145+
];
146+
147+
yield 'It does not add when object return false' => [
148+
new HandleMessageMiddlewareWithAckConfigurationTestCallable(false),
149+
false,
150+
];
151+
152+
yield 'It adds when object return true' => [
153+
new HandleMessageMiddlewareWithAckConfigurationTestCallable(true),
154+
true,
155+
];
156+
}
157+
117158
public function testThrowsNoHandlerException()
118159
{
119160
$this->expectException(NoHandlerForMessageException::class);
@@ -137,3 +178,22 @@ public function __invoke()
137178
{
138179
}
139180
}
181+
182+
class HandleMessageMiddlewareWithAckConfigurationTestCallable implements ConfigurableAutoAckInterface
183+
{
184+
private $autoAckDisabled;
185+
186+
public function __construct(bool $autoAckDisabled)
187+
{
188+
$this->autoAckDisabled = $autoAckDisabled;
189+
}
190+
191+
public function isAutoAckDisabled(Envelope $envelope): bool
192+
{
193+
return $this->autoAckDisabled;
194+
}
195+
196+
public function __invoke()
197+
{
198+
}
199+
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

+19
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Symfony\Component\Messenger\Exception\RuntimeException;
2525
use Symfony\Component\Messenger\MessageBusInterface;
2626
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
27+
use Symfony\Component\Messenger\Stamp\DelayedAckStamp;
2728
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2829
use Symfony\Component\Messenger\Stamp\SentStamp;
2930
use Symfony\Component\Messenger\Stamp\StampInterface;
@@ -307,6 +308,24 @@ public function testWorkerMessageReceivedEventMutability()
307308
$envelope = current($receiver->getAcknowledgedEnvelopes());
308309
$this->assertCount(1, $envelope->all(\get_class($stamp)));
309310
}
311+
312+
public function testWorkerDoesNotCallAckWhenDelayedAckStamp()
313+
{
314+
$envelope = new Envelope(new DummyMessage('Hello'));
315+
$envelope = $envelope->with(new DelayedAckStamp());
316+
$receiver = new DummyReceiver([[$envelope]]);
317+
318+
$bus = $this->createMock(MessageBusInterface::class);
319+
$bus->method('dispatch')->willReturnArgument(0);
320+
321+
$dispatcher = new EventDispatcher();
322+
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
323+
324+
$worker = new Worker([$receiver], $bus, $dispatcher);
325+
$worker->run();
326+
327+
$this->assertSame(0, $receiver->getAcknowledgeCount());
328+
}
310329
}
311330

312331
class DummyReceiver implements ReceiverInterface

src/Symfony/Component/Messenger/Worker.php

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2525
use Symfony\Component\Messenger\Exception\RuntimeException;
2626
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
27+
use Symfony\Component\Messenger\Stamp\DelayedAckStamp;
2728
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2829
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2930
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -165,7 +166,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
165166
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
166167
}
167168

168-
$receiver->ack($envelope);
169+
if (null === $envelope->last(DelayedAckStamp::class)) {
170+
$receiver->ack($envelope);
171+
}
169172
}
170173

171174
public function stop(): void

0 commit comments

Comments
 (0)
0