8000 [Messenger] fix Redis support on 32b arch by nicolas-grekas · Pull Request #44807 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] fix Redis support on 32b arch #44807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ install:
- cd ext
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
- 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
- cd ..
- copy /Y php.ini-development php.ini-min
- echo memory_limit=-1 >> php.ini-min
Expand All @@ -36,6 +38,7 @@ install:
- echo opcache.enable_cli=1 >> php.ini-max
- echo extension=php_openssl.dll >> php.ini-max
- echo extension=php_apcu.dll >> php.ini-max
- echo extension=php_redis.dll >> php.ini-max
- echo apc.enable_cli=1 >> php.ini-max
- echo extension=php_intl.dll >> php.ini-max
- echo extension=php_mbstring.dll >> php.ini-max
Expand All @@ -54,6 +57,7 @@ install:
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
- php composer.phar update --no-progress --ansi
- php phpunit install
- choco install memurai-developer

test_script:
- SET X=0
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,11 @@ jobs:
- name: Run tests
run: ./phpunit --group integration -v
env:
REDIS_HOST: localhost
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
REDIS_SENTINEL_HOSTS: 'localhost:26379'
REDIS_SENTINEL_SERVICE: redis_sentinel
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
MEMCACHED_HOST: localhost
LDAP_HOST: localhost
LDAP_PORT: 3389

#- name: Run HTTP push tests
# if: matrix.php == '8.0'
Expand Down
1 change: 1 addition & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<env name="LDAP_HOST" value="localhost" />
<env name="LDAP_PORT" value="3389" />
<env name="REDIS_HOST" value="localhost" />
<env name="MESSENGER_REDIS_DSN" value="redis://localhost/messages" />
<env name="MEMCACHED_HOST" value="localhost" />
<env name="MONGODB_HOST" value="localhost" />
<env name="ZOOKEEPER_HOST" value="localhost" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public function testGetAfterReject()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
$connection->cleanup();

$connection->add('1', []);
$connection->add('2', []);
Expand All @@ -230,20 +231,38 @@ public function testGetAfterReject()
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());

$redis->del('messenger-rejectthenget');
$connection->cleanup();
}

public function testGetNonBlocking()
{
$redis = new \Redis();

$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
$connection->cleanup();

$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');

$connection->cleanup();
}

public function testGetDelayed()
{
$redis = new \Redis();

$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
$connection->cleanup();

$connection->add('1', [], 100);
$this->assertNull($connection->get());
usleep(300000);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);

$connection->cleanup();
}

public function testJsonError()
Expand Down
82 changes: 52 additions & 30 deletions src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,33 +141,29 @@ public function get(): ?array
if ($this->autoSetup) {
$this->setup();
}
$now = microtime();
$now = substr($now, 11).substr($now, 2, 3);

try {
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now);

if ($queuedMessageCount) {
for ($i = 0; $i < $queuedMessageCount; ++$i) {
try {
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
while ($queuedMessageCount--) {
if (![$queuedMessage, $expiry] = $this->rawCommand('ZPOPMIN', 1)) {
break;
}

if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
// if a future-placed message is popped because of a race condition with
// another running consumer, the message is readded to the queue

foreach ($queuedMessages as $queuedMessage => $time) {
$queuedMessage = json_decode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add(
$queuedMessage['body'],
$queuedMessage['headers'],
$time - $this->getCurrentTimeInMilliseconds()
);
if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
throw new TransportException('Could not add a message to the redis stream.');
}

break;
}

$queuedMessage = json_decode($queuedMessage, true);
$this->add($queuedMessage['body'], $queuedMessage['headers'], 0);
}

$messageId = '>'; // will receive new messages
Expand Down Expand Up @@ -255,7 +251,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
}

6D40 try {
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
$message = json_encode([
'body' => $body,
'headers' => $headers,
Expand All @@ -267,8 +263,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
throw new TransportException(json_last_error_msg());
}

$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
$now = explode(' ', microtime(), 2);
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
if (3 < \strlen($now[0])) {
$now[1] += substr($now[0], 0, -3);
$now[0] = substr($now[0], -3);

if (\is_float($now[1])) {
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
}
}

$added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
} else {
$message = json_encode([
'body' => $body,
Expand Down Expand Up @@ -316,14 +322,30 @@ public function setup(): void
$this->autoSetup = false;
}

private function getCurrentTimeInMilliseconds(): int
{
return (int) (microtime(true) * 1000);
}

public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}

/**
* @return mixed
*/
private function rawCommand(string $command, ...$arguments)
{
try {
$result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if (false === $result) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
}

return $result;
}
}
0