10000 Allow suspensions within EventLoop::queue by kelunik · Pull Request #3 · revoltphp/event-loop · GitHub
[go: up one dir, main page]

Skip to content

Allow suspensions within EventLoop::queue #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/EventLoop/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ public function run(): void;
*/
public function stop(): void;

/**
* Interrupts the event loop and continues with {main}.
*
* The driver MUST check for a set interrupt after invoking an event callback or microtask. If an interrupt exists,
* it must be reset, and the driver must suspend with the given callback, i.e. call \Fiber::suspend($callback);
*
* @param callable $callback Callback to run on {main} before continuing.
*
* @internal This API is only supposed to be called by the Suspension API.
*/
public function interrupt(callable $callback): void;

/**
* @return bool True if the event loop is running, false if it is stopped.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/EventLoop/Driver/TracingDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public function stop(): void
$this->driver->stop();
}

public function interrupt(callable $callback): void
{
$this->driver->interrupt($callback);
}

public function isRunning(): bool
{
return $this->driver->isRunning();
Expand Down
86 changes: 69 additions & 17 deletions src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ abstract class AbstractDriver implements Driver
/** @var string Next callback identifier. */
private string $nextId = "a";

private \Fiber $fiber;
private \Fiber $callbackFiber;
private \Fiber $queueFiber;

/** @var Callback[] */
private array $callbacks = [];
Expand All @@ -42,11 +43,18 @@ abstract class AbstractDriver implements Driver
/** @var callable(\Throwable):void|null */
private $errorHandler;

/** @var callable|null */
private $interrupt;

private bool $running = false;

private \stdClass $internalSuspensionMarker;

public function __construct()
{
$this->fiber = $this->createFiber();
$this->internalSuspensionMarker = new \stdClass();
$this->createCallbackFiber();
$this->createQueueFiber();
}

/**
Expand Down Expand Up @@ -98,6 +106,11 @@ public function stop(): void
$this->running = false;
}

public function interrupt(callable $callback): void
{
$this->interrupt = $callback;
}

/**
* @return bool True if the event loop is running, false if it is stopped.
*/
Expand Down Expand Up @@ -560,22 +573,28 @@ abstract protected function deactivate(Callback $callback): void;

protected function invokeCallback(Callback $callback): void
{
if ($this->fiber->isRunning()) {
$this->fiber = $this->createFiber();
if ($this->callbackFiber->isRunning()) {
$this->createCallbackFiber();
}

try {
$yielded = $this->fiber->resume($callback);

if ($yielded !== $callback) {
$yielded = $this->callbackFiber->resume($callback);
if ($yielded !== $this->internalSuspensionMarker) {
// Callback suspended.
$this->fiber = $this->createFiber();
$this->createCallbackFiber();
}
} catch (\Throwable $exception) {
$this->fiber = $this->createFiber();
$this->createCallbackFiber();
$this->error($exception);
}

if ($this->interrupt) {
$interrupt = $this->interrupt;
$this->interrupt = null;

\Fiber::suspend($interrupt);
}

if ($this->microQueue) {
$this->invokeMicrotasks();
}
Expand Down Expand Up @@ -656,22 +675,39 @@ private function tick(): void
private function invokeMicrotasks(): void
{
while ($this->microQueue) {
foreach ($this->microQueue as $id => [$callable, $args]) {
foreach ($this->microQueue as $id => $queueEntry) {
if ($this->queueFiber->isRunning()) {
$this->createQueueFiber();
}

try {
unset($this->microQueue[$id]);
$callable(...$args);

$yielded = $this->queueFiber->resume($queueEntry);
if ($yielded !== $this->internalSuspensionMarker) {
$this->createQueueFiber();
}
} catch (\Throwable $exception) {
$this->createQueueFiber();
$this->error($exception);
}

if ($this->interrupt) {
$interrupt = $this->interrupt;
$this->interrupt = null;

\Fiber::suspend($interrupt);
}
}
}
}

private function createFiber(): \Fiber
private function createCallbackFiber(): void
{
$fiber = new \Fiber(static function (): void {
$callback = null;
while ($callback = \Fiber::suspend($callback)) {
$suspensionMarker = $this->internalSuspensionMarker;

$this->callbackFiber = new \Fiber(static function () use ($suspensionMarker): void {
while ($callback = \Fiber::suspend($suspensionMarker)) {
$result = match (true) {
$callback instanceof StreamCallback => ($callback->callback)($callback->id, $callback->stream),
$callback instanceof SignalCallback => ($callback->callback)($callback->id, $callback->signal),
Expand All @@ -681,10 +717,26 @@ private function createFiber(): \Fiber
if ($result !== null) {
throw InvalidCallbackError::nonNullReturn($callback->id, $callback->callback);
}

unset($callback);
}
});

$this->callbackFiber->start();
}

private function createQueueFiber(): void
{
$suspensionMarker = $this->internalSuspensionMarker;

$this->queueFiber = new \Fiber(static function () use ($suspensionMarker): void {
while ([$callback, $args] = \Fiber::suspend($suspensionMarker)) {
$callback(...$args);

unset($callback, $args);
}
});

$fiber->start();
return $fiber;
$this->queueFiber->start();
}
}
4 changes: 2 additions & 2 deletions src/EventLoop/Suspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function throw(\Throwable $throwable): void
$this->driver->queue([$this->fiber, 'throw'], $throwable);
} else {
// Suspend event loop fiber to {main}.
$this->driver->queue([\Fiber::class, 'suspend'], static fn () => throw $throwable);
$this->driver->interrupt(static fn () => throw $throwable);
}
}

Expand All @@ -76,7 +76,7 @@ public function resume(mixed $value): void
$this->driver->queue([$this->fiber, 'resume'], $value);
} else {
// Suspend event loop fiber to {main}.
$this->driver->queue([\Fiber::class, 'suspend'], static fn () => $value);
$this->driver->interrupt(static fn () => $value);
}
}

Expand Down
68 changes: 36 additions & 32 deletions test/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,8 @@ public function testRepeatWithNegativeInterval(): void

public function testOnReadable(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$ends = \stream_socket_pair(
\stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
\DIRECTORY_SEPARATOR === "\\" ? STREAM_PF_INET : STREAM_PF_UNIX,
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
);
Expand All @@ -51,12 +47,8 @@ public function testOnReadable(): void
self::assertSame(1, $count);
}

public function testOnWritable()
public function testOnWritable(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$count = 0;
$suspension = EventLoop::createSuspension();

Expand Down Expand Up @@ -95,12 +87,25 @@ public function testRun(): void
self::assertTrue($invoked);
}

public function testRunInFiber(): void
public function testFiberReuse(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}
EventLoop::defer(function () use (&$fiber1): void {
$fiber1 = \Fiber::getCurrent();
});

EventLoop::defer(function () use (&$fiber2): void {
$fiber2 = \Fiber::getCurrent();
});

EventLoop::run();

self::assertNotNull($fiber1);
self::assertNotNull($fiber2);
self::assertSame($fiber1, $fiber2);
}

public function testRunInFiber(): void
{
launch(fn () => EventLoop::run());

$this->expectException(\Error::class);
Expand All @@ -111,10 +116,6 @@ public function testRunInFiber(): void

public function testRunAfterSuspension(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$suspension = EventLoop::createSuspension();

EventLoop::defer(fn () => $suspension->resume('test'));
Expand All @@ -131,12 +132,8 @@ public function testRunAfterSuspension(): void
self::assertTrue($invoked);
}

public function testSuspensionAfter(): void
public function testSuspensionAfterRun(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$invoked = false;
EventLoop::defer(function () use (&$invoked): void {
$invoked = true;
Expand All @@ -153,13 +150,10 @@ public function testSuspensionAfter(): void
self::assertSame($suspension->suspend(), 'test');
}

public function testSuspensionWithinFiberWithinRun(): void
public function testSuspensionWithinFiber(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$invoked = false;

launch(function () use (&$invoked): void {
$suspension = EventLoop::createSuspension();

Expand All @@ -177,10 +171,6 @@ public function testSuspensionWithinFiberWithinRun(): void

public function testSuspensionWithinCallback(): void
{
if (!\class_exists(\Fiber::class, false)) {
self::markTestSkipped("Fibers required for this test");
}

$send = 42;

EventLoop::defer(static function () use (&$received, $send): void {
Expand All @@ -189,6 +179,20 @@ public function testSuspensionWithinCallback(): void
$received = $suspension->suspend();
});

EventLoop::run();

self::assertSame($send, $received);
}

public function testSuspensionWithinQueue(): void
{
$send = 42;

EventLoop::queue(static function () use (&$received, $send): void {
$suspension = EventLoop::createSuspension();
EventLoop::defer(static fn () => $suspension->resume($send));
$received = $suspension->suspend();
});

EventLoop::run();

Expand Down
0