diff --git a/src/EventLoop/DefaultFiberFactory.php b/src/EventLoop/DefaultFiberFactory.php new file mode 100644 index 0000000..9dafbb5 --- /dev/null +++ b/src/EventLoop/DefaultFiberFactory.php @@ -0,0 +1,20 @@ + */ private array $signals = []; - public function __construct() + public function __construct(?FiberFactory $fiberFactory = null) { - parent::__construct(); + parent::__construct($fiberFactory); $this->handle = new \EvLoop(); diff --git a/src/EventLoop/Driver/EventDriver.php b/src/EventLoop/Driver/EventDriver.php index 869c3a9..7e8475a 100644 --- a/src/EventLoop/Driver/EventDriver.php +++ b/src/EventLoop/Driver/EventDriver.php @@ -6,6 +6,7 @@ namespace Revolt\EventLoop\Driver; +use Revolt\EventLoop\FiberFactory; use Revolt\EventLoop\Internal\AbstractDriver; use Revolt\EventLoop\Internal\DriverCallback; use Revolt\EventLoop\Internal\SignalCallback; @@ -34,9 +35,9 @@ public static function isSupported(): bool /** @var array */ private array $signals = []; - public function __construct() + public function __construct(?FiberFactory $fiberFactory = null) { - parent::__construct(); + parent::__construct($fiberFactory); /** @psalm-suppress TooFewArguments https://github.com/JetBrains/phpstorm-stubs/pull/763 */ $this->handle = new \EventBase(); diff --git a/src/EventLoop/Driver/StreamSelectDriver.php b/src/EventLoop/Driver/StreamSelectDriver.php index 4f1b78f..3b909e7 100644 --- a/src/EventLoop/Driver/StreamSelectDriver.php +++ b/src/EventLoop/Driver/StreamSelectDriver.php @@ -6,6 +6,7 @@ namespace Revolt\EventLoop\Driver; +use Revolt\EventLoop\FiberFactory; use Revolt\EventLoop\Internal\AbstractDriver; use Revolt\EventLoop\Internal\DriverCallback; use Revolt\EventLoop\Internal\SignalCallback; @@ -43,9 +44,9 @@ final class StreamSelectDriver extends AbstractDriver private bool $streamSelectIgnoreResult = false; - public function __construct() + public function __construct(?FiberFactory $fiberFactory = null) { - parent::__construct(); + parent::__construct($fiberFactory); $this->signalQueue = new \SplQueue(); $this->timerQueue = new TimerQueue(); diff --git a/src/EventLoop/Driver/UvDriver.php b/src/EventLoop/Driver/UvDriver.php index f46d909..5b915ff 100644 --- a/src/EventLoop/Driver/UvDriver.php +++ b/src/EventLoop/Driver/UvDriver.php @@ -4,6 +4,7 @@ namespace Revolt\EventLoop\Driver; +use Revolt\EventLoop\FiberFactory; use Revolt\EventLoop\Internal\AbstractDriver; use Revolt\EventLoop\Internal\DriverCallback; use Revolt\EventLoop\Internal\SignalCallback; @@ -31,9 +32,9 @@ public static function isSupported(): bool private readonly \Closure $timerCallback; private readonly \Closure $signalCallback; - public function __construct() + public function __construct(?FiberFactory $fiberFactory = null) { - parent::__construct(); + parent::__construct($fiberFactory); $this->handle = \uv_loop_new(); diff --git a/src/EventLoop/FiberFactory.php b/src/EventLoop/FiberFactory.php new file mode 100644 index 0000000..c4aae40 --- /dev/null +++ b/src/EventLoop/FiberFactory.php @@ -0,0 +1,17 @@ + */ private array $callbacks = []; @@ -58,13 +61,15 @@ abstract class AbstractDriver implements Driver /** @var \SplQueue */ private readonly \SplQueue $callbackQueue; + private readonly FiberFactory $fiberFactory; + private bool $idle = false; private bool $stopped = false; /** @var \WeakMap> */ private \WeakMap $suspensions; - public function __construct() + public function __construct(?FiberFactory $fiberFactory = null) { if (\PHP_VERSION_ID < 80117 || \PHP_VERSION_ID >= 80200 && \PHP_VERSION_ID < 80204) { // PHP GC is broken on early 8.1 and 8.2 versions, see https://github.com/php/php-src/issues/10496 @@ -73,6 +78,7 @@ public function __construct() throw new \Error('Your version of PHP is affected by serious garbage collector bugs related to fibers. Please upgrade to a newer version of PHP, i.e. >= 8.1.17 or => 8.2.4'); } } + $this->fiberFactory = $fiberFactory ?? new DefaultFiberFactory(); $this->suspensions = new \WeakMap(); @@ -82,7 +88,7 @@ public function __construct() $this->createLoopFiber(); $this->createCallbackFiber(); - $this->createErrorCallback(); + $this->createErrorFiber(); /** @psalm-suppress InvalidArgument */ $this->interruptCallback = $this->setInterrupt(...); @@ -402,10 +408,15 @@ final protected function error(\Closure $closure, \Throwable $exception): void return; } - $fiber = new \Fiber($this->errorCallback); - + if ($this->errorFiber->isTerminated()) { + $this->createErrorFiber(); + } /** @noinspection PhpUnhandledExceptionInspection */ - $fiber->start($this->errorHandler, $exception); + $yielded = $this->errorFiber->isStarted() ? $this->errorFiber->resume($exception) : $this->errorFiber->start($exception); + + if ($yielded !== $this->internalSuspensionMarker) { + $this->createErrorFiber(); + } } /** @@ -531,7 +542,7 @@ private function invokeInterrupt(): void private function createLoopFiber(): void { - $this->fiber = new \Fiber(function (): void { + $this->fiber = $this->fiberFactory->create(function (): void { $this->stopped = false; // Invoke microtasks if we have some @@ -558,7 +569,7 @@ private function createLoopFiber(): void private function createCallbackFiber(): void { - $this->callbackFiber = new \Fiber(function (): void { + $this->callbackFiber = $this->fiberFactory->create(function (): void { do { $this->invokeMicrotasks(); @@ -623,16 +634,22 @@ private function createCallbackFiber(): void }); } - private function createErrorCallback(): void + private function createErrorFiber(): void { - $this->errorCallback = function (\Closure $errorHandler, \Throwable $exception): void { - try { - $errorHandler($exception); - } catch (\Throwable $exception) { - $this->interrupt = static fn () => $exception instanceof UncaughtThrowable - ? throw $exception - : throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception); - } - }; + $this->errorFiber = new \Fiber(function (\Throwable $exception): void { + do { + try { + \assert($this->errorHandler !== null); + ($this->errorHandler)($exception); + } catch (\Throwable $exception) { + $errorHandler = $this->errorHandler; + \assert($errorHandler !== null); + $this->interrupt = static fn () => $exception instanceof UncaughtThrowable + ? throw $exception + : throw UncaughtThrowable::throwingErrorHandler($errorHandler, $exception); + } + $exception = Fiber::suspend($this->internalSuspensionMarker); + } while (true); + }); } } diff --git a/src/EventLoop/TracingFiberFactory.php b/src/EventLoop/TracingFiberFactory.php new file mode 100644 index 0000000..6da816b --- /dev/null +++ b/src/EventLoop/TracingFiberFactory.php @@ -0,0 +1,64 @@ + + */ +final class TracingFiberFactory implements FiberFactory, Countable, IteratorAggregate +{ + /** + * @var \WeakMap<\Fiber, null> + */ + private readonly \WeakMap $map; + + public function __construct( + private readonly FiberFactory $fiberFactory = new DefaultFiberFactory() + ) { + /** @var \WeakMap<\Fiber, null> */ + $this->map = new \WeakMap(); + } + + /** + * Creates a new fiber instance. + * + * @param callable $callable The callable to invoke when starting the fiber. + * + * @return \Fiber + */ + public function create(callable $callback): \Fiber + { + $f = $this->fiberFactory->create($callback); + /** @psalm-suppress InaccessibleProperty */ + $this->map[$f] = null; + return $f; + } + + /** + * Returns the number of running fibers. + * + * @return int + */ + public function count(): int + { + return $this->map->count(); + } + + /** + * Iterate over all fibers currently in scope. + * + * @return Traversable<\Fiber, null> + */ + public function getIterator(): Traversable + { + return $this->map->getIterator(); + } +} diff --git a/test/Driver/TracingFiberFactoryTest.php b/test/Driver/TracingFiberFactoryTest.php new file mode 100644 index 0000000..5ef1dd5 --- /dev/null +++ b/test/Driver/TracingFiberFactoryTest.php @@ -0,0 +1,36 @@ +count()); + $this->start(static function (Driver $loop): void { + $loop->queue(static function () use ($loop) { + $suspension = $loop->getSuspension(); + $loop->delay(1, $suspension->resume(...)); + $suspension->suspend(); + }); + $loop->delay(0.5, function () { + self::assertEquals(3, self::$factory->count()); + }); + }); + self::assertEquals(2, self::$factory->count()); + } +}