From 742279caa93c52ab69cac79d4387c413385c6a9d Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Mon, 27 Dec 2021 14:24:52 +0100 Subject: [PATCH] [Messenger] fix Redis support on 32b arch --- .appveyor.yml | 4 + .github/workflows/integration-tests.yml | 4 - phpunit.xml.dist | 1 + .../Transport/RedisExt/ConnectionTest.php | 23 +++++- .../Transport/RedisExt/Connection.php | 82 ++++++++++++------- 5 files changed, 78 insertions(+), 36 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index cbb0098bcffbe..889aafe26929b 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -21,6 +21,8 @@ install: - cd ext - appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip - 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul + - appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip + - 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul - cd .. - copy /Y php.ini-development php.ini-min - echo memory_limit=-1 >> php.ini-min @@ -36,6 +38,7 @@ install: - echo opcache.enable_cli=1 >> php.ini-max - echo extension=php_openssl.dll >> php.ini-max - echo extension=php_apcu.dll >> php.ini-max + - echo extension=php_redis.dll >> php.ini-max - echo apc.enable_cli=1 >> php.ini-max - echo extension=php_intl.dll >> php.ini-max - echo extension=php_mbstring.dll >> php.ini-max @@ -54,6 +57,7 @@ install: - SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev - php composer.phar update --no-progress --ansi - php phpunit install + - choco install memurai-developer test_script: - SET X=0 diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 9832c8a9d09a2..72002fa8998ef 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -99,15 +99,11 @@ jobs: - name: Run tests run: ./phpunit --group integration -v env: - REDIS_HOST: localhost REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' REDIS_SENTINEL_HOSTS: 'localhost:26379' REDIS_SENTINEL_SERVICE: redis_sentinel MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages - MEMCACHED_HOST: localhost - LDAP_HOST: localhost - LDAP_PORT: 3389 #- name: Run HTTP push tests # if: matrix.php == '8.0' diff --git a/phpunit.xml.dist b/phpunit.xml.dist index c4e152a05938f..0c4dd3ee87287 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -18,6 +18,7 @@ + diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index 5484664e2c4ec..70b260ad63f80 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -220,6 +220,7 @@ public function testGetAfterReject() { $redis = new \Redis(); $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis); + $connection->cleanup(); $connection->add('1', []); $connection->add('2', []); @@ -230,7 +231,7 @@ public function testGetAfterReject() $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget'); $this->assertNotNull($connection->get()); - $redis->del('messenger-rejectthenget'); + $connection->cleanup(); } public function testGetNonBlocking() @@ -238,12 +239,30 @@ public function testGetNonBlocking() $redis = new \Redis(); $connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis); + $connection->cleanup(); $this->assertNull($connection->get()); // no message, should return null immediately $connection->add('1', []); $this->assertNotEmpty($message = $connection->get()); $connection->reject($message['id']); - $redis->del('messenger-getnonblocking'); + + $connection->cleanup(); + } + + public function testGetDelayed() + { + $redis = new \Redis(); + + $connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis); + $connection->cleanup(); + + $connection->add('1', [], 100); + $this->assertNull($connection->get()); + usleep(300000); + $this->assertNotEmpty($message = $connection->get()); + $connection->reject($message['id']); + + $connection->cleanup(); } public function testJsonError() diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index 4e372eecd72f8..29eb6cb12e0d9 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -141,33 +141,29 @@ public function get(): ?array if ($this->autoSetup) { $this->setup(); } + $now = microtime(); + $now = substr($now, 11).substr($now, 2, 3); - try { - $queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds()); - } catch (\RedisException $e) { - throw new TransportException($e->getMessage(), 0, $e); - } + $queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now); - if ($queuedMessageCount) { - for ($i = 0; $i < $queuedMessageCount; ++$i) { - try { - $queuedMessages = $this->connection->zpopmin($this->queue, 1); - } catch (\RedisException $e) { - throw new TransportException($e->getMessage(), 0, $e); - } + while ($queuedMessageCount--) { + if (![$queuedMessage, $expiry] = $this->rawCommand('ZPOPMIN', 1)) { + break; + } + + if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) { + // if a future-placed message is popped because of a race condition with + // another running consumer, the message is readded to the queue - foreach ($queuedMessages as $queuedMessage => $time) { - $queuedMessage = json_decode($queuedMessage, true); - // if a futured placed message is actually popped because of a race condition with - // another running message consumer, the message is readded to the queue by add function - // else its just added stream and will be available for all stream consumers - $this->add( - $queuedMessage['body'], - $queuedMessage['headers'], - $time - $this->getCurrentTimeInMilliseconds() - ); + if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) { + throw new TransportException('Could not add a message to the redis stream.'); } + + break; } + + $queuedMessage = json_decode($queuedMessage, true); + $this->add($queuedMessage['body'], $queuedMessage['headers'], 0); } $messageId = '>'; // will receive new messages @@ -255,7 +251,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void } try { - if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message + if ($delayInMs > 0) { // the delay is <= 0 for queued messages $message = json_encode([ 'body' => $body, 'headers' => $headers, @@ -267,8 +263,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void throw new TransportException(json_last_error_msg()); } - $score = $this->getCurrentTimeInMilliseconds() + $delayInMs; - $added = $this->connection->zadd($this->queue, ['NX'], $score, $message); + $now = explode(' ', microtime(), 2); + $now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT); + if (3 < \strlen($now[0])) { + $now[1] += substr($now[0], 0, -3); + $now[0] = substr($now[0], -3); + + if (\is_float($now[1])) { + throw new TransportException("Message delay is too big: {$delayInMs}ms."); + } + } + + $added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message); } else { $message = json_encode([ 'body' => $body, @@ -316,14 +322,30 @@ public function setup(): void $this->autoSetup = false; } - private function getCurrentTimeInMilliseconds(): int - { - return (int) (microtime(true) * 1000); - } - public function cleanup(): void { $this->connection->del($this->stream); $this->connection->del($this->queue); } + + /** + * @return mixed + */ + private function rawCommand(string $command, ...$arguments) + { + try { + $result = $this->connection->rawCommand($command, $this->queue, ...$arguments); + } catch (\RedisException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + if (false === $result) { + if ($error = $this->connection->getLastError() ?: null) { + $this->connection->clearLastError(); + } + throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command)); + } + + return $result; + } }