Skip to content

Commit 6b21227

Browse files
committed
Better Keep Alive support
1 parent 065b799 commit 6b21227

File tree

4 files changed

+52
-18
lines changed

4 files changed

+52
-18
lines changed

examples/config.php.example

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
$config = array(
44
'server' => 'yourMqttBroker.tld',
55
'port' => 1883,
6-
'options' => null,
6+
'options' => new \oliverlorenz\reactphpmqtt\packet\ConnectionOptions(array(
7+
'keepAlive' => 120,
8+
)),
79
);
810

911
return $config;

src/Connector.php

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ public function create(
7676
->then(function (Stream $stream) {
7777
return $this->listenForPackets($stream);
7878
})
79-
->then(function(Stream $stream) {
80-
return $this->keepAlive($stream);
79+
->then(function(Stream $stream) use ($options) {
80+
return $this->keepAlive($stream, $options->keepAlive);
8181
});
8282
}
8383

@@ -119,12 +119,16 @@ private function listenForPackets(Stream $stream)
119119
return $deferred->promise();
120120
}
121121

122-
private function keepAlive(Stream $stream)
122+
private function keepAlive(Stream $stream, $keepAlive)
123123
{
124-
$this->getLoop()->addPeriodicTimer(10, function(Timer $timer) use ($stream) {
125-
$packet = new PingRequest($this->version);
126-
$this->sendPacketToStream($stream, $packet);
127-
});
124+
if($keepAlive > 0) {
125+
$interval = (int) ($keepAlive / 2);
126+
127+
$this->getLoop()->addPeriodicTimer($interval, function(Timer $timer) use ($stream) {
128+
$packet = new PingRequest($this->version);
129+
$this->sendPacketToStream($stream, $packet);
130+
});
131+
}
128132

129133
return new FulfilledPromise($stream);
130134
}
@@ -142,7 +146,8 @@ public function connect(Stream $stream, ConnectionOptions $options) {
142146
$options->willTopic,
143147
$options->willMessage,
144148
$options->willQos,
145-
$options->willRetain
149+
$options->willRetain,
150+
$options->keepAlive
146151
);
147152
$message = $packet->get();
148153
echo MessageHelper::getReadableByRawString($message);

src/packet/Connect.php

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ class Connect extends ControlPacket {
3939
/** @var null */
4040
protected $willRetain;
4141

42+
/** @var int */
43+
private $keepAlive;
44+
4245
/**
4346
* @param Version $version
4447
* @param string|null $username
@@ -49,6 +52,7 @@ class Connect extends ControlPacket {
4952
* @param string|null $willMessage
5053
* @param bool|null $willQos
5154
* @param null $willRetain
55+
* @param int $keepAlive
5256
*/
5357
public function __construct(
5458
Version $version,
@@ -59,7 +63,9 @@ public function __construct(
5963
$willTopic = null,
6064
$willMessage = null,
6165
$willQos = null,
62-
$willRetain = null
66+
$willRetain = null,
67+
// $keepAlive = 0
68+
$keepAlive = 10
6369
) {
6470
parent::__construct($version);
6571
$this->clientId = $clientId;
@@ -70,6 +76,7 @@ public function __construct(
7076
$this->willMessage = $willMessage;
7177
$this->willQos = boolval($willQos);
7278
$this->willRetain = $willRetain;
79+
$this->keepAlive = $keepAlive;
7380
$this->buildPayload();
7481
}
7582

@@ -101,14 +108,24 @@ public static function getControlPacketType()
101108
*/
102109
protected function getVariableHeader()
103110
{
104-
return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1
105-
. chr(strlen($this->version->getProtocolIdentifierString())) // byte 2
106-
. $this->version->getProtocolIdentifierString() // byte 3,4,5,6
107-
. chr($this->version->getProtocolVersion()) // byte 7
108-
. chr($this->getConnectFlags()) // byte 8
109-
. chr(0) // byte 9
110-
. chr(10) // byte 10
111-
;
111+
return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1
112+
. chr(strlen($this->version->getProtocolIdentifierString())) // byte 2
113+
. $this->version->getProtocolIdentifierString() // byte 3,4,5,6
114+
. chr($this->version->getProtocolVersion()) // byte 7
115+
. chr($this->getConnectFlags()) // byte 8
116+
. $this->getKeepAlive(); // byte 9,10
117+
}
118+
119+
/**
120+
* @return string
121+
*/
122+
private function getKeepAlive()
123+
{
124+
$msb = $this->keepAlive >> 8;
125+
$lsb = $this->keepAlive % 256;
126+
127+
return chr($msb)
128+
. chr($lsb);
112129
}
113130

114131
/**

src/packet/ConnectionOptions.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ class ConnectionOptions
100100
*/
101101
public $willRetain = false;
102102

103+
/**
104+
* The Keep Alive is a time interval measured in seconds.
105+
*
106+
* @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Keep_Alive
107+
*
108+
* @var int
109+
*/
110+
// public $keepAlive = 0;
111+
public $keepAlive = 10;
112+
103113
/**
104114
* ConnectionOptions constructor.
105115
*

0 commit comments

Comments
 (0)