8000 bug #48837 [Messenger] [Redis] Fixed problem where worker stops handl… · symfony/symfony@ba49bda · GitHub
[go: up one dir, main page]

Skip to content

Commit ba49bda

Browse files
bug #48837 [Messenger] [Redis] Fixed problem where worker stops handling messages on first empty message (jvmanji)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] [Redis] Fixed problem where worker stops handling messages on first empty message | Q | A | ------------- | --- | Branch? |5.4 <!-- see below --> | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tickets | Fix #48166 <!-- prefix each issue number with "Fix #", no need to create an issue if none exists, explain below instead --> | License | MIT | Doc PR | n/a <!-- required for new features --> Fixed problem where worker stops handling messages on first empty message. Commits ------- ce103f1 [Messenger] [Redis] Fixed problem where worker stops handling messages on first empty message
2 parents 0c5246f + ce103f1 commit ba49bda

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
1616
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
17+
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver;
18+
use Symfony\Component\Messenger\Envelope;
1719
use Symfony\Component\Messenger\Exception\TransportException;
20+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
1821

1922
/**
2023
* @requires extension redis
@@ -319,6 +322,30 @@ public function testGetAfterReject()
319322
$redis->del('messenger-rejectthenget');
320323
}
321324

325+
public function testItProperlyHandlesEmptyMessages()
326+
{
327+
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
328+
329+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
330+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
331+
332+
$redisReceiver->get();
333+
$this->redis->xtrim('messages', 1);
334+
335+
// The consumer died during handling a message while performing xtrim in parallel process
336+
$this->redis = new \Redis();
337+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
338+
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
339+
340+
/** @var Envelope[] $envelope */
341+
$envelope = $redisReceiver->get();
342+
$this->assertCount(1, $envelope);
343+
344+
$message = $envelope[0]->getMessage();
345+
$this->assertInstanceOf(DummyMessage::class, $message);
346+
$this->assertEquals('Hi2', $message->getMessage());
347+
}
348+
322349
private function getConnectionGroup(Connection $connection): string
323350
{
324351
$property = (new \ReflectionClass(Connection::class))->getProperty('group');

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Exception\TransportException;
1718
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -44,6 +45,18 @@ public function get(): iterable
4445
return [];
4546
}
4647

48+
if (null === $message['data']) {
49+
try {
50+
$this->connection->reject($message['id']);
51+
} catch (TransportException $e) {
52+
if ($e->getPrevious()) {
53+
throw $e;
54+
}
55+
}
56+
57+
return $this->get();
58+
}
59+
4760
$redisEnvelope = json_decode($message['data']['message'] ?? '', true);
4861

4962
if (null === $redisEnvelope) {

0 commit comments

Comments
 (0)
0