8000 [Messenger] allow processing messages in batches · symfony/symfony@f9874c5 · GitHub
[go: up one dir, main page]

Skip to content

Commit f9874c5

Browse files
[Messenger] allow processing messages in batches
1 parent b2fc70b commit f9874c5

14 files changed

+758
-75
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
8+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
89
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
910
* Add support for resetting container services after each messenger message.
1011
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
/**
27+
* @param null|\Closure(\Throwable|null, mixed):void $ack
28+
*/
29+
public function __construct(string $handlerClass, \Closure $ack = null)
30+
{
31+
$this->handlerClass = $handlerClass;
32+
$this->ack = $ack ?? static function () {};
33+
}
34+
35+
/**
36+
* @param mixed $result
37+
*/
38+
public function ack($result = null): void
39+
{
40+
$this->doAck(null, $result);
41+
}
42+
43+
public function nack(\Throwable $error): void
44+
{
45+
$this->doAck($error);
46+
}
47+
48+
public function getError(): ?\Throwable
49+
{
50+
return $this->error;
51+
}
52+
53+
/**
54+
* @return mixed
55+
*/
56+
public function getResult()
57+
{
58+
return $this->result;
59+
}
60+
61+
public function isAcknowledged(): bool
62+
{
63+
return null === $this->ack;
64+
}
65+
66+
public function __destruct()
67+
{
68+
if ($this->ack instanceof \Closure) {
69+
throw new LogicException(sprintf('The acknowledger was not called by batch handler "%s".', $this->handlerClass));
70+
}
71+
}
72+
73+
private function doAck(\Throwable $e = null, $result = null): void
74+
{
75+
if (!$ack = $this->ack) {
76+
throw new LogicException(sprintf('The acknowledger cannot be called twice by batch handler "%s".', $this->handlerClass));
77+
}
78+
$this->ack = null;
79+
$this->error = $e;
80+
$this->result = $result;
81+
$ack($e, $result);
82+
}
83+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*
31+
* @param bool $force Whether flushing is required; it can be skipped if not
32+
*/
33+
public function flush(bool $force): void;
34+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5 2851 +
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function flush(bool $force): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return mixed The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
if (null === $ack) {
44+
$ack = new Acknowledger(get_debug_type($this));
45+
$this->jobs[] = [$message, $ack];
46+
$this->flush(true);
47+
48+
return $ack->getResult();
49+
}
50+
51+
$this->jobs[] = [$message, $ack];
52+
if (!$this->shouldFlush()) {
53+
return \count($this->jobs);
54+
}
55+
56+
$this->flush(true);
57+
58+
return 0;
59+
}
60+
61+
private function shouldFlush(): bool
62+
{
63+
return 10 <= \count($this->jobs);
64+
}
65+
66+
/**
67+
* Completes the jobs in the list.
68+
*
69+
* @list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
70+
*/
71+
private function process(array $jobs): void
72+
{
73+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
74+
}
75+
}

src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,34 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
2527
{
28+
if (!$handler instanceof \Closure) {
29+
$handler = \Closure::fromCallable($handler);
30+
}
31+
2632
$this->handler = $handler;
2733
$this->options = $options;
34+
35+
$r = new \ReflectionFunction($handler);
36+
37+
if (str_contains($r->name, '{closure}')) {
38+
$this->name = 'Closure';
39+
} elseif (!$handler = $r->getClosureThis()) {
40+
$class = $r->getClosureScopeClass();
41+
42+
$this->name = ($class ? $class->name.'::' : '').$r->name;
43+
} else {
44+
if ($handler instanceof BatchHandlerInterface) {
45+
$this->batchHandler = $handler;
46+
}
47+
48+
$this->name = \get_class($handler).'::'.$r->name;
49+
}
2850
}
2951

3052
public function getHandler(): callable
@@ -34,7 +56,7 @@ public function getHandler(): callable
3456

3557
public function getName(): string
3658
{
37-
$name = $this->callableName($this->handler);
59+
$name = $this->name;
3860
$alias = $this->options['alias'] ?? null;
3961

4062
if (null !== $alias) {
@@ -44,37 +66,13 @@ public function getName(): string
4466
return $name;
4567
}
4668

47-
public function getOption(string $option)
69+
public function getBatchHandler(): ?BatchHandlerInterface
4870
{
49-
return $this->options[$option] ?? null;
71+
return $this->batchHandler;
5072
}
5173

52-
private function callableName(callable $handler): string
74+
public function getOption(string $option)
5375
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
60-
}
61-
62-
if (\is_string($handler)) {
63-
return $handler;
64-
}
65-
66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
74-
75-
return $r->name;
76-
}
77-
78-
return \get_class($handler).'::__invoke';
76+
return $this->options[$option] ?? null;
7977
}
8078
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
24+
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2125
use Symfony\Component\Messenger\Stamp\HandledStamp;
26+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2227

2328
/**
2429
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,14 +65,58 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6065

6166
try {
6267
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
68+
$batchHandler = $handlerDescriptor->getBatchHandler();
69+
70+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
71+
/** @var AckStamp $ackStamp */
72+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
73+
if (null !== $e) {
74+
$e = new HandlerFailedException($envelope, [$e]);
75+
} else {
76+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
77+
}
78+
79+
$ackStamp->ack($envelope, $e);
80+
});
81+
82+
$result = $handler($message, $ack);
83+
84+
if (!\is_int($result) || 0 > $result) {
85+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
86+
}
87+
88+
if (!$ack->isAcknowledged()) {
89+
$envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
90+
} elseif ($ack->getError()) {
91+
throw $ack->getError();
92+
} else {
93+
$result = $ack->getResult();
94+
}
95+
} else {
96+
$result = $handler($message);
97+
}
98+
99+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $result);
64100
$envelope = $envelope->with($handledStamp);
65101
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66102
} catch (\Throwable $e) {
67103
$exceptions[] = $e;
68104
}
69105
}
70106

107+
if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) {
108+
/** @var FlushBatchHandlersStamp $flushStamp */
109+
/** @var NoAutoAckStamp $stamp */
110+
foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
111+
try {
112+
$handler = $stamp->getHandlerDescriptor()->getBatchHandler();
113+
$handler->flush($flushStamp->force());
114+
} catch (\Throwable $e) {
115+
$exceptions[] = $e;
116+
}
117+
}
118+
}
119+
71120
if (null === $handler) {
72121
if (!$this->allowNoHandlers) {
73122
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
@@ -85,11 +134,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
85134

86135
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
87136
{
88-
$some = array_filter($envelope
89-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
90-
return $stamp->getHandlerName() === $handlerDescriptor->getName();
91-
});
137+
/** @var HandledStamp $stamp */
138+
foreach ($envelope->all(HandledStamp::class) as $stamp) {
139+
if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
140+
return true;
141+
}
142+
}
92143

93-
return \count($some) > 0;
144+
return false;
94145
}
95146
}

0 commit comments

Comments
 (0)
0