10000 [Cache] fix eventual consistency when using RedisTagAwareAdapter with… · symfony/symfony@0f4cb6e · GitHub
[go: up one dir, main page]

Skip to content

Commit 0f4cb6e

Browse files
[Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster
1 parent 8391e0b commit 0f4cb6e

File tree

2 files changed

+74
-87
lines changed

2 files changed

+74
-87
lines changed

src/Symfony/Component/Cache/Adapter/RedisTagAwareAdapter.php

Lines changed: 64 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,13 @@
2323
use Symfony\Component\Cache\Traits\RedisTrait;
2424

2525
/**
26-
* Stores tag id <> cache id relationship as a Redis Set, lookup on invalidation using RENAME+SMEMBERS.
26+
* Stores tag id <> cache id relationship as a Redis Set.
2727
*
2828
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
2929
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
3030
* relationship survives eviction (cache cleanup when Redis runs out of memory).
3131
*
32-
* Requirements:
33-
* - Client: PHP Redis or Predis
34-
* Note: Due to lack of RENAME support it is NOT recommended to use Cluster on Predis, instead use phpredis.
35-
* - Server: Redis 2.8+
36-
* Configured with any `volatile-*` eviction policy, OR `noeviction` if it will NEVER fill up memory
32+
* Redis server 2.8+ with any `volatile-*` eviction policy, OR `noeviction` if you're sure memory will NEVER fill up
3733
*
3834
* Design limitations:
3935
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
@@ -49,11 +45,6 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
4945
{
5046
use RedisTrait;
5147

52-
/**
53-
* Limits for how many keys are deleted in batch.
54-
*/
55-
private const BULK_DELETE_LIMIT = 10000;
56-
5748
/**
5849
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
5950
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
@@ -96,7 +87,7 @@ protected function doSave(array $values, int $lifetime, array $addTagData = [],
9687
{
9788
$eviction = $this->getRedisEvictionPolicy();
9889
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
99-
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
90+
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
10091
}
10192

10293
// serialize values
@@ -159,15 +150,9 @@ protected function doDeleteYieldTags(array $ids): iterable
159150
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
160151
EOLUA;
161152

162-
if ($this->redis instanceof \Predis\ClientInterface) {
163-
$evalArgs = [$lua, 1, &$id];
164-
} else {
165-
$evalArgs = [$lua, [&$id], 1];
166-
}
167-
168-
$results = $this->pipeline(function () use ($ids, &$id, $evalArgs) {
153+
$results = $this->pipeline(function () use ($ids, $lua) {
169154
foreach ($ids as $id) {
170-
yield 'eval' => $evalArgs;
155+
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id] : [$lua, [$id], 1];
171156
}
172157
});
173158

@@ -185,12 +170,15 @@ protected function doDeleteYieldTags(array $ids): iterable
185170
*/
186171
protected function doDeleteTagRelations(array $tagData): bool
187172
{
188-
$this->pipeline(static function () use ($tagData) {
173+
$results = $this->pipeline(static function () use ($tagData) {
189174
foreach ($tagData as $tagId => $idList) {
190175
array_unshift($idList, $tagId);
191176
yield 'sRem' => $idList;
192177
}
193-
})->rewind();
178+
});
179+
foreach ($results as $result) {
180+
// no-op
181+
}
194182

195183
return true;
196184
}
@@ -200,77 +188,74 @@ protected function doDeleteTagRelations(array $tagData): bool
200188
*/
201189
protected function doInvalidate(array $tagIds): bool
202190
{
203-
if (!$this->redis instanceof \Predis\ClientInterface || !$this->redis->getConnection() instanceof PredisCluster) {
204-
$movedTagSetIds = $this->renameKeys($this->redis, $tagIds);
205-
} else {
206-
$clusterConnection = $this->redis->getConnection();
207-
$tagIdsByConnection = new \SplObjectStorage();
208-
$movedTagSetIds = [];
209-
210-
foreach ($tagIds as $id) {
211-
$connection = $clusterConnection->getConnectionByKey($id);
212-
$slot = $tagIdsByConnection[$connection] ?? $tagIdsByConnection[$connection] = new \ArrayObject();
213-
$slot[] = $id;
214-
}
191+
$lua = <<<'EOLUA'
192+
local cursor = '0'
193+
local id = KEYS[1]
194+
repeat
195+
local result = redis.call('SSCAN', id, cursor, 'COUNT', 5000);
196+
cursor = result[1];
197+
local rems = {}
198+
199+
for _, v in ipairs(result[2]) do
200+
10000 local ok, _ = pcall(redis.call, 'DEL', ARGV[1]..v)
201+
if ok then
202+
table.insert(rems, v)
203+
end
204+
end
205+
if 0 < #rems then
206+
redis.call('SREM', id, unpack(rems))
207+
end
208+
until '0' == cursor;
209+
210+
redis.call('SUNIONSTORE', '{'..id..'}'..id, id)
211+
212+
return redis.call('SSCAN', '{'..id..'}'..id, '0', 'COUNT', 5000)
213+
EOLUA;
215214

216-
foreach ($tagIdsByConnection as $connection) {
217-
$slot = $tagIdsByConnection[$connection];
218-
$movedTagSetIds = array_merge($movedTagSetIds, $this->renameKeys(new $this->redis($connection, $this->redis->getOptions()), $slot->getArrayCopy()));
215+
$results = $this->pipeline(function () use ($tagIds, $lua) {
216+
if ($this->redis instanceof \Predis\ClientInterface) {
217+
$prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
218+
} elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) {
219+
$prefix = current($prefix);
219220
}
220-
}
221-
222-
// No Sets found
223-
if (!$movedTagSetIds) {
224-
return false;
225-
}
226221

227-
// Now safely take the time to read the keys in each set and collect ids we need to delete
228-
$tagIdSets = $this->pipeline(static function () use ($movedTagSetIds) {
229-
foreach ($movedTagSetIds as $movedTagId) {
230-
yield 'sMembers' => [$movedTagId];
222+
foreach ($tagIds as $id) {
223+
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id, $prefix] : [$lua, [$id, $prefix], 1];
231224
}
232225
});
233226

234-
// Return combination of the temporary Tag Set ids and their values (cache ids)
235-
$ids = array_merge($movedTagSetIds, ...iterator_to_array($tagIdSets, false));
227+
$lua = <<<'EOLUA'
228+
local id = KEYS[1]
229+
local cursor = table.remove(ARGV)
230+
redis.call('SREM', '{'..id..'}'..id, unpack(ARGV))
236231
237-
// Delete cache in chunks to avoid overloading the connection
238-
foreach (array_chunk(array_unique($ids), self::BULK_DELETE_LIMIT) as $chunkIds) {
239-
$this->doDelete($chunkIds);
240-
}
232+
return redis.call('SSCAN', '{'..id..'}'..id, cursor, 'COUNT', 5000)
233+
EOLUA;
241234

242-
return true;
243-
}
235+
foreach ($results as $id => [$cursor, $ids]) {
236+
while ($ids || '0' !== $cursor) {
237+
$this->doDelete($ids);
244238

245-
/**
246-
* Renames several keys in order to be able to operate on them without risk of race conditions.
247-
*
248-
* Filters out keys that do not exist before returning new keys.
249-
*
250-
* @see https://redis.io/commands/rename
251-
* @see https://redis.io/topics/cluster-spec#keys-hash-tags
252-
*
253-
* @return array Filtered list of the valid moved keys (only those that existed)
254-
*/
255-
private function renameKeys($redis, array $ids): array
256-
{
257-
$newIds = [];
258-
$uniqueToken = bin2hex(random_bytes(10));
239+
$evalArgs = [$id, $cursor];
240+
array_splice($evalArgs, 1, 0, $ids);
259241

260-
$results = $this->pipeline(static function () use ($ids, $uniqueToken) {
261-
foreach ($ids as $id) {
262-
yield 'rename' => [$id, '{'.$id.'}'.$uniqueToken];
263-
}
264-
}, $redis);
242+
if ($this->redis instanceof \Predis\ClientInterface) {
243+
array_unshift($evalArgs, $lua, 1);
244+
} else {
245+
$evalArgs = [$lua, $evalArgs, 1];
246+
}
265247

266-
foreach ($results as $id => $result) {
267-
if (true === $result || ($result instanceof Status && Status::get('OK') === $result)) {
268-
// Only take into account if ok (key existed), will be false on phpredis if it did not exist
269-
$newIds[] = '{'.$id.'}'.$uniqueToken;
248+
$results = $this->pipeline(function () use ($evalArgs) {
249+
yield 'eval' => $evalArgs;
250+
});
251+
252+
foreach ($results as [$cursor, $ids]) {
253+
// no-op
254+
}
270255
}
271256
}
272257

273-
return $newIds;
258+
return true;
274259
}
275260

276261
private function getRedisEvictionPolicy(): string

src/Symfony/Component/Cache/Traits/RedisTrait.php

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,6 @@ protected function doHave($id)
363363
protected function doClear($namespace)
364364
{
365365
$cleared = true;
366-
if ($this->redis instanceof \Predis\ClientInterface) {
367-
$evalArgs = [0, $namespace];
368-
} else {
369-
$evalArgs = [[$namespace], 0];
370-
}
371-
372366
$hosts = $this->getHosts();
373367
$host = reset($hosts);
374368
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
@@ -385,17 +379,20 @@ protected function doClear($namespace)
385379
$info = $host->info('Server');
386380
$info = $info['Server'] ?? $info;
387381

382+
$pattern = $namespace.'*';
383+
388384
if (!version_compare($info['redis_version'], '2.8', '>=')) {
389385
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
390386
// can hang your server when it is executed against large databases (millions of items).
391387
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
392-
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared;
388+
$args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
389+
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
393390
continue;
394391
}
395392

396393
$cursor = null;
397394
do {
398-
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000);
395+
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
399396
if (isset($keys[1]) && \is_array($keys[1])) {
400397
$cursor = $keys[0];
401398
$keys = $keys[1];
@@ -507,6 +504,11 @@ private function pipeline(\Closure $generator, $redis = null): \Generator
507504
$results = $redis->exec();
508505
}
509506

507+
if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
508+
$e = new \RedisException($redis->getLastError());
509+
$results = array_map(function ($v) use ($e) { return false === $v ? $e : $v; }, $results);
510+
}
511+
510512
foreach ($ids as $k => $id) {
511513
yield $id => $results[$k];
512514
}

0 commit comments

Comments
 (0)
0