8000 [Messenger] fix Redis support on 32b arch · symfony/symfony@ab6cf95 · GitHub
[go: up one dir, main page]

Skip to content

Commit ab6cf95

Browse files
[Messenger] fix Redis support on 32b arch
1 parent 6d688f6 commit ab6cf95

File tree

5 files changed

+86
-37
lines changed

5 files changed

+86
-37
lines changed

.appveyor.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ install:
2121
- cd ext
2222
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
2323
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
24+
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
25+
- 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
2426
- cd ..
2527
- copy /Y php.ini-development php.ini-min
2628
- echo memory_limit=-1 >> php.ini-min
@@ -36,6 +38,7 @@ install:
3638
- echo opcache.enable_cli=1 >> php.ini-max
3739
- echo extension=php_openssl.dll >> php.ini-max
3840
- echo extension=php_apcu.dll >> php.ini-max
41+
- echo extension=php_redis.dll >> php.ini-max
3942
- echo apc.enable_cli=1 >> php.ini-max
4043
- echo extension=php_intl.dll >> php.ini-max
4144
- echo extension=php_mbstring.dll >> php.ini-max
@@ -54,6 +57,7 @@ install:
5457
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
5558
- php composer.phar update --no-progress --ansi
5659
- php phpunit install
60+
- choco install memurai-developer
5761

5862
test_script:
5963
- SET X=0

.github/workflows/integration-tests.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,10 @@ jobs:
9999
- name: Run tests
100100
run: ./phpunit --group integration -v
101101
env:
102-
REDIS_HOST: localhost
103102
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
104103
REDIS_SENTINEL_HOSTS: 'localhost:26379'
105104
REDIS_SENTINEL_SERVICE: redis_sentinel
106105
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
107-
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
108-
MEMCACHED_HOST: localhost
109-
LDAP_HOST: localhost
110-
LDAP_PORT: 3389
111106

112107
#- name: Run HTTP push tests
113108
# if: matrix.php == '8.0'

phpunit.xml.dist

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
<env name="LDAP_HOST" value="localhost" />
1919
<env name="LDAP_PORT" value="3389" />
2020
<env name="REDIS_HOST" value="localhost" />
21+
<env name="MESSENGER_AMQP_DSN" value="amqp://localhost/%2f/messages" />
22+
<env name="MESSENGER_REDIS_DSN" value="redis://localhost/messages" />
2123
<env name="MEMCACHED_HOST" value="localhost" />
2224
<env name="MONGODB_HOST" value="localhost" />
2325
<env name="ZOOKEEPER_HOST" value="localhost" />

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public function testGetAfterReject()
220220
{
221221
$redis = new \Redis();
222222
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
223+
$connection->cleanup();
223224

224225
$connection->add('1', []);
225226
$connection->add('2', []);
@@ -230,20 +231,38 @@ public function testGetAfterReject()
230231
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
231232
$this->assertNotNull($connection->get());
232233

233-
$redis->del('messenger-rejectthenget');
234+
$connection->cleanup();
234235
}
235236

236237
public function testGetNonBlocking()
237238
{
238239
$redis = new \Redis();
239240

240241
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
242+
$connection->cleanup();
241243

242244
$this->assertNull($connection->get()); // no message, should return null immediately
243245
$connection->add('1', []);
244246
$this->assertNotEmpty($message = $connection->get());
245247
$connection->reject($message['id']);
246-
$redis->del('messenger-getnonblocking');
248+
249+
$connection->cleanup();
250+
}
251+
252+
public function testGetDelayed()
253+
{
254+
$redis = new \Redis();
255+
256+
$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
257+
$connection->cleanup();
258+
259+
$connection->add('1', [], 100);
260+
$this->assertNull($connection->get());
261+
usleep(300000);
262+
$this->assertNotEmpty($message = $connection->get());
263+
$connection->reject($message['id']);
264+
265+
$connection->cleanup();
247266
}
248267

249268
public function testJsonError()

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 59 additions & 30 deletions
10000
Original file line numberDiff line numberDiff line change
@@ -141,32 +141,35 @@ public function get(): ?array
141141
if ($this->autoSetup) {
142142
$this->setup();
143143
}
144+
$now = microtime();
145+
$now = substr($now, 11).substr($now, 2, 3);
144146

145-
try {
146-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
147-
} catch (\RedisException $e) {
148-
throw new TransportException($e->getMessage(), 0, $e);
149-
}
147+
if ($queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now)) {
148+
$queuedMessages = $this->rawCommand('ZPOPMIN', $queuedMessageCount);
150149

151-
if ($queuedMessageCount) {
152-
for ($i = 0; $i < $queuedMessageCount; ++$i) {
153-
try {
154-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
155-
} catch (\RedisException $e) {
156-
throw new TransportException($e->getMessage(), 0, $e);
157-
}
150+
$i = \count($queuedMessages);
151+
152+
while (2 <= $i) {
153+
$expiry = $queuedMessages[--$i];
154+
$queuedMessage = $queuedMessages[--$i];
155+
156+
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
157+
if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
158+
throw new TransportException('Could not add a message to the redis stream.');
159+
}
158160

159-
foreach ($queuedMessages as $queuedMessage => $time) {
160-
$queuedMessage = json_decode($queuedMessage, true);
161-
// if a futured placed message is actually popped because of a race condition with
162-
// another running message consumer, the message is readded to the queue by add function
163-
// else its just added stream and will be available for all stream consumers
164-
$this->add(
165-
$queuedMessage['body'],
166-
$queuedMessage['headers'],
167-
$time - $this->getCurrentTimeInMilliseconds()
168-
);
161+
continue;
169162
}
163+
164+
$queuedMessage = json_decode($queuedMessage, true);
165+
// if a futured placed message is actually popped because of a race condition with
166+
// another running message consumer, the message is readded to the queue by add function
167+
// else its just added stream and will be available for all stream consumers
168+
$this->add(
169+
$queuedMessage['body'],
170+
$queuedMessage['headers'],
171+
0
172+
);
170173
}
171174
}
172175

@@ -255,7 +258,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255258
}
256259

257260
try {
258-
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
261+
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
259262
$message = json_encode([
260263
'body' => $body,
261264
'headers' => $headers,
@@ -267,8 +270,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267270
throw new TransportException(json_last_error_msg());
268271
}
269272

270-
$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
271-
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
273+
$now = explode(' ', microtime(), 2);
274+
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
275+
if (3 < \strlen($now[0])) {
276+
$now[1] += substr($now[0], 0, -3);
277+
$now[0] = substr($now[0], -3);
278+
279+
if (\is_float($now[1])) {
280+
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
281+
}
282+
}
283+
284+
$added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
272285
} else {
273286
$message = json_encode([
274287
'body' => $body,
@@ -316,14 +329,30 @@ public function setup(): void
316329
$this->autoSetup = false;
317330
}
318331

319-
private function getCurrentTimeInMilliseconds(): int
320-
{
321-
return (int) (microtime(true) * 1000);
322-
}
323-
324332
public function cleanup(): void
325333
{
326334
$this->connection->del($this->stream);
327335
$this->connection->del($this->queue);
328336
}
337+
338+
/**
339+
* @return mixed
340+
*/
341+
private function rawCommand(string $command, ...$arguments)
342+
{
343+
try {
344+
$result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
345+
} catch (\RedisException $e) {
346+
throw new TransportException($e->getMessage(), 0, $e);
347+
}
348+
349+
if (false === $result) {
350+
if ($error = $this->connection->getLastError() ?: null) {
351+
$this->connection->clearLastError();
352+
}
353+
throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
354+
}
355+
356+
return $result;
357+
}
329358
}

0 commit comments

Comments
 (0)
0