10000 [Messenger] [Redis] Fixed problem where worker stops handling message… · symfony/symfony@ce103f1 · GitHub
[go: up one dir, main page]

Skip to content

Commit ce103f1

Browse files
jvmanjinicolas-grekas
authored andcommitted
[Messenger] [Redis] Fixed problem where worker stops handling messages on first empty message
1 parent 003ccbd commit ce103f1

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
1616
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
17+
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver;
18+
use Symfony\Component\Messenger\Envelope;
1719
use Symfony\Component\Messenger\Exception\TransportException;
20+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
1821

1922
/**
2023
* @requires extension redis
24+
*
2125
* @group time-sensitive
2226
* @group integration
2327
*/
@@ -318,6 +322,30 @@ public function testGetAfterReject()
318322
$redis->del('messenger-rejectthenget');
319323
}
320324

325+
public function testItProperlyHandlesEmptyMessages()
326+
{
327+
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
328+
329+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
330+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
331+
332+
$redisReceiver->get();
333+
$this->redis->xtrim('messages', 1);
334+
335+
// The consumer died during handling a message while performing xtrim in parallel process
336+
$this->redis = new \Redis();
337+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
338+
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
339+
340+
/** @var Envelope[] $envelope */
341+
$envelope = $redisReceiver->get();
342+
$this->assertCount(1, $envelope);
343+
344+
$message = $envelope[0]->getMessage();
345+
$this->assertInstanceOf(DummyMessage::class, $message);
346+
$this->assertEquals('Hi2', $message->getMessage());
347+
}
348+
321349
private function getConnectionGroup(Connection $connection): string
322350
{
323351
$property = (new \ReflectionClass(Connection::class))->getProperty('group');

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Exception\TransportException;
1718
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -44,6 +45,18 @@ public function get(): iterable
4445
return [];
4546
}
4647

48+
if (null === $message['data']) {
49+
try {
50+
$this->connection->reject($message['id']);
51+
} catch (TransportException $e) {
52+
if ($e->getPrevious()) {
53+
throw $e;
54+
}
55+
}
56+
57+
return $this->get();
58+
}
59+
4760
$redisEnvelope = json_decode($message['data']['message'] ?? '', true);
4861

4962
if (null === $redisEnvelope) {

0 commit comments

Comments
 (0)
0