8000 Changes ReceiverInterface::handle() to get() to give more control to … · symfony/symfony@d5fd552 · GitHub
[go: up one dir, main page]

Skip to content

Commit d5fd552

Browse files
committed
Changes ReceiverInterface::handle() to get() to give more control to Worker
1 parent 76260e7 commit d5fd552

File tree

6 files changed

+43
-47
lines changed

6 files changed

+43
-47
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CHANGELOG
1010
the transport. See `ConsumeMessagesCommand`.
1111
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
1212
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
13-
`ack()` and `reject()`.
13+
`ack()` and `reject()` and `receive()` was changed to `get()`.
1414
* [BC BREAK] Error handling was moved from the receivers into
1515
`Worker`. Implementations of `ReceiverInterface::handle()`
1616
should now allow all exceptions to be thrown, except for transport

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void
595595

596596
class DummyReceiver implements ReceiverInterface
597597
{
598-
public function receive(callable $handler): void
598+
public function get(string $queue = null): ?Envelope
599599
{
600-
for ($i = 0; $i < 3; ++$i) {
601-
$handler(new Envelope(new DummyMessage("Dummy $i")));
602-
}
600+
return new Envelope(new DummyMessage('Dummy'));
603601
}
604602

605603
public function stop(): void

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,38 +41,33 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4141
/**
4242
* {@inheritdoc}
4343
*/
44-
public function receive(callable $handler): void
44+
public function get(string $queue = null): ?Envelope
4545
{
46-
while (!$this->shouldStop) {
47-
try {
48-
$amqpEnvelope = $this->connection->get();
49-
} catch (\AMQPException $exception) {
50-
throw new TransportException($exception->getMessage(), 0, $exception);
51-
}
52-
53-
if (null === $amqpEnvelope) {
54-
$handler(null);
55-
56-
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
57-
58-
continue;
59-
}
60-
61-
try {
62-
$envelope = $this->serializer->decode([
63-
'body' => $amqpEnvelope->getBody(),
64-
'headers' => $amqpEnvelope->getHeaders(),
65-
]);
66-
} catch (MessageDecodingFailedException $exception) {
67-
// invalid message of some type
68-
$this->rejectAmqpEnvelope($amqpEnvelope);
69-
70-
throw $exception;
71-
}
72-
73-
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
74-
$handler($envelope);
46+
try {
47+
$amqpEnvelope = $this->connection->get($queue);
48+
} catch (\AMQPException $exception) {
49+
throw new TransportException($exception->getMessage(), 0, $exception);
50+
}
51+
52+
if (null === $amqpEnvelope) {
53+
return null;
7554
}
55+
56+
try {
57+
$envelope = $this->serializer->decode([
58+
'body' => $amqpEnvelope->getBody(),
59+
'headers' => $amqpEnvelope->getHeaders(),
60+
]);
61+
} catch (MessageDecodingFailedException $exception) {
62+
// invalid message of some type
63+
$this->rejectAmqpEnvelope($amqpEnvelope);
64+
65+
throw $exception;
66+
}
67+
68+
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
69+
70+
return $envelope;
7671
}
7772

7873
public function ack(Envelope $envelope): void

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali
3737
/**
3838
* {@inheritdoc}
3939
*/
40-
public function receive(callable $handler): void
40+
public function get(string $queue = null): ?Envelope
4141
{
42-
($this->receiver ?? $this->getReceiver())->receive($handler);
42+
return ($this->receiver ?? $this->getReceiver())->get($queue);
4343
}
4444

4545
/**

src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@ interface ReceiverInterface
2525
/**
2626
* Receive some messages to the given handler.
2727
*
28-
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
29-
* Note that this envelope can be `null` if the timeout to receive something has expired.
30-
*
3128
* If the received message cannot be decoded, the message should not
3229
* be retried again (e.g. if there's a queue, it should be removed)
3330
* and a MessageDecodingFailedException should be thrown.
3431
*
3532
* @throws TransportException If there is an issue communicating with the transport
3633
*/
37-
public function receive(callable $handler): void;
34+
public function get(string $queue = null): ?Envelope;
3835

3936
/**
4037
* Stop receiving some messages.

src/Symfony/Component/Messenger/Worker.php

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,25 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
6262
*/
6363
public function run()
6464
{
65+
$shouldStop = false;
6566
if (\function_exists('pcntl_signal')) {
66-
pcntl_signal(SIGTERM, function () {
67-
$this->receiver->stop();
67+
pcntl_signal(SIGTERM, function () use ($shouldStop) {
68+
$shouldStop = true;
6869
});
6970
}
7071

71-
$this->receiver->receive(function (?Envelope $envelope) {
72+
while (false === $shouldStop) {
73+
$envelope = $this->receiver->get();
74+
7275
if (null === $envelope) {
7376
if (\function_exists('pcntl_signal_dispatch')) {
7477
pcntl_signal_dispatch();
7578
}
7679

77-
return;
80+
// TODO - configurable (on transport? on Worker?)
81+
usleep(200000);
82+
83+
continue;
7884
}
7985

8086
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
@@ -124,7 +130,7 @@ public function run()
124130
pcntl_signal_dispatch();
125131
}
126132

127-
return;
133+
continue;
128134
}
129135

130136
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
@@ -138,7 +144,7 @@ public function run()
138144
if (\function_exists('pcntl_signal_dispatch')) {
139145
pcntl_signal_dispatch();
140146
}
141-
});
147+
}
142148
}
143149

144150
private function dispatchEvent(Event $event)

0 commit comments

Comments
 (0)
0