8000 Automatically send unsubscribe/punsubscribe events for PubSub channels · SimonFrings/reactphp-redis@5e4a75c · GitHub
[go: up one dir, main page]

Skip to content

Commit 5e4a75c

Browse files
committed
Automatically send unsubscribe/punsubscribe events for PubSub channels
1 parent 46e4cd2 commit 5e4a75c

File tree

4 files changed

+80
-1
lines changed

4 files changed

+80
-1
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,13 @@ outstanding commands and will return to the initial "idle" state. This
233233
means that you can keep sending additional commands at a later time which
234234
will again try to open the underlying connection.
235235

236+
If the underlying database connection drops while using PubSub channels
237+
(see `SUBSCRIBE` and `PSUBSCRIBE` commands), it will automatically send the
238+
appropriate `unsubscribe` and `punsubscribe` events for all currently active
239+
channel and pattern subscriptions. This allows you to react to these
240+
events and restore your subscriptions by creating a new underlying
241+
connection with the above commands.
242+
236243
Note that creating the underlying connection will be deferred until the
237244
first request is invoked. Accordingly, any eventual connection issues
238245
will be detected once this instance is first used. You can use the

examples/subscribe.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,26 @@
1212
$client = $factory->createLazyClient('localhost');
1313
$client->subscribe($channel)->then(function () {
1414
echo 'Now subscribed to channel ' . PHP_EOL;
15+
}, function (Exception $e) {
16+
echo 'Unable to subscribe: ' . $e->getMessage() . PHP_EOL;
1517
});
1618

1719
$client->on('message', function ($channel, $message) {
1820
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
1921
});
2022

23+
// automatically re-subscribe to channel on connection issues
24+
$client->on('unsubscribe', function ($channel) use ($client, $loop) {
25+
echo 'Unsubscribed from ' . $channel . PHP_EOL;
26+
27+
$loop->addPeriodicTimer(2.0, function ($timer) use ($client, $channel, $loop){
28+
$client->subscribe($channel)->then(function () use ($timer, $loop) {
29+
echo 'Now subscribed again' . PHP_EOL;
30+
$loop->cancelTimer($timer);
31+
}, function (Exception $e) {
32+
echo 'Unable to subscribe again: ' . $e->getMessage() . PHP_EOL;
33+
});
34+
});
35+
});
36+
2137
$loop->run();

src/LazyClient.php

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,34 @@ private function client()
3535
$pending =& $this->promise;
3636
return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending) {
3737
// connection completed => remember only until closed
38-
$client->on('close', function () use (&$pending) {
38+
$subscribed = array();
39+
$psubscribed = array();
40+
$client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed) {
3941
$pending = null;
42+
43+
// foward unsubscribe/punsubscribe events when underlying connection closes
44+
$n = count($subscribed);
45+
foreach ($subscribed as $channel => $_) {
46+
$self->emit('unsubscribe', array($channel, --$n));
47+
}
48+
$n = count($psubscribed);
49+
foreach ($psubscribed as $pattern => $_) {
50+
$self->emit('punsubscribe', array($pattern, --$n));
51+
}
52+
});
53+
54+
// keep track of all channels and patterns this connection is subscribed to
55+
$client->on('subscribe', function ($channel) use (&$subscribed) {
56+
$subscribed[$channel] = true;
57+
});
58+
$client->on('psubscribe', function ($pattern) use (&$psubscribed) {
59+
$psubscribed[$pattern] = true;
60+
});
61+
$client->on('unsubscribe', function ($channel) use (&$subscribed) {
62+
unset($subscribed[$channel]);
63+
});
64+
$client->on('punsubscribe', function ($pattern) use (&$psubscribed) {
65+
unset($psubscribed[$pattern]);
4066
});
4167

4268
Util::forwardEvents(

tests/LazyClientTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,4 +272,34 @@ public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubCh
272272
$this->client->on('message', $this->expectCallableOnce());
273273
$client->emit('message', array('foo', 'bar'));
274274
}
275+
276+
public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClosesWhileUsingPubSubChannel()
277+
{
278+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
279+
$client->expects($this->exactly(6))->method('__call')->willReturn(\React\Promise\resolve());
280+
281+
$this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client));
282+
283+
$this->client->subscribe('foo');
284+
$client->emit('subscribe', array('foo', 1));
285+
286+
$this->client->subscribe('bar');
287+
$client->emit('subscribe', array('bar', 2));
288+
289+
$this->client->unsubscribe('bar');
290+
$client->emit('unsubscribe', array('bar', 1));
291+
292+
$this->client->psubscribe('foo*');
293+
$client->emit('psubscribe', array('foo*', 1));
294+
295+
$this->client->psubscribe('bar*');
296+
$client->emit('psubscribe', array('bar*', 2));
297+
298+
$this->client->punsubscribe('bar*');
299+
$client->emit('punsubscribe', array('bar*', 1));
300+
301+
$this->client->on('unsubscribe', $this->expectCallableOnce());
302+
$this->client->on('punsubscribe', $this->expectCallableOnce());
303+
$client->emit('close');
304+
}
275305
}

0 commit comments

Comments
 (0)
0