8000 [Messenger] allow processing messages in batches · symfony/symfony@ce13bc6 · GitHub
[go: up one dir, main page]

Skip to content

Commit ce13bc6

Browse files
[Messenger] allow processing messages in batches
1 parent b2fc70b commit ce13bc6

14 files changed

+765
-75
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
8+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
89
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
910
* Add support for resetting container services after each messenger message.
1011
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
/**
27+
* @param null|\Closure(\Throwable|null, mixed):void $ack
28+
*/
29+
public function __construct(string $handlerClass, \Closure $ack = null)
30+
{
31+
$this->handlerClass = $handlerClass;
32+
$this->ack = $ack ?? static function () {};
33+
}
34+
35+
/**
36+
* @param mixed $result
37+
*/
38+
public function ack($result = null): void
39+
{
40+
$this->doAck(null, $result);
41+
}
42+
43+
public function nack(\Throwable $error): void
44+
{
45+
$this->doAck($error);
46+
}
47+
48+
public function getError(): ?\Throwable
49+
{
50+
return $this->error;
51+
}
52+
53+
/**
54+
* @return mixed
55+
*/
56+
public function getResult()
57+
{
58+
return $this->result;
59+
}
60+
61+
public function isAcknowledged(): bool
62+
{
63+
return null === $this->ack;
64+
}
65+
66+
public function __destruct()
67+
{
68+
if ($this->ack instanceof \Closure) {
69+
throw new LogicException(sprintf('The acknowledger was not called by batch handler "%s".', $this->handlerClass));
70+
}
71+
}
72+
73+
private function doAck(\Throwable $e = null, $result = null): void
74+
{
75+
if (!$ack = $this->ack) {
76+
throw new LogicException(sprintf('The acknowledger cannot be called twice by batch handler "%s".', $this->handlerClass));
77+
}
78+
$this->ack = null;
79+
$this->error = $e;
80+
$this->result = $result;
81+
$ack($e, $result);
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*
31+
* @param bool $force Whether flushing is required; it can be skipped if not
32+
*/
33+
public function flush(bool $force): void;
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function flush(bool $force): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+ F438
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return mixed The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
if (null === $ack) {
44+
$ack = new Acknowledger(get_debug_type($this));
45+
$this->jobs[] = [$this->schedule($message), $ack];
46+
$this->flush(true);
47+
48+
return $ack->getResult();
49+
}
50+
51+
$this->jobs[] = [$this->schedule($message), $ack];
52+
if (!$this->shouldFlush()) {
53+
return \count($this->jobs);
54+
}
55+
56+
$this->flush(true);
57+
58+
return 0;
59+
}
60+
61+
private function shouldFlush(): bool
62+
{
63+
return 10 <= \count($this->jobs);
64+
}
65+
66+
/**
67+
* Schedules a message for processing.
68+
*
69+
* @return mixed A value to pass to process() for batch handling
70+
*/
71+
private function schedule(object $message)
72+
{
73+
return $message;
74+
}
75+
76+
/**
77+
* Completes the jobs in the list.
78+
*
79+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
80+
* and their corresponding acknowledgers
81+
*/
82+
private function process(array $jobs): void
83+
{
84+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
85+
}
86+
}

src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

+27-29
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,34 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
2527
{
28+
if (!$handler instanceof \Closure) {
29+
$handler = \Closure::fromCallable($handler);
30+
}
31+
2632
$this->handler = $handler;
2733
$this->options = $options;
34+
35+
$r = new \ReflectionFunction($handler);
36+
37+
if (str_contains($r->name, '{closure}')) {
38+
$this->name = 'Closure';
39+
} elseif (!$handler = $r->getClosureThis()) {
40+
$class = $r->getClosureScopeClass();
41+
42+
$this->name = ($class ? $class->name.'::' : '').$r->name;
43+
} else {
44+
if ($handler instanceof BatchHandlerInterface) {
45+
$this->batchHandler = $handler;
46+
}
47+
48+
$this->name = \get_class($handler).'::'.$r->name;
49+
}
2850
}
2951

3052
public function getHandler(): callable
@@ -34,7 +56,7 @@ public function getHandler(): callable
3456

3557
public function getName(): string
3658
{
37-
$name = $this->callableName($this->handler);
59+
$name = $this->name;
3860
$alias = $this->options['alias'] ?? null;
3961

4062
if (null !== $alias) {
@@ -44,37 +66,13 @@ public function getName(): string
4466
return $name;
4567
}
4668

47-
public function getOption(string $option)
69+
public function getBatchHandler(): ?BatchHandlerInterface
4870
{
49-
return $this->options[$option] ?? null;
71+
return $this->batchHandler;
5072
}
5173

52-
private function callableName(callable $handler): string
74+
public function getOption(string $option)
5375
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
60-
}
61-
62-
if (\is_string($handler)) {
63-
return $handler;
64-
}
65-
66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
74-
75-
return $r->name;
76-
}
77-
78-
return \get_class($handler).'::__invoke';
76+
return $this->options[$option] ?? null;
7977
}
8078
}

0 commit comments

Comments
 (0)
0