8000 Add a Memcached and Semaphore store · symfony/symfony@c48619b · GitHub
[go: up one dir, main page]

Skip to content

Commit c48619b

Browse files
committed
Add a Memcached and Semaphore store
1 parent d335d06 commit c48619b

12 files changed

+400
-7
lines changed

src/Symfony/Component/Lock/Store/CombinedStore.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class CombinedStore extends AbstractStore implements LoggerAwareInterface
3636
private $quorum;
3737

3838
/**
39-
* @param StoreInterface[] $stores The decorated stores
39+
* @param StoreInterface[] $stores The list of synchronized stores
4040
* @param QuorumInterface $quorum
4141
*
4242
* @throws InvalidArgumentException
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Lock\Store;
13+
14+
use Symfony\Component\Lock\Exception\InvalidArgumentException;
15+
use Symfony\Component\Lock\Exception\LockConflictedException;
16+
use Symfony\Component\Lock\Key;
17+
18+
/**
19+
* MemcachedStore is a StoreInterface implementation using Memcached as store engine.
20+
*
21+
* @author Jérémy Derussé <jeremy@derusse.com>
22+
*/
23+
class MemcachedStore extends AbstractStore
24+
{
25+
private $memcached;
26+
private $defaultTtl;
27+
/** @var bool */
28+
private $useExtendedReturn;
29+
30+
/**
31+
* @param \Memcached $memcached
32+
* @param int $defaultTtl the default expiration delay of locks in seconds
33+
*/
34+
public function __construct(\Memcached $memcached, $defaultTtl = 300)
35+
{
36+
if ($defaultTtl < 1) {
37+
throw new InvalidArgumentException(sprintf('%s() expects a strictly positive TTL. Got %d.', __METHOD__, $defaultTtl));
38+
}
39+
40+
$this->memcached = $memcached;
41+
$this->defaultTtl = $defaultTtl;
42+
}
43+
44+
/**
45+
* {@inheritdoc}
46+
*/
47+
public function save(Key $key)
48+
{
49+
$token = $this->getToken($key);
50+
51+
if ($this->memcached->add((string) $key, $token, (int) ceil($this->defaultTtl))) {
52+
return;
53+
}
54+
55+
// the lock is already acquire. It could be us. Let's try to put off.
56+
$this->putOffExpiration($key, $this->defaultTtl);
57+
}
58+
59+
public function waitAndSave(Key $key)
60+
{
61+
throw new InvalidArgumentException(
62+
sprintf('The store "%s" does not supports blocking locks', get_class($this))
63+
);
64+
}
65+
66+
/**
67+
* {@inheritdoc}
68+
*/
69+
public function putOffExpiration(Key $key, $ttl)
70+
{
71+
if ($ttl < 1) {
72+
throw new InvalidArgumentException(sprintf('%s() expects a TTL greater or equals to 1. Got %s.', __METHOD__, $ttl));
73+
}
74+
75+
// Interface define a float value but Store required an integer.
76+
$ttl = (int) ceil($ttl);
77+
78+
$token = $this->getToken($key);
79+
80+
list($value, $cas) = $this->getValueAndCas($key);
81+
82+
// Could happens when we ask a putOff after a timeout but in luck nobody steal the lock
83+
if (\Memcached::RES_NOTFOUND === $this->memcached->getResultCode()) {
84+
if ($this->memcached->add((string) $key, $token, $ttl)) {
85+
return;
86+
}
87+
88+
// no luck, with concurrency, someone else acquire the lock
89+
throw new LockConflictedException();
90+
}
91+
92+
// Someone else steal the lock
93+
if ($value !== $token) {
94+
throw new LockConflictedException();
95+
}
96+
97+
if (!$this->memcached->cas($cas, (string) $key, $token, $ttl)) {
98+
throw new LockConflictedException();
99+
}
100+
}
101+
102+
/**
103+
* {@inheritdoc}
104+
*/
105+
public function delete(Key $key)
106+
{
107+
$token = $this->getToken($key);
108+
109+
list($value, $cas) = $this->getValueAndCas($key);
110+
111+
if ($value !== $token) {
112+
// we are not the owner of the lock. Nothing to do.
113+
return;
114+
}
115+
116+
// To avoid concurrency in deletion, the trick is to extends the TTL then deleting the key
117+
if (!$this->memcached->cas($cas, (string) $key, $token, 2)) {
118+
// Someone steal our lock. It does not belongs to us anymore. Nothing to do.
119+
return;
120+
}
121+
122+
// Now, we are the owner of the lock for 2 more seconds, we can delete it.
123+
$this->memcached->delete((string) $key);
124+
}
125+
126+
/**
127+
* {@inheritdoc}
128+
*/
129+
public function exists(Key $key)
130+
{
131+
return $this->memcached->get((string) $key) === $this->getToken($key);
132+
}
133+
134+
/**
135+
* Retrieve an unique token for the given key.
136+
*
137+
* @param Key $key
138+
*
139+
* @return string
140+
*/
141+
private function getToken(Key $key)
142+
{
143+
if (!$key->hasState(__CLASS__)) {
144+
$token = base64_encode(random_bytes(32));
145+
$key->setState(__CLASS__, $token);
146+
}
147+
148+
return $key->getState(__CLASS__);
149+
}
150+
151+
private function getValueAndCas(Key $key)
152+
{
153+
if (null === $this->useExtendedReturn) {
154+
$this->useExtendedReturn = version_compare(phpversion('memcached'), '2.9.9', '>');
155+
}
156+
157+
if ($this->useExtendedReturn) {
158+
$extendedReturn = $this->memcached->get((string) $key, null, \Memcached::GET_EXTENDED);
159+
if ($extendedReturn === \Memcached::GET_ERROR_RETURN_VALUE) {
160+
return [$extendedReturn, 0.0];
161+
}
162+
163+
return [$extendedReturn['value'], $extendedReturn['cas']];
164+
}
165+
166+
$cas = 0.0;
167+
$value = $this->memcached->get((string) $key, null, $cas);
168+
169+
return [$value, $cas];
170+
}
171+
}

src/Symfony/Component/Lock/Store/RedisStore.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,18 @@ public function __construct($redisClient, $defaultTtl = 300.0)
3535
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, is_object($redisClient) ? get_class($redisClient) : gettype($redisClient)));
3636
}
3737

38+
if ($defaultTtl <= 0) {
39+
throw new InvalidArgumentException(sprintf('%s() expects a strictly positive TTL. Got %d.', __METHOD__, $defaultTtl));
40+
}
41+
3842
$this->redis = $redisClient;
3943
$this->defaultTtl = $defaultTtl;
4044
}
4145

4246
/**
4347
* {@inheritdoc}
4448
*/
45-
public function save(Key $key, $blocking = false)
49+
public function save(Key $key)
4650
{
4751
$script = '
4852
if redis.call("GET", KEYS[1]) == ARGV[1] then
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Lock\Store;
13+
14+
use Symfony\Component\Lock\Exception\InvalidArgumentException;
15+
use Symfony\Component\Lock\Exception\LockConflictedException;
16+
use Symfony\Component\Lock\Exception\NotSupportedException;
17+
use Symfony\Component\Lock\Key;
18+
use Symfony\Component\Lock\StoreInterface;
19+
20+
/**
21+
* SemaphoreStore is a StoreInterface implementation using Semaphore as store engine.
22+
*
23+
* @author Jérémy Derussé <jeremy@derusse.com>
24+
*/
25+
class SemaphoreStore extends AbstractStore implements StoreInterface
26+
{
27+
/**
28+
* {@inheritdoc}
29+
*/
30+
public function save(Key $key)
31+
{
32+
$this->lock($key, false);
33+
}
34+
35+
/**
36+
* {@inheritdoc}
37+
*/
38+
public function waitAndSave(Key $key)
39+
{
40+
$this->lock($key, true);
41+
}
42+
43+
private function lock(Key $key, $blocking)
44+
{
45+
if ($key->hasState(__CLASS__)) {
46+
return;
47+
}
48+
49+
$resource = sem_get(crc32($key));
50+
51+
if (PHP_VERSION_ID < 50601) {
52+
if (!$blocking) {
53+
throw new NotSupportedException(sprintf('The store "%s" does not supports non blocking locks', get_class($this)));
54+
}
55+
56+
$acquired = sem_acquire($resource);
57+
} else {
58+
$acquired = sem_acquire($resource, !$blocking);
59+
}
60+
61+
62+
if (!$acquired) {
63+
throw new LockConflictedException();
64+
}
65+
66+
$key->setState(__CLASS__, $resource);
67+
}
68+
69+
/**
70+
* {@inheritdoc}
71+
*/
72+
public function delete(Key $key)
73+
{
74+
// The lock is maybe not acquired.
75+
if (!$key->hasState(__CLASS__)) {
76+
return;
77+
}
78+
79+
$resource = $key->getState(__CLASS__);
80+
81+
sem_release($resource);
82+
83+
$key->removeState(__CLASS__);
84+
}
85+
86+
/**
87+
* {@inheritdoc}
88+
*/
89+
public function putOffExpiration(Key $key, $ttl)
90+
{
91+
// do nothing, the flock locks forever.
92+
}
93+
94+
/**
95+
* {@inheritdoc}
96+
*/
97+
public function exists(Key $key)
98+
{
99+
return $key->hasState(__CLASS__);
100+
}
101+
}

src/Symfony/Component/Lock/Tests/Store/AbstractRedisStoreTest.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,23 @@ abstract class AbstractRedisStoreTest extends AbstractStoreTest
2020
{
2121
use ExpirableStoreTestTrait;
2222

23+
/**
24+
* {@inheritdoc}
25+
*/
26+
protected function getClockDelay() {
27+
return 2000000;
28+
}
29+
2330
/**
2431
* Return a RedisConnection.
2532
*
2633
* @return \Redis|\RedisArray|\RedisCluster|\Predis\Client
2734
*/
2835
abstract protected function getRedisConnection();
2936

37+
/**
38+
* {@inheritdoc}
39+
*/
3040
public function getStore()
3141
{
3242
return new RedisStore($this->getRedisConnection());

src/Symfony/Component/Lock/Tests/Store/BlockingStoreTestTrait.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ abstract protected function getStore();
3434
public function testBlockingLocks()
3535
{
3636
// Amount a microsecond used to order async actions
37-
$clockDelay = 20000;
37+
$clockDelay = 30000;
3838

3939
if (PHP_VERSION_ID < 50600 || defined('HHVM_VERSION_ID')) {
4040
$this->markTestSkipped('The PHP engine does not keep resource in child forks');
@@ -49,6 +49,7 @@ public function testBlockingLocks()
4949
if ($childPID1 = pcntl_fork()) {
5050
if ($childPID2 = pcntl_fork()) {
5151
if ($childPID3 = pcntl_fork()) {
52+
// This is the parent, wait for the end of child process to assert their results
5253
pcntl_waitpid($childPID1, $status1);
5354
pcntl_waitpid($childPID2, $status2);
5455
pcntl_waitpid($childPID3, $status3);

src/Symfony/Component/Lock/Tests/Store/CombinedStoreTest.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ class CombinedStoreTest extends AbstractStoreTest
2626
{
2727
use ExpirableStoreTestTrait;
2828

29+
/**
30+
* {@inheritdoc}
31+
*/
32+
protected function getClockDelay() {
33+
return 2000000;
34+
}
35+
36+
/**
37+
* {@inheritdoc}
38+
*/
2939
public function getStore()
3040
{
3141
$redis = new \Predis\Client('tcp://'.getenv('REDIS_HOST').':6379');

src/Symfony/Component/Lock/Tests/Store/ExpirableStoreTestTrait.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
*/
2020
trait ExpirableStoreTestTrait
2121
{
22+
/**
23+
* Amount a microsecond used to order async actions
24+
*
25+
* @return int
26+
*/
27+
abstract protected function getClockDelay();
28+
2229
/**
2330
* @see AbstractStoreTest::getStore()
2431
*/
@@ -31,10 +38,8 @@ abstract protected function getStore();
3138
*/
3239
public function testExpiration()
3340
{
34-
// Amount a microsecond used to order async actions
35-
$clockDelay = 20000;
36-
3741
$key = new Key(uniqid(__METHOD__, true));
42+
$clockDelay = $this->getClockDelay();
3843

3944
/** @var StoreInterface $store */
4045
$store = $this->getStore();
@@ -55,7 +60,7 @@ public function testExpiration()
5560
public function testRefreshLock()
5661
{
5762
// Amount a microsecond used to order async actions
58-
$clockDelay = 20000;
63+
$clockDelay = $this->getClockDelay();
5964

6065
// Amount a microsecond used to order async actions
6166
$key = new Key(uniqid(__METHOD__, true));

0 commit comments

Comments
 (0)
0