8000 Refractor using redis streams · symfony/symfony@580fdb9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 580fdb9

Browse files
Refractor using redis streams
1 parent 5851bf0 commit 580fdb9

15 files changed

+292
-446
lines changed

.travis.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://localhost/messages
2223

2324
matrix:
2425
include:
@@ -53,8 +54,8 @@ before_install:
5354
5455
- |
5556
# Start Redis cluster
56-
docker pull grokzen/redis-cluster:4.0.8
57-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
57+
docker pull grokzen/redis-cluster:5.0.4
58+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
5859
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
5960
6061
- |
@@ -145,7 +146,6 @@ before_install:
145146
echo session.gc_probability = 0 >> $INI
146147
echo opcache.enable_cli = 1 >> $INI
147148
echo apc.enable_cli = 1 >> $INI
148-
echo extension = redis.so >> $INI
149149
echo extension = memcached.so >> $INI
150150
done
151151
@@ -159,6 +159,7 @@ before_install:
159159
tfold ext.libsodium tpecl libsodium sodium.so $INI
160160
fi
161161
162+
tfold ext.redis tpecl redis-4.2.0 redis.so $INI
162163
tfold ext.apcu tpecl apcu-5.1.16 apcu.so $INI
163164
tfold ext.mongodb tpecl mongodb-1.6.0alpha1 mongodb.so $INI
164165
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,48 @@
1919
*/
2020
class ConnectionTest extends TestCase
2121
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
22+
public function testFromInvalidDns()
2723
{
24+
$this->expectException(\InvalidArgumentException::class);
25+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
26+
2827
Connection::fromDsn('redis://');
2928
}
3029

31-
public function testItGetsParametersFromTheDsn()
30+
public function testFromDns()
3231
{
3332
$this->assertEquals(
34-
new Connection('queue', array(
33+
new Connection(['stream' => 'queue'], [
3534
'host' => 'localhost',
3635
'port' => 6379,
37-
)),
36+
]),
3837
Connection::fromDsn('redis://localhost/queue')
3938
);
4039
}
4140

42-
public function testOverrideOptionsViaQueryParameters()
41+
public function testFromDnsWithOptions()
4342
{
4443
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
44+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
45+
'host' => 'localhost',
46+
'port' => 6379,
47+
], [
48+
'blocking_timeout' => 30,
49+
]),
50+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
51+
);
52+
}
53+
54+
public function testFromDnsWithQueryOptions()
55+
{
56+
$this->assertEquals(
57+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
58+
'host' => 'localhost',
4759
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
60+
], [
61+
'blocking_timeout' => 30,
62+
]),
63+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
5264
);
5365
}
5466
}

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Lines changed: 21 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -12,135 +12,49 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
1615
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1716
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19-
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20-
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21-
use Symfony\Component\Process\PhpProcess;
22-
use Symfony\Component\Process\Process;
23-
use Symfony\Component\Serializer as SerializerComponent;
24-
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25-
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2617

2718
/**
2819
* @requires extension redis
2920
*/
3021
class RedisExtIntegrationTest extends TestCase
3122
{
23+
private $connection;
24+
3225
protected function setUp()
3326
{
34-
parent::setUp();
35-
3627
if (!getenv('MESSENGER_REDIS_DSN')) {
3728
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
3829
}
39-
}
40-
41-
public function testItSendsAndReceivesMessages()
42-
{
43-
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45-
);
46-
47-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48-
49-
$sender = new RedisSender($connection, $serializer);
50-
$receiver = new RedisReceiver($connection, $serializer);
5130

52-
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53-
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54-
55-
$receivedMessages = 0;
56-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57-
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58-
59-
if (2 === ++$receivedMessages) {
60-
$receiver->stop();
61-
}
62-
});
31+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
32+
$this->clearRedis();
33+
$this->connection->setup();
6334
}
6435

65-
public function testItReceivesSignals()
36+
public function testConnectionSendAndGet()
6637
{
67-
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69-
);
70-
71-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72-
73-
$sender = new RedisSender($connection, $serializer);
74-
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75-
76-
$amqpReadTimeout = 30;
77-
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79-
'COMPONENT_ROOT' => __DIR__.'/../../../',
80-
'DSN' => $dsn,
81-
));
82-
83-
$process->start();
84-
85-
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86-
87-
$signalTime = microtime(true);
88-
$timedOutTime = time() + 10;
89-
90-
$process->signal(15);
91-
92-
while ($process->isRunning() && time() < $timedOutTime) {
93-
usleep(100 * 1000); // 100ms
94-
}
95-
96-
$this->assertFalse($process->isRunning());
97-
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98-
$this->assertSame($expectedOutput.<<<'TXT'
99-
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100-
with items: [
101-
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102-
]
103-
Done.
104-
105-
TXT
106-
, $process->getOutput());
38+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
39+
$encoded = $this->connection->get();
40+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
41+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
10742
}
10843

109-
/**
110-
* @runInSeparateProcess
111-
*/
112-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
44+
public function testGetTheFirstAvailableMessage()
11345
{
114-
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116-
);
117-
118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119-
120-
$receiver = new RedisReceiver($connection, $serializer);
121-
122-
$receivedMessages = 0;
123-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124-
$this->assertNull($envelope);
125-
126-
if (2 === ++$receivedMessages) {
127-
$receiver->stop();
128-
}
129-
});
46+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
47+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
48+
$encoded = $this->connection->get();
49+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
50+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
51+
$encoded = $this->connection->get();
52+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
53+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
13054
}
13155

132-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
56+
private function clearRedis()
13357
{
134-
$timedOutTime = time() + $timeoutInSeconds;
135-
136-
while (time() < $timedOutTime) {
137-
if (0 === strpos($process->getOutput(), $output)) {
138-
return;
139-
}
140-
141-
usleep(100 * 1000); // 100ms
142-
}
143-
144-
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
58+
// TODO the redis stream should be cleared before running the test
14559
}
14660
}

0 commit comments

Comments
 (0)
0