8000 Refractor using redis streams · symfony/symfony@a4cceee · GitHub
[go: up one dir, main page]

Skip to content

Commit a4cceee

Browse files
Refractor using redis streams
1 parent 81d4455 commit a4cceee

File tree

16 files changed

+377
-447
lines changed

16 files changed

+377
-447
lines changed

.travis.yml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
2223

2324
matrix:
2425
include:
@@ -55,8 +56,8 @@ before_install:
5556
5657
- |
5758
# Start Redis cluster
58-
docker pull grokzen/redis-cluster:4.0.8
59-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
59+
docker pull grokzen/redis-cluster:5.0.4
60+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6061
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6162
6263
- |
@@ -116,6 +117,7 @@ before_install:
116117
local ext_name=$1
117118
local ext_so=$2
118119
local INI=$3
120+
local input=${4:-yes}
119121
local ext_dir=$(php -r "echo ini_get('extension_dir');")
120122
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
121123
@@ -124,7 +126,7 @@ before_install:
124126
else
125127
rm ~/.pearrc /tmp/pear 2>/dev/null || true
126128
mkdir -p $ext_cache
127-
echo yes | pecl install -f $ext_name &&
129+
echo $input | pecl install -f $ext_name &&
128130
cp $ext_dir/$ext_so $ext_cache
129131
fi
130132
}
@@ -147,7 +149,6 @@ before_install:
147149
echo session.gc_probability = 0 >> $INI
148150
echo opcache.enable_cli = 1 >> $INI
149151
echo apc.enable_cli = 1 >> $INI
150-
echo extension = redis.so >> $INI
151152
echo extension = memcached.so >> $INI
152153
done
153154
@@ -166,7 +167,11 @@ before_install:
166167
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
167168
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
168169
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
170+
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
169171
done
172+
- |
173+
# List all php extensions with versions
174+
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
170175

171176
- |
172177
# Load fixtures

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 4 additions & 0 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<tag name="messenger.transport_factory" />
6767
</service>
6868

69+
<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
70+
<tag name="messenger.transport_factory" />
71+
</service>
72+
6973
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7074
<tag name="messenger.transport_factory" />
7175
</service>

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,104 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
1617

1718
/**
1819
* @requires extension redis
1920
*/
2021
class ConnectionTest extends TestCase
2122
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
23+
public function testFromInvalidDsn()
2724
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
2828
Connection::fromDsn('redis://');
2929
}
3030

31-
public function testItGetsParametersFromTheDsn()
31+
public function testFromDsn()
3232
{
3333
$this->assertEquals(
34-
new Connection('queue', array(
34+
new Connection(['stream' => 'queue'], [
3535
'host' => 'localhost',
3636
'port' => 6379,
37-
)),
37+
]),
3838
Connection::fromDsn('redis://localhost/queue')
3939
);
4040
}
4141

42-
public function testOverrideOptionsViaQueryParameters()
42+
public function testFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
46+
'host' => 'localhost',
4747
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
48+
], [
49+
'blocking_timeout' => 30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
5252
);
5353
}
54+
55+
public function testFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
59+
'host' => 'localhost',
60+
'port' => 6379,
61+
], [
62+
'blocking_timeout' => 30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
public function testKeepGettingPendingMessages()
69+
{
70+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
74+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
public function testFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count = 0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
90+
++$count;
91+
92+
if (1 === $count) {
93+
return '0' === $arr_streams['queue'];
94+
}
95+
96+
return '>' === $arr_streams['queue'];
97+
}), 1, null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
101+
$connection->get();
102+
}
103+
104+
public function testUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113+
$connection->get();
114+
}
54115
}

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Lines changed: 26 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -12,135 +12,54 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
1615
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1716
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19-
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20-
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21-
use Symfony\Component\Process\PhpProcess;
22-
use Symfony\Component\Process\Process;
23-
use Symfony\Component\Serializer as SerializerComponent;
24-
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25-
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2617

2718
/**
2819
* @requires extension redis
2920
*/
3021
class RedisExtIntegrationTest extends TestCase
3122
{
23+
private $redis;
24+
private $connection;
25+
3226
protected function setUp()
3327
{
34-
parent::setUp();
35-
3628
if (!getenv('MESSENGER_REDIS_DSN')) {
3729
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
3830
}
39-
}
40-
41-
public function testItSendsAndReceivesMessages()
42-
{
43-
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45-
);
46-
47-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48-
49-
$sender = new RedisSender($connection, $serializer);
50-
$receiver = new RedisReceiver($connection, $serializer);
5131

52-
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53-
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54-
55-
$receivedMessages = 0;
56-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57-
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58-
59-
if (2 === ++$receivedMessages) {
60-
$receiver->stop();
61-
}
62-
});
32+
$this->redis = new \Redis();
33+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
34+
$this->clearRedis();
35+
$this->connection->setup();
6336
}
6437

65-
public function testItReceivesSignals()
38+
public function testConnectionSendAndGet()
6639
{
67-
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69-
);
70-
71-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72-
73-
$sender = new RedisSender($connection, $serializer);
74-
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75-
76-
$amqpReadTimeout = 30;
77-
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79-
'COMPONENT_ROOT' => __DIR__.'/../../../',
80-
'DSN' => $dsn,
81-
));
82-
83-
$process->start();
84-
85-
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86-
87-
$signalTime = microtime(true);
88-
$timedOutTime = time() + 10;
89-
90-
$process->signal(15);
91-
92-
while ($process->isRunning() && time() < $timedOutTime) {
93-
usleep(100 * 1000); // 100ms
94-
}
95-
96-
$this->assertFalse($process->isRunning());
97-
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98-
$this->assertSame($expectedOutput.<<<'TXT'
99-
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100-
with items: [
101-
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102-
]
103-
Done.
104-
105-
TXT
106-
, $process->getOutput());
40+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
41+
$encoded = $this->connection->get();
42+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
43+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
10744
}
10845

109-
/**
110-
* @runInSeparateProcess
111-
*/
112-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
46+
public function testGetTheFirstAvailableMessage()
11347
{
114-
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116-
);
117-
118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119-
120-
$receiver = new RedisReceiver($connection, $serializer);
121-
122-
$receivedMessages = 0;
123-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124-
$this->assertNull($envelope);
125-
126-
if (2 === ++$receivedMessages) {
127-
$receiver->stop();
128-
}
129-
});
48+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
49+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
50+
$encoded = $this->connection->get();
51+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
52+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
53+
$encoded = $this->connection->get();
54+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
55+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
13056
}
13157

132-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
58+
private function clearRedis()
13359
{
134-
$timedOutTime = time() + $timeoutInSeconds;
135-
136-
while (time() < $timedOutTime) {
137-
if (0 === strpos($process->getOutput(), $output)) {
138-
return;
139-
}
140-
141-
usleep(100 * 1000); // 100ms
142-
}
143-
144-
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
60+
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
61+
$pathParts = explode('/', $parsedUrl['path'] ?? '');
62+
$stream = $pathParts[1] ?? 'symfony';
63+
$this->redis->del($stream);
14564
}
14665
}

0 commit comments

Comments
 (0)
0