diff --git a/src/packet/ControlPacket.php b/src/packet/ControlPacket.php index 9f29c0d..39e91cc 100644 --- a/src/packet/ControlPacket.php +++ b/src/packet/ControlPacket.php @@ -25,10 +25,11 @@ public function __construct(Version $version) /** * @param Version $version - * @param string $rawInput + * @param string $rawInput + * @param int $topicStart * @return static */ - public static function parse(Version $version, $rawInput) + public static function parse(Version $version, $rawInput, $topicStart = 2) { static::checkRawInputValidControlPackageType($rawInput); diff --git a/src/packet/Factory.php b/src/packet/Factory.php index 4f138af..e807504 100644 --- a/src/packet/Factory.php +++ b/src/packet/Factory.php @@ -21,16 +21,22 @@ class Factory public static function getNextPacket(Version $version, $remainingData) { while(isset($remainingData{1})) { - $remainingLength = ord($remainingData{1}); - $packetLength = 2 + $remainingLength; + $byte = 1; + $packetLength = 0; + do { + $digit = ord($remainingData{$byte}); + $packetLength += $digit; + $byte++; + } while (($digit & 128) != 0); + $packetLength += 2; $nextPacketData = substr($remainingData, 0, $packetLength); $remainingData = substr($remainingData, $packetLength); - yield self::getByMessage($version, $nextPacketData); + yield self::getByMessage($version, $nextPacketData, $byte); } } - private static function getByMessage(Version $version, $input) + private static function getByMessage(Version $version, $input, $topicStart = 2) { $controlPacketType = ord($input{0}) >> 4; @@ -45,7 +51,7 @@ private static function getByMessage(Version $version, $input) return SubscribeAck::parse($version, $input); case Publish::getControlPacketType(): - return Publish::parse($version, $input); + return Publish::parse($version, $input, $topicStart); case PublishComplete::getControlPacketType(): return PublishComplete::parse($version, $input); diff --git a/src/packet/Publish.php b/src/packet/Publish.php index 2d630ab..20c7b69 100644 --- a/src/packet/Publish.php +++ b/src/packet/Publish.php @@ -32,13 +32,13 @@ public static function getControlPacketType() return ControlPacketType::PUBLISH; } - public static function parse(Version $version, $rawInput) + public static function parse(Version $version, $rawInput, $topicStart = 2) { /** @var Publish $packet */ $packet = parent::parse($version, $rawInput); //TODO 3.3.2.2 Packet Identifier not yet supported - $topic = static::getPayloadLengthPrefixFieldInRawInput(2, $rawInput); + $topic = static::getPayloadLengthPrefixFieldInRawInput($topicStart, $rawInput); $packet->setTopic($topic); $byte1 = $rawInput{0}; @@ -53,7 +53,7 @@ public static function parse(Version $version, $rawInput) } $packet->payload = substr( $rawInput, - 4 + strlen($topic) + $topicStart + 2 + strlen($topic) ); return $packet; diff --git a/src/packet/PublishComplete.php b/src/packet/PublishComplete.php index ec7576b..52f9a5b 100644 --- a/src/packet/PublishComplete.php +++ b/src/packet/PublishComplete.php @@ -13,6 +13,8 @@ */ class PublishComplete extends ControlPacket { + const EVENT = 'PUBLISH_COMPLETE'; + public static function getControlPacketType() { return ControlPacketType::PUBCOMP;