diff --git a/.appveyor.yml b/.appveyor.yml
index cbb0098bcffbe..889aafe26929b 100644
--- a/.appveyor.yml
+++ b/.appveyor.yml
@@ -21,6 +21,8 @@ install:
- cd ext
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
+ - appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
+ - 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
- cd ..
- copy /Y php.ini-development php.ini-min
- echo memory_limit=-1 >> php.ini-min
@@ -36,6 +38,7 @@ install:
- echo opcache.enable_cli=1 >> php.ini-max
- echo extension=php_openssl.dll >> php.ini-max
- echo extension=php_apcu.dll >> php.ini-max
+ - echo extension=php_redis.dll >> php.ini-max
- echo apc.enable_cli=1 >> php.ini-max
- echo extension=php_intl.dll >> php.ini-max
- echo extension=php_mbstring.dll >> php.ini-max
@@ -54,6 +57,7 @@ install:
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
- php composer.phar update --no-progress --ansi
- php phpunit install
+ - choco install memurai-developer
test_script:
- SET X=0
diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml
index 9832c8a9d09a2..72002fa8998ef 100644
--- a/.github/workflows/integration-tests.yml
+++ b/.github/workflows/integration-tests.yml
@@ -99,15 +99,11 @@ jobs:
- name: Run tests
run: ./phpunit --group integration -v
env:
- REDIS_HOST: localhost
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
REDIS_SENTINEL_HOSTS: 'localhost:26379'
REDIS_SENTINEL_SERVICE: redis_sentinel
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
- MEMCACHED_HOST: localhost
- LDAP_HOST: localhost
- LDAP_PORT: 3389
#- name: Run HTTP push tests
# if: matrix.php == '8.0'
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index c4e152a05938f..0c4dd3ee87287 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -18,6 +18,7 @@
+
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
index 5484664e2c4ec..70b260ad63f80 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
@@ -220,6 +220,7 @@ public function testGetAfterReject()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
+ $connection->cleanup();
$connection->add('1', []);
$connection->add('2', []);
@@ -230,7 +231,7 @@ public function testGetAfterReject()
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());
- $redis->del('messenger-rejectthenget');
+ $connection->cleanup();
}
public function testGetNonBlocking()
@@ -238,12 +239,30 @@ public function testGetNonBlocking()
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
+ $connection->cleanup();
$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
- $redis->del('messenger-getnonblocking');
+
+ $connection->cleanup();
+ }
+
+ public function testGetDelayed()
+ {
+ $redis = new \Redis();
+
+ $connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
+ $connection->cleanup();
+
+ $connection->add('1', [], 100);
+ $this->assertNull($connection->get());
+ usleep(300000);
+ $this->assertNotEmpty($message = $connection->get());
+ $connection->reject($message['id']);
+
+ $connection->cleanup();
}
public function testJsonError()
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
index 4e372eecd72f8..29eb6cb12e0d9 100644
--- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
@@ -141,33 +141,29 @@ public function get(): ?array
if ($this->autoSetup) {
$this->setup();
}
+ $now = microtime();
+ $now = substr($now, 11).substr($now, 2, 3);
- try {
- $queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
- } catch (\RedisException $e) {
- throw new TransportException($e->getMessage(), 0, $e);
- }
+ $queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now);
- if ($queuedMessageCount) {
- for ($i = 0; $i < $queuedMessageCount; ++$i) {
- try {
- $queuedMessages = $this->connection->zpopmin($this->queue, 1);
- } catch (\RedisException $e) {
- throw new TransportException($e->getMessage(), 0, $e);
- }
+ while ($queuedMessageCount--) {
+ if (![$queuedMessage, $expiry] = $this->rawCommand('ZPOPMIN', 1)) {
+ break;
+ }
+
+ if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
+ // if a future-placed message is popped because of a race condition with
+ // another running consumer, the message is readded to the queue
- foreach ($queuedMessages as $queuedMessage => $time) {
- $queuedMessage = json_decode($queuedMessage, true);
- // if a futured placed message is actually popped because of a race condition with
- // another running message consumer, the message is readded to the queue by add function
- // else its just added stream and will be available for all stream consumers
- $this->add(
- $queuedMessage['body'],
- $queuedMessage['headers'],
- $time - $this->getCurrentTimeInMilliseconds()
- );
+ if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
+ throw new TransportException('Could not add a message to the redis stream.');
}
+
+ break;
}
+
+ $queuedMessage = json_decode($queuedMessage, true);
+ $this->add($queuedMessage['body'], $queuedMessage['headers'], 0);
}
$messageId = '>'; // will receive new messages
@@ -255,7 +251,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
}
try {
- if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
+ if ($delayInMs > 0) { // the delay is <= 0 for queued messages
$message = json_encode([
'body' => $body,
'headers' => $headers,
@@ -267,8 +263,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
throw new TransportException(json_last_error_msg());
}
- $score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
- $added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
+ $now = explode(' ', microtime(), 2);
+ $now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
+ if (3 < \strlen($now[0])) {
+ $now[1] += substr($now[0], 0, -3);
+ $now[0] = substr($now[0], -3);
+
+ if (\is_float($now[1])) {
+ throw new TransportException("Message delay is too big: {$delayInMs}ms.");
+ }
+ }
+
+ $added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
} else {
$message = json_encode([
'body' => $body,
@@ -316,14 +322,30 @@ public function setup(): void
$this->autoSetup = false;
}
- private function getCurrentTimeInMilliseconds(): int
- {
- return (int) (microtime(true) * 1000);
- }
-
public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}
+
+ /**
+ * @return mixed
+ */
+ private function rawCommand(string $command, ...$arguments)
+ {
+ try {
+ $result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
+ } catch (\RedisException $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+
+ if (false === $result) {
+ if ($error = $this->connection->getLastError() ?: null) {
+ $this->connection->clearLastError();
+ }
+ throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
+ }
+
+ return $result;
+ }
}