8000 Changes ReceiverInterface::handle() to get() to give more control to … · symfony/symfony@2d1f17b · GitHub
[go: up one dir, main page]

Skip to content

Commit 2d1f17b

Browse files
committed
Changes ReceiverInterface::handle() to get() to give more control to Worker
1 parent 76260e7 commit 2d1f17b

25 files changed

+608
-651
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ CHANGELOG
1010
the transport. See `ConsumeMessagesCommand`.
1111
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
1212
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
13-
`ack()` and `reject()`.
13+
`ack()` and `reject()`. Also `receive()` was changed to `get()`
14+
and `stop()` was removed.
1415
* [BC BREAK] Error handling was moved from the receivers into
1516
`Worker`. Implementations of `ReceiverInterface::handle()`
1617
should now allow all exceptions to be thrown, except for transport

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
use Symfony\Component\Console\Style\SymfonyStyle;
2323
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2424
use Symfony\Component\Messenger\RoutableMessageBus;
25-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
26-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
27-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
2825
use Symfony\Component\Messenger\Worker;
26+
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
27+
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
28+
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
2929

3030
/**
3131
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -152,20 +152,21 @@ protected function execute(InputInterface $input, OutputInterface $output): void
152152
$bus = new RoutableMessageBus($this->busLocator);
153153
}
154154

155+
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
155156
$stopsWhen = [];
156157
if ($limit = $input->getOption('limit')) {
157158
$stopsWhen[] = "processed {$limit} messages";
158-
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
159+
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
159160
}
160161

161162
if ($memoryLimit = $input->getOption('memory-limit')) {
162163
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
163-
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
164+
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
164165
}
165166

166167
if ($timeLimit = $input->getOption('time-limit')) {
167168
$stopsWhen[] = "been running for {$timeLimit}s";
168-
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
169+
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
169170
}
170171

171172
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -183,7 +184,6 @@ protected function execute(InputInterface $input, OutputInterface $output): void
183184
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
184185
}
185186

186-
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
187187
$worker->run();
188188
}
189189

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void
595595

596596
class DummyReceiver implements ReceiverInterface
597597
{
598-
public function receive(callable $handler): void
598+
public function get(string $queue = null): ?Envelope
599599
{
600-
for ($i = 0; $i < 3; ++$i) {
601-
$handler(new Envelope(new DummyMessage("Dummy $i")));
602-
}
600+
return new Envelope(new DummyMessage('Dummy'));
603601
}
604602

605603
public function stop(): void

src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Tests\Fixtures;
4+
5+
use Symfony\Component\Messenger\WorkerInterface;
6+
7+
class DummyWorker implements WorkerInterface
8+
{
9+
private $isStopped = false;
10+
private $envelopesToReceive;
11+
private $envelopesHandled = 0;
12+
13+
public function __construct(array $envelopesToReceive)
14+
{
15+
$this->envelopesToReceive = $envelopesToReceive;
16+
}
17+
18+
public function run(callable $onHandledCallback = null): void
19+
{
20+
foreach ($this->envelopesToReceive as $envelope) {
21+
if (true === $this->isStopped) {
22+
break;
23+
}
24+
25+
if ($onHandledCallback) {
26+
$onHandledCallback($envelope);
27+
++$this->envelopesHandled;
28+
}
29+
}
30+
}
31+
32+
public function stop(): void
33+
{
34+
$this->isStopped = true;
35+
}
36+
37+
public function isStopped(): bool
38+
{
39+
return $this->isStopped;
40+
}
41+
42+
public function countEnvelopesHandled()
43+
{
44+
return $this->envelopesHandled;
45+
}
46+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Lines changed: 30 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,14 @@ public function testItSendsAndReceivesMessages()
5757
$sender->send($first = new Envelope(new DummyMessage('First')));
5858
$sender->send($second = new Envelope(new DummyMessage('Second')));
5959

60-
$receivedMessages = 0;
61-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
62-
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
63-
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
64-
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
65-
66-
if (2 === ++$receivedMessages) {
67-
$receiver->stop();
68-
}
69-
});
60+
$envelope = $receiver->get();
61+
$this->assertEquals($first->getMessage(), $envelope->getMessage());
62+
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
63+
64+
$envelope = $receiver->get();
65+
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
66+
67+
$this->assertNull($receiver->get());
7068
}
7169

7270
public function testRetryAndDelay()
@@ -82,50 +80,32 @@ public function testRetryAndDelay()
8280

8381
$sender->send($first = new Envelope(new DummyMessage('First')));
8482

85-
$receivedMessages = 0;
86-
$startTime = time();
87-
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
88-
if (null === $envelope) {
89-
// if we have been processing for 4 seconds + have received 2 messages
90-
// then it's safe to say no other messages will be received
91-
if (time() > $startTime + 4 && 2 === $receivedMessages) {
92-
$receiver->stop();
93-
}
94-
95-
return;
96-
}
97-
98-
++$receivedMessages;
99-
100-
// retry the first time
101-
if (1 === $receivedMessages) {
102-
// imitate what Worker does
103-
$envelope = $envelope
104-
->with(new DelayStamp(2000))
105-
->with(new RedeliveryStamp(1, 'not_important'));
106-
$sender->send($envelope);
107-
$receiver->ack($envelope);
83+
$envelope = $receiver->get();
84+
$newEnvelope = $envelope
85+
->with(new DelayStamp(2000))
86+
->with(new RedeliveryStamp(1, 'not_important'));
87+
$sender->send($newEnvelope);
88+
$receiver->ack($envelope);
10889

109-
return;
110-
}
111-
112-
if (2 === $receivedMessages) {
113-
// should have a 2 second delay
114-
$this->assertGreaterThanOrEqual($startTime + 2, time());
115-
// but only a 2 second delay
116-
$this->assertLessThan($startTime + 4, time());
90+
$envelope = null;
91+
$startTime = time();
92+
// wait for next message, but only for max 3 seconds
93+
while (null === $envelope && $startTime + 3 > time()) {
94+
$envelope = $receiver->get();
95+
}
11796

118-
/** @var RedeliveryStamp|null $retryStamp */
119-
// verify the stamp still exists from the last send
120-
$retryStamp = $envelope->last(RedeliveryStamp::class);
121-
$this->assertNotNull($retryStamp);
122-
$this->assertSame(1, $retryStamp->getRetryCount());
97+
// should have a 2 second delay
98+
$this->assertGreaterThanOrEqual($startTime + 2, time());
99+
// but only a 2 second delay
100+
$this->assertLessThan($startTime + 4, time());
123101

124-
$receiver->ack($envelope);
102+
/** @var RedeliveryStamp|null $retryStamp */
103+
// verify the stamp still exists from the last send
104+
$retryStamp = $envelope->last(RedeliveryStamp::class);
105+
$this->assertNotNull($retryStamp);
106+
$this->assertSame(1, $retryStamp->getRetryCount());
125107

126-
return;
127-
}
128-
});
108+
$receiver->ack($envelope);
129109
}
130110

131111
public function testItReceivesSignals()
@@ -175,29 +155,6 @@ public function testItReceivesSignals()
175155
, $process->getOutput());
176156
}
177157

178-
/**
179-
* @runInSeparateProcess
180-
*/
181-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
182-
{
183-
$serializer = $this->createSerializer();
184-
185-
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
186-
$connection->setup();
187-
$connection->queue()->purge();
188-
189-
$receiver = new AmqpReceiver($connection, $serializer);
190-
191-
$receivedMessages = 0;
192-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
193-
$this->assertNull($envelope);
194-
195-
if (2 === ++$receivedMessages) {
196-
$receiver->stop();
197-
}
198-
});
199-
}
200-
201158
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
202159
{
203160
$timedOutTime = time() + $timeoutInSeconds;

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
1718
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
1819
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1920
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2022
use Symfony\Component\Serializer as SerializerComponent;
2123
use Symfony\Component\Serializer\Encoder\JsonEncoder;
2224
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@@ -26,7 +28,7 @@
2628
*/
2729
class AmqpReceiverTest extends TestCase
2830
{
29-
public function testItSendTheDecodedMessageToTheHandler()
31+
public function testItReturnsTheDecodedMessageToTheHandler()
3032
{
3133
$serializer = new Serializer(
3234
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
@@ -37,52 +39,38 @@ public function testItSendTheDecodedMessageToTheHandler()
3739
$connection->method('get')->willReturn($amqpEnvelope);
3840

3941
$receiver = new AmqpReceiver($connection, $serializer);
40-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
41-
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
42-
$receiver->stop();
43-
});
42+
$actualEnvelope = $receiver->get();
43+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
4444
}
4545

4646
/**
4747
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
4848
*/
4949
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5050
{
51-
$serializer = new Serializer(
52-
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
53-
);
54-
51+
$serializer = $this->createMock(SerializerInterface::class);
5552
$amqpEnvelope = $this->createAMQPEnvelope();
5653
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
5754
$connection->method('get')->willReturn($amqpEnvelope);
5855
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
5956

6057
$receiver = new AmqpReceiver($connection, $serializer);
61-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
62-
$receiver->ack($envelope);
63-
$receiver->stop();
64-
});
58+
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
6559
}
6660

6761
/**
6862
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
6963
*/
7064
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
7165
{
72-
$serializer = new Serializer(
73-
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
74-
);
75-
66+
$serializer = $this->createMock(SerializerInterface::class);
7667
$amqpEnvelope = $this->createAMQPEnvelope();
7768
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
7869
$connection->method('get')->willReturn($amqpEnvelope);
7970
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
8071

8172
$receiver = new AmqpReceiver($connection, $serializer);
82-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
83-
$receiver->reject($envelope);
84-
$receiver->stop();
85-
});
73+
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
8674
}
8775

8876
private function createAMQPEnvelope()

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,8 @@ public function testReceivesMessages()
4747
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
4848
$connection->method('get')->willReturn($amqpEnvelope);
4949

50-
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
51-
$this->assertSame($decodedMessage, $envelope->getMessage());
52-
53-
$transport->stop();
54-
});
50+
$envelope = $transport->get();
51+
$this->assertSame($decodedMessage, $envelope->getMessage());
5552
}
5653

5754
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

0 commit comments

Comments
 (0)
0