Skip to content

Commit e9de03f

Browse files
authored
Merge pull request #154 from clue-labs/include-timeout
Include timeout logic to avoid dependency on reactphp/promise-timer
2 parents 615ae23 + ab31500 commit e9de03f

File tree

4 files changed

+90
-17
lines changed

4 files changed

+90
-17
lines changed

composer.json

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
1717
"react/event-loop": "^1.2",
1818
"react/promise": "^3",
19-
"react/promise-timer": "^1.10",
2019
"react/socket": "^1.15"
2120
},
2221
"require-dev": {

src/Io/Factory.php

+47-9
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
use React\EventLoop\Loop;
77
use React\EventLoop\LoopInterface;
88
use React\Promise\Deferred;
9+
use React\Promise\Promise;
910
use React\Promise\PromiseInterface;
10-
use React\Promise\Timer\TimeoutException;
1111
use React\Socket\ConnectionInterface;
1212
use React\Socket\Connector;
1313
use React\Socket\ConnectorInterface;
1414
use function React\Promise\reject;
15-
use function React\Promise\Timer\timeout;
1615

1716
/**
1817
* @internal
@@ -175,14 +174,53 @@ function (\Exception $e) use ($redis, $uri) {
175174
return $deferred->promise();
176175
}
177176

178-
return timeout($deferred->promise(), $timeout, $this->loop)->then(null, function (\Throwable $e) use ($uri) {
179-
if ($e instanceof TimeoutException) {
180-
throw new \RuntimeException(
181-
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
182-
defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110
183-
);
177+
$promise = $deferred->promise();
178+
179+
/** @var Promise<StreamingClient> */
180+
$ret = new Promise(function (callable $resolve, callable $reject) use ($timeout, $promise, $uri): void {
181+
/** @var ?\React\EventLoop\TimerInterface */
182+
$timer = null;
183+
$promise = $promise->then(function (StreamingClient $v) use (&$timer, $resolve): void {
184+
if ($timer) {
185+
$this->loop->cancelTimer($timer);
186+
}
187+
$timer = false;
188+
$resolve($v);
189+
}, function (\Throwable $e) use (&$timer, $reject): void {
190+
if ($timer) {
191+
$this->loop->cancelTimer($timer);
192+
}
193+
$timer = false;
194+
$reject($e);
195+
});
196+
197+
// promise already settled => no need to start timer
198+
if ($timer === false) {
199+
return;
184200
}
185-
throw $e;
201+
202+
// start timeout timer which will cancel the pending promise
203+
$timer = $this->loop->addTimer($timeout, function () use ($timeout, &$promise, $reject, $uri): void {
204+
$reject(new \RuntimeException(
205+
'Connection to ' . $uri . ' timed out after ' . $timeout . ' seconds (ETIMEDOUT)',
206+
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
207+
));
208+
209+
// Cancel pending connection to clean up any underlying resources and references.
210+
// Avoid garbage references in call stack by passing pending promise by reference.
211+
\assert($promise instanceof PromiseInterface);
212+
$promise->cancel();
213+
$promise = null;
214+
});
215+
}, function () use (&$promise): void {
216+
// Cancelling this promise will cancel the pending connection, thus triggering the rejection logic above.
217+
// Avoid garbage references in call stack by passing pending promise by reference.
218+
\assert($promise instanceof PromiseInterface);
219+
$promise->cancel();
220+
$promise = null;
186221
});
222+
223+
// variable assignment needed for legacy PHPStan on PHP 7.1 only
224+
return $ret;
187225
}
188226
}

tests/FunctionalTest.php

+11-7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55
use Clue\React\Redis\RedisClient;
66
use React\EventLoop\Loop;
7-
use React\Promise\Deferred;
7+
use React\Promise\Promise;
88
use React\Promise\PromiseInterface;
99
use function React\Async\await;
10-
use function React\Promise\Timer\timeout;
1110

1211
class FunctionalTest extends TestCase
1312
{
@@ -144,19 +143,24 @@ public function testPubSub(): void
144143
$channel = 'channel:test:' . mt_rand();
145144

146145
// consumer receives a single message
147-
/** @var Deferred<void> */
148-
$deferred = new Deferred();
149146
$consumer->on('message', $this->expectCallableOnce());
150-
$consumer->on('message', [$deferred, 'resolve']);
151147
$once = $this->expectCallableOnceWith(1);
152148
$consumer->subscribe($channel)->then(function() use ($producer, $channel, $once){
153149
// producer sends a single message
154150
$producer->publish($channel, 'hello world')->then($once);
155151
})->then($this->expectCallableOnce());
156152

157153
// expect "message" event to take no longer than 0.1s
158-
159-
await(timeout($deferred->promise(), 0.1));
154+
await(new Promise(function (callable $resolve, callable $reject) use ($consumer): void {
155+
$timeout = Loop::addTimer(0.1, function () use ($consumer, $reject): void {
156+
$consumer->close();
157+
$reject(new \RuntimeException('Timed out'));
158+
});
159+
$consumer->on('message', function () use ($timeout, $resolve): void {
160+
Loop::cancelTimer($timeout);
161+
$resolve(null);
162+
});
163+
}));
160164

161165
/** @var PromiseInterface<array{0:"unsubscribe",1:string,2:0}> */
162166
$promise = $consumer->unsubscribe($channel);

tests/Io/FactoryStreamingClientTest.php

+32
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Clue\Tests\React\Redis\TestCase;
88
use PHPUnit\Framework\MockObject\MockObject;
99
use React\EventLoop\LoopInterface;
10+
use React\EventLoop\TimerInterface;
1011
use React\Promise\Deferred;
1112
use React\Socket\ConnectionInterface;
1213
use React\Socket\ConnectorInterface;
@@ -633,4 +634,35 @@ public function testCreateClientWithoutTimeoutParameterWillStartTimerWithDefault
633634
$this->factory->createClient('redis://127.0.0.1:2');
634635
ini_set('default_socket_timeout', $old);
635636
}
637+
638+
public function testCreateClientWillCancelTimerWhenConnectionResolves(): void
639+
{
640+
$timer = $this->createMock(TimerInterface::class);
641+
$this->loop->expects($this->once())->method('addTimer')->willReturn($timer);
642+
$this->loop->expects($this->once())->method('cancelTimer')->with($timer);
643+
644+
$deferred = new Deferred();
645+
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise());
646+
647+
$promise = $this->factory->createClient('127.0.0.1');
648+
$promise->then($this->expectCallableOnce());
649+
650+
$deferred->resolve($this->createMock(ConnectionInterface::class));
651+
}
652+
653+
public function testCreateClientWillCancelTimerWhenConnectionRejects(): void
654+
{
655+
$timer = $this->createMock(TimerInterface::class);
656+
$this->loop->expects($this->once())->method('addTimer')->willReturn($timer);
657+
$this->loop->expects($this->once())->method('cancelTimer')->with($timer);
658+
659+
$deferred = new Deferred();
660+
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise());
661+
662+
$promise = $this->factory->createClient('127.0.0.1');
663+
664+
$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
665+
666+
$deferred->reject(new \RuntimeException());
667+
}
636668
}

0 commit comments

Comments
 (0)