Skip to content

Commit

Permalink
🚧 A+
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Apr 21, 2021
1 parent 98c7e6e commit b4916fc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 5 deletions.
File renamed without changes.
File renamed without changes.
29 changes: 27 additions & 2 deletions src/Adapter/Swoole/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
use Swoole\Event;
use RuntimeException;
use function extension_loaded;
Expand Down Expand Up @@ -61,7 +62,31 @@ public function async(\Generator $generator): Promise
*/
public function promiseAll(Promise ...$promises): Promise
{

return new YieldPromise(function($resolve, $reject) use ($promises) {
$ticks = count($promises);
$firstError = null;
$result = [];
$key = 0;
foreach ($promises as $promise) {
YieldPromise::wrap($promise)
->then(static function ($value) use ($key, &$result, &$ticks, $resolve) {
$result[$key] = $value;
$ticks--;
if($ticks === 0) {
ksort($result);
$resolve($result);
}
return $value;
}, function ($error) use (&$firstError, &$ticks, $reject) {
$ticks--;
if ($firstError === null) {
$firstError = $error;
$reject($firstError);
}
});
$key++;
}
});
}

/**
Expand Down Expand Up @@ -124,7 +149,7 @@ public function delay(int $milliseconds): Promise
{
$promise = new YieldPromise();
Coroutine::create(function() use($milliseconds, $promise) {
Coroutine::sleep($milliseconds / 1000 /* ms -> s */);
Coroutine::sleep($milliseconds / 1000);
$promise->resolve(null);
});

Expand Down
68 changes: 65 additions & 3 deletions src/Adapter/Swoole/Internal/YieldPromise.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,62 @@

final class YieldPromise implements Promise, Deferred
{
private $cids = [];
private $isSettled = false;
private $cids;
private $executor;
private $isSettled;
private $value;
private $exception;

public function __construct(?callable $executor = null)
{
$this->cids = [];
$this->isSettled = false;
$this->executor = function() use ($executor) {
Coroutine::create(function () use ($executor) {
if ($executor) {
$resolve = function ($value) {
$this->resolve($value);
};
$reject = function ($exception) {
$this->reject($exception);
};

try {
$executor($resolve, $reject);
} catch (\Throwable $exception) {
$reject($exception);
}
}
});
};
}

public function then(?callable $onFulfilled = null, ?callable $onRejected = null): Promise
{
if($this->isSettled === false) {
return new self(function () use ($onFulfilled, $onRejected) {
try {
($this->executor)($onFulfilled, $onRejected);
} catch (\Throwable $exception) {
$onRejected($exception);
}
});
}

return new self(function (callable $resolve, callable $reject) use ($onFulfilled, $onRejected) {
if($this->exception) {
if($onRejected !== null) {
$onRejected($this->exception);
}
$reject($this->exception);
} else {
if($onFulfilled !== null) {
$onFulfilled($this->value);
}
$resolve($this->value);
}
});
}

public function yield(): void
{
Expand All @@ -25,6 +78,7 @@ public function yield(): void
public function value()
{
assert($this->isSettled, new \Error('Promise is not resolved.'));

return $this->value;
}

Expand All @@ -43,6 +97,7 @@ public function getPromise(): Promise
public function resolve($value): void
{
assert(false === $this->isSettled, new \Error('Promise is already resolved.'));

$this->isSettled = true;
$this->value = $value;
foreach ($this->cids as $cid => $dummy) {
Expand All @@ -53,6 +108,13 @@ public function resolve($value): void

public function reject(\Throwable $throwable): void
{

assert(false === $this->isSettled, new \Error('Promise is already resolved.'));

$this->isSettled = true;
$this->exception = $throwable;
foreach ($this->cids as $cid => $dummy) {
Coroutine::resume($cid);
}
$this->cids = [];
}
}

0 comments on commit b4916fc

Please sign in to comment.