8000 [Cache] fix eventual consistency when using RedisTagAwareAdapter with… · symfony/symfony@8fcc2fe · GitHub
[go: up one dir, main page]

Skip to content

Commit 8fcc2fe

Browse files
[Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster
1 parent 8391e0b commit 8fcc2fe

File tree

3 files changed

+83
-86
lines changed

3 files changed

+83
-86
lines changed

src/Symfony/Component/Cache/Adapter/RedisTagAwareAdapter.php

Lines changed: 70 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,13 @@
2323
use Symfony\Component\Cache\Traits\RedisTrait;
2424

2525
/**
26-
* Stores tag id <> cache id relationship as a Redis Set, lookup on invalidation using RENAME+SMEMBERS.
26+
* Stores tag id <> cache id relationship as a Redis Set.
2727
*
2828
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
2929
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
3030
* relationship survives eviction (cache cleanup when Redis runs out of memory).
3131
*
32-
* Requirements:
33-
* - Client: PHP Redis or Predis
34-
* Note: Due to lack of RENAME support it is NOT recommended to use Cluster on Predis, instead use phpredis.
35-
* - Server: Redis 2.8+
36-
* Configured with any `volatile-*` eviction policy, OR `noeviction` if it will NEVER fill up memory
32+
* Redis server 2.8+ with any `volatile-*` eviction policy, OR `noeviction` if you're sure memory will NEVER fill up
3733
*
3834
* Design limitations:
3935
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
@@ -49,11 +45,6 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
4945
{
5046
use RedisTrait;
5147

52-
/**
53-
* Limits for how many keys are deleted in batch.
54-
*/
55-
private const BULK_DELETE_LIMIT = 10000;
56-
5748
/**
5849
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
5950
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
@@ -96,7 +87,7 @@ protected function doSave(array $values, int $lifetime, array $addTagData = [],
9687
{
9788
$eviction = $this->getRedisEvictionPolicy();
9889
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
99-
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
90+
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
10091
}
10192

10293
// serialize values
@@ -159,15 +150,9 @@ protected function doDeleteYieldTags(array $ids): iterable
159150
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
160151
EOLUA;
161152

162-
if ($this->redis instanceof \Predis\ClientInterface) {
163-
$evalArgs = [$lua, 1, &$id];
164-
} else {
165-
$evalArgs = [$lua, [&$id], 1];
166-
}
167-
168-
$results = $this->pipeline(function () use ($ids, &$id, $evalArgs) {
153+
$results = $this->pipeline(function () use ($ids, $lua) {
169154
foreach ($ids as $id) {
170-
yield 'eval' => $evalArgs;
155+
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id] : [$lua, [$id], 1];
171156
}
172157
});
173158

@@ -185,12 +170,15 @@ protected function doDeleteYieldTags(array $ids): iterable
185170
*/
186171
protected function doDeleteTagRelations(array $tagData): bool
187172
{
188-
$this->pipeline(static function () use ($tagData) {
173+
$results = $this->pipeline(static function () use ($tagData) {
189174
foreach ($tagData as $tagId => $idList) {
190175
array_unshift($idList, $tagId);
191176
yield 'sRem' => $idList;
192177
}
193-
})->rewind();
178+
});
179+
foreach ($results as $result) {
180+
// no-op
181+
}
194182

195183
return true;
196184
}
@@ -200,77 +188,81 @@ protected function doDeleteTagRelations(array $tagData): bool
200188
*/
201189
protected function doInvalidate(array $tagIds): bool
202190
{
203-
if (!$this->redis instanceof \Predis\ClientInterface || !$this->redis->getConnection() instanceof PredisCluster) {
204-
$movedTagSetIds = $this->renameKeys($this->redis, $tagIds);
205-
} else {
206-
$clusterConnection = $this->redis->getConnection();
207-
$tagIdsByConnection = new \SplObjectStorage();
208-
$movedTagSetIds = [];
191+
// This script scans the set of items linked to tag: it empties the set
192+
// and removes the linked items. When the set is still not empty after
193+
// the scan, it means we're in cluster mode and that the linked items
194+
// are on other nodes: we move the links to a temporary set and we
195+
// gargage collect that set from the client side.
209196

210-
foreach ($tagIds as $id) {
211-
$connection = $clusterConnection->getConnectionByKey($id);
212-
$slot = $tagIdsByConnection[$connection] ?? $tagIdsByConnection[$connection] = new \ArrayObject();
213-
$slot[] = $id;
214-
}
197+
$lua = <<<'EOLUA'
198+
local cursor = '0'
199+
local id = KEYS[1]
200+
repeat
201+
local result = redis.call('SSCAN', id, cursor, 'COUNT', 5000);
202+
cursor = result[1];
203+
local rems = {}
204+
205+
for _, v in ipairs(result[2]) do
206+
local ok, _ = pcall(redis.call, 'DEL', ARGV[1]..v)
207+
if ok then
208+
table.insert(rems, v)
209+
end
210+
end
211+
if 0 < #rems then
212+
redis.call('SREM', id, unpack(rems))
213+
end
214+
until '0' == cursor;
215+
216+
redis.call('SUNIONSTORE', '{'..id..'}'..id, id)
217+
redis.call('DEL', id)
218+
219+
return redis.call('SSCAN', '{'..id..'}'..id, '0', 'COUNT', 5000)
220+
EOLUA;
215221

216-
foreach ($tagIdsByConnection as $connection) {
217-
$slot = $tagIdsByConnection[$connection];
218-
$movedTagSetIds = array_merge($movedTagSetIds, $this->renameKeys(new $this->redis($connection, $this->redis->getOptions()), $slot->getArrayCopy()));
222+
$results = $this->pipeline(function () use ($tagIds, $lua) {
223+
if ($this->redis instanceof \Predis\ClientInterface) {
224+
$prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
225+
} elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) {
226+
$prefix = current($prefix);
219227
}
220-
}
221228

222-
// No Sets found
223-
if (!$movedTagSetIds) {
224-
return false;
225-
}
226-
227-
// Now safely take the time to read the keys in each set and collect ids we need to delete
228-
$tagIdSets = $this->pipeline(static function () use ($movedTagSetIds) {
229-
foreach ($movedTagSetIds as $movedTagId) {
230-
yield 'sMembers' => [$movedTagId];
229+
foreach ($tagIds as $id) {
230+
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id, $prefix] : [$lua, [$id, $prefix], 1];
231231
}
232232
});
233233

234-
// Return combination of the temporary Tag Set ids and their values (cache ids)
235-
$ids = array_merge($movedTagSetIds, ...iterator_to_array($tagIdSets, false));
234+
$lua = <<<'EOLUA'
235+
local id = KEYS[1]
236+
local cursor = table.remove(ARGV)
237+
redis.call('SREM', '{'..id..'}'..id, unpack(ARGV))
236238
237-
// Delete cache in chunks to avoid overloading the connection
238-
foreach (array_chunk(array_unique($ids), self::BULK_DELETE_LIMIT) as $chunkIds) {
239-
$this->doDelete($chunkIds);
240-
}
239+
return redis.call('SSCAN', '{'..id..'}'..id, cursor, 'COUNT', 5000)
240+
EOLUA;
241241

242-
return true;
243-
}
242+
foreach ($results as $id => [$cursor, $ids]) {
243+
while ($ids || '0' !== $cursor) {
244+
$this->doDelete($ids);
244245

245-
/**
246-
* Renames several keys in order to be able to operate on them without risk of race conditions.
247-
*
248-
* Filters out keys that do not exist before returning new keys.
249-
*
250-
* @see https://redis.io/commands/rename
251-
* @see https://redis.io/topics/cluster-spec#keys-hash-tags
252-
*
253-
* @return array Filtered list of the valid moved keys (only those that existed)
254-
*/
255-
private function renameKeys($redis, array $ids): array
256-
{
257-
$newIds = [];
258-
$uniqueToken = bin2hex(random_bytes(10));
246+
$evalArgs = [$id, $cursor];
247+
array_splice($evalArgs, 1, 0, $ids);
259248

260-
$results = $this->pipeline(static function () use ($ids, $uniqueToken) {
261-
foreach ($ids as $id) {
262-
yield 'rename' => [$id, '{'.$id.'}'.$uniqueToken];
263-
}
264-
}, $redis);
249+
if ($this->redis instanceof \Predis\ClientInterface) {
250+
array_unshift($evalArgs, $lua, 1);
251+
} else {
252+
$evalArgs = [$lua, $evalArgs, 1];
253+
}
265254

266-
foreach ($results as $id => $result) {
267-
if (true === $result || ($result instanceof Status && Status::get('OK') === $result)) {
268-
// Only take into account if ok (key existed), will be false on phpredis if it did not exist
269-
$newIds[] = '{'.$id.'}'.$uniqueToken;
255+
$results = $this->pipeline(function () use ($evalArgs) {
256+
yield 'eval' => $evalArgs;
257+
});
258+
259+
foreach ($results as [$cursor, $ids]) {
260+
// no-op
261+
}
270262
}
271263
}
272264

273-
return $newIds;
265+
return true;
274266
}
275267

276268
private function getRedisEvictionPolicy(): string

src/Symfony/Component/Cache/Tests/LockRegistryTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Cache\LockRegistry;
1616

17+
/**
18+
* @runInSeparateProcess
19+
*/
1720
class LockRegistryTest extends TestCase
1821
{
1922
public function testFiles()

src/Symfony/Component/Cache/Traits/RedisTrait.php

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,6 @@ protected function doHave($id)
363363
protected function doClear($namespace)
364364
{
365365
$cleared = true;
366-
if ($this->redis instanceof \Predis\ClientInterface) {
367-
$evalArgs = [0, $namespace];
368-
} else {
369-
$evalArgs = [[$namespace], 0];
370-
}
371-
372366
$hosts = $this->getHosts();
373367
$host = reset($hosts);
374368
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
@@ -385,17 +379,20 @@ protected function doClear($namespace)
385379
$info = $host->info('Server');
386380
$info = $info['Server'] ?? $info;
387381

382+
$pattern = $namespace.'*';
383+
388384
if (!version_compare($info['redis_version'], '2.8', '>=')) {
389385
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
390386
// can hang your server when it is executed against large databases (millions of items).
391387
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
392-
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared;
388+
$args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
389+
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
393390
continue;
394391
}
395392

396393
$cursor = null;
397394
do {
398-
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000);
395+
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
399396
if (isset($keys[1]) && \is_array($keys[1])) {
400397
$cursor = $keys[0];
401398
$keys = $keys[1];
@@ -507,6 +504,11 @@ private function pipeline(\Closure $generator, $redis = null): \Generator
507504
$results = $redis->exec();
508505
}
509506

507+
if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
508+
$e = new \RedisException($redis->getLastError());
509+
$results = array_map(function ($v) use ($e) { return false === $v ? $e : $v; }, $results);
510+
}
511+
510512
foreach ($ids as $k => $id) {
511513
yield $id => $results[$k];
512514
}

0 commit comments

Comments
 (0)
0