From f2692cd66069b3489d2bcfc16ba6be4107af40a4 Mon Sep 17 00:00:00 2001 From: ehong-tl Date: Fri, 13 Sep 2024 16:53:45 +0800 Subject: [PATCH 1/4] Explicit partial write length and delay settings Allow to set partial write length for each successive client write into network buffer, as well as delay for each successive partial write. --- src/MQTTClient.cpp | 36 ++++++++++++++++++++++++++++++++---- src/MQTTClient.h | 5 +++++ src/lwmqtt/client.c | 5 ++++- src/lwmqtt/lwmqtt.h | 7 ++++++- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/MQTTClient.cpp b/src/MQTTClient.cpp index 5925ec2..2103dd1 100644 --- a/src/MQTTClient.cpp +++ b/src/MQTTClient.cpp @@ -86,15 +86,32 @@ inline lwmqtt_err_t lwmqtt_arduino_network_read(void *ref, uint8_t *buffer, size } inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, size_t len, size_t *sent, - uint32_t /*timeout*/) { + uint32_t /*timeout*/, size_t maxPartialWriteLength, uint32_t partialWriteDelayms) { // cast network reference auto n = (lwmqtt_arduino_network_t *)ref; // write bytes - *sent = n->client->write(buffer, len); - if (*sent <= 0) { - return LWMQTT_NETWORK_FAILED_WRITE; + size_t partial_write = 0; + size_t written = 0; + + while (true) { + if (len - written > maxPartialWriteLength) { + partial_write = n->client->write(buffer + written, maxPartialWriteLength); + written += partial_write; + } + else { + partial_write = n->client->write(buffer + written, len - written); + written += partial_write; + break; + } + + delay(partialWriteDelayms); + + if (partial_write <= 0) { + return LWMQTT_NETWORK_FAILED_WRITE; + } } + *sent = written; return LWMQTT_SUCCESS; } @@ -195,6 +212,9 @@ void MQTTClient::begin(Client &_client) { // set callback lwmqtt_set_callback(&this->client, (void *)&this->callback, MQTTClientHandler); + + this->client.maxPartialWriteLength = this->_maxPartialWriteLength; + this->client.partialWriteDelayms = this->_partialWriteDelayms; } void MQTTClient::onMessage(MQTTClientCallbackSimple cb) { @@ -314,6 +334,14 @@ void MQTTClient::setCleanSession(bool _cleanSession) { this->cleanSession = _cle void MQTTClient::setTimeout(int _timeout) { this->timeout = _timeout; } +void MQTTClient::setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms) { + this->_maxPartialWriteLength = maxPartialWriteLength; + this->_partialWriteDelayms = partialWriteDelayms; + + this->client.maxPartialWriteLength = this->_maxPartialWriteLength; + this->client.partialWriteDelayms = this->_partialWriteDelayms; +} + void MQTTClient::dropOverflow(bool enabled) { // configure drop overflow lwmqtt_drop_overflow(&this->client, enabled, &this->_droppedMessages); diff --git a/src/MQTTClient.h b/src/MQTTClient.h index 55071e2..1c8514e 100644 --- a/src/MQTTClient.h +++ b/src/MQTTClient.h @@ -93,6 +93,9 @@ class MQTTClient { lwmqtt_err_t _lastError = (lwmqtt_err_t)0; uint32_t _droppedMessages = 0; + size_t _maxPartialWriteLength = 512; + uint32_t _partialWriteDelayms = 0; + public: void *ref = nullptr; @@ -141,6 +144,8 @@ class MQTTClient { this->setTimeout(_timeout); } + void setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms); + void dropOverflow(bool enabled); uint32_t droppedMessages() { return this->_droppedMessages; } diff --git a/src/lwmqtt/client.c b/src/lwmqtt/client.c index 97d0cf5..2dbd9b4 100644 --- a/src/lwmqtt/client.c +++ b/src/lwmqtt/client.c @@ -25,6 +25,9 @@ void lwmqtt_init(lwmqtt_client_t *client, uint8_t *write_buf, size_t write_buf_s client->drop_overflow = false; client->overflow_counter = NULL; + + client->maxPartialWriteLength = 512; + client->partialWriteDelayms = 0; } void lwmqtt_set_network(lwmqtt_client_t *client, void *ref, lwmqtt_network_read_t read, lwmqtt_network_write_t write) { @@ -144,7 +147,7 @@ static lwmqtt_err_t lwmqtt_write_to_network(lwmqtt_client_t *client, uint8_t *bu // write size_t partial_write = 0; lwmqtt_err_t err = - client->network_write(client->network, buf + written, len - written, &partial_write, (uint32_t)remaining_time); + client->network_write(client->network, buf + written, len - written, &partial_write, (uint32_t)remaining_time, client->maxPartialWriteLength, client->partialWriteDelayms); if (err != LWMQTT_SUCCESS) { return err; } diff --git a/src/lwmqtt/lwmqtt.h b/src/lwmqtt/lwmqtt.h index 435280d..0728ed2 100644 --- a/src/lwmqtt/lwmqtt.h +++ b/src/lwmqtt/lwmqtt.h @@ -178,8 +178,10 @@ typedef lwmqtt_err_t (*lwmqtt_network_read_t)(void *ref, uint8_t *buf, size_t le * @param len The length of the buffer. * @param sent Variable that must be set with the amount of written bytes. * @param timeout The timeout in milliseconds for the operation. + * @param maxPartialWriteLength Maximum partial write length for each successive partial client write to network buffer. + * @param partialWriteDelayms Delay in ms for each successive partial client write to network buffer. */ -typedef lwmqtt_err_t (*lwmqtt_network_write_t)(void *ref, uint8_t *buf, size_t len, size_t *sent, uint32_t timeout); +typedef lwmqtt_err_t (*lwmqtt_network_write_t)(void *ref, uint8_t *buf, size_t len, size_t *sent, uint32_t timeout, size_t maxPartialWriteLength, uint32_t partialWriteDelayms); /** * The callback used to set a timer. @@ -238,6 +240,9 @@ struct lwmqtt_client_t { bool drop_overflow; uint32_t *overflow_counter; + + size_t maxPartialWriteLength; + uint32_t partialWriteDelayms; }; /** From 3454da81a0dff04c99ebfc9a725ae24f75d2673b Mon Sep 17 00:00:00 2001 From: ehong-tl Date: Fri, 13 Sep 2024 17:00:50 +0800 Subject: [PATCH 2/4] Update README.md --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 023e70a..29a3104 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,13 @@ void setOptions(int keepAlive, bool cleanSession, int timeout); - The `cleanSession` option controls the session retention on the broker side (default: true). - The `timeout` option controls the default timeout for all commands in milliseconds (default: 1000). +```c++ +void setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms); +``` + +- The `maxPartialWriteLength` option controls maximum partial write length for each successive client write into network buffer (default: 512). +- The `partialWriteDelayms` option controls delay in ms for each successive partial client write into network buffer (default: 0). + Set a custom clock source "custom millis" callback to enable deep sleep applications: ```c++ From 30f8b94c35b882bda97f76a7595fc23451280f70 Mon Sep 17 00:00:00 2001 From: ehong-tl Date: Sat, 14 Sep 2024 09:45:20 +0800 Subject: [PATCH 3/4] Revert lwmqtt back to original, modify MQTTClient Revert lwmqtt back to original and modify MQTTClient instead. Added variables to lwmqtt_arduino_network_t structure for network segmented write and delay control. --- src/MQTTClient.cpp | 37 +++++++++++++++++++++---------------- src/MQTTClient.h | 8 +++++--- src/lwmqtt/client.c | 5 +---- src/lwmqtt/lwmqtt.h | 7 +------ 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/MQTTClient.cpp b/src/MQTTClient.cpp index 2103dd1..fc2c4c3 100644 --- a/src/MQTTClient.cpp +++ b/src/MQTTClient.cpp @@ -86,33 +86,38 @@ inline lwmqtt_err_t lwmqtt_arduino_network_read(void *ref, uint8_t *buffer, size } inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, size_t len, size_t *sent, - uint32_t /*timeout*/, size_t maxPartialWriteLength, uint32_t partialWriteDelayms) { + uint32_t /*timeout*/) { // cast network reference auto n = (lwmqtt_arduino_network_t *)ref; // write bytes - size_t partial_write = 0; + size_t partial_written = 0; size_t written = 0; while (true) { - if (len - written > maxPartialWriteLength) { - partial_write = n->client->write(buffer + written, maxPartialWriteLength); - written += partial_write; + if (len - written > n->segmentLength) { + partial_written = n->client->write(buffer + written, n->segmentLength); + written += partial_written; } else { - partial_write = n->client->write(buffer + written, len - written); - written += partial_write; + partial_written = n->client->write(buffer + written, len - written); + written += partial_written; break; } - delay(partialWriteDelayms); + delay(n->writeDelayMs); - if (partial_write <= 0) { + if (partial_written <= 0) { return LWMQTT_NETWORK_FAILED_WRITE; } } + *sent = written; + if (*sent <= 0) { + return LWMQTT_NETWORK_FAILED_WRITE; + } + return LWMQTT_SUCCESS; } @@ -213,8 +218,8 @@ void MQTTClient::begin(Client &_client) { // set callback lwmqtt_set_callback(&this->client, (void *)&this->callback, MQTTClientHandler); - this->client.maxPartialWriteLength = this->_maxPartialWriteLength; - this->client.partialWriteDelayms = this->_partialWriteDelayms; + this->network.segmentLength = this->_segmentLength; + this->network.writeDelayMs = this->_writeDelayMs; } void MQTTClient::onMessage(MQTTClientCallbackSimple cb) { @@ -334,12 +339,12 @@ void MQTTClient::setCleanSession(bool _cleanSession) { this->cleanSession = _cle void MQTTClient::setTimeout(int _timeout) { this->timeout = _timeout; } -void MQTTClient::setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms) { - this->_maxPartialWriteLength = maxPartialWriteLength; - this->_partialWriteDelayms = partialWriteDelayms; +void MQTTClient::setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs) { + this->_segmentLength = segmentLength; + this->_writeDelayMs = writeDelayMs; - this->client.maxPartialWriteLength = this->_maxPartialWriteLength; - this->client.partialWriteDelayms = this->_partialWriteDelayms; + this->network.segmentLength = this->_segmentLength; + this->network.writeDelayMs = this->_writeDelayMs; } void MQTTClient::dropOverflow(bool enabled) { diff --git a/src/MQTTClient.h b/src/MQTTClient.h index 1c8514e..9ea718e 100644 --- a/src/MQTTClient.h +++ b/src/MQTTClient.h @@ -41,6 +41,8 @@ typedef struct { typedef struct { Client *client; + size_t segmentLength; + uint32_t writeDelayMs; } lwmqtt_arduino_network_t; class MQTTClient; @@ -93,8 +95,8 @@ class MQTTClient { lwmqtt_err_t _lastError = (lwmqtt_err_t)0; uint32_t _droppedMessages = 0; - size_t _maxPartialWriteLength = 512; - uint32_t _partialWriteDelayms = 0; + size_t _segmentLength = 65535; + uint32_t _writeDelayMs = 0; public: void *ref = nullptr; @@ -144,7 +146,7 @@ class MQTTClient { this->setTimeout(_timeout); } - void setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms); + void setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs); void dropOverflow(bool enabled); uint32_t droppedMessages() { return this->_droppedMessages; } diff --git a/src/lwmqtt/client.c b/src/lwmqtt/client.c index 2dbd9b4..97d0cf5 100644 --- a/src/lwmqtt/client.c +++ b/src/lwmqtt/client.c @@ -25,9 +25,6 @@ void lwmqtt_init(lwmqtt_client_t *client, uint8_t *write_buf, size_t write_buf_s client->drop_overflow = false; client->overflow_counter = NULL; - - client->maxPartialWriteLength = 512; - client->partialWriteDelayms = 0; } void lwmqtt_set_network(lwmqtt_client_t *client, void *ref, lwmqtt_network_read_t read, lwmqtt_network_write_t write) { @@ -147,7 +144,7 @@ static lwmqtt_err_t lwmqtt_write_to_network(lwmqtt_client_t *client, uint8_t *bu // write size_t partial_write = 0; lwmqtt_err_t err = - client->network_write(client->network, buf + written, len - written, &partial_write, (uint32_t)remaining_time, client->maxPartialWriteLength, client->partialWriteDelayms); + client->network_write(client->network, buf + written, len - written, &partial_write, (uint32_t)remaining_time); if (err != LWMQTT_SUCCESS) { return err; } diff --git a/src/lwmqtt/lwmqtt.h b/src/lwmqtt/lwmqtt.h index 0728ed2..435280d 100644 --- a/src/lwmqtt/lwmqtt.h +++ b/src/lwmqtt/lwmqtt.h @@ -178,10 +178,8 @@ typedef lwmqtt_err_t (*lwmqtt_network_read_t)(void *ref, uint8_t *buf, size_t le * @param len The length of the buffer. * @param sent Variable that must be set with the amount of written bytes. * @param timeout The timeout in milliseconds for the operation. - * @param maxPartialWriteLength Maximum partial write length for each successive partial client write to network buffer. - * @param partialWriteDelayms Delay in ms for each successive partial client write to network buffer. */ -typedef lwmqtt_err_t (*lwmqtt_network_write_t)(void *ref, uint8_t *buf, size_t len, size_t *sent, uint32_t timeout, size_t maxPartialWriteLength, uint32_t partialWriteDelayms); +typedef lwmqtt_err_t (*lwmqtt_network_write_t)(void *ref, uint8_t *buf, size_t len, size_t *sent, uint32_t timeout); /** * The callback used to set a timer. @@ -240,9 +238,6 @@ struct lwmqtt_client_t { bool drop_overflow; uint32_t *overflow_counter; - - size_t maxPartialWriteLength; - uint32_t partialWriteDelayms; }; /** From 6c2cbf05d0618f898ee263f90315b61c36aadf70 Mon Sep 17 00:00:00 2001 From: ehong-tl Date: Sat, 14 Sep 2024 09:52:20 +0800 Subject: [PATCH 4/4] Update README.md --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 29a3104..53ba60d 100644 --- a/README.md +++ b/README.md @@ -184,11 +184,12 @@ void setOptions(int keepAlive, bool cleanSession, int timeout); - The `timeout` option controls the default timeout for all commands in milliseconds (default: 1000). ```c++ -void setPartialWriteSettings(size_t maxPartialWriteLength, uint32_t partialWriteDelayms); +void setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs); ``` -- The `maxPartialWriteLength` option controls maximum partial write length for each successive client write into network buffer (default: 512). -- The `partialWriteDelayms` option controls delay in ms for each successive partial client write into network buffer (default: 0). +- The `segmentLength` option controls maximum segment write length for each successive segmented write into network buffer (default: 65535). +- The `writeDelayMs` option controls delay in ms between each successive segmented write into network buffer (default: 0). +- This function controls the flow of data into the network buffer, helping to prevent buffer overflows and network congestion, particularly when handling large payloads. Set a custom clock source "custom millis" callback to enable deep sleep applications: