8000 [Messenger] Add a redis transport by soyuka · Pull Request #28681 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add a redis transport #28681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('redis://');
}

public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection('queue', array(
'host' => 'localhost',
'port' => 6379,
)),
Connection::fromDsn('redis://localhost/queue')
);
}

public function testOverrideOptionsViaQueryParameters()
{
$this->assertEquals(
new Connection('queue', array(
'host' => '127.0.0.1',
'port' => 6379,
), array(
'processing_ttl' => '8000',
)),
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

$componentRoot = $_SERVER['COMPONENT_ROOT'];

if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
}

if (!file_exists($autoload)) {
exit('You should run "composer install --dev" in the component before running this script.');
}

require_once $autoload;

use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new RedisReceiver($connection, $serializer);

$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($envelope)
{
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));

sleep(30);
echo "Done.\n";
}
});

echo "Receiving messages...\n";
$worker->run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* @requires extension redis
*/
class RedisExtIntegrationTest extends TestCase
{
protected function setUp()
{
parent::setUp();

if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}
}

public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$receiver = new RedisReceiver($connection, $serializer);

$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

public function testItReceivesSignals()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$sender->send(Envelope::wrap(new DummyMessage('Hello')));

$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
'COMPONENT_ROOT' => __DIR__.'/../../../',
'DSN' => $dsn,
));

$process->start();

$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");

$signalTime = microtime(true);
$timedOutTime = time() + 10;

$process->signal(15);

while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}

$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.

TXT
, $process->getOutput());
}

/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));

$receiver = new RedisReceiver($connection, $serializer);

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;

while (time() < $timedOutTime) {
if (0 === strpos($process->getOutput(), $output)) {
return;
}

usleep(100 * 1000); // 100ms
}

throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* @requires extension redis
*/
class RedisReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);

$connection->expects($this->once())->method('ack')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

public function testItSendNoMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn(null);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertNull($envelope);
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('requeue')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('reject')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}
}

class InterruptException extends \Exception
{
}

class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
{
}
Loading
0