8000 Allow suspensions within EventLoop::queue · revoltphp/event-loop@45ced59 · GitHub
[go: up one dir, main page]

Skip to content

Commit 45ced59

Browse files
committed
Allow suspensions within EventLoop::queue
This ensures it's safe to call any code within EventLoop::queue callbacks by executing callbacks in an extra fiber instead of directly from the event loop fiber. Driver::interrupt has been added to allow the Suspension class to still suspend the event loop fiber itself to continue with {main}.
1 parent 5e0910c commit 45ced59

File tree

5 files changed

+124
-51
lines changed

5 files changed

+124
-51
lines changed

src/EventLoop/Driver.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ public function run(): void;
2929
*/
3030
public function stop(): void;
3131

32+
/**
33+
* Interrupts the event loop and continues with {main}.
34+
*
35+
* The driver MUST check for a set interrupt after invoking an event callback or microtask. If an interrupt exists,
36+
* it must be reset, and the driver must suspend with the given callback, i.e. call \Fiber::suspend($callback);
37+
*
38+
* @param callable $callback Callback to run on {main} before continuing.
39+
*
40+
* @internal This API is only supposed to be called by the Suspension API.
41+
*/
42+
public function interrupt(callable $callback): void;
43+
3244
/**
3345
* @return bool True if the event loop is running, false if it is stopped.
3446
*/

src/EventLoop/Driver/TracingDriver.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ public function stop(): void
3636
$this->driver->stop();
3737
}
3838

39+
public function interrupt(callable $callback): void
40+
{
41+
$this->driver->interrupt($callback);
42+
}
43+
3944
public function isRunning(): bool
4045
{
4146
return $this->driver->isRunning();

src/EventLoop/Internal/AbstractDriver.php

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ abstract class AbstractDriver implements Driver
2222
/** @var string Next callback identifier. */
2323
private string $nextId = "a";
2424

25-
private \Fiber $fiber;
25+
private \Fiber $callbackFiber;
26+
private \Fiber $queueFiber;
2627

2728
/** @var Callback[] */
2829
private array $callbacks = [];
@@ -42,11 +43,18 @@ abstract class AbstractDriver implements Driver
4243
/** @var callable(\Throwable):void|null */
4344
private $errorHandler;
4445

46+
/** @var callable|null */
47+
private $interrupt;
48+
4549
private bool $running = false;
4650

51+
private \stdClass $internalSuspensionMarker;
52+
4753
public function __construct()
4854
{
49-
$this->fiber = $this->createFiber();
55+
$this->internalSuspensionMarker = new \stdClass();
56+
$this->createCallbackFiber();
57+
$this->createQueueFiber();
5058
}
5159

5260
/**
@@ -98,6 +106,11 @@ public function stop(): void
98106
$this->running = false;
99107
}
100108

109+
public function interrupt(callable $callback): void
110+
{
111+
$this->interrupt = $callback;
112+
}
113+
101114
/**
102115
* @return bool True if the event loop is running, false if it is stopped.
103116
*/
@@ -560,22 +573,28 @@ abstract protected function deactivate(Callback $callback): void;
560573

561574
protected function invokeCallback(Callback $callback): void
562575
{
563-
if ($this->fiber->isRunning()) {
564-
$this->fiber = $this->createFiber();
576+
if ($this->callbackFiber->isRunning()) {
577+
$this->createCallbackFiber();
565578
}
566579

567580
try {
568-
$yielded = $this->fiber->resume($callback);
569-
570-
if ($yielded !== $callback) {
581+
$yielded = $this->callbackFiber->resume($callback);
582+
if ($yielded !== $this->internalSuspensionMarker) {
571583
// Callback suspended.
572-
$this->fiber = $this->createFiber();
584+
$this->createCallbackFiber();
573585
}
574586
} catch (\Throwable $exception) {
575-
$this->fiber = $this->createFiber();
587+
$this->createCallbackFiber();
576588
$this->error($exception);
577589
}
578590

591+
if ($this->interrupt) {
592+
$interrupt = $this->interrupt;
593+
$this->interrupt = null;
594+
595+
\Fiber::suspend($interrupt);
596+
}
597+
579598
if ($this->microQueue) {
580599
$this->invokeMicrotasks();
581600
}
@@ -656,22 +675,39 @@ private function tick(): void
656675
private function invokeMicrotasks(): void
657676
{
658677
while ($this->microQueue) {
659-
foreach ($this->microQueue as $id => [$callable, $args]) {
678+
foreach ($this->microQueue as $id => $queueEntry) {
679+
if ($this->queueFiber->isRunning()) {
680+
$this->createQueueFiber();
681+
}
682+
660683
try {
661684
unset($this->microQueue[$id]);
662-
$callable(...$args);
685+
686+
$yielded = $this->queueFiber->resume($queueEntry);
687+
if ($yielded !== $this->internalSuspensionMarker) {
688+
$this->createQueueFiber();
689+
}
663690
} catch (\Throwable $exception) {
691+
$this->createQueueFiber();
664692
$this->error($exception);
665693
}
694+
695+
if ($this->interrupt) {
696+
$interrupt = $this->interrupt;
697+
$this->interrupt = null;
698+
699+
\Fiber::suspend($interrupt);
700+
}
666701
}
667702
}
668703
}
669704

670-
private function createFiber(): \Fiber
705+
private function createCallbackFiber(): void
671706
{
672-
$fiber = new \Fiber(static function (): void {
673-
$callback = null;
674-
while ($callback = \Fiber::suspend($callback)) {
707+
$suspensionMarker = $this->internalSuspensionMarker;
708+
709+
$this->callbackFiber = new \Fiber(static function () use ($suspensionMarker): void {
710+
while ($callback = \Fiber::suspend($suspensionMarker)) {
675711
$result = match (true) {
676712
$callback instanceof StreamCallback => ($callback->callback)($callback->id, $callback->stream),
677713
$callback instanceof SignalCallback => ($callback->callback)($callback->id, $callback->signal),
@@ -681,10 +717,26 @@ private function createFiber(): \Fiber
681717
if ($result !== null) {
682718
throw InvalidCallbackError::nonNullReturn($callback->id, $callback->callback);
683719
}
720+
721+
unset($callback);
722+
}
723+
});
724+
725+
$this->callbackFiber->start();
726+
}
727+
728+
private function createQueueFiber(): void
729+
{
730+
$suspensionMarker = $this->internalSuspensionMarker;
731+
732+
$this->queueFiber = new \Fiber(static function () use ($suspensionMarker): void {
733+
while ([$callback, $args] = \Fiber::suspend($suspensionMarker)) {
734+
$callback(...$args);
735+
736+
unset($callback, $args);
684737
}
685738
});
686739

687-
$fiber->start();
688-
return $fiber;
740+
$this->queueFiber->start();
689741
}
690742
}

src/EventLoop/Suspension.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public function throw(\Throwable $throwable): void
6060
$this->driver->queue([$this->fiber, 'throw'], $throwable);
6161
} else {
6262
// Suspend event loop fiber to {main}.
63-
$this->driver->queue([\Fiber::class, 'suspend'], static fn () => throw $throwable);
63+
$this->driver->interrupt(static fn () => throw $throwable);
6464
}
6565
}
6666

@@ -76,7 +76,7 @@ public function resume(mixed $value): void
7676
$this->driver->queue([$this->fiber, 'resume'], $value);
7777
} else {
7878
// Suspend event loop fiber to {main}.
79-
$this->driver->queue([\Fiber::class, 'suspend'], static fn () => $value);
79+
$this->driver->interrupt(static fn () => $value);
8080
}
8181
}
8282

test/EventLoopTest.php

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,8 @@ public function testRepeatWithNegativeInterval(): void
2424

2525
public function testOnReadable(): void
2626
{
27-
if (!\class_exists(\Fiber::class, false)) {
28-
self::markTestSkipped("Fibers required for this test");
29-
}
30-
3127
$ends = \stream_socket_pair(
32-
\stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
28+
\DIRECTORY_SEPARATOR === "\\" ? STREAM_PF_INET : STREAM_PF_UNIX,
3329
STREAM_SOCK_STREAM,
3430
STREAM_IPPROTO_IP
3531
);
@@ -51,12 +47,8 @@ public function testOnReadable(): void
5147
self::assertSame(1, $count);
5248
}
5349

54-
public function testOnWritable()
50+
public function testOnWritable(): void
5551
{
56-
if (!\class_exists(\Fiber::class, false)) {
57-
self::markTestSkipped("Fibers required for this test");
58-
}
59-
6052
$count = 0;
6153
$suspension = EventLoop::createSuspension();
6254

@@ -95,12 +87,25 @@ public function testRun(): void
9587
self::assertTrue($invoked);
9688
}
9789

98-
public function testRunInFiber(): void
90+
public function testFiberReuse(): void
9991
{
100-
if (!\class_exists(\Fiber::class, false)) {
101-
self::markTestSkipped("Fibers required for this test");
102-
}
92+
EventLoop::defer(function () use (&$fiber1): void {
93+
$fiber1 = \Fiber::getCurrent();
94+
});
95+
96+
EventLoop::defer(function () use (&$fiber2): void {
97+
$fiber2 = \Fiber::getCurrent();
98+
});
99+
100+
EventLoop::run();
103101

102+
self::assertNotNull($fiber1);
103+
self::assertNotNull($fiber2);
104+
self::assertSame($fiber1, $fiber2);
105+
}
106+
107+
public function testRunInFiber(): void
108+
{
104109
launch(fn () => EventLoop::run());
105110

106111
$this->expectException(\Error::class);
@@ -111,10 +116,6 @@ public function testRunInFiber(): void
111116

112117
public function testRunAfterSuspension(): void
113118
{
114-
if (!\class_exists(\Fiber::class, false)) {
115-
self::markTestSkipped("Fibers required for this test");
116-
}
117-
118119
$suspension = EventLoop::createSuspension();
119120

120121
EventLoop::defer(fn () => $suspension->resume('test'));
@@ -131,12 +132,8 @@ public function testRunAfterSuspension(): void
131132
self::assertTrue($invoked);
132133
}
133134

134-
public function testSuspensionAfter(): void
135+
public function testSuspensionAfterRun(): void
135136
{
136-
if (!\class_exists(\Fiber::class, false)) {
137-
self::markTestSkipped("Fibers required for this test");
138-
}
139-
140137
$invoked = false;
141138
EventLoop::defer(function () use (&$invoked): void {
142139
$invoked = true;
@@ -153,13 +150,10 @@ public function testSuspensionAfter(): void
153150
self::assertSame($suspension->suspend(), 'test');
154151
}
155152

156-
public function testSuspensionWithinFiberWithinRun(): void
153+
public function testSuspensionWithinFiber(): void
157154
{
158-
if (!\class_exists(\Fiber::class, false)) {
159-
self::markTestSkipped("Fibers required for this test");
160-
}
161-
162155
$invoked = false;
156+
163157
launch(function () use (&$invoked): void {
164158
$suspension = EventLoop::createSuspension();
165159

@@ -177,10 +171,6 @@ public function testSuspensionWithinFiberWithinRun(): void
177171

178172
public function testSuspensionWithinCallback(): void
179173
{
180-
if (!\class_exists(\Fiber::class, false)) {
181-
self::markTestSkipped("Fibers required for this test");
182-
}
183-
184174
$send = 42;
185175

186176
EventLoop::defer(static function () use (&$received, $send): void {
@@ -189,6 +179,20 @@ public function testSuspensionWithinCallback(): void
189179
$received = $suspension->suspend();
190180
});
191181

182+
EventLoop::run();
183+
184+
self::assertSame($send, $received);
185+
}
186+
187+
public function testSuspensionWithinQueue(): void
188+
{
189+
$send = 42;
190+
191+
EventLoop::queue(static function () use (&$received, $send): void {
192+
$suspension = EventLoop::createSuspension();
193+
EventLoop::defer(static fn () => $suspension->resume($send));
194+
$received = $suspension->suspend();
195+
});
192196

193197
EventLoop::run();
194198

0 commit comments

Comments
 (0)
0