From c4180fb0ef743572f83856c888a14e16fd681d3b Mon Sep 17 00:00:00 2001 From: soyuka Date: Fri, 28 Sep 2018 17:26:27 +0200 Subject: [PATCH] Implement redis transport --- .../Transport/RedisExt/ConnectionTest.php | 54 ++++++ .../RedisExt/Fixtures/long_receiver.php | 43 +++++ .../RedisExt/RedisExtIntegrationTest.php | 146 ++++++++++++++++ .../Transport/RedisExt/RedisReceiverTest.php | 118 +++++++++++++ .../Transport/RedisExt/RedisSenderTest.php | 40 +++++ .../RedisExt/RedisTransportFactoryTest.php | 43 +++++ .../Transport/RedisExt/RedisTransportTest.php | 61 +++++++ .../Transport/RedisExt/Connection.php | 156 ++++++++++++++++++ .../RejectMessageExceptionInterface.php | 25 +++ .../Transport/RedisExt/RedisReceiver.php | 71 ++++++++ .../Transport/RedisExt/RedisSender.php | 39 +++++ .../Transport/RedisExt/RedisTransport.php | 68 ++++++++ .../RedisExt/RedisTransportFactory.php | 40 +++++ 13 files changed, 904 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php create mode 100644 src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php new file mode 100644 index 0000000000000..c585bf20811fa --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; + +/** + * @requires extension redis + */ +class ConnectionTest extends TestCase +{ + /** + * @expectedException \InvalidArgumentException + * @expectedExceptionMessage The given Redis DSN "redis://" is invalid. + */ + public function testItCannotBeConstructedWithAWrongDsn() + { + Connection::fromDsn('redis://'); + } + + public function testItGetsParametersFromTheDsn() + { + $this->assertEquals( + new Connection('queue', array( + 'host' => 'localhost', + 'port' => 6379, + )), + Connection::fromDsn('redis://localhost/queue') + ); + } + + public function testOverrideOptionsViaQueryParameters() + { + $this->assertEquals( + new Connection('queue', array( + 'host' => '127.0.0.1', + 'port' => 6379, + ), array( + 'processing_ttl' => '8000', + )), + Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000') + ); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php new file mode 100644 index 0000000000000..4d78e9478e062 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php @@ -0,0 +1,43 @@ + new JsonEncoder())) +); + +$connection = Connection::fromDsn(getenv('DSN')); +$receiver = new RedisReceiver($connection, $serializer); + +$worker = new Worker($receiver, new class() implements MessageBusInterface { + public function dispatch($envelope) + { + echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n"; + echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT)); + + sleep(30); + echo "Done.\n"; + } +}); + +echo "Receiving messages...\n"; +$worker->run(); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php new file mode 100644 index 0000000000000..ee43f30a3ced9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php @@ -0,0 +1,146 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver; +use Symfony\Component\Messenger\Transport\RedisExt\RedisSender; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Process\PhpProcess; +use Symfony\Component\Process\Process; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +/** + * @requires extension redis + */ +class RedisExtIntegrationTest extends TestCase +{ + protected function setUp() + { + parent::setUp(); + + if (!getenv('MESSENGER_REDIS_DSN')) { + $this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.'); + } + } + + public function testItSendsAndReceivesMessages() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); + + $sender = new RedisSender($connection, $serializer); + $receiver = new RedisReceiver($connection, $serializer); + + $sender->send($first = Envelope::wrap(new DummyMessage('First'))); + $sender->send($second = Envelope::wrap(new DummyMessage('Second'))); + + $receivedMessages = 0; + $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { + $this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope); + + if (2 === ++$receivedMessages) { + $receiver->stop(); + } + }); + } + + public function testItReceivesSignals() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN')); + + $sender = new RedisSender($connection, $serializer); + $sender->send(Envelope::wrap(new DummyMessage('Hello'))); + + $amqpReadTimeout = 30; + $dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout; + $process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array( + 'COMPONENT_ROOT' => __DIR__.'/../../../', + 'DSN' => $dsn, + )); + + $process->start(); + + $this->waitForOutput($process, $expectedOutput = "Receiving messages...\n"); + + $signalTime = microtime(true); + $timedOutTime = time() + 10; + + $process->signal(15); + + while ($process->isRunning() && time() < $timedOutTime) { + usleep(100 * 1000); // 100ms + } + + $this->assertFalse($process->isRunning()); + $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); + $this->assertSame($expectedOutput.<<<'TXT' +Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage +with items: [ + "Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage" +] +Done. + +TXT + , $process->getOutput()); + } + + /** + * @runInSeparateProcess + */ + public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1')); + + $receiver = new RedisReceiver($connection, $serializer); + + $receivedMessages = 0; + $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { + $this->assertNull($envelope); + + if (2 === ++$receivedMessages) { + $receiver->stop(); + } + }); + } + + private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) + { + $timedOutTime = time() + $timeoutInSeconds; + + while (time() < $timedOutTime) { + if (0 === strpos($process->getOutput(), $output)) { + return; + } + + usleep(100 * 1000); // 100ms + } + + throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php new file mode 100644 index 0000000000000..476d27c845932 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php @@ -0,0 +1,118 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +/** + * @requires extension redis + */ +class RedisReceiverTest extends TestCase +{ + public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = Envelope::wrap(new DummyMessage('Hi')); + $encoded = $serializer->encode($envelope); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('waitAndGet')->willReturn($encoded); + + $connection->expects($this->once())->method('ack')->with($encoded); + + $receiver = new RedisReceiver($connection, $serializer); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); + $receiver->stop(); + }); + } + + public function testItSendNoMessageToTheHandler() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('waitAndGet')->willReturn(null); + + $receiver = new RedisReceiver($connection, $serializer); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $this->assertNull($envelope); + $receiver->stop(); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException + */ + public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = Envelope::wrap(new DummyMessage('Hi')); + $encoded = $serializer->encode($envelope); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('waitAndGet')->willReturn($encoded); + $connection->expects($this->once())->method('requeue')->with($encoded); + + $receiver = new RedisReceiver($connection, $serializer); + $receiver->receive(function () { + throw new InterruptException('Well...'); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException + */ + public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = Envelope::wrap(new DummyMessage('Hi')); + $encoded = $serializer->encode($envelope); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('waitAndGet')->willReturn($encoded); + $connection->expects($this->once())->method('reject')->with($encoded); + + $receiver = new RedisReceiver($connection, $serializer); + $receiver->receive(function () { + throw new WillNeverWorkException('Well...'); + }); + } +} + +class InterruptException extends \Exception +{ +} + +class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php new file mode 100644 index 0000000000000..06d6c929e638e --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisSender; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @requires extension redis + */ +class RedisSenderTest extends TestCase +{ + public function testItSendsTheEncodedMessage() + { + $envelope = Envelope::wrap(new DummyMessage('Oy')); + $encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class)); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('add')->with($encoded); + + $sender = new RedisSender($connection, $serializer); + $sender->send($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php new file mode 100644 index 0000000000000..b4ac961473edc --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php @@ -0,0 +1,43 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +class RedisTransportFactoryTest extends TestCase +{ + public function testSupportsOnlyRedisTransports() + { + $factory = new RedisTransportFactory( + $this->getMockBuilder(SerializerInterface::class)->getMock() + ); + + $this->assertTrue($factory->supports('redis://localhost', array())); + $this->assertFalse($factory->supports('sqs://localhost', array())); + $this->assertFalse($factory->supports('invalid-dsn', array())); + } + + public function testItCreatesTheTransport() + { + $factory = new RedisTransportFactory( + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock() + ); + + $expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', array('foo' => 'bar'), true), $serializer); + + $this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', array('foo' => 'bar'))); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php new file mode 100644 index 0000000000000..c0a3c1a04c7ba --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @requires extension redis + */ +class RedisTransportTest extends TestCase +{ + public function testItIsATransport() + { + $transport = $this->getTransport(); + + $this->assertInstanceOf(TransportInterface::class, $transport); + } + + public function testReceivesMessages() + { + $transport = $this->getTransport( + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(), + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock() + ); + + $decodedMessage = new DummyMessage('Decoded.'); + $encodedMessage = array('body' => 'body', 'headers' => array('my' => 'header')); + + $serializer->method('decode')->with($encodedMessage)->willReturn(Envelope::wrap($decodedMessage)); + $connection->method('waitAndGet')->willReturn($encodedMessage); + + $transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) { + $this->assertSame($decodedMessage, $envelope->getMessage()); + + $transport->stop(); + }); + } + + private function getTransport(SerializerInterface $serializer = null, Connection $connection = null) + { + $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock(); + $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + + return new RedisTransport($connection, $serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php new file mode 100644 index 0000000000000..55601d7c5b9cd --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -0,0 +1,156 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Exception\InvalidArgumentException; + +/** + * @author Antoine Bluchet + */ +class Connection +{ + const PROCESSING_QUEUE_SUFFIX = '_processing'; + const DEFAULT_CONNECTION_CREDENTIALS = array('host' => '127.0.0.1', 'port' => 6379); + const DEFAULT_REDIS_OPTIONS = array('serializer' => \Redis::SERIALIZER_PHP, 'processing_ttl' => 10000, 'blocking_timeout' => 1000); + + /** + * @var \Redis + */ + private $connection; + + /** + * @var string + */ + private $queue; + + public function __construct(string $queue, array $connectionCredentials = self::DEFAULT_CONNECTION_CREDENTIALS, array $redisOptions = self::DEFAULT_REDIS_OPTIONS) + { + $this->connection = new \Redis(); + $this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379); + $this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP); + // We force this because we rely on the fact that redis doesn't timeout with bRPopLPush + $this->connection->setOption(\Redis::OPT_READ_TIMEOUT, -1); + $this->queue = $queue; + $this->processingTtl = $redisOptions['processing_ttl'] ?? self::DEFAULT_REDIS_OPTIONS['processing_ttl']; + $this->blockingTimeout = $redisOptions['blocking_timeout'] ?? self::DEFAULT_REDIS_OPTIONS['blocking_timeout']; + } + + public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_REDIS_OPTIONS): self + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn)); + } + + $queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : $redisOptions['queue'] ?? 'messages'; + $connectionCredentials = array( + 'host' => $parsedUrl['host'] ?? '127.0.0.1', + 'port' => $parsedUrl['port'] ?? 6379, + ); + + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $parsedQuery); + $redisOptions = array_replace_recursive($redisOptions, $parsedQuery); + } + + return new self($queue, $connectionCredentials, $redisOptions); + } + + /** + * Takes last element (tail) of the list and add it to the processing queue (head - blocking) + * Also sets a key with TTL that will be checked by the `doCheck` method. + */ + public function waitAndGet(): ?array + { + $this->doCheck(); + $value = $this->connection->bRPopLPush($this->queue, $this->queue.self::PROCESSING_QUEUE_SUFFIX, $this->blockingTimeout); + + // false in case of timeout + if (false === $value) { + return null; + } + + $key = md5($value['body']); + $this->connection->set($key, 1, array('px' => $this->processingTtl)); + + return $value; + } + + /** + * Acknowledge the message: + * 1. Remove the ttl key + * 2. LREM the message from the processing list. + */ + public function ack($message) + { + $key = md5($message['body']); + $processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX; + $this->connection->multi() + ->lRem($processingQueue, $message) + ->del($key) + ->exec(); + } + + /** + * Reject the message: we acknowledge it, means we remove it form the queues. + * + * @TODO: log something? + */ + public function reject($message) + { + $this->ack($message); + } + + /** + * Requeue - add it back to the queue + * All we have to do is to make our key expire and let the `doCheck` system manage it. + */ + public function requeue($message) + { + $key = md5($message['body']); + $this->connection->expire($key, -1); + } + + /** + * Add item at the tail of list. + */ + public function add($message) + { + $this->connection->lpush($this->queue, $message); + } + + /** + * The check: + * 1. Get the processing queue items + * 2. Check if the TTL is over + * 3. If it is, rpush back the message to the origin queue. + */ + private function doCheck() + { + $processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX; + $pending = $this->connection->lRange($processingQueue, 0, -1); + + foreach ($pending as $temp) { + $key = md5($temp['body']); + + if ($this->connection->ttl($key) > 0) { + continue; + } + + $this->connection + ->multi() + ->del($key) + ->lRem($processingQueue, $temp, 1) + ->rPush($this->queue, $temp) + ->exec(); + } + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php new file mode 100644 index 0000000000000..944bacaf77274 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt\Exception; + +/** + * If something goes wrong while consuming and handling a message from the Redis broker, there are two choices: rejecting + * or re-queuing the message. + * + * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will + * be rejected. Otherwise, it will be re-queued. + * + * @author Antoine Bluchet + */ +interface RejectMessageExceptionInterface extends \Throwable +{ +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php new file mode 100644 index 0000000000000..365984f4f65a3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Antoine Bluchet + */ +class RedisReceiver implements ReceiverInterface +{ + private $connection; + private $serializer; + private $shouldStop = false; + + public function __construct(Connection $connection, SerializerInterface $serializer) + { + $this->connection = $connection; + $this->serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function receive(callable $handler): void + { + while (!$this->shouldStop) { + if (null === $message = $this->connection->waitAndGet()) { + $handler(null); + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + continue; + } + + try { + $handler($this->serializer->decode($message)); + $this->connection->ack($message); + } catch (RejectMessageExceptionInterface $e) { + $this->connection->reject($message); + + throw $e; + } catch (\Throwable $e) { + $this->connection->requeue($message); + + throw $e; + } finally { + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + } + } + } + + public function stop(): void + { + $this->shouldStop = true; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php new file mode 100644 index 0000000000000..79c05b70058da --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Antoine Bluchet + */ +class RedisSender implements SenderInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer) + { + $this->connection = $connection; + $this->serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope) + { + $this->connection->add($this->serializer->encode($envelope)); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php new file mode 100644 index 0000000000000..ce5e0ad873ff5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php @@ -0,0 +1,68 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Antoine Bluchet + */ +class RedisTransport implements TransportInterface +{ + private $connection; + private $serializer; + private $receiver; + private $sender; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? Serializer::create(); + } + + /** + * {@inheritdoc} + */ + public function receive(callable $handler): void + { + ($this->receiver ?? $this->getReceiver())->receive($handler); + } + + /** + * {@inheritdoc} + */ + public function stop(): void + { + ($this->receiver ?? $this->getReceiver())->stop(); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): void + { + ($this->sender ?? $this->getSender())->send($envelope); + } + + private function getReceiver() + { + return $this->receiver = new RedisReceiver($this->connection, $this->serializer); + } + + private function getSender() + { + return $this->sender = new RedisSender($this->connection, $this->serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php new file mode 100644 index 0000000000000..84285114fefa4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Antoine Bluchet + */ +class RedisTransportFactory implements TransportFactoryInterface +{ + private $serializer; + + public function __construct(SerializerInterface $serializer = null) + { + $this->serializer = $serializer ?? Serializer::create(); + } + + public function createTransport(string $dsn, array $options): TransportInterface + { + return new RedisTransport(Connection::fromDsn($dsn, $options), $this->serializer); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'redis://'); + } +}